Commit c7e3832e authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Revision of algorithm for getting spatial refererence scene during L1B_P

L1B_P:
- COREG:
    - __init__(): changed condition that determines whether coregistration in executed or not
    - major revision of get_reference_image_params(): reference scene is now chosen out of ALL scenes included in the spatial query result (not only the first one); revised selection criteria; impementation now fully based on geopandas
    - calculate_spatial_shifts(): bugfix for not leaving while loop in case no match has been found
    - added class reference_scene_getter(): so far only a prototype that is not used yet
PC:
- changed job starting message
- disabled L2C compression
parent 1776e726
......@@ -537,9 +537,11 @@ class COREG(object):
self.y_shift_map = None # set by self.get_updated_map_info()
self.updated_map_info = None # set by self.get_updated_map_info()
self.ref_available = None
self.success = False # default
if self.coreg_needed:
# POPULATE ATTRIBUTES
self.get_reference_image_params()
......@@ -550,7 +552,7 @@ class COREG(object):
'target dataset ID equals reference dataset ID.')
if self.imref_scene_ID:
if self.coreg_needed and self.ref_available:
"""read input data (from disk or memory)"""
if v: print('reference image:', self.path_imref)
if v: print('shift image: ', self.path_im2shift, '\n')
......@@ -610,55 +612,117 @@ class COREG(object):
timeEnd = add_years(AcqDate,+plusminus_years)
timeEnd = timeEnd if timeEnd <= datetime.now() else datetime.now()
# place query
scenes = SpatialIndexMediator().getFullSceneDataForDataset(boundsLonLat, timeStart, timeEnd, min_cloudcov,
max_cloudcov, usecase.datasetid_spatial_ref,
refDate=AcqDate,maxDaysDelta=plusminus_days)
GDF = GeoDataFrame(scenes, columns=['object'])
GDF['sceneid'] = [*GDF['object'].map(lambda obj: obj.sceneid)]
GDF['acquisitiondate'] = [*GDF['object'].map(lambda obj: obj.acquisitiondate)]
GDF['cloudcover'] = [*GDF['object'].map(lambda obj: obj.cloudcover)]
GDF['polyLonLat'] = [*GDF['object'].map(lambda obj: obj.polyLonLat)]
LonLat2UTM = lambda polyLL: GEOP.reproject_shapelyPoly(polyLL,self.shift_prj)
GDF['polyUTM'] = [*GDF['polyLonLat'].map(LonLat2UTM)]
#get overlap parameter
get_OL_prms = lambda poly: GEOP.get_overlap_polygon(poly, self.footprint_poly)
GDF['overlapParams'] = [*GDF['polyLonLat'].map(get_OL_prms)]
GDF['overlap area'] = [*GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap area'])]
GDF['overlap percentage'] = [*GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap percentage'])]
GDF['overlap poly'] = [*GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap poly'])]
del GDF['overlapParams']
GDF_filt_overlap = GDF[GDF['overlap percentage']>=min_overlap]
GDF_filt_overlap_cloud = GDF_filt_overlap[GDF_filt_overlap['cloudcover']==GDF_filt_overlap['cloudcover'].min()]
GDF_res = GDF_filt_overlap_cloud[GDF_filt_overlap_cloud['overlap percentage']== \
GDF_filt_overlap_cloud['overlap percentage'].max()]
if GDF_res.shape[0]>0:
ref_available = True
ref_scene_record = GDF_res.iloc[0]
self.imref_scene_ID = int(ref_scene_record['sceneid'])
self.imref_footprint_poly = ref_scene_record['polyUTM']
self.overlap_poly = ref_scene_record['overlap poly']
self.overlap_percentage = ref_scene_record['overlap percentage']
self.overlap_area = ref_scene_record['overlap area']
ref_procL = DB_T.get_info_from_postgreSQLdb(job.conn_database,'scenes_proc',['proc_level'],
{'sceneid':self.imref_scene_ID})
if not ref_procL:
raise RuntimeError('The requested spatial reference scene (ID %s) has not been processed yet.'
%self.imref_scene_ID)
assert ref_procL, '%s / %s' %(ref_procL, self.imref_scene_ID)
self.path_imref = PG.path_generator(scene_ID=self.imref_scene_ID,
proc_level=ref_procL[0][0]).get_path_imagedata()
query_res = DB_T.get_info_from_postgreSQLdb(job.conn_database, 'scenes', ['entityid'],
{'id': self.imref_scene_ID}, records2fetch=1)
assert query_res != [], 'No entity-ID found for scene number %s' % self.imref_scene_ID
self.imref_entity_ID = query_res[0][0] # [('LC81510322013152LGN00',)]
temp_logger = HLP_F.setup_logger('log__' + self.baseN, self.path_logfile, append=1)
# place query (retry 10 times in case of timeout)
temp_logger.info('Querying database in order to find a suitable reference scene for co-registration.')
scenes = None
for i in range(10):
try:
SpIM = SpatialIndexMediator(timeout=None)
scenes = SpIM.getFullSceneDataForDataset(boundsLonLat, timeStart, timeEnd, min_cloudcov,
max_cloudcov, usecase.datasetid_spatial_ref, refDate=AcqDate,maxDaysDelta=plusminus_days)
temp_logger.info('Query result: %s reference scenes with matching metadata.' % len(scenes))
break
except socket.timeout:
if i<9:
continue
else:
raise TimeoutError('Spatial query timed out 10 times!')
if scenes:
GDF = GeoDataFrame(scenes, columns=['object'])
GDF['sceneid'] = [*GDF['object'].map(lambda obj: obj.sceneid)]
GDF['acquisitiondate'] = [*GDF['object'].map(lambda obj: obj.acquisitiondate)]
GDF['cloudcover'] = [*GDF['object'].map(lambda obj: obj.cloudcover)]
GDF['polyLonLat'] = [*GDF['object'].map(lambda obj: obj.polyLonLat)]
LonLat2UTM = lambda polyLL: GEOP.reproject_shapelyPoly(polyLL,self.shift_prj)
GDF['polyUTM'] = [*GDF['polyLonLat'].map(LonLat2UTM)]
#get overlap parameter
get_OL_prms = lambda poly: GEOP.get_overlap_polygon(poly, self.footprint_poly)
GDF['overlapParams'] = [*GDF['polyLonLat'].map(get_OL_prms)]
GDF['overlap area'] = [*GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap area'])]
GDF['overlap percentage'] = [*GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap percentage'])]
GDF['overlap poly'] = [*GDF['overlapParams'].map(lambda OL_prms: OL_prms['overlap poly'])]
del GDF['overlapParams']
# filter scenes out that have too less overlap
GDF = GDF[GDF['overlap percentage'] >= min_overlap]
if not GDF.empty:
# get processing level of refernce scenes
query_procL = lambda sceneID: DB_T.get_info_from_postgreSQLdb(job.conn_database,'scenes_proc',
['proc_level'], {'sceneid':sceneID})
GDF['temp_queryRes'] = [*GDF['sceneid'].map(query_procL)]
GDF['proc_level'] = [*GDF['temp_queryRes'].map(lambda queryRes: queryRes[0][0] if queryRes else None)]
del GDF['temp_queryRes']
# filter scenes out that have not yet been processed
GDF = GDF[GDF['proc_level'].notnull()]
if not GDF.empty:
# get path of binary file and check if the corresponding dataset exists
get_path_binary = lambda GDF_row: \
PG.path_generator(scene_ID=GDF_row['sceneid'], proc_level=GDF_row['proc_level']).get_path_imagedata()
check_exists = lambda path: os.path.exists(path)
GDF['path_ref'] = GDF.apply(lambda GDF_row: get_path_binary(GDF_row), axis=1)
GDF['refDs_exists'] = [*GDF['path_ref'].map(check_exists)]
# filter scenes out where the corresponding dataset does not exist on fileserver
GDF = GDF[GDF['refDs_exists'] == True]
if not GDF.empty:
# check if a proper entity ID can be gathered from database
query_eID = lambda sceneID: DB_T.get_info_from_postgreSQLdb(job.conn_database, 'scenes', ['entityid'],
{'id': sceneID}, records2fetch=1)
GDF['temp_queryRes'] = [*GDF['sceneid'] .map(query_eID)]
GDF['entityid'] = [*GDF['temp_queryRes'].map(lambda queryRes: queryRes[0][0] if queryRes else None)]
del GDF['temp_queryRes']
# filter scenes out that have no entity ID (database errors)
GDF = GDF[GDF['refDs_exists'] == True]
if not GDF.empty:
# compare projections of target and reference image
from ..io.Input_reader import read_ENVIhdr_to_dict
get_prj = lambda path_binary: \
read_ENVIhdr_to_dict(os.path.splitext(path_binary)[0]+'.hdr')['coordinate system string']
is_prj_equal = lambda path_binary: GEOP.prj_equal(self.shift_prj, get_prj(path_binary))
GDF['prj_equal'] = [*GDF['path_ref'].map(is_prj_equal)]
# filter scenes out that have a different projection
GDF = GDF[GDF['prj_equal'] == True]
if not GDF.empty:
# choose refence scene with minimum cloud cover and maximum overlap
GDF = GDF[GDF['cloudcover'] == GDF['cloudcover'] .min()]
GDF = GDF[GDF['overlap percentage'] == GDF['overlap percentage'].max()]
if not GDF.empty:
GDF_res = GDF.iloc[0]
self.ref_available = True
self.imref_scene_ID = int(GDF_res['sceneid'])
self.imref_entity_ID = GDF_res['entityid']
self.path_imref = PG.path_generator(scene_ID = self.imref_scene_ID,
proc_level = GDF_res['proc_level']).get_path_imagedata()
self.imref_footprint_poly = GDF_res['polyUTM']
self.overlap_poly = GDF_res['overlap poly']
self.overlap_percentage = GDF_res['overlap percentage']
self.overlap_area = GDF_res['overlap area']
temp_logger.info('Found a suitable reference image for coregistration: scene ID %s (entity ID %s).'
% (self.imref_scene_ID, self.imref_entity_ID))
else:
self.ref_available = False
temp_logger.warning('No scene fulfills all requirements to serve as spatial reference for scene %s '
'(entity ID %s). Coregistration failed.' % (self.scene_ID, self.entity_ID))
def get_reference_image_params_pgSQL(self):
......@@ -1253,28 +1317,31 @@ class COREG(object):
y_intshift.append(y_tempshift)
if not self.q: print('Detected integer shifts (X/Y): %s/%s' %(x_tempshift,y_tempshift))
if count_iter > self.max_iter:
warnings.warn('No match found in the given window. Deshifting of %s has been skipped' %self.entity_ID)
warnings.warn('No match found in the given window. Deshifting of %s has been skipped.' %self.entity_ID)
self.success = False
x_intshift, y_intshift = sum(x_intshift), sum(y_intshift)
x_subshift, y_subshift = self.calc_subpixel_shifts(scps,v=self.v)
x_totalshift, y_totalshift = self.get_total_shifts(x_intshift, y_intshift, x_subshift, y_subshift)
self.x_shift_px, self.y_shift_px = x_totalshift * gsd_factor, y_totalshift * gsd_factor
if not self.q:
print('Detected subpixel shifts (X/Y): %s/%s' %(x_subshift,y_subshift))
print('Calculated total shifts in fft image units (X/Y): %s/%s' %(x_totalshift,y_totalshift))
print('Calculated total shifts in reference image units (X/Y): %s/%s' %(x_totalshift,y_totalshift))
if max([abs(x_totalshift),abs(y_totalshift)]) > self.max_shift:
warnings.warn("The calculated shift (X: %s px / Y: %s px) is recognized as too large to be valid. "
"If you know that it is valid, just set the '-max_shift' parameter to an appropriate value. Otherwise "
"try to use a different window size for matching via the '-ws' parameter or define the spectral bands "
"to be used for matching manually ('-br' and '-bs'). Deshifting of %s has been skipped"
%(self.x_shift_px,self.y_shift_px,self.entity_ID))
self.success = False
self.updated_map_info = self.map_info_to_update
else:
self.success = True
self.get_updated_map_info()
break
if self.success:
x_intshift, y_intshift = sum(x_intshift), sum(y_intshift)
x_subshift, y_subshift = self.calc_subpixel_shifts(scps,v=self.v)
x_totalshift, y_totalshift = self.get_total_shifts(x_intshift, y_intshift, x_subshift, y_subshift)
self.x_shift_px, self.y_shift_px = x_totalshift * gsd_factor, y_totalshift * gsd_factor
if not self.q:
print('Detected subpixel shifts (X/Y): %s/%s' %(x_subshift,y_subshift))
print('Calculated total shifts in fft image units (X/Y): %s/%s' %(x_totalshift,y_totalshift))
print('Calculated total shifts in reference image units (X/Y): %s/%s' %(x_totalshift,y_totalshift))
if max([abs(x_totalshift),abs(y_totalshift)]) > self.max_shift:
warnings.warn("The calculated shift (X: %s px / Y: %s px) is recognized as too large to be valid. "
"If you know that it is valid, just set the '-max_shift' parameter to an appropriate value. Otherwise "
"try to use a different window size for matching via the '-ws' parameter or define the spectral bands "
"to be used for matching manually ('-br' and '-bs'). Deshifting of %s has been skipped"
%(self.x_shift_px,self.y_shift_px,self.entity_ID))
self.success = False
self.updated_map_info = self.map_info_to_update
else:
self.success = True
self.get_updated_map_info()
def get_updated_map_info(self):
......@@ -1292,6 +1359,42 @@ class COREG(object):
if not self.q: print('Updated map info:',self.updated_map_info)
class reference_scene_getter(object):
def __init__(self, src_boundsLonLat, src_AcqDate, src_prj, src_footprint_poly, logger=None, min_overlap=20,
min_cloudcov=0, max_cloudcov=20, plusminus_days=30, plusminus_years=10):
self.boundsLonLat = src_boundsLonLat
self.src_AcqDate = src_AcqDate
self.src_prj = src_prj
self.src_footprint_poly = src_footprint_poly
self.logger = logger
self.min_overlap = min_overlap
self.min_cloudcov = min_cloudcov
self.max_cloudcov = max_cloudcov
self.plusminus_days = plusminus_days
self.plusminus_years = plusminus_years
# get temporal constraints
add_years = lambda dt, years: dt.replace(dt.year + years) \
if not (dt.month==2 and dt.day==29) else dt.replace(dt.year+years,3,1)
self.timeStart = add_years(self.src_AcqDate,-plusminus_years)
timeEnd = add_years(self.src_AcqDate,+plusminus_years)
self.timeEnd = timeEnd if timeEnd <= datetime.now() else datetime.now()
self.possib_ref_scenes = None
def spatial_query(self, timeout=5):
SpIM = SpatialIndexMediator(timeout=timeout)
self.possib_ref_scenes = \
SpIM.getFullSceneDataForDataset(self.boundsLonLat , self.timeStart, self.timeEnd, self.min_cloudcov, self.max_cloudcov,
usecase.datasetid_spatial_ref, refDate=self.src_AcqDate,
maxDaysDelta=self.plusminus_days)
class L1B_object(L1A_object):
def __init__(self, L1A_obj, COREG_obj):
super().__init__(None)
......
......@@ -124,7 +124,7 @@ class job:
if locals()['exec__%s' %i][1]==0:
warnings.warn("If job.exec_mode is set to 'Python' the output writer for %s has to be enabled because "
"any operations on GMS_obj.arr read the intermediate results from disk. Turning it on.." %i)
locals()['exec__%s' %i][1] = 1
locals()['exec__%s' %i][1] = 1 # FIXME
assert os.path.isdir(job.path_archive), "Given archive folder '%s' does not exist. Execution stopped" % job.path_archive
if not os.path.isdir(job.path_job_logs): os.makedirs(job.path_job_logs)
......
......@@ -46,8 +46,8 @@ if isdebugging: #override the existing settings in order to get write access eve
#builtins.GMS_process_ID = 26185255 # 1x L8 Bug 5 corners found -> Grund=Schreibfehler L1A im tiled Python-mode bei mehr als 1 Szene im Job
#builtins.GMS_process_ID = 26185256 # 1x L7 SLC off, Zielsensor L8, spat.ref L8
#builtins.GMS_process_ID = 26185257 # Beta-Job - 219 x L8, 172 x L7, 111 x S2, spatref L8
builtins.GMS_process_ID = 26185258 # Beta-Job - 219 x L8, spatref L8
#builtins.GMS_process_ID = 26185259 # Beta-Job - 172 x L7, spatref L8
#builtins.GMS_process_ID = 26185258 # Beta-Job - 219 x L8, spatref L8
builtins.GMS_process_ID = 26185259 # Beta-Job - 172 x L7, spatref L8
#builtins.GMS_process_ID = 26185260 # Beta-Job - 111 x S2, spatref L8
#builtins.GMS_process_ID = 26185268 # 25x L7 SLC off, Zielsensor L8, spat.ref L8
#builtins.GMS_process_ID = 26185269 # 1x L7 SLC off, Bug SpatialIndexMediator
......@@ -69,8 +69,8 @@ parallelization_level = 'scenes'
assert parallelization_level in ['scenes', 'tiles']
CPUs= job.CPUs
#job.path_procdata_scenes = '/home/danscheff/database/processed_scenes/'
#job.path_procdata_MGRS = '/home/danscheff/database/processed_mgrs_tiles/'
#job.path_procdata_scenes = '/data1/gms/mount/db/data/processed_scenes_dev/'
#job.path_procdata_MGRS = '/data1/gms/mount/db/data/processed_mgrs_tiles_dev/'
t1 = time.time()
if job.profiling:
......@@ -106,7 +106,7 @@ from .algorithms import L2C_P # Level 2C Processor
########################### core functions ####################################
job.logger = HLP_F.setup_logger('log__%s' %job.ID, os.path.join(job.path_job_logs,'%s.log' % job.ID), 0)
job.logger.info('Execution started.')
job.logger.info('Starting job with ID %s...' %job.ID)
@HLP_F.log_uncaught_exceptions
def L0B_L1A_map(data_list_item): #map (scene-wise parallelization)
......@@ -265,7 +265,7 @@ def L2C_map_1(L2B_obj):
L2C_obj = L2C_P.L2C_object(L2B_obj)
if job.exec__L2BP[1]:
L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=10)
[OUT_W.Obj2ENVI(MGRS_tile,compression=True) for MGRS_tile in L2C_MRGS_tiles]
[OUT_W.Obj2ENVI(MGRS_tile,compression=False) for MGRS_tile in L2C_MRGS_tiles]
L2C_obj.delete_tempFiles()
return L2C_obj # contains no array data in Python mode
......@@ -325,6 +325,7 @@ def run_processController_in_multiprocessing(usecase_data_list):
L1B_newObjects = []
if job.exec__L1BP[0]:
# TODO implement check for running spatial index mediator server
# run on full cubes
# get earlier processed L1A data
GMSfile_list_L1A_inDB = INP_R.get_list_GMSfiles(datalist_L1B_P, 'L1A')
......@@ -335,6 +336,12 @@ def run_processController_in_multiprocessing(usecase_data_list):
L1A_Instances = L1A_newObjects + L1A_DBObjects # combine newly and earlier processed L1A data
## add SpatialIndexMediator connection to L1A_Instances # FIXME dirty hack
#from .misc.SpatialIndexMediator import SpatialIndexMediator
#SpIM_connection = SpatialIndexMediator(timeout=5)
#for obj in L1A_Instances:
# obj.SpIM_connection = SpIM_connection
with multiprocessing.Pool() as pool: L1B_resObjects = pool.map(L1B_map_1, L1A_Instances)
L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj,L1B_P.L1B_object)]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment