Commit 12c9f4fe authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Bugfix for issues writing MGRS tiles in case of scenes that have an overlap to...

Bugfix for issues writing MGRS tiles in case of scenes that have an overlap to neighbour UTM zone; added feature for job creation from scene IDs

algorithms.gms_object:
- get_subset_obj(): implemented keyword out_prj
- to_MGRS_tiles(): fix for not respecting output projection

misc.database_tools.GMS_JOB:
- from_dictlist(): moved parts to _get_validated_sceneInfoGDFs() and _populate_jobAttrs_from_sceneInfoGDF()
- added from_sceneIDlist()
- added _get_validated_sceneInfoGDFs()
- added _populate_jobAttrs_from_sceneInfoGDF()

misc.path_generator.path_generator:
- added get_pathes_all_procdata(): not fully implemented yet

processing.multiproc:
- MAP: bugfix for ignoring CFG.job.CPUs in case CPUs is given

- updated __version__


Former-commit-id: 9b5925c8
parent 1460d9b2
......@@ -877,6 +877,7 @@ class AtmCorr(object):
'input GMS object with the same dimensions.')
else:
# TODO: check if nevertheless a cloud mask is returned!
self.logger.warning('Atmospheric correction did not return a result for the input array. '
'Thus the output keeps NOT atmospherically corrected.')
......
......@@ -1227,7 +1227,7 @@ class GMS_object(object):
overwrite=overwrite)
def get_subset_obj(self, imBounds=None, mapBounds=None, mapBounds_prj=None, logmsg=None, v=False):
def get_subset_obj(self, imBounds=None, mapBounds=None, mapBounds_prj=None, out_prj=None, logmsg=None, v=False):
# type: (tuple) -> self
"""Returns a subset of the given GMS object, based on the given bounds coordinates.
Array attributes are clipped and relevant metadata keys are updated according to new extent.
......@@ -1236,6 +1236,8 @@ class GMS_object(object):
:param mapBounds: <tuple> tuple of map coordinates in the form (xmin,xmax,ymin,ymax)
:param mapBounds_prj: <str> a WKT string containing projection of the given map bounds
(can be different to projection of the GMS object; ignored if map bounds not given)
:param out_prj: <str> a WKT string containing output projection.
If not given, the projection of self.arr is used.
:param logmsg: <str> a message to be logged when this method is called
:param v: <bool> verbose mode (default: False)
:return: <GMS_object> the GMS object subset
......@@ -1276,11 +1278,11 @@ class GMS_object(object):
geoArr = getattr(self,arrname)
# get subsetted and (possibly) reprojected array
rspAlg = 'near' if arrname=='masks' else 'cubic'
subArr = GeoArray(*geoArr.get_mapPos((xmin,ymin,xmax,ymax), mapBounds_prj,
rspAlg=rspAlg), progress=False,
bandnames = list(geoArr.bandnames),
nodata = geoArr.nodata)
out_prj = out_prj if out_prj else geoArr.prj
rspAlg = 'near' if arrname=='masks' else 'cubic'
subArr = GeoArray(*geoArr.get_mapPos((xmin,ymin,xmax,ymax), mapBounds_prj,
out_prj=out_prj, rspAlg=rspAlg), progress=False,
bandnames = list(geoArr.bandnames), nodata = geoArr.nodata)
# show result
if v: subArr.show(figsize=(10,10))
......@@ -1395,6 +1397,7 @@ class GMS_object(object):
for GDF_idx, GDF_row in GDF_MGRS_tiles.iterrows():
tileObj = self.get_subset_obj(mapBounds = GDF_row.map_bounds_MGRS,
mapBounds_prj = GEOP.EPSG2WKT(GDF_row['MGRStileObj'].EPSG),
out_prj = GEOP.EPSG2WKT(GDF_row['MGRStileObj'].EPSG),
logmsg = 'Producing MGRS tile %s from scene %s (entity ID %s).'
%(GDF_row.granuleid, self.scene_ID, self.entity_ID),
v = v)
......
......@@ -77,6 +77,7 @@ class Job:
self.allow_subMultiprocessing = True # allows multiprocessing within workers
self.disable_exception_handler = False # disables automatic handling of unexpected exceptions
self.profiling = False
# TODO add log level
self.benchmark_global = False
# bench_CLD_class = True
......
......@@ -602,9 +602,9 @@ class GMS_JOB(object):
def from_dictlist(self,dictlist_data2process, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
# type: (list,int) -> object
# type: (list, int, int, str) -> object
"""
:param dictlist_data2process: <list> a list of dictionaries containinng the keys "satellite", "sensor" and
:param dictlist_data2process: <list> a list of dictionaries containing the keys "satellite", "sensor" and
"filenames",
e.g. [{'satellite:'Landsat-8,'sensor':'OLI_TIRS','filenames':file.tar.gz},{...}]
:param virtual_sensor_id : <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
......@@ -651,28 +651,104 @@ class GMS_JOB(object):
gdf = GeoDataFrame(pd.concat(all_gdfs)).drop_duplicates()
gdf.columns = ['satellite', 'sensor', 'filename']
# run self.from_dictlist
sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)
# populate attributes
self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)
return self
def from_sceneIDlist(self, list_sceneIDs, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
# type: (list, int, int, str) -> object
"""
:param list_sceneIDs: <list> of scene IDs, e.g. [26781907, 26781917, 26542650, 26542451, 26541679]
:param virtual_sensor_id : <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
:param datasetid_spatial_ref: <int> a valid dataset ID of the dataset to be chosen as spatial reference
(from the 'datasets' table of the postgreSQL database)
(default:249 - Sentinel-2A), 104=Landsat-8
:param comment: <str> a comment describing the job (e.g. 'Beta job')
"""
self._set_target_sensor_specs(virtual_sensor_id,datasetid_spatial_ref)
self.comment = comment
list_sceneIDs = list(list_sceneIDs)
# query 'satellite', 'sensor', 'filename' from database and summarize in GeoDataFrame
with psycopg2.connect(CFG.job.conn_database) as conn:
with conn.cursor() as cursor:
execute_pgSQL_query(cursor,
"""SELECT scenes.id, satellites.name, sensors.name, scenes.filename FROM scenes
LEFT JOIN satellites on scenes.satelliteid=satellites.id
LEFT JOIN sensors on scenes.sensorid=sensors.id
WHERE scenes.id in (%s)""" % ','.join([str(i) for i in list_sceneIDs]))
gdf = GeoDataFrame(cursor.fetchall(), columns=['sceneid', 'satellite', 'sensor', 'filename'])
gdf['sensor'] = gdf['sensor'].apply(lambda senN: senN if senN != 'ETM+_SLC_OFF' else 'ETM+')
gdf = gdf.drop_duplicates()
if gdf.empty:
raise ValueError('None of the given scene IDs could be found in the GeoMultiSens database. '
'Job creation failed.')
else:
missing_IDs = [i for i in list_sceneIDs if i not in gdf['sceneid'].values]
if missing_IDs:
warnings.warn('The following scene IDs could not been found in the GeoMultiSens database: \n%s'
% '\n'.join([str(i) for i in missing_IDs]))
# run self.from_dictlist
sceneInfoGDF = self._get_validated_sceneInfoGDFs(gdf)
# populate attributes
self._populate_jobAttrs_from_sceneInfoGDF(sceneInfoGDF)
return self
def _get_validated_sceneInfoGDFs(self, GDF_SatSenFname):
# type: (GeoDataFrame) -> GeoDataFrame
"""
:param GDF_SatSenFname:
:return:
"""
gdf = GDF_SatSenFname
# loop through all satellite-sensor combinations and get scene information from database
all_gdf_recs, all_gdf_miss = [],[]
all_satellites,all_sensors = zip(*[i.split('__') for i in (np.unique(gdf['satellite']+'__'+gdf['sensor']))])
all_gdf_recs, all_gdf_miss = [], []
all_satellites, all_sensors = zip(
*[i.split('__') for i in (np.unique(gdf['satellite'] + '__' + gdf['sensor']))])
for satellite,sensor in zip(all_satellites,all_sensors):
cur_gdf = gdf.loc[(gdf['satellite']==satellite) & (gdf['sensor']==sensor)]
for satellite, sensor in zip(all_satellites, all_sensors):
cur_gdf = gdf.loc[(gdf['satellite'] == satellite) & (gdf['sensor'] == sensor)]
filenames = list(cur_gdf['filename'])
satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': satellite})
senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors' , ['id'], {'name': sensor})
senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': sensor})
assert len(satID_res), "No satellite named '%s' found in database." % satellite
assert len(senID_res), "No sensor named '%s' found in database." %sensor
assert len(senID_res), "No sensor named '%s' found in database." % sensor
# append sceneid and wkb_hex bounds
records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'],
{'filename': filenames,
'satelliteid': satID_res[0][0], 'sensorid': senID_res[0][0]})
records = GeoDataFrame (records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
cur_gdf = cur_gdf.merge(records, on='filename', how="outer")
if 'sceneid' in gdf.columns:
sceneIDs = list(cur_gdf['sceneid'])
conddict = dict(id=sceneIDs, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])
else:
conddict = dict(filename=filenames, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])
records = get_info_from_postgreSQLdb(
self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'], conddict)
records = GeoDataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
if 'sceneid' in gdf.columns:
del records['sceneid']
cur_gdf = cur_gdf.merge(records, on='filename', how="outer", copy=False)
# separate records with valid matches in database from invalid matches (filename not found in database)
gdf_recs = cur_gdf[cur_gdf.sceneid.notnull()].copy() # creates a copy (needed to be able to apply maps later)
gdf_recs = cur_gdf[
cur_gdf.sceneid.notnull()].copy() # creates a copy (needed to be able to apply maps later)
gdf_miss = cur_gdf[cur_gdf.sceneid.isnull()] # creates a view
# convert scene ids from floats to integers
......@@ -684,7 +760,6 @@ class GMS_JOB(object):
all_gdf_recs.append(gdf_recs)
all_gdf_miss.append(gdf_miss)
# merge all dataframes of all satellite-sensor combinations
gdf_recs_compl = GeoDataFrame(pd.concat(all_gdf_recs))
gdf_miss_compl = GeoDataFrame(pd.concat(all_gdf_miss))
......@@ -694,15 +769,24 @@ class GMS_JOB(object):
warnings.warn('The following scene filenames could not been found in the GeoMultiSens database: \n%s'
% '\n'.join(list(gdf_miss_compl['filename'])))
if not gdf_recs_compl.empty:
self.dataframe = gdf_recs_compl
return gdf_recs_compl
def _populate_jobAttrs_from_sceneInfoGDF(self, sceneInfoGDF):
# type: (GeoDataFrame) -> None
"""
:param sceneInfoGDF:
:return:
"""
if not sceneInfoGDF.empty:
self.dataframe = sceneInfoGDF
self.sceneids = list(self.dataframe['sceneid'])
self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds)
self.timerange_start = self.dataframe.acquisitiondate.min().to_datetime()
self.timerange_end = self.dataframe.acquisitiondate.max().to_datetime()
return self
def from_job_ID(self,job_ID):
# type: (int) -> object
......@@ -943,7 +1027,7 @@ def add_missing_filenames_in_pgSQLdb(conn_params): # FIXME
def pdDataFrame_to_sql_k(engine, frame, name, if_exists='fail', index=True,
index_label=None, schema=None, chunksize=None, dtype=None, **kwargs):
#type: (Any,pd.DataFrame,str,str,bool,str,str,int,dict,dict) -> None
#type: (any,pd.DataFrame,str,str,bool,str,str,int,dict,dict) -> None
"""Extends the standard function pandas.io.SQLDatabase.to_sql() with 'kwargs' which allows to set the primary key
of the target table for example. This is usually not possible with the standard to_sql() function.
......
......@@ -23,6 +23,7 @@ class GMS_logger(logging.Logger):
or to create a new logfile (0); default=1
"""
# TODO redirect different log levels to stdout and stderr
# TODO add log level keyword
# private attributes
self._captured_stream = ''
......
......@@ -136,7 +136,7 @@ class path_generator(object):
def get_outPath_hdr(self,attrName2write):
# type: (str) -> str
"""Returns the output path for the given attribute to be written.
:param attrName2write: <str> name of the GNS object attribute to be written"""
:param attrName2write: <str> name of the GMS object attribute to be written"""
outNameSuffix = 'image_data' if attrName2write=='arr' else attrName2write
outNameHdr = '%s_%s_%s.hdr' %(self.get_baseN(),outNameSuffix,self.proc_level) if outNameSuffix else \
'%s_%s.hdr' %(self.get_baseN(),self.proc_level)
......@@ -146,6 +146,19 @@ class path_generator(object):
"""Returns the path of the .dill for a dump of atmospheric correction inputs, e.g. '/path/to/file/file.dill'."""
return os.path.join(self.get_path_procdata(), '%s_ac_input_%s.dill' % (self.get_baseN(), self.proc_level))
def get_pathes_all_procdata(self): # TODO
image = self.get_path_imagedata()
mask = self.get_path_maskdata()
mask_clouds = self.get_path_cloudmaskdata()
gms_file = self.get_path_gmsfile()
log_file = self.get_path_logfile()
all_pathes = [image, mask, mask_clouds, gms_file, log_file]
warnings.warn(
'get_pathes_all_procdata() is not yet completely implemented and will not return complete path list!')
return all_pathes
def get_tempfile(ext=None,prefix=None,tgt_dir=None):
"""Returns the path to a tempfile.mkstemp() file that can be passed to any function that expects a physical path.
......
......@@ -21,6 +21,7 @@ def MAP(func, args, CPUs=None, flatten_output=False):
"""
CPUs = CPUs if CPUs is not None else CFG.job.CPUs
CPUs = CPUs if CPUs <= CFG.job.CPUs else CFG.job.CPUs # treat CFG.job.CPUs as maximum number of CPUs
if CPUs is not None and CPUs > 1:
with multiprocessing.Pool(CPUs) as pool:
......
......@@ -10,6 +10,8 @@ except ImportError:
if __name__=='__main__':
if len(sys.argv)<2:
# TODO: allow to run a job from list of scene IDs or from dictlist (use GMS_JOB)
# TODO: provide console interface
# a job ID has not been given
# ID = 26184107
......@@ -62,6 +64,8 @@ if __name__=='__main__':
# set up process controller instance
PC = process_controller(ID, parallelization_level='scenes')
#PC.job.path_procdata_scenes = '/geoms/data/processed_scenes_dev'
#PC.job.path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
# run the job
PC.run_all_processors()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment