Commit 428ee490 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

algorithms.gms_object.GMS_Object:

- __getstate__(): added code for deleting array attributes in order to wok around pickle object size limitation
- added deleters for 'arr', 'mask_nodata', 'mask_clouds', 'masks'
- fixed some broken type hints
algorithms.L1B_P.L1B_Object:
- correct_spatial_shifts(): bugfix for missing masks attribute in Flink mode
algorithms.L1C_P.L1C_Object:
- atm_corr(): bugfix for not deleting lonlat_arr
misc.database_tools:
- fixed some broken type hints
misc.helper_functions:
- fixed broken type hint
processing.multiproc:
- MAP(): bugfix for not always returning a list
processing.process_controller:
- _is_inMEM(): fixed broken type hint
- updated __version__
parent d48276ad
......@@ -15,7 +15,7 @@ from . import config
from .processing.process_controller import process_controller
__version__ = '20170104.01'
__version__ = '20170105.01'
__author__ = 'Daniel Scheffler'
__all__ = ['algorithms',
'io',
......
......@@ -702,6 +702,10 @@ class L1B_object(L1A_object):
mapBounds = box(xmin, ymin, xmax, ymax).bounds
# correct shifts and clip to extent
# ensure self.masks exists (does not exist in Flink mode because in that case self.fill_from_disk() is skipped)
if not hasattr(self, 'masks') or self.masks is None:
self.build_combined_masks_array() # creates self.masks and self.masks_meta
for attrname in ['arr', 'masks']:
if self.coreg_needed:
if self.coreg_info['success']:
......@@ -713,9 +717,7 @@ class L1B_object(L1A_object):
self.logger.info("Resampling attribute '%s' to target grid..." % attrname)
# correct shifts
#print(attrname)
geoArr = getattr(self, attrname)
#print('L1B718 geoArr', geoArr)
DS = DESHIFTER(geoArr, self.coreg_info,
target_xyGrid = [CFG.usecase.spatial_ref_gridx, CFG.usecase.spatial_ref_gridy],
cliptoextent = cliptoextent,
......@@ -748,6 +750,8 @@ class L1B_object(L1A_object):
self.masks_meta['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
self.masks_meta['lines'], self.masks_meta['samples'] = DS.arr_shifted.shape[:2]
# NOTE: mask_nodata and mask_clouds are updated later by L2A_map mapper function (module pipeline)
self.resamp_needed = False
self.coreg_needed = False
......
......@@ -135,7 +135,9 @@ class L1C_object(L1B_object):
#SRF_dict = INP_R.SRF_reader(SRF_fold,RSD_md_L1B)
self.arr = self.arr # FIXME implement atmospheric correction
# delete array data that is not needed anymore
self.delete_acquisition_illumination_geometry()
self.lonlat_arr = None
def delete_acquisition_illumination_geometry(self):
self.VZA_arr = None # not needed anymore
......
......@@ -150,6 +150,13 @@ class GMS_object(object):
self.close_GMS_loggers()
del self.pathGen # path generator can only be used for the current processing level
# delete arrays if their in-mem size is to big to be pickled
# => (avoids MaybeEncodingError: Error sending result: '[<GeoMultiSens_dev.algorithms.L2C_P.L2C_object
# object at 0x7fc44f6399e8>]'. Reason: 'error("'i' format requires -2147483648 <= number <= 2147483647",)')
#if self.proc_level=='L2C' and CFG.job.exec_mode=='Flink':
# del self.arr, self.masks
return self.__dict__
......@@ -283,6 +290,11 @@ class GMS_object(object):
self._arr.prj = self.meta_odict['coordinate system string']
@arr.deleter
def arr(self):
self._arr = None
@property
def mask_nodata(self):
if self._mask_nodata is not None:
......@@ -317,11 +329,16 @@ class GMS_object(object):
self._mask_nodata.prj = self.arr.prj
@mask_nodata.deleter
def mask_nodata(self):
self._mask_nodata = None
@property
def mask_clouds(self):
if self._mask_clouds:
if not self._mask_clouds.is_inmem and self._mask_clouds.bands > 1:
# CloudMask object self._mask_clouds points to multi-band image file (bands mask_nodata/mask_clouds)
# CloudMask object self._mask_clouds points to multi-band image file on disk (bands mask_nodata/mask_clouds)
# -> read processes of not needed bands need to be avoided
self._mask_clouds = CloudMask(self._mask_clouds[:,:,1],
geotransform = self._mask_clouds.gt,
......@@ -347,6 +364,11 @@ class GMS_object(object):
self._mask_clouds.prj = self.arr.prj
@mask_clouds.deleter
def mask_clouds(self):
self._mask_clouds = None
@property
def masks(self):
return self._masks
......@@ -354,12 +376,21 @@ class GMS_object(object):
@masks.setter
def masks(self, *geoArr_initArgs, **geoArr_initKwargs):
"""
NOTE: This does not automatically update mask_nodata and mask_clouds BUT if mask_nodata and mask_clouds are
None their getters will automatically synchronize!
"""
self._masks = GeoArray(*geoArr_initArgs, **geoArr_initKwargs)
self._masks.nodata = 0
self._masks.gt = self.arr.gt
self._masks.prj = self.arr.prj
@masks.deleter
def masks(self):
self._masks = None
@property
def pathGen(self):
"""
......@@ -595,7 +626,7 @@ class GMS_object(object):
def apply_nodata_mask_to_ObjAttr(self, attrname, out_nodata_val=None):
# type: (str,int)
# type: (str,int) -> None
"""Applies self.mask_nodata to the specified array attribute by setting all values where mask_nodata is 0 to the
given nodata value.
......@@ -657,7 +688,7 @@ class GMS_object(object):
def apply_nodata_mask_to_saved_ENVIfile(self, path_saved_ENVIhdr, custom_nodata_val=None, update_spec_vals=False):
# type: (str,int,bool)
# type: (str,int,bool) -> None
"""Applies self.mask_nodata to a saved ENVI file with the same X/Y dimensions like self.mask_nodata by setting all
values where mask_nodata is 0 to the given nodata value.
......@@ -683,7 +714,7 @@ class GMS_object(object):
def combine_tiles_to_ObjAttr(self, tiles, target_attr):
# type: (list,str)
# type: (list,str) -> None
"""Combines tiles, e.g. produced by L1A_P.L1A_object.DN2TOARadRefTemp() to a single attribute.
If CFG.usecase.CFG.job.exec_mode == 'Python' the produced attribute is additionally written to disk.
......@@ -722,7 +753,7 @@ class GMS_object(object):
def write_tiles_to_ENVIfile(self, tiles, overwrite=True):
# type: (list,bool)
# type: (list,bool) -> None
"""Writes tiles, e.g. produced by L1A_P.L1A_object.DN2TOARadRefTemp() to a single output ENVI file.
:param tiles: <list> a list of dictionaries with the keys 'desc', 'data', 'row_start','row_end',
......@@ -750,7 +781,7 @@ class GMS_object(object):
def get_subset_obj(self, imBounds=None, mapBounds=None, mapBounds_prj=None, logmsg=None, v=False):
# type: (L1A_object,tuple) -> L1A_object
# type: (tuple) -> self
"""Returns a subset of the given GMS object, based on the given bounds coordinates.
Array attributes are clipped and relevant metadata keys are updated according to new extent.
......@@ -853,7 +884,7 @@ class GMS_object(object):
def to_MGRS_tiles(self, pixbuffer=10, v=False):
# type: (int) -> list
# type: (int) -> self
"""Returns a generator object where items represent the MGRS tiles for the given GMS object.
:param pixbuffer: <int> a buffer in pixel values used to generate an overlap between the returned MGRS tiles
......@@ -880,7 +911,7 @@ class GMS_object(object):
zip(GDF_MGRS_tiles['granuleid'], GDF_MGRS_tiles['map_bounds_MGRS'])}
firstTile_ID = dictIDxminymin[min(dictIDxminymin.keys())]
# ensure self.masks exists (does not exist in Flink mode because this is skipped by self.fill_from_disk() )
# ensure self.masks exists (does not exist in Flink mode because in that case self.fill_from_disk() is skipped)
if not hasattr(self, 'masks') or self.masks is None:
self.build_combined_masks_array() # creates self.masks and self.masks_meta
......@@ -966,7 +997,7 @@ class GMS_object(object):
def to_ENVI(self, write_masks_as_ENVI_classification=True, is_tempfile=False, compression=False):
# type: (object, bool, bool)
# type: (object, bool, bool) -> None
"""Write GMS object to disk. Supports full cubes AND 'block' tiles.
:param self: <object> GMS object, e.g. L1A_P.L1A_object
......
......@@ -531,7 +531,7 @@ class GMS_JOB(object):
"""GeoMultiSens job manager"""
def __init__(self,conn_db):
# type: (str,list,int)
# type: (str) -> None
"""
:param conn_db: <str> the database connection parameters as given by CFG.job.conn_params
"""
......@@ -762,7 +762,7 @@ class GMS_JOB(object):
def create(self):
# type: -> int
# type: () -> int
"""
Add the job to the 'jobs' table of the database
:return: <int> the job ID of the newly created job
......@@ -888,7 +888,7 @@ def delete_processing_results(scene_ID, proc_level='all', force=False):
def add_externally_downloaded_data_to_GMSDB(conn_DB, src_folder, filenames, satellite, sensor):
# type: (str,str,list,str,str)
# type: (str,str,list,str,str) -> None
"""Adds externally downloaded satellite scenes to GMS fileserver AND updates the corresponding postgreSQL records
by adding a filename and setting the processing level to 'DOWNLOADED'.:
......@@ -937,7 +937,7 @@ def add_missing_filenames_in_pgSQLdb(conn_params): # FIXME
def pdDataFrame_to_sql_k(engine, frame, name, if_exists='fail', index=True,
index_label=None, schema=None, chunksize=None, dtype=None, **kwargs):
#type: (Any,pd.DataFrame,str,str,bool,str,str,int,dict,dict)
#type: (Any,pd.DataFrame,str,str,bool,str,str,int,dict,dict) -> None
"""Extends the standard function pandas.io.SQLDatabase.to_sql() with 'kwargs' which allows to set the primary key
of the target table for example. This is usually not possible with the standard to_sql() function.
......@@ -968,7 +968,7 @@ def pdDataFrame_to_sql_k(engine, frame, name, if_exists='fail', index=True,
def import_shapefile_into_postgreSQL_database(path_shp, tablename, cols2import=None, dtype_dic=None,
if_exists='fail', index_label=None, primarykey=None):
# type: (str,str,list,dict,str,bool,str)
# type: (str,str,list,dict,str,bool,str) -> None
"""Imports all features of shapefile into the specified table of the postgreSQL database. Geometry is automatically
converted to postgreSQL geometry data type.
:param path_shp: <str> path of the shapefile to be imported
......@@ -1010,7 +1010,7 @@ def import_shapefile_into_postgreSQL_database(path_shp, tablename, cols2import=N
def data_DB_updater(obj_dict):
# type: (dict)
# type: (dict) -> None
"""Updates the table "scenes_proc" or "mgrs_tiles_proc within a postgreSQL or an SQL database
according to the given dictionary of a GMS object.
......
......@@ -216,7 +216,7 @@ def get_job_summary(list_GMS_objects):
def silentremove(filename):
# type: str
# type: (str) -> None
"""Remove the given file without raising OSError exceptions, e.g. if the file does not exist."""
try:
os.remove(filename)
......
......@@ -6,4 +6,5 @@ import multiprocessing
def MAP(func, *args, CPUs=None):
with multiprocessing.Pool(CPUs) as pool:
results = pool.map(func,*args)
return results
\ No newline at end of file
return results if isinstance(results, list) else [results]
\ No newline at end of file
......@@ -8,6 +8,7 @@ import time
from itertools import chain
import signal
import re
import collections
from ..io import Output_writer as OUT_W
from ..io import Input_reader as INP_R
......@@ -214,7 +215,7 @@ class process_controller(object):
@staticmethod
def _is_inMEM(GMS_objects, dataset):
# type: (list, list, collections.OrderedDict)
# type: (list, collections.OrderedDict) -> bool
"""Checks whether a dataset within a dataset list has been processed in the previous processing level.
:param GMS_objects: <list> a list of GMS objects that has been recently processed
:param dataset: <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment