Commit 096bddfd authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

fixed a memory issue during L2C processing; revised process_controller

algorithms.L1A_P.L1A_object:
- get_subset_obj(): minor revisions; removed deprecated code
- to_MGRS_tiles() now returns a generator object instead of a list => Output writer can be used directly after generating each tile
misc.path_generator:
- fixed some PyCharm warnings
processing.process_controller:
- _is_already_present(): added docstring
- added get_DB_objects() -> simplifies L1A-L2C processor functions
- run_all_processors(): KeyBoardInterrupt now caused process controller to shutdown gracefully
- added stop()
- simplified L1A-L2C processor functions
config:
- set_config(): added 'reset' kwarg
- updated __version__
parent db515a3b
......@@ -15,7 +15,7 @@ from . import config
from .processing.process_controller import process_controller
__version__ = '20161201.04'
__version__ = '20161202.01'
__author__ = 'Daniel Scheffler'
__all__ = ['algorithms',
'io',
......
......@@ -1249,7 +1249,7 @@ class L1A_object(object):
self.logger.info(logmsg)
# copy object
sub_GMS_obj = HLP_F.parentObjDict[self.proc_level](*HLP_F.initArgsDict[self.proc_level])
sub_GMS_obj = HLP_F.parentObjDict[self.proc_level](*HLP_F.initArgsDict[self.proc_level]) # init
sub_GMS_obj.__dict__ = {k: getattr(self,k) for k in self.__dict__.keys() if not isinstance(getattr(self,k), np.ndarray)}.copy()
sub_GMS_obj = copy.deepcopy(sub_GMS_obj)
......@@ -1267,19 +1267,20 @@ class L1A_object(object):
# avoid disk IO if requested area is within the input array # TODO
# subset all array attributes and update directly related metadata
for arrname in list_arraynames:
# get input data for array subsetting
geoArr = getattr(self,arrname) if isinstance(getattr(self,arrname),GeoArray) else getattr(self,arrname)
nodata = HLP_F.get_outFillZeroSaturated(geoArr)[0]
meta2update = sub_GMS_obj.meta if arrname == 'arr' else sub_GMS_obj.masks_meta
gt = mapinfo2geotransform(meta2update['map info'])
prj = meta2update['coordinate system string']
meta2update = sub_GMS_obj.meta if arrname == 'arr' else sub_GMS_obj.masks_meta
geoArr = getattr(self,arrname) if isinstance(getattr(self,arrname),GeoArray) else \
GeoArray(getattr(self,arrname))
geoArr.nodata = HLP_F.get_outFillZeroSaturated(geoArr)[0]
geoArr.gt = mapinfo2geotransform(meta2update['map info'])
geoArr.prj = meta2update['coordinate system string']
# get subsetted and (possibly) reprojected array
xmin,xmax,ymin,ymax = mapBounds
rspAlg = 'near' if arrname=='masks' else 'cubic'
sub_arr,sub_gt,sub_prj = geoArr.get_mapPos((xmin,ymin,xmax,ymax), mapBounds_prj,
fillVal=nodata, arr_gt=gt, arr_prj=prj, rspAlg=rspAlg)
sub_arr,sub_gt,sub_prj = geoArr.get_mapPos((xmin,ymin,xmax,ymax), mapBounds_prj, rspAlg=rspAlg)
# show result
if v: GeoArray(sub_arr).show(figsize=(10,10))
......@@ -1313,15 +1314,9 @@ class L1A_object(object):
sub_GMS_obj.bounds_LonLat = HLP_F.corner_coord_to_minmax(sub_GMS_obj.corner_lonlat)
sub_GMS_obj.bounds_utm = HLP_F.corner_coord_to_minmax(sub_GMS_obj.corner_utm)
# calculate data_corners_imXY
if isinstance(sub_GMS_obj.mask_1bit, np.ndarray):
corners_imYX = calc_FullDataset_corner_positions(
sub_GMS_obj.mask_1bit, assert_four_corners=False, algorithm='shapely')
else: # str # FIXME not needed anymore because geoArr.get_mapPos always returns an array?
from ..io.Input_reader import read_mask_subset
subset = ('block', ((rS, rE), (cS, cE)))
mask_1bit = read_mask_subset(sub_GMS_obj.mask_1bit, 'mask_1bit', sub_GMS_obj.logger, subset)
corners_imYX = calc_FullDataset_corner_positions(mask_1bit, assert_four_corners=False, algorithm='shapely')
# calculate data_corners_imXY (mask_1bit is always an array here because get_mapPos always returns an array)
corners_imYX = calc_FullDataset_corner_positions(
sub_GMS_obj.mask_1bit, assert_four_corners=False, algorithm='shapely')
sub_GMS_obj.data_corners_imXY = [(YX[1], YX[0]) for YX in corners_imYX]
# calculate data_corners_LonLat
......@@ -1344,11 +1339,11 @@ class L1A_object(object):
def to_MGRS_tiles(self, pixbuffer=10, v=False):
# type: (int) -> list
"""Returns a list of GMS objects representing the MGRS tiles for the given GMS object.
"""Returns a generator object where items represent the MGRS tiles for the given GMS object.
:param pixbuffer: <int> a buffer in pixel values used to generate an overlap between the returned MGRS tiles
:param v: <bool> verbose mode
:return:
:return: <list> of MGRS_tile objects
"""
assert self.arr_shape == 'cube', "Only 'cube' objects can be cut into MGRS tiles. Got %s." % self.arr_shape
self.logger.info(
......@@ -1366,30 +1361,28 @@ class L1A_object(object):
GDF_MGRS_tiles['MGRStileObj'] = [*GDF_MGRS_tiles['granuleid'] .map(lambda mgrsTileID: MGRS_tile(mgrsTileID))]
GDF_MGRS_tiles['map_bounds_MGRS'] = [*GDF_MGRS_tiles['MGRStileObj'].map(get_arrBounds)] # xmin,xmax,ymin,ymax
# find first tile to log and assign 'logAtThisTile' later
dictIDxminymin = {(b[0] + b[2]): ID for ID, b in
zip(GDF_MGRS_tiles['granuleid'], GDF_MGRS_tiles['map_bounds_MGRS'])}
firstTile_ID = dictIDxminymin[min(dictIDxminymin.keys())]
# ensure self.masks exists (does not exist in Flink mode because this is skipped by self.fill_from_disk() )
if not hasattr(self, 'masks') or self.masks is None:
self.build_combined_masks_array() # creates self.masks and self.masks_meta
#read whole dataset into RAM in order to fasten subsetting
self.arr = GeoArray(self.arr) .to_mem() # FIXME change that if GeoArray implemented into L1A
self.masks = GeoArray(self.masks).to_mem() # to_mem ensures that the whole dataset is present in memory
get_GMSobj_MGRS = lambda GDF_row: \
self.get_subset_obj(mapBounds = GDF_row.map_bounds_MGRS,
mapBounds_prj = GEOP.EPSG2WKT(GDF_row['MGRStileObj'].EPSG),
logmsg = 'Producing MGRS tile %s from scene %s (entity ID %s).'
%(GDF_row.granuleid, self.scene_ID, self.entity_ID),
v = v)
GDF_MGRS_tiles['GMS_obj_MGRS_tile'] = GDF_MGRS_tiles.apply(lambda GDF_row: get_GMSobj_MGRS(GDF_row), axis=1)
# produce data for each MGRS tile in loop
for GDF_idx, GDF_row in GDF_MGRS_tiles.iterrows():
tileObj = self.get_subset_obj(mapBounds = GDF_row.map_bounds_MGRS,
mapBounds_prj = GEOP.EPSG2WKT(GDF_row['MGRStileObj'].EPSG),
logmsg = 'Producing MGRS tile %s from scene %s (entity ID %s).'
%(GDF_row.granuleid, self.scene_ID, self.entity_ID),
v = v)
MGRS_tileID = GDF_row['granuleid']
# set array attributes back to file path if they had been a filePath before
if self.arr .filePath: self.arr = self.arr .filePath # FIXME change that to self.arr .to_disk()
if self.masks.filePath: self.masks = self.masks.filePath # FIXME change that to self.masks.to_disk()
# find first tile to log and assign 'logAtThisTile'
dictIDxminymin = {(b[0] + b[2]): ID for ID, b in
zip(GDF_MGRS_tiles['granuleid'], GDF_MGRS_tiles['map_bounds_MGRS'])}
firstTile_ID = dictIDxminymin[min(dictIDxminymin.keys())]
# add data for each MGRS tile in loop
for MGRS_tileID, tileObj in zip(GDF_MGRS_tiles['granuleid'], GDF_MGRS_tiles['GMS_obj_MGRS_tile']):
# set MGRS info
tileObj.arr_shape = 'MGRS_tile'
tileObj.MGRS_info = {'tile_ID': MGRS_tileID, 'grid1mil': MGRS_tileID[:3], 'grid100k': MGRS_tileID[3:]}
......@@ -1397,9 +1390,11 @@ class L1A_object(object):
# set logAtThisTile
tileObj.logAtThisTile = MGRS_tileID == firstTile_ID
# convert records of the GeoDataFrame into a list of MGRS tiles
GMS_obj_MGRS_tiles = GDF_MGRS_tiles['GMS_obj_MGRS_tile'].values
return GMS_obj_MGRS_tiles
yield tileObj
# set array attributes back to file path if they had been a filePath before
if self.arr .filePath: self.arr = self.arr .filePath # FIXME change that to self.arr .to_disk()
if self.masks.filePath: self.masks = self.masks.filePath # FIXME change that to self.masks.to_disk()
def to_GMS_file(self, path_gms_file=None):
......
......@@ -11,8 +11,8 @@ import numpy as np
import builtins
def set_config(call_type, ID, exec_mode='Python'):
if not hasattr(builtins, 'GMS_job') or not hasattr(builtins, 'GMS_usecase'):
def set_config(call_type, ID, exec_mode='Python', reset=False):
if not hasattr(builtins, 'GMS_job') or not hasattr(builtins, 'GMS_usecase') or reset:
builtins.GMS_job = Job(call_type, ID, exec_mode=exec_mode)
builtins.GMS_usecase = Usecase(getattr(builtins, 'GMS_job'))
......@@ -26,6 +26,7 @@ class GMS_configuration(object):
else:
raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'")
@job.setter
def job(self, job_obj):
assert isinstance(job_obj, Job), \
......@@ -40,6 +41,7 @@ class GMS_configuration(object):
else:
raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'")
@usecase.setter
def usecase(self, usecase_obj):
assert isinstance(usecase_obj, Usecase), \
......@@ -82,10 +84,10 @@ class Job:
self.path_job_logs = absP('./logs/job_logs/')
# processor configuration: [run processor, write output, delete output if not needed anymore]
self.exec__L1AP = [1, 1, 1]
self.exec__L1BP = [1, 1, 1]
self.exec__L1CP = [1, 1, 1]
self.exec__L2AP = [1, 1, 1]
self.exec__L1AP = [1, 0, 0]
self.exec__L1BP = [1, 0, 0]
self.exec__L1CP = [1, 0, 0]
self.exec__L2AP = [1, 0, 0]
self.exec__L2BP = [1, 1, 0]
self.exec__L2CP = [1, 1, 0]
......@@ -95,9 +97,12 @@ class Job:
self.path_tempdir = '/dev/shm/GeoMultiSens/'
# path_procdata = absP('./database/processed_data/')
# path_procdata = '/srv/gms2/scheffler/GeoMultiSens/database/processed_data/'
self.path_procdata = joinP(self.path_fileserver, 'database/processed_scenes%s/' %('_bench' if self.benchmark_global else ''))
self.path_procdata_MGRS = joinP(self.path_fileserver, 'database/processed_mgrs_tiles%s/' %('_bench' if self.benchmark_global else ''))
self.path_database = joinP(self.path_fileserver, 'database/processed_scenes%s/data_DB.db' %('_bench' if self.benchmark_global else ''))
self.path_procdata = joinP(self.path_fileserver, 'database/processed_scenes%s/'
%('_bench' if self.benchmark_global else ''))
self.path_procdata_MGRS = joinP(self.path_fileserver, 'database/processed_mgrs_tiles%s/'
%('_bench' if self.benchmark_global else ''))
self.path_database = joinP(self.path_fileserver, 'database/processed_scenes%s/data_DB.db'
%('_bench' if self.benchmark_global else ''))
# path_database = absP('./database/processed_data/data_DB.db')
# path_db_meta = absP('./database/metadata/')
self.path_db_meta = absP('./database/metadata/metadata_DB.db') # ('geoms.gfz-potsdam.de:5432')
......
......@@ -28,7 +28,7 @@ class path_generator(object):
if not isdict and len(args)==8:
args += (None,) # set logger to None if not given in tuple
elif isdict and 'logger' not in args[0]:
args[0]['logger'] = None # set logger to None if not given in dict
args[0].update({'logger':None}) # set logger to None if not given in dict
argsdict = args[0] if isdict else dict(zip(['proc_level','image_type','satellite','sensor','subsystem',
'acquisition_date','entity_ID','filename','logger'],[*args]))
......@@ -75,12 +75,12 @@ class path_generator(object):
outP = os.path.join(folder_rawdata, self.filename)
else:
extensions_found = [ext for ext in ['.tar.gz','.zip','.hdf'] \
if os.path.exists(os.path.join(folder_rawdata, self.filename+ext))]
if os.path.exists(os.path.join(folder_rawdata, '%s%s'%(self.filename,ext)))]
if extensions_found:
assert len(extensions_found) > 0, 'The dataset %s.* cannot be found at %s' %(self.filename,folder_rawdata)
assert len(extensions_found) == 1, "The folder %s contains multiple files identified as raw data " \
"to be processed. Choosing first one.." %folder_rawdata
outP = os.path.join(folder_rawdata, self.filename+extensions_found[0])
outP = os.path.join(folder_rawdata, '%s%s'%(self.filename,extensions_found[0]))
else:
if self.filename.endswith('.SAFE') and \
os.path.exists(os.path.join(folder_rawdata,os.path.splitext(self.filename)[0])+'.zip'):
......@@ -91,9 +91,9 @@ class path_generator(object):
if self.image_type == 'DGM':
if self.satellite and re.search(self.satellite,'SRTM',re.I):
return os.path.join(CFG.job.path_archive,'srtm2/', self.entity_ID+'_sub.bsq')
return os.path.join(CFG.job.path_archive,'srtm2/', '%s%s'%(self.entity_ID,'_sub.bsq'))
if self.image_type == 'ATM':
return os.path.join(CFG.job.path_archive,'atm_data/', self.entity_ID + '.bsq')
return os.path.join(CFG.job.path_archive,'atm_data/', '%s%s'%(self.entity_ID,'.bsq'))
try: self.logger.critical('Given dataset specification is not yet supported. Specified parameters: ' \
'image_type: %s; satellite: %s; sensor: %s' %(self.image_type,self.satellite,self.sensor))
......
......@@ -7,6 +7,7 @@ import os
import sys
import time
from itertools import chain
import signal
from ..io import Output_writer as OUT_W
from ..io import Input_reader as INP_R
......@@ -59,7 +60,7 @@ class process_controller(object):
self.summary_quick = None
# set GMS configuration
set_config(call_type=call_type, exec_mode=exec_mode, ID=job_ID)
set_config(call_type=call_type, exec_mode=exec_mode, ID=job_ID, reset=True)
self.job = GMS_config.job
self.usecase = GMS_config.usecase
......@@ -123,16 +124,23 @@ class process_controller(object):
:param GMS_objects: <list> a list of GMS objects that has been recently processed
:param dataset: <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
"""
# check if the scene ID of the given dataset is in the scene IDs of the previously processed datasets
return dataset['scene_ID'] in [obj.scene_ID for obj in GMS_objects]
@staticmethod
def _is_already_present(dataset, procLvl):
"""Checks if the given dataset is already available on disk.
:param dataset: <GMS object>
:param procLvl: <str> processing level to be checked
:return: <bool>
"""
return HLP_F.proc_level_already_present(dataset['proc_level'], procLvl)
def _get_processor_data_list(self, procLvl, prevLvl_objects=None):
"""Returns a list of datasets to be processed by a specific processor.
"""Returns a list of datasets that have to be read from disk and then processed by a specific processor.
:param procLvl:
:param prevLvl_objects:
......@@ -143,14 +151,61 @@ class process_controller(object):
if prevLvl_objects is None:
return [dataset for dataset in self.usecase.data_list if not is_already_present(dataset)] # TODO generator?
else:
return [dataset for dataset in self.usecase.data_list if not is_already_present(dataset) and not
self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
return [dataset for dataset in self.usecase.data_list if not is_already_present(dataset) and \
not self._is_inMEM(prevLvl_objects + self.failed_objects, dataset)]
def get_DB_objects(self, procLvl, prevLvl_objects=None, parallLev=None, blocksize=None):
"""
Returns a list of GMS objects for datasets available on disk that have to be processed by the current processor.
:param procLvl: <str> processing level oof the current processor
:param prevLvl_objects: <list> of in-mem GMS objects produced by the previous processor
:param parallLev: <str> parallelization level ('scenes' or 'tiles')
-> defines if full cubes or blocks are to be returned
:param blocksize: <tuple> block size in case blocks are to be returned, e.g. (2000,2000)
:return:
"""
# TODO get prevLvl_objects automatically from self
if procLvl=='L1A':
return []
else:
# handle input parameters
parallLev = parallLev if parallLev else self.parallLev
blocksize = blocksize if blocksize else self.job.tiling_block_size_XY
prevLvl = HLP_F.proc_chain[HLP_F.proc_chain.index(procLvl)-1] # TODO replace by enum
# get GMSfile list
dataset_dicts = self._get_processor_data_list(procLvl, prevLvl_objects)
GMSfile_list_prevLvl_inDB = INP_R.get_list_GMSfiles(dataset_dicts, prevLvl)
# create GMS objects from disk with respect to parallelization level and block size
if parallLev == 'scenes':
# get input parameters for creating GMS objects as full cubes
work = [[GMS, ['cube', None]] for GMS in GMSfile_list_prevLvl_inDB]
else:
# define tile positions and size
get_tilepos_list = lambda GMSfile: HLP_F.get_image_tileborders(
'block', blocksize, shape_fullArr=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'])
# get input parameters for creating GMS objects as blocks
work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_prevLvl_inDB
for tp in get_tilepos_list(GMSfile)]
DB_objs = MAP(HLP_F.parentObjDict[prevLvl](*HLP_F.initArgsDict[prevLvl]).fill_from_disk, work) # init
if DB_objs:
DB_objs = list(chain.from_iterable(DB_objs)) if list in [type(i) for i in DB_objs] else list(DB_objs)
return DB_objs
def run_all_processors(self, custom_data_list=None):
"""
Run all processors at once.
"""
signal.signal(signal.SIGINT, self.stop) # enable clean shutdown possibility
# TODO handle errors
if self.job.profiling:
from pyinstrument import Profiler
......@@ -160,6 +215,8 @@ class process_controller(object):
self.logger.info('Starting job with ID %s (comment: %s)...'
% (self.job.ID, self.DB_job_record.comment))
self.failed_objects = []
# get list of datasets to be processed
if custom_data_list:
self.usecase.data_list = custom_data_list
......@@ -194,6 +251,13 @@ class process_controller(object):
shutdown_loggers()
def stop(self, signum, frame):
del self.logger
shutdown_loggers()
print('Process controller stopped by user.')
raise KeyboardInterrupt
def benchmark(self):
"""
Run a benchmark.
......@@ -226,7 +290,7 @@ class process_controller(object):
L1A_resObjects = MAP(L0B_L1A_map, datalist_L1A_P, CPUs=12)
else: # tiles
L1A_tiles_map1 = MAP(L0B_L1A_map_1, datalist_L1A_P) # map_1
all_L1A_tiles_map1 = chain.from_iterable(L1A_tiles_map1) # merge results to new list of splits
all_L1A_tiles_map1 = chain.from_iterable(L1A_tiles_map1) # merge results to new list of splits
L1A_obj_tiles = MAP(L1A_map_2, all_L1A_tiles_map1) # map_2
grouped_L1A_Tiles = HLP_F.group_objects_by_attributes(
......@@ -251,14 +315,7 @@ class process_controller(object):
# run on full cubes
if self.job.exec__L1BP[0]:
datalist_L1B_P = self._get_processor_data_list('L1B', self.L1A_newObjects)
# get earlier processed L1A data
GMSfile_list_L1A_inDB = INP_R.get_list_GMSfiles(datalist_L1B_P, 'L1A')
work = [[GMS, ['cube', None]] for GMS in GMSfile_list_L1A_inDB]
L1A_DBObjects = MAP(L1A_P.L1A_object(None).fill_from_disk, work)
L1A_DBObjects = list(L1A_DBObjects)
L1A_DBObjects = self.get_DB_objects('L1B', self.L1A_newObjects, parallLev='scenes')
L1A_Instances = self.L1A_newObjects + L1A_DBObjects # combine newly and earlier processed L1A data
L1B_resObjects = MAP(L1B_map_1, L1A_Instances)
......@@ -275,14 +332,8 @@ class process_controller(object):
Run Level 1C processing.
"""
if self.job.exec__L1CP[0]:
datalist_L1C_P = self._get_processor_data_list('L1C', self.L1B_newObjects)
GMSfile_list_L1B_inDB = INP_R.get_list_GMSfiles(datalist_L1C_P, 'L1B')
if self.parallLev == 'scenes':
work = [[GMS, ['cube', None]] for GMS in GMSfile_list_L1B_inDB]
L1B_DBObjects = MAP(L1B_P.L1B_object(None).fill_from_disk, work)
L1B_DBObjects = list(L1B_DBObjects)
L1B_DBObjects = self.get_DB_objects('L1C', self.L1B_newObjects)
L1B_Instances = self.L1B_newObjects + L1B_DBObjects # combine newly and earlier processed L1B data
L1C_resObjects = MAP(L1C_map_1, L1B_Instances)
else: # tiles
......@@ -294,14 +345,8 @@ class process_controller(object):
L1B_newTiles = MAP(HLP_F.cut_GMS_obj_into_blocks, tuples_obj_blocksize)
L1B_newTiles = list(chain.from_iterable(L1B_newTiles))
"""prepare earlier processed L1A data for further processing"""
get_tilepos_list = lambda GMSfile: HLP_F.get_image_tileborders(
'block', blocksize, shape_fullArr=INP_R.GMSfile2dict(GMSfile)['shape_fullArr']) # """defines tile positions and size"""
work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_L1B_inDB
for tp in get_tilepos_list(GMSfile)]
L1B_newDBTiles = MAP(L1B_P.L1B_object(None).fill_from_disk, work)
"""combine newly and earlier processed L1A data"""
"""combine newly and earlier processed L1B data"""
L1B_newDBTiles = self.get_DB_objects('L1C', self.L1B_newObjects, blocksize=blocksize)
L1B_tiles = L1B_newTiles + L1B_newDBTiles
# TODO merge subsets of S2/Aster in order to provide all bands for atm.correction
......@@ -323,13 +368,8 @@ class process_controller(object):
Run Level 2A processing.
"""
if self.job.exec__L2AP[0]:
datalist_L2A_P = self._get_processor_data_list('L2A', self.L1C_newObjects)
# get earlier processed L1C data
GMSfile_list_L1C_inDB = INP_R.get_list_GMSfiles(datalist_L2A_P, 'L1C')
work = [[GMS, ['cube', None]] for GMS in GMSfile_list_L1C_inDB]
L1C_DBObjects = MAP(L1C_P.L1C_object(None).fill_from_disk, work)
L1C_DBObjects = list(L1C_DBObjects)
"""combine newly and earlier processed L1C data"""
L1C_DBObjects = self.get_DB_objects('L2A', self.L1C_newObjects, parallLev='scenes')
L1C_Instances = self.L1C_newObjects + L1C_DBObjects # combine newly and earlier processed L1C data
if self.job.exec_mode == 'Flink':
......@@ -364,38 +404,26 @@ class process_controller(object):
Run Level 2B processing.
"""
if self.job.exec__L2BP[0]:
datalist_L2B_P = self._get_processor_data_list('L2B', self.L2A_tiles)
# get earlier processed L2A data
GMSfile_list_L2A_inDB = INP_R.get_list_GMSfiles(datalist_L2B_P, 'L2A')
if self.parallLev == 'scenes':
# don't know if scenes makes sense in L2B processing because full objects are very big!
work = [[GMS, ['cube', None]] for GMS in GMSfile_list_L2A_inDB]
L2A_DBObjects = MAP(L2A_P.L2A_object(None).fill_from_disk, work)
L2A_DBObjects = list(L2A_DBObjects)
"""if newly processed L2A objects are present: merge them to scenes"""
grouped_L2A_Tiles = HLP_F.group_objects_by_attributes(self.L2A_tiles, 'scene_ID') # group results
# reduce # will be too slow because it has to pickle back really large L2A_newObjects
# L2A_newObjects = MAP(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles)
L2A_newObjects = [HLP_F.merge_GMS_tiles_to_GMS_obj(tileList) for tileList in grouped_L2A_Tiles]
L2A_newObjects = [HLP_F.merge_GMS_tiles_to_GMS_obj(tileList) for tileList in grouped_L2A_Tiles]
"""combine newly and earlier processed L2A data"""
L2A_DBObjects = self.get_DB_objects('L2B', self.L2A_tiles)
L2A_Instances = L2A_newObjects + L2A_DBObjects # combine newly and earlier processed L2A data
L2B_resObjects = MAP(L2B_map_1, L2A_Instances)
else: # tiles
blocksize = (2048, 2048) # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
L2A_newTiles = self.L2A_tiles
"""prepare earlier processed L2A data for further processing"""
# define tile positions and size
get_tilepos_list = lambda GMSfile: HLP_F.get_image_tileborders(
'block', blocksize, shape_fullArr=INP_R.GMSfile2dict(GMSfile)['shape_fullArr'])
work = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_L2A_inDB for tp in
get_tilepos_list(GMSfile)]
L2A_newDBTiles = MAP(L2A_P.L2A_object(None).fill_from_disk, work)
L2A_newTiles = self.L2A_tiles # tiles have the block size specified in L2A_map_2
"""combine newly and earlier processed L2A data"""
blocksize = (2048, 2048) # must be equal to the blocksize of L2A_newTiles specified in L2A_map_2
L2A_newDBTiles = self.get_DB_objects('L2B', self.L2A_tiles, blocksize=blocksize)
L2A_tiles = L2A_newTiles + L2A_newDBTiles
L2B_tiles = MAP(L2B_map_1, L2A_tiles)
......@@ -417,20 +445,14 @@ class process_controller(object):
"""
Run Level 2C processing.
"""
# FIXME only parallelization_level == 'scenes' implemented
if self.job.exec__L2CP[0]:
datalist_L2C_P = self._get_processor_data_list('L2C', self.L2B_newObjects)
# get earlier processed L2B data
GMSfile_list_L2B_inDB = INP_R.get_list_GMSfiles(datalist_L2C_P, 'L2B')
# FIXME only parallelization_level == 'scenes' implemented
work = [[GMS, ['cube', None]] for GMS in GMSfile_list_L2B_inDB]
L2B_DBObjects = MAP(L2B_P.L2B_object(None).fill_from_disk, work)
L2B_DBObjects = list(L2B_DBObjects)
"""combine newly and earlier processed L2A data"""
L2B_DBObjects = self.get_DB_objects('L2C', self.L2B_newObjects, parallLev='scenes')
L2B_Instances = self.L2B_newObjects + L2B_DBObjects # combine newly and earlier processed L2A data
# print('L2B_Instances', L2B_Instances)
L2C_resObjects = MAP(L2C_map_1, L2B_Instances, CPUs=8) # FIXME 8 workers due to heavy IO
L2C_resObjects = MAP(L2C_map_1, L2B_Instances, CPUs=8) # FIXME 8 workers due to heavy IO
# FIXME in Flink mode results are too big to be back-pickled
self.L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
self.failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, HLP_F.failed_GMS_object) and
obj.scene_ID not in self.sceneids_failed]
......@@ -476,6 +498,4 @@ class process_controller(object):
self.L1C_newObjects = []
self.L2A_tiles = []
self.L2B_newObjects = []
self.L2C_newObjects = []
self.L2C_newObjects = []
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment