Commit c588839b authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

moved add_local_availability() to process_controller; changed output file names of image data

L0A_P:
- moved add_local_availability() to process_controller
L1B_P.L1B_object:
- coregister_spatially(): fixed deprecated keyword name due to API changes within CoReg_Sat
misc.path_generator:
- get_path_imagedata(): output file names of image data now end with '_image_data_<procLevel>.bsq'
- get_outPath_hdr(): changed outNameSuffix
processing.process_controller.process_controller:
- get_data_list(): edited docstring
- added add_local_availability from L0A_P
- updated __version__
parent 3cc34718
......@@ -15,7 +15,7 @@ from . import config
from .processing.process_controller import process_controller
__version__ = '20161202.01'
__version__ = '20161202.03'
__author__ = 'Daniel Scheffler'
__all__ = ['algorithms',
'io',
......
......@@ -23,11 +23,8 @@ import psycopg2
import psycopg2.extras
from ..config import GMS_config as CFG
from ..io import Input_reader as INP_R
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import path_generator as PG
from .METADATA import get_LayerBandsAssignment
########################### core functions ####################################
......@@ -167,73 +164,3 @@ def get_sensormode(dataset):
else:
return 'M'
def add_local_availability(dataset):
if CFG.job.call_type == 'webapp':
DB_match = DB_T.get_info_from_postgreSQLdb \
(CFG.job.conn_database,'scenes_proc',['proc_level','layer_bands_assignment'], {'sceneid':dataset['scene_ID']})
else: ## CFG.job.call_type == 'console'
DB_match = DB_T.get_info_from_SQLdb(CFG.job.path_database,'processed_data',['proc_level','LayerBandsAssignment'],
{'image_type':dataset['image_type'],'satellite':dataset['satellite'], 'sensor':dataset['sensor'],
'subsystem':dataset['subsystem'], 'sensormode':dataset['sensormode'], 'entity_ID':dataset['entity_ID']})
path_logfile = PG.path_generator(dataset).get_path_logfile()
def get_AllWrittenProcL_dueLog(path_log):
if not os.path.exists(path_log):
print ("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed." \
% (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
AllWrittenProcL_dueLog = []
else:
logfile = open(path_log, 'r').read()
AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.",logfile,re.I)
if not AllWrittenProcL_dueLog: # AllWrittenProcL_dueLog = []
print ('%s: According to logfile no completely processed data exist at any processing level. ' \
'Dataset has to be reprocessed.' %dataset['entity_ID'])
else:
AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
return AllWrittenProcL_dueLog
if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':
AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
dataset['proc_level'] = None # default (dataset has to be reprocessed)
for ProcL in reversed(AllWrittenProcL):
if dataset['proc_level']: break # proc_level found; no further searching for lower proc_levels
assumed_path_GMS_file = '%s_%s.gms' %(os.path.splitext(path_logfile)[0],ProcL)
if os.path.isfile(assumed_path_GMS_file):
GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
target_LayerBandsAssignment = get_LayerBandsAssignment({'image_type': dataset['image_type'],
'Satellite': dataset['satellite'], 'Sensor': dataset['sensor'],'Subsystem': dataset['subsystem'],
'logger': None}, 1 if dataset['sensormode'] == 'P' else None)
if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:
if DB_match == [] or DB_match == 'database connection fault':
print ('The dataset %s is not included in the database of processed data but according to ' \
'logfile %s has been written successfully. Recreating missing database entry.' \
%(dataset['entity_ID'],ProcL))
DB_T.data_DB_updater(GMS_file_dict)
if CFG.job.call_type == 'console':
DB_T.SQL_DB_to_csv()
dataset['proc_level'] = ProcL
elif len(DB_match) == 1:
try:
print('Found a matching %s dataset for %s. Processing skipped until %s.' \
%(ProcL,dataset['entity_ID'],HLP_F.proc_chain[HLP_F.proc_chain.index(ProcL)+1]))
except IndexError:
print('Found a matching %s dataset for %s. Processing already done.' \
%(ProcL,dataset['entity_ID']))
if DB_match[0][0] == ProcL:
dataset['proc_level'] = DB_match[0][0]
else:
dataset['proc_level'] = ProcL
else:
print('Found a matching dataset for %s but with a different LayerBandsAssignment. ' \
'Dataset has to be reprocessed.' %dataset['entity_ID'])
else:
print('%s for dataset %s has been written due to logfile but no corresponding dataset has been found.' \
%(ProcL,dataset['entity_ID'])+' Searching for lower processing level...'
if AllWrittenProcL.index(ProcL)!=0 else '')
elif len(DB_match) > 1:
print ('According to database there are multiple matches for the dataset %s. Dataset has to be reprocessed.' \
%dataset['entity_ID'])
dataset['proc_level'] = None
else:
dataset['proc_level'] = None
return dataset
......@@ -642,8 +642,8 @@ class L1B_object(L1A_object):
's_b4match' : s_b4match,
'align_grids' : True, # FIXME not needed here
'match_gsd' : True, # FIXME not needed here
'data_corners_im0' : [[x,y] for x,y in self.spatRef_scene.polyUTM.convex_hull.exterior.coords],
'data_corners_im1' : [transform_any_prj(EPSG2WKT(4326),self.meta['coordinate system string'],
'data_corners_ref' : [[x,y] for x,y in self.spatRef_scene.polyUTM.convex_hull.exterior.coords],
'data_corners_tgt' : [transform_any_prj(EPSG2WKT(4326),self.meta['coordinate system string'],
x,y) for x,y in HLP_F.reorder_CornerLonLat(self.trueDataCornerLonLat)],
'nodata' : (HLP_F.get_outFillZeroSaturated(geoArr_ref .dtype)[0],
HLP_F.get_outFillZeroSaturated(geoArr_shift.dtype)[0]),
......
......@@ -106,7 +106,7 @@ class path_generator(object):
def get_path_imagedata(self):
"""Returns the path of the .bsq file belonging to the given processing level, e.g. '/path/to/file/file.bsq'."""
return os.path.join(self.get_path_procdata(),'%s_%s.bsq' %(self.get_baseN(), self.proc_level))
return os.path.join(self.get_path_procdata(),'%s_image_data_%s.bsq' %(self.get_baseN(), self.proc_level))
def get_path_maskdata(self):
"""Returns the path of the *_masks_*.bsq file belonging to the given processing level,
......@@ -129,7 +129,7 @@ class path_generator(object):
# type: (str) -> str
"""Returns the output path for the given attribute to be written.
:param attrName2write: <str> name of the GNS object attribute to be written"""
outNameSuffix = '' if attrName2write=='arr' else attrName2write
outNameSuffix = 'image_data' if attrName2write=='arr' else attrName2write
outNameHdr = '%s_%s_%s.hdr' %(self.get_baseN(),outNameSuffix,self.proc_level) if outNameSuffix else \
'%s_%s.hdr' %(self.get_baseN(),self.proc_level)
return os.path.join(self.get_path_procdata(), outNameHdr)
......
......@@ -8,18 +8,21 @@ import sys
import time
from itertools import chain
import signal
import re
from ..io import Output_writer as OUT_W
from ..io import Input_reader as INP_R
from ..misc import database_tools as DB_T
from ..misc import helper_functions as HLP_F
from ..misc import environment as ENV
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms import L0A_P, L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
from .pipeline import (L0B_L1A_map, L0B_L1A_map_1, L1A_map_2, L1A_map_3, L1B_map_1, L1C_map_1,
L2A_map, L2A_map_1, L2A_map_2, L2B_map_1, L2C_map_1)
from ..config import set_config, GMS_config
from .multiproc import MAP
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger, shutdown_loggers
from ..algorithms.METADATA import get_LayerBandsAssignment
from ..algorithms import L0A_P, L1A_P, L1B_P, L1C_P, L2A_P, L2B_P, L2C_P
from .pipeline import (L0B_L1A_map, L0B_L1A_map_1, L1A_map_2, L1A_map_3, L1B_map_1, L1C_map_1,
L2A_map, L2A_map_1, L2A_map_2, L2B_map_1, L2C_map_1)
from ..config import set_config, GMS_config
from .multiproc import MAP
......@@ -106,6 +109,12 @@ class process_controller(object):
def get_data_list(self):
"""
Get a list of datasets to be processed from database and return it together with some metadata.
:return: <list> of OrderedDicts, e.g. [OrderedDict([('proc_level', None), ('scene_ID', 5895940),
('datasetid', 104), ('image_type', 'RSD'), ('satellite', 'Landsat-8'), ('sensor', 'OLI_TIRS'),
('subsystem', ''), ('acquisition_date', datetime.datetime(2015, 2, 5, 10, 2, 52)),
('entity_ID', 'LC81930242015036LGN00'), ('filename', 'LC81930242015036LGN00.tar.gz'),
('sensormode', 'M'), ('logger', None)]), ...]
"""
# parse cli arguments
if self.call_type == 'console':
......@@ -117,6 +126,89 @@ class process_controller(object):
return self.usecase.data_list
@staticmethod
def add_local_availability(dataset):
# TODO revise this function
if GMS_config.job.call_type == 'webapp':
DB_match = DB_T.get_info_from_postgreSQLdb \
(GMS_config.job.conn_database, 'scenes_proc', ['proc_level', 'layer_bands_assignment'],
{'sceneid': dataset['scene_ID']})
else: ## call_type == 'console'
DB_match = DB_T.get_info_from_SQLdb(GMS_config.job.path_database, 'processed_data',
['proc_level', 'LayerBandsAssignment'],
{'image_type': dataset['image_type'], 'satellite': dataset['satellite'],
'sensor': dataset['sensor'],
'subsystem': dataset['subsystem'], 'sensormode': dataset['sensormode'],
'entity_ID': dataset['entity_ID']})
path_logfile = path_generator(dataset).get_path_logfile()
def get_AllWrittenProcL_dueLog(path_log): # TODO replace this by database query + os.path.exists
if not os.path.exists(path_log):
print("No logfile named '%s' found for %s at %s. Dataset has to be reprocessed." \
% (os.path.basename(path_log), dataset['entity_ID'], os.path.dirname(path_log)))
AllWrittenProcL_dueLog = []
else:
logfile = open(path_log, 'r').read()
AllWrittenProcL_dueLog = re.findall(":*(\S*\s*) data successfully saved.", logfile, re.I)
if not AllWrittenProcL_dueLog: # AllWrittenProcL_dueLog = []
print('%s: According to logfile no completely processed data exist at any processing level. ' \
'Dataset has to be reprocessed.' % dataset['entity_ID'])
else:
AllWrittenProcL_dueLog = HLP_F.sorted_nicely(list(set(AllWrittenProcL_dueLog)))
return AllWrittenProcL_dueLog
if len(DB_match) == 1 or DB_match == [] or DB_match == 'database connection fault':
AllWrittenProcL = get_AllWrittenProcL_dueLog(path_logfile)
dataset['proc_level'] = None # default (dataset has to be reprocessed)
for ProcL in reversed(AllWrittenProcL):
if dataset['proc_level']: break # proc_level found; no further searching for lower proc_levels
assumed_path_GMS_file = '%s_%s.gms' % (os.path.splitext(path_logfile)[0], ProcL)
if os.path.isfile(assumed_path_GMS_file):
GMS_file_dict = INP_R.GMSfile2dict(assumed_path_GMS_file)
target_LayerBandsAssignment = get_LayerBandsAssignment({'image_type': dataset['image_type'],
'Satellite': dataset['satellite'],
'Sensor': dataset['sensor'],
'Subsystem': dataset['subsystem'],
'logger': None},
1 if dataset['sensormode'] == 'P' else None)
if target_LayerBandsAssignment == GMS_file_dict['LayerBandsAssignment']:
if DB_match == [] or DB_match == 'database connection fault':
print('The dataset %s is not included in the database of processed data but according to ' \
'logfile %s has been written successfully. Recreating missing database entry.' \
% (dataset['entity_ID'], ProcL))
DB_T.data_DB_updater(GMS_file_dict)
if GMS_config.job.call_type == 'console':
DB_T.SQL_DB_to_csv()
dataset['proc_level'] = ProcL
elif len(DB_match) == 1:
try:
print('Found a matching %s dataset for %s. Processing skipped until %s.' \
% (
ProcL, dataset['entity_ID'], HLP_F.proc_chain[HLP_F.proc_chain.index(ProcL) + 1]))
except IndexError:
print('Found a matching %s dataset for %s. Processing already done.' \
% (ProcL, dataset['entity_ID']))
if DB_match[0][0] == ProcL:
dataset['proc_level'] = DB_match[0][0]
else:
dataset['proc_level'] = ProcL
else:
print('Found a matching dataset for %s but with a different LayerBandsAssignment. ' \
'Dataset has to be reprocessed.' % dataset['entity_ID'])
else:
print('%s for dataset %s has been written due to logfile but no corresponding dataset has been '
'found.' % (ProcL, dataset['entity_ID']) + ' Searching for lower processing level...'
if AllWrittenProcL.index(ProcL) != 0 else '')
elif len(DB_match) > 1:
print('According to database there are multiple matches for the dataset %s. Dataset has to be reprocessed.'\
% dataset['entity_ID'])
dataset['proc_level'] = None
else:
dataset['proc_level'] = None
return dataset
@staticmethod
def _is_inMEM(GMS_objects, dataset):
# type: (list, list, collections.OrderedDict)
......@@ -224,7 +316,7 @@ class process_controller(object):
self.get_data_list() # sets self.usecase.data_list
# add local availability
self.usecase.data_list = MAP(L0A_P.add_local_availability, self.usecase.data_list)
self.usecase.data_list = MAP(self.add_local_availability, self.usecase.data_list)
self.L1A_processing()
self.L1B_processing()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment