Commit 64594529 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Config revision - intermediate state: Tests are running again.

parent cc4f6a67
Pipeline #1606 failed with stage
in 10 minutes and 30 seconds
......@@ -20,8 +20,8 @@ def run_from_jobid(args):
# set up process controller instance
PC = process_controller(args.jobid, parallelization_level='scenes', db_host='geoms') # FIXME hardcoded host
# PC.job.path_procdata_scenes = '/geoms/data/processed_scenes_dev'
# PC.job.path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
# PC.path_procdata_scenes = '/geoms/data/processed_scenes_dev'
# PC.path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
# run the job
PC.run_all_processors()
......@@ -112,8 +112,8 @@ def _run_job(dbJob, parallelization_level='scenes'):
warnings.warn("Currently the console argument parser sets the parallelization level to 'scenes'.") # TODO
PC = process_controller(jobid, parallelization_level=parallelization_level)
# PC.job.path_procdata_scenes = '/geoms/data/processed_scenes_dev'
# PC.job.path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
# PC.path_procdata_scenes = '/geoms/data/processed_scenes_dev'
# PC.path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
# run the job
PC.run_all_processors()
......
......@@ -143,7 +143,7 @@ class L1A_object(GMS_object):
subset = ['block', [[sub_dim[0], sub_dim[1] + 1], [sub_dim[2], sub_dim[3] + 1]]]
rasObj = GEOP.GEOPROCESSING(paths_files2stack[0], self.logger, subset=subset)
if CFG.job.exec_mode == 'Flink' and path_output is None: # numpy array output
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output
self.arr = rasObj.Layerstacking(paths_files2stack)
self.path_InFilePreprocessor = paths_files2stack[0]
else: # 'MEMORY' or physical output
......@@ -162,7 +162,7 @@ class L1A_object(GMS_object):
subset = ['block', [[sub_dim[0], sub_dim[1] + 1], [sub_dim[2], sub_dim[3] + 1]]]
rasObj = GEOP.GEOPROCESSING(path_file2load, self.logger, subset=subset)
if CFG.job.exec_mode == 'Flink' and path_output is None: # numpy array output
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output
self.arr = gdalnumeric.LoadFile(path_file2load) if subset is None else \
gdalnumeric.LoadFile(path_file2load, rasObj.colStart, rasObj.rowStart, rasObj.cols, rasObj.rows)
self.path_InFilePreprocessor = path_file2load
......@@ -190,7 +190,7 @@ class L1A_object(GMS_object):
data_arr = np.empty(data.shape + (len(self.LayerBandsAssignment),), data.dtype)
data_arr[:, :, bidx] = data
if CFG.job.exec_mode == 'Flink' and path_output is None: # numpy array output
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output
self.arr = data_arr
else:
GEOP.ndarray2gdal(data_arr, path_output, geotransform=ds.GetGeoTransform(),
......@@ -221,7 +221,7 @@ class L1A_object(GMS_object):
data_arr = np.empty(data.shape + (len(self.LayerBandsAssignment),), data.dtype)
data_arr[:, :, i] = data
if CFG.job.exec_mode == 'Flink' and path_output is None: # numpy array output
if CFG.exec_mode == 'Flink' and path_output is None: # numpy array output
self.arr = data_arr
else:
GEOP.ndarray2gdal(data_arr, path_output, direction=3)
......@@ -271,7 +271,7 @@ class L1A_object(GMS_object):
def calc_TOARadRefTemp(self, subset=None):
"""Convert DN, Rad or TOA_Ref data to TOA Reflectance, to Radiance or to Surface Temperature
(depending on CFG.usecase.conversion_type_optical and conversion_type_thermal).
(depending on CFG.conversion_type_optical and conversion_type_thermal).
The function can be executed by a L1A_object representing a full scene or a tile. To process a file from disk
in tiles, provide an item of self.tile_pos as the 'subset' argument."""
......@@ -305,7 +305,7 @@ class L1A_object(GMS_object):
for optical_thermal in ['optical', 'thermal']:
if optical_thermal not in self.dict_LayerOptTherm.values():
continue
conv = getattr(CFG.usecase, 'conversion_type_%s' % optical_thermal)
conv = getattr(CFG, 'conversion_type_%s' % optical_thermal)
conv = conv if conv != 'BOA_Ref' else 'TOA_Ref'
assert conv in ['Rad', 'TOA_Ref', 'Temp'], 'Unsupported conversion type: %s' % conv
arr_desc = self.arr_desc.split('/')[0] if optical_thermal == 'optical' else self.arr_desc.split('/')[-1]
......@@ -330,7 +330,7 @@ class L1A_object(GMS_object):
inSaturated) if conv == 'TOA_Ref' else \
GEOP.DN2DegreesCelsius_fastforward(inArray, OFF, GAI, K1, K2, 0.95, inFill, inZero, inSaturated)
if conv == 'TOA_Ref':
self.MetaObj.ScaleFactor = CFG.usecase.scale_factor_TOARef
self.MetaObj.ScaleFactor = CFG.scale_factor_TOARef
elif arr_desc == 'Rad':
raise NotImplementedError("Conversion Rad to %s is currently not supported." % conv)
......@@ -349,16 +349,16 @@ class L1A_object(GMS_object):
'13 bands and it not clear for which bands the gains are provided.')
raise NotImplementedError("Conversion TOA_Ref to %s is currently not supported." % conv)
else: # conv=='TOA_Ref'
if self.MetaObj.ScaleFactor != CFG.usecase.scale_factor_TOARef:
res = self.rescale_array(inArray, CFG.usecase.scale_factor_TOARef, self.MetaObj.ScaleFactor)
self.MetaObj.ScaleFactor = CFG.usecase.scale_factor_TOARef
if self.MetaObj.ScaleFactor != CFG.scale_factor_TOARef:
res = self.rescale_array(inArray, CFG.scale_factor_TOARef, self.MetaObj.ScaleFactor)
self.MetaObj.ScaleFactor = CFG.scale_factor_TOARef
self.log_for_fullArr_or_firstTile(
'Rescaling Ref data to scaling factor %d.' % CFG.usecase.scale_factor_TOARef)
'Rescaling Ref data to scaling factor %d.' % CFG.scale_factor_TOARef)
else:
res = inArray
self.log_for_fullArr_or_firstTile('The input data already represents TOA '
'reflectance with the desired scale factor of %d.'
% CFG.usecase.scale_factor_TOARef)
% CFG.scale_factor_TOARef)
else: # arr_desc == 'Temp'
raise NotImplementedError("Conversion Temp to %s is currently not supported." % conv)
......@@ -390,8 +390,8 @@ class L1A_object(GMS_object):
self.update_spec_vals_according_to_dtype('int16')
tiles_desc = '_'.join([desc for op_th, desc in zip(['optical', 'thermal'],
[CFG.usecase.conversion_type_optical,
CFG.usecase.conversion_type_thermal])
[CFG.conversion_type_optical,
CFG.conversion_type_thermal])
if desc in self.dict_LayerOptTherm.values()])
self.arr = dataOut
......@@ -452,12 +452,12 @@ class L1A_object(GMS_object):
dst_CS_datum='WGS84', mode='GDAL', use_workspace=True,
inFill=self.MetaObj.spec_vals['fill'])
if CFG.job.exec_mode == 'Python':
if CFG.exec_mode == 'Python':
path_warped = os.path.join(self.ExtractedFolder, self.baseN + '__' + self.arr_desc)
GEOP.ndarray2gdal(rasObj.tondarray(direction=3), path_warped, importFile=rasObj.desc, direction=3)
self.MetaObj.Dataname = path_warped
self.arr = path_warped
else: # CFG.job.exec_mode=='Flink':
else: # CFG.exec_mode=='Flink':
self.arr = rasObj.tondarray(direction=3)
self.shape_fullArr = [rasObj.rows, rasObj.cols, rasObj.bands]
......@@ -473,7 +473,7 @@ class L1A_object(GMS_object):
self.MetaObj.CornerTieP_UTM = rasObj.get_corner_coordinates('UTM')
self.meta_odict = self.MetaObj.to_odict() # important in order to keep geotransform/projection
if CFG.job.exec_mode == 'Flink':
if CFG.exec_mode == 'Flink':
self.delete_tempFiles() # these files are needed later in Python execution mode
self.MetaObj.Dataname = previous_dataname # /vsi.. pointing directly to raw data archive (which exists)
......
......@@ -76,7 +76,7 @@ class Scene_finder(object):
SpIM = SpatialIndexMediator(timeout=timeout)
self.possib_ref_scenes = \
SpIM.getFullSceneDataForDataset(self.boundsLonLat, self.timeStart, self.timeEnd, self.min_cloudcov,
self.max_cloudcov, CFG.usecase.datasetid_spatial_ref,
self.max_cloudcov, CFG.datasetid_spatial_ref,
refDate=self.src_AcqDate, maxDaysDelta=self.plusminus_days)
break
except socket.timeout:
......@@ -116,7 +116,7 @@ class Scene_finder(object):
# get processing level of reference scenes
procL = GeoDataFrame(
DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes_proc', ['sceneid', 'proc_level'],
DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes_proc', ['sceneid', 'proc_level'],
{'sceneid': list(GDF.sceneid)}), columns=['sceneid', 'proc_level'])
GDF = GDF.merge(procL, on='sceneid', how='left')
GDF = GDF.where(GDF.notnull(), None) # replace NaN values with None
......@@ -129,7 +129,7 @@ class Scene_finder(object):
GDF['refDs_exists'] = list(GDF['path_ref'].map(lambda p: os.path.exists(p) if p else False))
# check if a proper entity ID can be gathered from database
eID = GeoDataFrame(DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['id', 'entityid'],
eID = GeoDataFrame(DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['id', 'entityid'],
{'id': list(GDF.sceneid)}), columns=['sceneid', 'entityid'])
GDF = GDF.merge(eID, on='sceneid', how='left')
self.GDF_ref_scenes = GDF.where(GDF.notnull(), None)
......@@ -284,7 +284,7 @@ class L1B_object(L1A_object):
plusminus_days = 30
AcqDate = self.im2shift_objDict['acquisition_date']
date_minmax = [AcqDate - timedelta(days=plusminus_days), AcqDate + timedelta(days=plusminus_days)]
dataset_cond = 'datasetid=%s' % CFG.usecase.datasetid_spatial_ref
dataset_cond = 'datasetid=%s' % CFG.datasetid_spatial_ref
cloudcov_cond = 'cloudcover < %s' % max_cloudcov
# FIXME cloudcover noch nicht für alle scenes im proc_level METADATA verfügbar
dayrange_cond = "(EXTRACT(MONTH FROM scenes.acquisitiondate), EXTRACT(DAY FROM scenes.acquisitiondate)) " \
......@@ -294,7 +294,7 @@ class L1B_object(L1A_object):
def query_scenes(condlist):
return DB_T.get_overlapping_scenes_from_postgreSQLdb(
CFG.job.conn_database,
CFG.conn_database,
table='scenes',
tgt_corners_lonlat=self.trueDataCornerLonLat,
conditions=condlist,
......@@ -311,9 +311,9 @@ class L1B_object(L1A_object):
# das ist nur Ergebnis aus scenes_proc
# -> dort liegt nur eine referenz, wenn die szene schon bei CFG.job-Beginn in Datensatzliste drin war
res = DB_T.get_overlapping_scenes_from_postgreSQLdb(
CFG.job.conn_database,
CFG.conn_database,
tgt_corners_lonlat=self.trueDataCornerLonLat,
conditions=['datasetid=%s' % CFG.usecase.datasetid_spatial_ref],
conditions=['datasetid=%s' % CFG.datasetid_spatial_ref],
add_cmds='ORDER BY scenes.cloudcover ASC',
timeout=25000)
filt_overlap_scenes = self._sceneIDList_to_filt_overlap_scenes([i[0] for i in res[:50]], 20.)
......@@ -354,8 +354,8 @@ class L1B_object(L1A_object):
# start download of scene data not available and start L1A processing
def dl_cmd(scene_ID): print('%s %s %s' % (
CFG.job.java_commands['keyword'].strip(), # FIXME CFG.job.java_commands is deprecated
CFG.job.java_commands["value_download"].strip(), scene_ID))
CFG.java_commands['keyword'].strip(), # FIXME CFG.java_commands is deprecated
CFG.java_commands["value_download"].strip(), scene_ID))
path = PG.path_generator(scene_ID=sc['scene_ID']).get_path_imagedata()
......@@ -369,12 +369,12 @@ class L1B_object(L1A_object):
# check if scene is downloading
download_start_timeout = 5 # seconds
# set timout for external processing
# -> DEPRECATED BECAUSE CREATION OF EXTERNAL CFG.job WITHIN CFG.job IS NOT ALLOWED
# -> DEPRECATED BECAUSE CREATION OF EXTERNAL CFG WITHIN CFG IS NOT ALLOWED
processing_timeout = 5 # seconds # FIXME increase timeout if processing is really started
proc_level = None
while True:
proc_level_chk = DB_T.get_info_from_postgreSQLdb(
CFG.job.conn_database, 'scenes', ['proc_level'], {'id': sc['scene_ID']})[0][0]
CFG.conn_database, 'scenes', ['proc_level'], {'id': sc['scene_ID']})[0][0]
if proc_level != proc_level_chk:
print('Reference scene %s, current processing level: %s' % (sc['scene_ID'], proc_level_chk))
proc_level = proc_level_chk
......@@ -391,7 +391,7 @@ class L1B_object(L1A_object):
warnings.warn('L1A processing of reference scene %s (entity ID %s) timed out. '
'Coregistration of this scene failed.' % (self.baseN, self.scene_ID))
break
# DB_T.set_info_in_postgreSQLdb(CFG.job.conn_database,'scenes',
# DB_T.set_info_in_postgreSQLdb(CFG.conn_database,'scenes',
# {'proc_level':'METADATA'},{'id':sc['scene_ID']})
time.sleep(5)
......@@ -408,7 +408,7 @@ class L1B_object(L1A_object):
self.overlap_percentage = sc['overlap percentage']
self.overlap_area = sc['overlap area']
query_res = DB_T.get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['entityid'],
query_res = DB_T.get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['entityid'],
{'id': self.imref_scene_ID}, records2fetch=1)
assert query_res != [], 'No entity-ID found for scene number %s' % self.imref_scene_ID
self.imref_entity_ID = query_res[0][0] # [('LC81510322013152LGN00',)]
......@@ -504,7 +504,7 @@ class L1B_object(L1A_object):
if spatIdxSrv_status == 'unavailable':
self.logger.warning('Coregistration skipped due to unavailable Spatial Index Mediator Server!"')
elif CFG.job.skip_coreg:
elif CFG.skip_coreg:
self.logger.warning('Coregistration skipped according to user configuration.')
elif self.coreg_needed and self.spatRef_available:
......@@ -604,7 +604,7 @@ class L1B_object(L1A_object):
if self.coreg_info['success']:
self.logger.info("Correcting spatial shifts for attribute '%s'..." % attrname)
elif cliptoextent and is_coord_grid_equal(
geoArr.gt, CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy):
geoArr.gt, CFG.spatial_ref_gridx, CFG.spatial_ref_gridy):
self.logger.info("Attribute '%s' has only been clipped to it's extent because no valid "
"shifts have been detected and the pixel grid equals the target grid."
% attrname)
......@@ -615,12 +615,12 @@ class L1B_object(L1A_object):
# correct shifts
DS = DESHIFTER(geoArr, self.coreg_info,
target_xyGrid=[CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy],
target_xyGrid=[CFG.spatial_ref_gridx, CFG.spatial_ref_gridy],
cliptoextent=cliptoextent,
clipextent=mapBounds,
align_grids=True,
resamp_alg='nearest' if attrname == 'masks' else 'cubic',
CPUs=None if CFG.job.allow_subMultiprocessing else 1,
CPUs=None if CFG.allow_subMultiprocessing else 1,
progress=True if v else False,
q=True,
v=v)
......
......@@ -148,8 +148,8 @@ class L1C_object(L1B_object):
meshwidth=10,
nodata_mask=None, # dont overwrite areas outside the image with nodata
outFill=get_outFillZeroSaturated(np.float32)[0],
accurracy=CFG.usecase.SZA_SAA_calculation_accurracy,
lonlat_arr=self.lonlat_arr if CFG.usecase.SZA_SAA_calculation_accurracy == 'fine' else None)
accurracy=CFG.SZA_SAA_calculation_accurracy,
lonlat_arr=self.lonlat_arr if CFG.SZA_SAA_calculation_accurracy == 'fine' else None)
return self._SZA_arr
@SZA_arr.setter
......@@ -259,7 +259,7 @@ class AtmCorr(object):
path_logfile = inObj.pathGen.get_path_logfile()
fileHandler = logging.FileHandler(path_logfile, mode='a')
fileHandler.setFormatter(logger_atmCorr.formatter_fileH)
fileHandler.setLevel(CFG.job.log_level)
fileHandler.setLevel(CFG.log_level)
logger_atmCorr.addHandler(fileHandler)
......@@ -662,7 +662,7 @@ class AtmCorr(object):
# compute cloud mask if not already provided
if no_avail_CMs:
algorithm = CFG.usecase.cloud_masking_algorithm[self.inObjs[0].satellite]
algorithm = CFG.cloud_masking_algorithm[self.inObjs[0].satellite]
if algorithm == 'SICOR':
return None
......@@ -672,7 +672,7 @@ class AtmCorr(object):
try:
from .cloud_masking import Cloud_Mask_Creator
CMC = Cloud_Mask_Creator(self.inObjs[0], algorithm=algorithm, tempdir_root=CFG.job.path_tempdir)
CMC = Cloud_Mask_Creator(self.inObjs[0], algorithm=algorithm, tempdir_root=CFG.path_tempdir)
CMC.calc_cloud_mask()
cm_geoarray = CMC.cloud_mask_geoarray
cm_array = CMC.cloud_mask_array
......@@ -742,7 +742,7 @@ class AtmCorr(object):
t0 = time()
results = download_variables(date_from=self.inObjs[0].acq_datetime,
date_to=self.inObjs[0].acq_datetime,
db_path=CFG.job.path_ECMWF_db,
db_path=CFG.path_ECMWF_db,
max_step=120, # default
ecmwf_variables=default_products,
processes=0, # singleprocessing
......@@ -792,7 +792,7 @@ class AtmCorr(object):
script = False
# check if ECMWF data are available - if not, start the download
if CFG.usecase.auto_download_ecmwf:
if CFG.auto_download_ecmwf:
self._check_or_download_ECMWF_data()
# validate SNR
......@@ -887,7 +887,7 @@ class AtmCorr(object):
# update metadata
inObj.arr_desc = 'BOA_Ref'
inObj.MetaObj.bands = len(self.results.data_ac)
inObj.MetaObj.PhysUnit = 'BOA_Reflectance in [0-%d]' % CFG.usecase.scale_factor_BOARef
inObj.MetaObj.PhysUnit = 'BOA_Reflectance in [0-%d]' % CFG.scale_factor_BOARef
inObj.MetaObj.LayerBandsAssignment = out_LBA
inObj.MetaObj.filter_layerdependent_metadata()
inObj.meta_odict = inObj.MetaObj.to_odict() # actually auto-updated by getter
......@@ -896,7 +896,7 @@ class AtmCorr(object):
# FIXME AC output nodata values = 0 -> new nodata areas but mask not updated
oF_refl, oZ_refl, oS_refl = get_outFillZeroSaturated(inObj.arr.dtype)
surf_refl = np.dstack((self.results.data_ac[bandN] for bandN in ac_bandNs))
surf_refl *= CFG.usecase.scale_factor_BOARef # scale using scale factor (output is float16)
surf_refl *= CFG.scale_factor_BOARef # scale using scale factor (output is float16)
# FIXME really set AC nodata values to GMS outZero?
surf_refl[nodata] = oZ_refl # overwrite AC nodata values with GMS outZero
# apply the original nodata mask (indicating background values)
......@@ -927,8 +927,8 @@ class AtmCorr(object):
ac_bandNs = [bandN for bandN in inObj.arr.bandnames if bandN in self.results.data_ac.keys()]
ac_errors = np.dstack((self.results.data_errors[bandN] for bandN in ac_bandNs))
ac_errors *= CFG.usecase.scale_factor_errors_ac # scale using scale factor (output is float16)
out_dtype = np.int8 if CFG.usecase.scale_factor_errors_ac <= 255 else np.int16
ac_errors *= CFG.scale_factor_errors_ac # scale using scale factor (output is float16)
out_dtype = np.int8 if CFG.scale_factor_errors_ac <= 255 else np.int16
ac_errors[nodata] = get_outFillZeroSaturated(out_dtype)[0]
ac_errors = ac_errors.astype(out_dtype)
inObj.ac_errors = ac_errors # setter generates a GeoArray with the same bandnames like inObj.arr
......@@ -989,8 +989,8 @@ class AtmCorr(object):
if self.results.mask_clouds.mask_confidence_array is not None:
cfd_arr = self.results.mask_clouds.mask_confidence_array # float32 2D array, scaled [0-1, nodata 255]
cfd_arr[cfd_arr == self.ac_input['options']['cld_mask']['nodata_value_mask']] = -1
cfd_arr = (cfd_arr * CFG.usecase.scale_factor_BOARef).astype(np.int16)
cfd_arr[cfd_arr == -CFG.usecase.scale_factor_BOARef] = get_outFillZeroSaturated(cfd_arr.dtype)[0]
cfd_arr = (cfd_arr * CFG.scale_factor_BOARef).astype(np.int16)
cfd_arr[cfd_arr == -CFG.scale_factor_BOARef] = get_outFillZeroSaturated(cfd_arr.dtype)[0]
joined = False
for inObj in self.inObjs:
......
......@@ -41,7 +41,7 @@ class L2B_object(L2A_object):
def spectral_homogenization(self, kind='linear'):
src_cwls = self.meta_odict['wavelength']
# FIXME exclude or include thermal bands; respect sorted CWLs in context of LayerBandsAssignment
tgt_cwls = CFG.usecase.target_CWL
tgt_cwls = CFG.target_CWL
if src_cwls != tgt_cwls:
assert kind in ['linear', ], "%s is not a supported kind of homogenization." % kind
self.log_for_fullArr_or_firstTile(
......
......@@ -71,7 +71,7 @@ class _FMASK_Runner(object):
def is_GMSConfig_available(self):
from ..config import GMS_config as CFG
try:
if CFG.job is not None:
if CFG is not None:
return True
except (EnvironmentError, OSError):
return False
......@@ -350,7 +350,7 @@ class FMASK_Runner_Sentinel2(_FMASK_Runner):
if not self._granule_ID and self.scene_ID and self.scene_ID != -9999 and self.is_GMSConfig_available:
from ..config import GMS_config as CFG
res = get_info_from_postgreSQLdb(CFG.job.conn_database, 'scenes', ['entityid'], {'id': self.scene_ID})
res = get_info_from_postgreSQLdb(CFG.conn_database, 'scenes', ['entityid'], {'id': self.scene_ID})
assert len(res) != 0, \
"Invalid SceneID given - no corresponding scene with the ID=%s found in database.\n" % self.scene_ID
assert len(res) == 1, "Error in database. The sceneid %s exists more than once. \n" % self.scene_ID
......@@ -480,7 +480,7 @@ class Cloud_Mask_Creator(object):
self.GMS_obj.logger.info("Calculating cloud mask based on '%s' algorithm..." % self.algorithm)
if self.algorithm is 'FMASK':
if self.algorithm == 'FMASK':
if re.search('Landsat', self.GMS_obj.satellite, re.I):
FMR = FMASK_Runner_Landsat(self.GMS_obj.path_archive, self.GMS_obj.satellite)
......
......@@ -100,7 +100,7 @@ class GEOPROCESSING(object):
self.shortname, self.extension = os.path.splitext(self.filename)
# ****OBJECT ATTRIBUTES***************************************************
self.workspace = os.path.join(CFG.job.path_tempdir, 'GEOPROCESSING_temp') if workspace is None else workspace
self.workspace = os.path.join(CFG.path_tempdir, 'GEOPROCESSING_temp') if workspace is None else workspace
if v:
self.logger.debug("\n--")
self.logger.debug("\ttemporary geoprocessing workspace", self.workspace)
......@@ -379,7 +379,7 @@ class GEOPROCESSING(object):
# %(dst_EPSG_code, in_nodataVal,out_nodataVal, translatedFile, warpedFile))
os.system('gdalwarp -of ENVI --config GDAL_CACHEMAX 2048 -wm 2048 -t_srs EPSG:%s -tps -r \
cubic -srcnodata %s -dstnodata %s -multi -overwrite -wo NUM_THREADS=%s -q %s %s'
% (dst_EPSG_code, inFill, out_nodataVal, CFG.job.CPUs, translatedFile, warpedFile))
% (dst_EPSG_code, inFill, out_nodataVal, CFG.CPUs, translatedFile, warpedFile))
# import shutil
# only for bugfixing:
# shutil.copy(translatedFile, \
......@@ -405,7 +405,7 @@ class GEOPROCESSING(object):
# %(dst_EPSG_code,in_nodataVal,out_nodataVal,translatedFile,warpedFile))
os.system('gdalwarp -of VRT --config GDAL_CACHEMAX 2048 -wm 2048 -ot Int16 -t_srs EPSG:%s -tps -r \
cubic -srcnodata %s -dstnodata %s -overwrite -multi -wo NUM_THREADS=%s -q %s %s'
% (dst_EPSG_code, inFill, out_nodataVal, CFG.job.CPUs, translatedFile, warpedFile))
% (dst_EPSG_code, inFill, out_nodataVal, CFG.CPUs, translatedFile, warpedFile))
# print('warped')
print('GDAL warping time', time.time() - t0)
......
......@@ -16,61 +16,25 @@ import json
from jsmin import jsmin
from cerberus import Validator
import pkgutil
from typing import TYPE_CHECKING, Dict
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .misc.database_tools import GMS_JOB
from .misc.database_tools import GMS_JOB # noqa F401 # flake8 issue
__author__ = 'Daniel Scheffler'
def set_configOLD(job_ID, exec_mode='Python', db_host='localhost', reset=False, job_kwargs=None):
# type: (int, str, str, bool, dict) -> None
"""Set up a configuration for a new gms_preprocessing job!
:param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
:param db_host: host name of the server that runs the postgreSQL database
:param reset: whether to reset the job status or not (default=False)
:param job_kwargs: keyword arguments to be passed to gms_preprocessing.config.Job() (see documentation there)
"""
if not hasattr(builtins, 'GMS_job') or not hasattr(builtins, 'GMS_usecase') or reset:
job_kwargs = job_kwargs if job_kwargs else {}
builtins.GMS_job = Job(job_ID, exec_mode=exec_mode, db_host=db_host, **job_kwargs)
builtins.GMS_usecase = Usecase(getattr(builtins, 'GMS_job'))
class GMS_configuration(object):
@property
def job(self):
if hasattr(builtins, 'GMS_job'):
return getattr(builtins, 'GMS_job')
else:
raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'")
@job.setter
def job(self, job_obj):
assert isinstance(job_obj, Job), \
"GMS_configuration.job can only be set to an instance of the class 'config.Job'."
builtins.GMS_job = job_obj
@property
def usecase(self):
if hasattr(builtins, 'GMS_usecase'):
return getattr(builtins, 'GMS_usecase')
def __getattr__(self, attr):
if hasattr(builtins, 'GMS_JobConfig'):
if attr in ['job', 'usecase']:
# This is only to keep compatibility with HU-INF codes
return getattr(builtins, 'GMS_JobConfig')
return getattr(builtins.GMS_JobConfig, attr)
else:
raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'")
@usecase.setter
def usecase(self, usecase_obj):
assert isinstance(usecase_obj, Usecase), \
"GMS_configuration.usecase can only be set to an instance of the class 'config.Usecase'."
builtins.GMS_usecase = usecase_obj
GMS_config = GMS_configuration()
......@@ -288,7 +252,7 @@ class JobConfig(object):
self.exec_mode = \
gp('exec_mode', json_opts['exec_mode'])
self.CPUs = \
gp('CPUs', json_opts['exec_mode'], fallback=multiprocessing.cpu_count())
gp('CPUs', json_opts['CPUs'], fallback=multiprocessing.cpu_count())
self.allow_subMultiprocessing = \
gp('allow_subMultiprocessing', json_opts['allow_subMultiprocessing'])
self.disable_exception_handler = \
......@@ -459,6 +423,12 @@ class JobConfig(object):
self.spatial_ref_gridx = np.arange(xgsd / 2., xgsd / 2. + 2 * xgsd, xgsd) # e.g. [15, 45]
self.spatial_ref_gridy = np.arange(ygsd / 2., ygsd / 2. + 2 * ygsd, ygsd)
#############
# data list #
#############
self.data_list = self.get_data_list_of_current_jobID()
@property
def user_opts_defaults(self):
if not self._user_opts_defaults:
......@@ -490,7 +460,7 @@ class JobConfig(object):
# type: () -> GMS_JOB
if not self._DB_job_record:
# check if job ID exists in database
from .misc.database_tools import GMS_JOB
from .misc.database_tools import GMS_JOB # noqa F811 # redefinition of unused 'GMS_JOB' from line 22
try:
self._DB_job_record = GMS_JOB(self.conn_database).from_job_ID(self.ID)
except ValueError:
......@@ -544,6 +514,7 @@ class JobConfig(object):
'options_default.json'), validation=validate)
# update default options with those from DB
if self.DB_job_record.analysis_parameter:
db_options = json_to_python(json.loads(jsmin(self.DB_job_record.analysis_parameter))) # type: dict
default_options.update(db_options)
......@@ -575,6 +546,82 @@ class JobConfig(object):
return {'args': {k: v for k, v in argskwargs.items() if k in argsnames},
'kwargs': {k: v for k, v in argskwargs.items() if k not in argsnames}}
def get_data_list_of_current_jobID(self):
"""
Get a list of datasets to be processed from database and return it together with some metadata.
:return: <list> of OrderedDicts, e.g. [OrderedDict([('proc_level', None), ('scene_ID', 5895940),
('datasetid', 104), ('image_type', 'RSD'), ('satellite', 'Landsat-8'), ('sensor', 'OLI_TIRS'),
('subsystem', ''), ('acquisition_date', datetime.datetime(2015, 2, 5, 10, 2, 52)),
('entity_ID', 'LC81930242015036LGN00'), ('filename', 'LC81930242015036LGN00.tar.gz'),
('sensormode', 'M'), ('logger', None)]), ...]
"""
from .model.metadata import get_sensormode
data_list = []
with psycopg2.connect(self.conn_database) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("""
WITH jobs_unnested AS (
SELECT id, unnest(sceneids) AS sceneid FROM jobs
)
SELECT jobs_unnested.sceneid,
scenes.datasetid,
scenes.acquisitiondate,
scenes.entityid,
scenes.filename,
COALESCE(scenes_proc.proc_level::text, 'L1A') AS proc_level,
datasets.image_type,
satellites.name AS satellite,
sensors.name AS sensor,
subsystems.name AS subsystem
FROM jobs_unnested
LEFT OUTER JOIN scenes ON scenes.id = jobs_unnested.sceneid
LEFT OUTER JOIN scenes_proc ON scenes_proc.sceneid = jobs_unnested.sceneid
LEFT OUTER JOIN datasets ON datasets.id = datasetid
LEFT OUTER JOIN satellites ON satellites.id = satelliteid
LEFT OUTER JOIN sensors ON sensors.id = sensorid
LEFT OUTER JOIN subsystems ON subsystems.id = subsystemid
WHERE jobs_unnested.id = %s
""",
(self.ID,))
for row in cur.fetchall():
ds = OrderedDict()
ds["proc_level"] = row["proc_level"]
ds["scene_ID"] = row["sceneid"]
ds["dataset_ID"] = row["datasetid"]
ds["image_type"] = row["image_type"]
ds["satellite"] = row["satellite"]
ds["sensor"] = row["sensor"]
ds["subsystem"] = row["subsystem"]
ds["acq_datetime"] = row["acquisitiondate"]
ds["entity_ID"] = row["entityid"]
ds["filename"] = row["filename"]
ds['sensor'] = 'ETM+' if re.search('ETM+', ds['sensor']) else ds['sensor']