Commit 18f12712 authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Bugfix for wrong no data value in case of cloud classification masks that have...

Bugfix for wrong no data value in case of cloud classification masks that have no no-data values inside

OUT_W:
- mask_to_ENVI_Classification(): Bugfix for wrong no data value in case of cloud classification masks that have no no-data values inside

DB_T:
removed deprecated import statement

PC:
- deleted deprecated versions of process_controller modules
parent 2116759f
......@@ -149,7 +149,7 @@ def HDR_writer(meta_dic,outpath_hdr,logger=None):
def ASCII_writer(In,path_out_baseN):
warnings.warn(DeprecationWarning())
warnings.warn(DeprecationWarning('Use <GMS_object>.to_GMS_file() instead.'))
assert isinstance(In,dict), 'Input for ASCII writer is expected to be a dictionary. Got %s.' %type(In)
for k,v in list(In.items()):
......@@ -328,6 +328,7 @@ def mask_to_ENVI_Classification(InObj,maskname):
remaining_pixelVals = range(1,len(pixelVals_in_mask)+1)
else:
remaining_pixelVals = range(len(pixelVals_in_mask))
del mask_md['data ignore value']
for in_val,out_val in zip(pixelVals_in_mask,remaining_pixelVals):
classif_array[classif_array==in_val] = out_val
......
......@@ -18,7 +18,6 @@ import shutil
import traceback
from pprint import pprint
import shapely
import geopandas
from shapely.geometry import Polygon,box,MultiPolygon
from geopandas import GeoDataFrame
......
......@@ -84,7 +84,7 @@ assert parallelization_level in ['scenes', 'tiles']
CPUs= job.CPUs
#job.path_procdata_scenes = '/data1/gms/mount/db/data/processed_scenes_dev/'
job.path_procdata_MGRS = '/data1/gms/mount/db/data/processed_mgrs_tiles_dev/'
#job.path_procdata_MGRS = '/data1/gms/mount/db/data/processed_mgrs_tiles_dev/'
t1 = time.time()
if job.profiling:
......
This diff is collapsed.
# -*- coding: utf-8 -*-
###############################################################################
#
# BigData process controller
# only for test purpose
#
###############################################################################
########################### Library import ####################################
from __future__ import (division, print_function, unicode_literals,absolute_import)
import datetime
import inspect
import multiprocessing
import os
import algorithms.L2A_P
import builtins
import dill
import sys
import time
import misc.logging
print('##################################################################################################')
called_from_iPyNb = 1 if 'ipykernel/__main__.py' in sys.argv[0] else 0
# check if process_controller is executed by debugger
isdebugging = 1 if True in [frame[1].endswith("pydevd.py") for frame in inspect.stack()] else False
isdebugging = 1 # FIXME
builtins.GMS_call_type = 'console' if len(sys.argv) < 2 or called_from_iPyNb else 'webapp'
builtins.GMS_process_ID = datetime.datetime.now().strftime("%Y%m%d__%Hh%Mm%Ss") if len(sys.argv) < 2\
or called_from_iPyNb else int(sys.argv[1])
if isdebugging: #override the existing settings in order to get write access everywhere
builtins.GMS_call_type = 'webapp'
#builtins.GMS_process_ID = 26184107 # FIXME
builtins.GMS_process_ID = 26185175 # FIXME # 1x TM5
import config
builtins.GMS_config = config
for i in [attr for attr in dir(config) if attr.startswith('exec__')]:
globals()[i] = getattr(config, i)
job, usecase, GMS_call_type = builtins.GMS_config.job, builtins.GMS_config.usecase, builtins.GMS_config.GMS_call_type
t1 = time.time()
if job.profiling:
from pyinstrument import Profiler
profiler = Profiler() # or Profiler(use_signal=False), see below
profiler.start()
import io.Output_writer as OUT_W # Output_writer
import io.Input_reader as INP_R # Input_reader
import misc.helper_functions as HLP_F # Helper functions
import misc.database_tools as DB_T # Database tools
import algorithms.L0A_P as L0A_P # Level 0A Processor
import algorithms.L0B_P as L0B_P # Level 0B Processor
import algorithms.L1A_P as L1A_P # Level 1A Processor
import algorithms.L1B_P as L1B_P # Level 1B Processor
#import algorithms.L1C_P as L1C_P # Level 1C Processor
#import algorithms.L2A_P as L2A_P # Level 2A Processor
#import algorithms.L2B_P as L2B_P # Level 2B Processor
#import algorithms.L2C_P as L2C_P # Level 2C Processor
#import algorithms.L2D_P as L2D_P # Level 2D Processor
########################### core functions ####################################
job.logger = misc.logging.setup_logger('log__%s' % job.ID, os.path.join(job.path_job_logs, '%s.log' % job.ID), 1, 0)
job.logger.info('Execution started.')
def run_processController_in_multiprocessing(usecase_data_list):
pool = multiprocessing.Pool(job.CPUs)
usecase_data_list = pool.map(L0A_P.add_local_availability, usecase_data_list)
#[print(i) for i in usecase_data_list]
L1A_already_present = lambda dataset: HLP_F.proc_level_already_present(dataset['proc_level'],'L1A')
datalist_newData = [dataset for dataset in usecase_data_list if not L1A_already_present(dataset)]
datalist_inDB = [dataset for dataset in usecase_data_list if L1A_already_present(dataset)]
"""LEVEL 0B PROCESSING"""
L0B_objects = pool.map(L0B_P.L0B_object, datalist_newData)
"""LEVEL 1A PROCESSING"""
L1A_newObjects = []
if job.exec__L1AP[0]:
[print(obj.baseN) for obj in L0B_objects]
t1_init = time.time()
L1A_newObjects = pool.map(L1A_P.L1A_object, L0B_objects)
time_L1A_init = time.time()-t1_init
time_OUT_W_allDs = []
for obj in L1A_newObjects: # (block-wise parallelization)
"""Berechnete Arrays müssen in Python-Implementierung direkt auf Disk geschrieben werden, weil Liste
'L1A_newObjects' sonst im Speicher zu groß wird und die Berechnung mit jeder Szene langsamer wird. Das
lässt sich vielleicht in Flink anders lösen."""
obj.calc_mask_nodata()
obj.apply_nodata_mask_to_ObjAttr('arr')
# map
obj.get_tilepos('block', [1024,1024])
tiles = pool.map(obj.DN2TOARadRefTemp, obj.tile_pos)
obj.add_rasterInfo_to_MetaObj()
#reduce
obj.write_tiles_to_ENVIfile(tiles)
if obj.sensormode == 'M' and not obj.GeoAlign_ok: # if geoalignment has to be fixed # FIXME
# map
nodata_tiles = pool.map(obj.calc_mask_nodata, obj.tile_pos)
obj.combine_tiles_to_ObjAttr(nodata_tiles,'mask_1bit')
obj.apply_nodata_mask_to_saved_ENVIfile(obj.MetaObj.Dataname) # prevents warping artifacts at image borders
obj.reference_data('LonLat') # FIXME UTM?
del obj.mask_1bit
else: # panchromatic data or multispectral data with a proper geoalignment
obj.reference_data('LonLat') # FIXME UTM?
obj.get_tilepos('block', [1024,1024])
nodata_tiles = pool.map(obj.calc_mask_nodata, obj.tile_pos)
cloud_mask_tiles = pool.map(obj.calc_cloud_mask, obj.tile_pos)
obj.combine_tiles_to_ObjAttr(nodata_tiles,'mask_1bit')
obj.combine_tiles_to_ObjAttr(cloud_mask_tiles,'mask_clouds')
obj.calc_corner_positions() # requires mask_1bit
obj.calc_center_AcqTime() # (if neccessary); requires corner positions
obj.calc_mean_VAA()
obj.calc_orbit_overpassParams() # requires corner positions
obj.MetaObj2ODict()
obj.apply_nodata_mask_to_saved_ENVIfile(obj.arr)
obj.apply_nodata_mask_to_ObjAttr('mask_clouds',0)
masks = obj.build_L1A_masks()
obj.write_tiles_to_ENVIfile([masks])
if job.exec__L1AP[1]:
t1_IO = time.time()
OUT_W.Obj2ENVI(obj)
obj.delete_tempFiles()
time_OUT_W_allDs.append(time.time()-t1_IO)
globals()['time_IO'] = time_L1A_init+sum(time_OUT_W_allDs)
# preparation for L1B-P (scene-wise parallelization)
L1B_newObjects = []
if job.exec__L1BP[0]:
# run on full cubes
# get earlier processed L1A data
GMSfile_list_L1A_inDB = INP_R.get_list_GMSfiles([datalist_inDB, 'L1A'])
work = [[GMS,['cube',None]] for GMS in GMSfile_list_L1A_inDB]
L1A_DBObjects = pool.imap(L1A_P.L1A_object(None).fill_from_disk, work)
L1A_DBObjects = list(L1A_DBObjects)
L1A_Instances = L1A_newObjects + L1A_DBObjects # combine newly and earlier processed L1A data
"""LEVEL 1B PROCESSING"""
for L1A_obj in L1A_Instances: # scene-wise processing
# TODO soll später szenenweise parallelisiert laufen
"""L1A_obj enthält in Python- (im Gegensatz zur Flink-) Implementierung KEINE ARRAY-DATEN!,
nur die für die ganze Szene gültigen Metadaten"""
if not HLP_F.proc_level_already_present(L1A_obj.proc_level,'L1B') and not L1A_obj.georef:
"""1. calculate shifts"""
COREG_obj = L1B_P.COREG(L1A_obj.__dict__.copy(), v=0)
if not COREG_obj.success: # FIXME
continue # skip coregistration if L1B_P.COREG returns 0 (e.g. if no reference scene exists)
COREG_obj.calculate_spatial_shifts()
"""2. get L1B object with attribute coreg_info"""
L1B_obj = L1B_P.L1B_object(L1A_obj, COREG_obj)
del L1A_obj
"""3. perform deshifting"""
var='RASTERIO'
#var='GDAL' # FIXME deprecated
if var=='GDAL': # fast in singleprocessing
deshift_configs = algorithms.L2A_P.get_DESHIFTER_configs(L1B_obj.__dict__.copy(),
['arr','masks']) # array-wise parallelization
DESHIFT_instances = [algorithms.L2A_P.DESHIFTER(obj, attr, **kwargs) for obj, attr, kwargs in deshift_configs]
deshift_results = []
for inst in DESHIFT_instances:
deshift_results.append(inst.correct_shifts())
elif var=='RASTERIO': # fast in multiprocessing
deshift_configs = algorithms.L2A_P.get_DESHIFTER_configs(L1B_obj.__dict__.copy(),
['arr','masks'], proc_bandwise=True) # array-wise parallelization
DESHIFT_instances = [algorithms.L2A_P.DESHIFTER(obj, attr, **kwargs) for obj, attr, kwargs in deshift_configs]
deshift_results = pool.map(algorithms.L2A_P.DESHIFTER.correct_shifts, DESHIFT_instances)
else:
raise Exception
grouped_deshift_results = HLP_F.group_dicts_by_key(deshift_results,'attrname')
[L1B_obj.apply_deshift_results(deshift_results) for deshift_results in grouped_deshift_results]
"""write L1B"""
if job.exec__L1BP[1]:
OUT_W.Obj2ENVI(L1B_obj,1)
L1B_P.delete_tempFiles()
else:
print('%s skipped in L1B_P' %L1A_obj.baseN)
L1B_newObjects = L1A_Instances
# preparation for L1C-P (block-wise parallelization)
L1C_newObjects = []
if job.exec__L1CP[0]:
if L1B_newObjects:
"""if newly processed L1B objects are present: cut them into tiles"""
tuple__newL1Bobjects__blocksize = [[obj, [1000, 1000]] for obj in L1B_newObjects]
L1B_newObjects = pool.map(L1A_P.cut_L1A_obj_into_blocks, # FIXME should not be implemented in L1A-P
tuple__newL1Bobjects__blocksize) # returns [[obj,obj,obj]]
L1B_newObjects = L1B_newObjects[0] if isinstance(L1B_newObjects[0], list) else L1B_newObjects
"""prepare earlier processed L1B data for further processing"""
GMSfile_list_L1B_inDB = INP_R.get_list_GMSfiles([datalist_inDB, 'L1B'])
get_tilepos_list = lambda GMSfile: HLP_F.get_image_tileborders('block', [500, 500],
shape_fullArr=INP_R.GMSfile2dict(GMSfile)['shape_fullArr']) # """defines tile positions and size"""
GMSfile_tilepos_list_L1B_inDB = [[GMSfile, ['block', tp]] for GMSfile in GMSfile_list_L1B_inDB \
for i, tp in enumerate(get_tilepos_list(GMSfile))]
if GMSfile_tilepos_list_L1B_inDB:
""" create L1B objects from processed and saved L1B data """
# Instancing a L1B object with argument L0B_object set to 'None' creates an empty L1B object that is
# filled with the data from disk. # FIXME not yet implemented
L1B_DBObjects = pool.imap(L1B_P.L1B_object(None).fill_from_disk, GMSfile_tilepos_list_L1B_inDB)
L1B_DBObjects = list(L1B_DBObjects)
else:
L1B_DBObjects = []
"""combine newly and earlier processed L1B data"""
L1B_Instances = L1B_newObjects + L1B_DBObjects
grouped_L1B_Tiles = HLP_F.group_objects_by_attributes(L1B_Instances, 'baseN')
"""LEVEL 1C PROCESSING"""
"""process all L1B data to fullfill the requirements of L1C processing (atmospheric correction)"""
processed_grouped_L1B_Tiles = []
for scene_tilelist in grouped_L1B_Tiles: # scene-wise processing (grouped by baseN)
L1B_obj = L1B_P.merge_L1B_tiles_to_L1B_obj(scene_tilelist) # FIXME not yet implemented
t1 = time.time()
print (scene_tilelist[0].baseN)
processed_tiles = pool.map(apply_L1C_preparationFuncs_to_tiles,scene_tilelist)
# processed_tiles = pool.imap_unordered(apply_L1A_funcs_to_tiles,scene_tilelist)
print('func applying time: ', time.time()-t1)
if job.export_VZA_SZA_SAA_RAA_stats:
"""optionally export some statistics about illumination and acquisition geometry"""
t2 = time.time()
L1B_obj = L1B_P.merge_L1B_tiles_to_L1B_obj(processed_tiles)
print('tile merging time: ', time.time()-t2)
OUT_W.export_VZA_SZA_SAA_RAA_stats(L1B_obj)
if job.export_L1C_obj_dumps:
"""optionally export dill dump files containing L1A-instances"""
print("dumping a tile...")
for obj in processed_tiles:
if obj.arr_pos[0][0] > 1500 and obj.arr_pos[1][0] > 1500: # FIXME
with open(os.path.join(obj.path_procdata,'%s_%s_%s.pkl'%(obj.baseN,obj.proc_level,
obj.arr_pos)), 'wb') as outFile:
dill.dump(obj,outFile,protocol=2)
break
print("dumping full L1A_obj...")
if 'L1B_obj' not in locals():
t2 = time.time()
L1B_obj = L1B_P.merge_L1B_tiles_to_L1B_obj(processed_tiles)
print('tile merging time: ', time.time()-t2)
with open(os.path.join(L1B_obj.path_procdata,'%s_%s.pkl'
%(L1B_obj.baseN,L1B_obj.proc_level)), 'wb') as outFile:
dill.dump(L1B_obj,outFile,protocol=2)
# dill.dump(L1B_obj,outFile)
with open(os.path.join(L1B_obj.path_procdata,'%s_%s.pkl'
%(L1B_obj.baseN,L1B_obj.proc_level)), 'rb') as validateFile:
loadedDillFile = dill.load(validateFile)
print(L1B_obj.path_procdata)
print (dir(loadedDillFile))
"""atmosperic correction"""
pass # FIXME not yet implemented
L2A_newObjects = []
if job.exec__L2AP[0]:
pass
# und so weiter...
pool.close()
pool.join()
if GMS_call_type == 'console':
DB_T.SQL_DB_to_csv()
def apply_L1C_preparationFuncs_to_tiles(L1B_obj):
"""calculate lonlat array as well as illumination and acquisition geometries"""
subset = [L1B_obj.arr_shape, L1B_obj.arr_pos]
L1B_obj.lonlat_arr = L1B_obj.get_lonlat_coord_array(subset)['data']
L1B_obj.VZA_arr = L1B_obj.calc_VZA_array(subset)['data']
res_SZA, res_SAA = L1B_obj.calc_SZA_SAA_array(subset)
L1B_obj.SZA_arr = res_SZA['data']
L1B_obj.SAA_arr = res_SAA['data']
L1B_obj.RAA_arr = L1B_obj.calc_RAA_array(subset)['data']
return L1B_obj
########################################### MAIN/ARGUMENT PARSER #######################################################
try:
"""LEVEL 0A PROCESSING"""
# parse cli arguments
if GMS_call_type == 'console' :
sys.stderr.write("No scene ids from CLI received. Using old data_list.\n")
usecase.data_list = L0A_P.get_entity_IDs_within_AOI()
else: # webapp
usecase.data_list = L0A_P.get_data_list_of_current_jobID()
if not job.benchmark_global:
run_processController_in_multiprocessing(usecase.data_list)
else:
# BENCHMARKS
for count_datasets in range(len(usecase.data_list)):
t_processing_all_runs, t_IO_all_runs = [], []
for count_run in range(10):
current_data_list = usecase.data_list[0:count_datasets+1]
if os.path.exists(job.path_database): os.remove(job.path_database)
t_start = time.time()
run_processController_in_multiprocessing(current_data_list)
t_processing_all_runs.append(time.time()-t_start)
t_IO_all_runs.append(globals()['time_IO'])
OUT_W.write_global_benchmark_output(t_processing_all_runs,t_IO_all_runs,current_data_list)
except:
job.logger.exception('EXCEPTION!')
raise
job.logger.info('Execution finished.')
t2 = time.time()
job.logger.info('Time for execution: %s' % str(datetime.timedelta(seconds=t2-t1)))
if job.profiling:
profiler.stop()
print(profiler.output_text(unicode=True, color=True))
del job, usecase
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment