Commit f1d8ba19 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Previously processed L2A and L2B Sentinel-2 datasets are now properly found by...

Previously processed L2A and L2B Sentinel-2 datasets are now properly found by subsequent jobs (issue #58). Added tests for properly finding already written datasets by subsequent jobs.

Fixed issue #9 (L2C MGRS output has no logfile).


Former-commit-id: 8dd18769
Former-commit-id: 668a0c33
parent 24abf4b6
......@@ -76,17 +76,28 @@ class path_generator(object):
(CFG.path_procdata_scenes, self.satellite, self.sensor, self.entity_ID)
return os.path.join(*pOrd)
def get_baseN(self):
"""Returns the basename belonging to the given scene."""
items2include = (self.satellite, self.sensor, self.subsystem, self.entity_ID) if self.subsystem else \
(self.satellite, self.sensor, self.entity_ID)
def get_baseN(self, merged_subsystems=False):
"""Returns the basename belonging to the given scene.
:param merged_subsystems: if True, a subsystem is not included in the returned basename
(usefor for merged subsystems in L2A+)
"""
if self.subsystem and not merged_subsystems:
items2include = (self.satellite, self.sensor, self.subsystem, self.entity_ID)
else:
items2include = (self.satellite, self.sensor, self.entity_ID)
if self.MGRS_info:
items2include += (self.MGRS_info['tile_ID'],)
return '__'.join(list(items2include))
def get_path_logfile(self):
"""Returns the path of the logfile belonging to the given scene, e.g. '/path/to/file/file.log'."""
return os.path.join(self.get_path_procdata(), self.get_baseN() + '.log')
def get_path_logfile(self, merged_subsystems=False):
"""Returns the path of the logfile belonging to the given scene, e.g. '/path/to/file/file.log'.
:param merged_subsystems: if True, a subsystem is not included in the returned logfile path
(usefor for merged subsystems in L2A+)
"""
return os.path.join(self.get_path_procdata(), self.get_baseN(merged_subsystems=merged_subsystems) + '.log')
def get_local_archive_path_baseN(self):
"""Returns the path of the downloaded raw data archive, e.g. '/path/to/file/file.tar.gz'."""
......
......@@ -360,6 +360,7 @@ class Dataset(object):
@property
def pathGen(self): # TODO keep that in the base class?
# type: () -> PG.path_generator
"""
Returns the path generator object for generating file pathes belonging to the GMS object.
"""
......
......@@ -461,7 +461,8 @@ class GMS_object(Dataset):
return copy.copy(self)
def from_sensor_subsystems(self, list_GMS_objs):
# type: (list) -> GMS_object
# type: (List[GMS_object]) -> GMS_object
# TODO convert to classmethod
"""Merge separate GMS objects belonging to the same scene-ID into ONE GMS object.
:param list_GMS_objs: <list> of GMS objects covering the same geographic area but representing different
......@@ -480,19 +481,49 @@ class GMS_object(Dataset):
assert len(subsystems) == len(list(set(subsystems))), \
"The input 'list_GMS_objs' contains duplicates: %s" % subsystems
##################
# merge logfiles #
##################
# read all logs into DataFrame, sort it by the first column
[GMS_obj.close_GMS_loggers() for GMS_obj in list_GMS_objs] # close the loggers of the input objects
paths_inLogs = [GMS_obj.pathGen.get_path_logfile() for GMS_obj in list_GMS_objs]
allLogs_df = DataFrame()
for log in paths_inLogs:
df = read_csv(log, sep='\n', delimiter=': ', header=None,
engine='python') # engine suppresses a pandas warning
allLogs_df = allLogs_df.append(
df) # FIXME this will log e.g. atm. corr 3 times for S2A -> use captured streams instead?
allLogs_df = allLogs_df.sort_values(0)
# set common metadata, needed for logfile
self.baseN = list_GMS_objs[0].pathGen.get_baseN(merged_subsystems=True)
self.path_logfile = list_GMS_objs[0].pathGen.get_path_logfile(merged_subsystems=True)
self.scene_ID = list_GMS_objs[0].scene_ID
# write the merged logfile and flush previous logger
np.savetxt(self.path_logfile, np.array(allLogs_df), delimiter=': ', fmt="%s")
self.close_GMS_loggers()
# log
list_GMS_objs[0].logger.info('Merging the subsystems %s to a single GMS object...'
% ', '.join([GMS_obj.subsystem for GMS_obj in list_GMS_objs]))
self.logger.info('Merging the subsystems %s to a single GMS object...'
% ', '.join([GMS_obj.subsystem for GMS_obj in list_GMS_objs]))
# find the common extent. NOTE: boundsMap is expected in the order [xmin,xmax,ymin,ymax]
geoExtents = np.array([GMS_obj.arr.box.boundsMap for GMS_obj in list_GMS_objs])
common_extent = (min(geoExtents[:, 0]), max(geoExtents[:, 1]), min(geoExtents[:, 2]), max(geoExtents[:, 3]))
# MERGE METADATA
##################
# MERGE METADATA #
##################
# copy all attributes from the first input GMS file (private attributes are not touched)
for key, value in list_GMS_objs[0].__dict__.copy().items():
if key in ['GMS_identifier', 'georef', 'dict_LayerOptTherm']:
continue # properties that should better be created on the fly
elif key in ['baseN', 'path_logfile', 'scene_ID', 'subsystem']:
continue # either previously set with common values or not needed for merged GMS_object
try:
setattr(self, key, value)
except Exception:
......@@ -536,20 +567,10 @@ class GMS_object(Dataset):
if isinstance(getattr(list_GMS_objs[0].MetaObj, attrN), list) else attrDic_fullLBA
setattr(self.MetaObj, attrN, val2set)
# merge logfiles (read all logs into DataFrame, sort it by the first column and write to new logfile
[GMS_obj.close_GMS_loggers() for GMS_obj in list_GMS_objs] # close the loggers of the input objects
paths_inLogs = [GMS_obj.pathGen.get_path_logfile() for GMS_obj in list_GMS_objs]
allLogs_df = DataFrame()
for log in paths_inLogs:
df = read_csv(log, sep='\n', delimiter=': ', header=None,
engine='python') # engine suppresses a pandas warning
allLogs_df = allLogs_df.append(
df) # FIXME this will log e.g. atm. corr 3 times for S2A -> use captured streams instead?
allLogs_df = allLogs_df.sort_values(0)
np.savetxt(self.pathGen.get_path_logfile(), np.array(allLogs_df), delimiter=': ', fmt="%s")
####################
# MERGE ARRAY DATA #
####################
# MERGE ARRAY DATA
# overwrite array data with merged arrays, clipped to common_extent and reordered according to FullLayerBandsAss
for attrname in ['arr', 'ac_errors', 'dem', 'mask_nodata', 'mask_clouds', 'mask_clouds_confidence', 'masks']:
......@@ -582,7 +603,8 @@ class GMS_object(Dataset):
# skip get_mapPos() if all input GMS objects have the same extent
geoArrs_same_extent = all_arrays
# validate output GeoArrays
# validate output GeoArrays #
#############################
if len([gA for gA in geoArrs_same_extent if gA is not None]) > 1:
equal_bounds = all([geoArrs_same_extent[0].box.boundsMap == gA.box.boundsMap
for gA in geoArrs_same_extent[1:]])
......@@ -593,7 +615,8 @@ class GMS_object(Dataset):
'input GMS objects. The extents, projections or pixel dimensions of the '
'calculated input GMS objects are not equal.')
# set output arrays
# set output arrays #
#####################
if attrname in ['arr', 'ac_errors'] and list(set(geoArrs_same_extent)) != [None]:
# the bands of these arrays have to be reordered according to FullLayerBandsAssignment
......@@ -607,7 +630,8 @@ class GMS_object(Dataset):
"%s" % (bN, attrname, str(available_bandNs)))
# merge arrays
def get_band(bandN): return [gA[bandN] for gA in geoArrs_same_extent if gA and bandN in gA.bandnames][0]
def get_band(bandN):
return [gA[bandN] for gA in geoArrs_same_extent if gA and bandN in gA.bandnames][0]
full_geoArr = GeoArray(np.dstack((get_band(bandN) for bandN in bandnames)),
geoArrs_same_extent[0].gt, geoArrs_same_extent[0].prj,
bandnames=bandnames,
......@@ -620,15 +644,18 @@ class GMS_object(Dataset):
# use the DEM of the first input object
# (if the grid is the same, the DEMs should be the same anyway)
self.dem = geoArrs_same_extent[0]
elif attrname == 'mask_nodata':
# must not be merged -> self.arr is already merged, so just recalculate it (np.all)
self.mask_nodata = self.calc_mask_nodata(overwrite=True)
elif attrname == 'mask_clouds':
# possibly only present in ONE subsystem (set by atm. Corr.)
mask_clouds = [msk for msk in geoArrs_same_extent if msk is not None]
if len(mask_clouds) > 1:
raise ValueError('Expected mask clouds in only one subsystem. Got %s.' % len(mask_clouds))
self.mask_clouds = mask_clouds[0] if mask_clouds else None
elif attrname == 'mask_clouds_confidence':
# possibly only present in ONE subsystem (set by atm. Corr.)
mask_clouds_conf = [msk for msk in geoArrs_same_extent if msk is not None]
......@@ -636,6 +663,7 @@ class GMS_object(Dataset):
raise ValueError(
'Expected mask_clouds_conf in only one subsystem. Got %s.' % len(mask_clouds_conf))
self.mask_clouds_confidence = mask_clouds_conf[0] if mask_clouds_conf else None
elif attrname == 'masks':
# self.mask_nodata and self.mask_clouds will already be set here -> so just recreate it from there
self.masks = None
......@@ -1290,6 +1318,11 @@ class GMS_object(Dataset):
'Found multiple database records for the last updated record. sceneid: %s' % self.scene_ID
self.scenes_proc_ID = res[0][0]
# copy logfile to MGRS output directory
if self.arr_shape == 'MGRS_tile':
shutil.copy(self.path_logfile, os.path.join(self.pathGen.get_path_procdata(),
os.path.basename(self.path_logfile)))
if not is_tempfile:
self.log_for_fullArr_or_firstTile('%s data successfully saved.' % self.proc_level)
......@@ -1524,3 +1557,20 @@ def return_GMS_objs_without_arrays(GMS_pipeline):
return returnVal
return wrapped_GMS_pipeline
def GMS_object_2_dataset_dict(GMS_obj):
# type: (GMS_object) -> OrderedDict
return OrderedDict([
('proc_level', GMS_obj.proc_level),
('scene_ID', GMS_obj.scene_ID),
('dataset_ID', GMS_obj.dataset_ID),
('image_type', GMS_obj.image_type),
('satellite', GMS_obj.satellite),
('sensor', GMS_obj.sensor),
('subsystem', GMS_obj.subsystem),
('sensormode', GMS_obj.sensormode),
('acq_datetime', GMS_obj.acq_datetime),
('entity_ID', GMS_obj.entity_ID),
('filename', GMS_obj.filename)
])
......@@ -131,11 +131,8 @@ class process_controller(object):
dict(sceneid=dataset['scene_ID']))
# get the corresponding logfile
path_logfile = path_generator(
dataset).get_path_logfile() # FIXME this always returns the logfile for the subsystem.
# FIXME -> merged logfiles (L2A+) are ignored
# FIXME -> for subsystems the highest start procL is L2A
path_logfile = path_generator(dataset).get_path_logfile(merged_subsystems=False)
path_logfile_merged_ss = path_generator(dataset).get_path_logfile(merged_subsystems=True)
def get_AllWrittenProcL_dueLog(path_log): # TODO replace this by database query + os.path.exists
"""Returns all processing level that have been successfully written according to logfile."""
......@@ -147,7 +144,7 @@ class process_controller(object):
else:
logfile = open(path_log, 'r').read()
AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
if not AllWrittenProcL_dueLog: # AllWrittenProcL_dueLog = []
if not AllWrittenProcL_dueLog and path_logfile == path_logfile_merged_ss: # AllWrittenProcL_dueLog = []
self.logger.info('%s: According to logfile no completely processed data exist at any '
'processing level. Dataset has to be reprocessed.' % dataset['entity_ID'])
else:
......@@ -159,6 +156,9 @@ class process_controller(object):
# get all processing level that have been successfully written
AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
if not AllWrittenProcL:
AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile_merged_ss)
dataset['proc_level'] = None # default (dataset has to be reprocessed)
# loop through all the found proc. levels and find the one that fulfills all requirements
......
......@@ -53,6 +53,7 @@ from gms_preprocessing.algorithms.L2A_P import L2A_object
from gms_preprocessing.algorithms.L2B_P import L2B_object
from gms_preprocessing.algorithms.L2C_P import L2C_object
from gms_preprocessing.misc.database_tools import get_info_from_postgreSQLdb
from gms_preprocessing.model.gms_object import GMS_object_2_dataset_dict
from . import db_host
......@@ -105,41 +106,67 @@ class BaseTestCases:
[cls.validate_db_entry(ds['filename']) for ds in cls.PC.config.data_list]
def check_availability(self, GMS_objs, tgt_procL):
dss = self.PC.add_local_availability([GMS_object_2_dataset_dict(obj) for obj in GMS_objs])
for ds in dss:
self.assertEqual(ds['proc_level'], tgt_procL,
msg='Written %s dataset %s %s %s is not found by PC.add_local_availability.'
% (ds['proc_level'], ds['satellite'], ds['sensor'], ds['subsystem']))
def test_L1A_processing(self):
self.L1A_newObjects = self.PC.L1A_processing()
self.assertIsInstance(self.L1A_newObjects, list)
self.assertNotEqual(len(self.L1A_newObjects), 0, msg='L1A_processing did not output an L1A object.')
self.assertIsInstance(self.L1A_newObjects[0], L1A_object)
# check if PC.add_local_availability finds the written dataset
self.check_availability(self.L1A_newObjects, 'L1A')
def test_L1B_processing(self):
self.L1B_newObjects = self.PC.L1B_processing()
self.assertIsInstance(self.L1B_newObjects, list)
self.assertNotEqual(len(self.L1B_newObjects), 0, msg='L1B_processing did not output an L1B object.')
self.assertIsInstance(self.L1B_newObjects[0], L1B_object)
# check if PC.add_local_availability finds the written dataset
self.check_availability(self.L1B_newObjects, 'L1B')
def test_L1C_processing(self):
self.L1C_newObjects = self.PC.L1C_processing()
self.assertIsInstance(self.L1C_newObjects, list)
self.assertNotEqual(len(self.L1C_newObjects), 0, msg='L1C_processing did not output an L1C object.')
self.assertIsInstance(self.L1C_newObjects[0], L1C_object)
# check if PC.add_local_availability finds the written dataset
self.check_availability(self.L1C_newObjects, 'L1C')
def test_L2A_processing(self):
self.L2A_newObjects = self.PC.L2A_processing()
self.assertIsInstance(self.L2A_newObjects, list)
self.assertNotEqual(len(self.L2A_newObjects), 0, msg='L2A_processing did not output an L2A object.')
self.assertIsInstance(self.L2A_newObjects[0], L2A_object)
# check if PC.add_local_availability finds the written dataset
self.check_availability(self.L2A_newObjects, 'L2A')
def test_L2B_processing(self):
self.L2B_newObjects = self.PC.L2B_processing()
self.assertIsInstance(self.L2B_newObjects, list)
self.assertNotEqual(len(self.L2B_newObjects), 0, msg='L2B_processing did not output an L2B object.')
self.assertIsInstance(self.L2B_newObjects[0], L2B_object)
# check if PC.add_local_availability finds the written dataset
self.check_availability(self.L2B_newObjects, 'L2B')
def test_L2C_processing(self):
self.L2C_newObjects = self.PC.L2C_processing()
self.assertIsInstance(self.L2C_newObjects, list)
self.assertNotEqual(len(self.L2C_newObjects), 0, msg='L2C_processing did not output an L2C object.')
self.assertIsInstance(self.L2C_newObjects[0], L2C_object)
# check if PC.add_local_availability finds the written dataset
# self.check_availability(self.L2C_newObjects, 'L2C') # FIXME fails (not yet working)
# Setting the config.status manually.
# if self.L2C_newObjects:
# self.PC.config.status = "finished"
......@@ -264,12 +291,11 @@ class Test_Sentinel2A_SingleGranuleFormat_CompletePipeline(BaseTestCases.TestCom
"""
@classmethod
def setUpClass(cls):
# job_config_kwargs['CPUs'] = 1
cls.create_job(26186268, job_config_kwargs)
@classmethod
def tearDownClass(cls):
super().tearDownClass()
# @classmethod
# def tearDownClass(cls):
# super().tearDownClass()
# PC = cls.PC
......@@ -312,9 +338,9 @@ class Test_MultipleDatasetsInOneJob_CompletePipeline(BaseTestCases.TestCompleteP
def setUpClass(cls):
cls.create_job(26186273, job_config_kwargs)
@classmethod
def tearDownClass(cls):
super().tearDownClass()
# @classmethod
# def tearDownClass(cls):
# super().tearDownClass()
# PC = cls.PC
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment