Commit 0d17e8bb authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Bugfix for wrong scene counts in job summary; added functions for deleting processed data

 DB_T:
 - added delete_processing_results(): a function for deleting processing results of a given scene ID
 - GMS_JOB: added __delete_procdata, delete_procdata_of_entire_job and delete_procdata_of_failed_sceneIDs
 PC:
 - added proper counting for succeeded and failed objects
 - adjusted maximum count of workers for L1A and L2B processing
 - added some jobs needed for bugfixing
 IPYNB:
 - updated GMS beta usecase ## dev GMS41.ipynb
parent 02c0b1e0
This diff is collapsed.
......@@ -118,6 +118,7 @@ class job:
exec__L2AP = [1, 1, 1]
exec__L2BP = [1, 1, 0]
exec__L2CP = [1, 1, 0]
if exec_mode=='Python':
for i in ['L1AP','L1BP','L1CP','L2AP','L2BP','L2CP']:
if locals()['exec__%s' %i][1]==0:
......
......@@ -15,6 +15,8 @@ import numpy as np
import re
import itertools
import shutil
import traceback
from pprint import pprint
import shapely
import geopandas
......@@ -779,6 +781,69 @@ class GMS_JOB(object):
raise NotImplementedError
def delete_procdata_of_failed_sceneIDs(self,force=False):
"""Deletes all data where processing failed within the current job ID.
:param force:
"""
self.__delete_procdata(self.failed_sceneids,'failed',force=force)
def delete_procdata_of_entire_job(self,force=False):
"""Deletes all scene data processed by the current job ID.
:param force:
"""
self.__delete_procdata(self.sceneids,'processed',force=force)
def __delete_procdata(self, list_sceneIDs, scene_desc, force=False):
"""Applies delete_processing_results on each scene given in list_sceneIDs.
:param list_sceneIDs: <list> a list of scene IDs
:param scene_desc: <str> a description like 'succeeded' or 'failed'
:param force:
"""
if self.exists_in_db:
if list_sceneIDs:
delete = 'J'
if not force:
delete = input("Do you really want to delete the processing results of %s scenes? (J/n)"
%len(list_sceneIDs))
if delete=='J':
[delete_processing_results(ScID,force) for ScID in list_sceneIDs]
else:
warnings.warn('\nAccording to the database the job has no %s scene IDs. Nothing to delete.' %scene_desc)
else:
warnings.warn('The job with the ID %s does not exist in the database. Thus there are no %s scene IDs.' \
%(scene_desc,self.id))
def delete_processing_results(scene_ID, force=False):
"""Deletes the processing results of a given scene ID
:param scene_ID: <int> the scene ID to delete results from
:param force: <bool> force deletion without user interaction
"""
path_procdata = PG.path_generator(scene_ID=scene_ID).get_path_procdata()
if not os.path.isdir(path_procdata):
print('The folder %s does not exist. Nothing to delete.' %path_procdata)
else:
delete = 'J'
if not force:
dir_list = os.listdir(path_procdata)
count_files = len([i for i in dir_list if os.path.isfile(os.path.join(path_procdata,i))])
count_dirs = len([i for i in dir_list if os.path.isdir (os.path.join(path_procdata,i))])
delete = input("Do you really want to delete the folder %s? It contains %s files and %s directories. (J/n)"
%(path_procdata, count_files, count_dirs))
if delete=='J':
try:
shutil.rmtree(path_procdata)
except OSError:
msg ='\nNot all files of scene %s could be deleted properly. Remaining files:\n%s\n\nThe following ' \
'error occurred:\n%s'%(scene_ID, '\n'.join(os.listdir(path_procdata)), traceback.format_exc())
warnings.warn(msg)
def add_externally_downloaded_data_to_GMSDB(conn_DB, src_folder, filenames, satellite, sensor):
# type: (str,str,list,str,str)
......
......@@ -53,7 +53,11 @@ if isdebugging: #override the existing settings in order to get write access eve
#builtins.GMS_process_ID = 26185269 # 1x L7 SLC off, Bug SpatialIndexMediator
#builtins.GMS_process_ID = 26185270 # 5x L7 SLC off, Bug SpatialIndexMediator
#builtins.GMS_process_ID = 26185275 # 1x L8, spat. Ref. L8 Bug L1B_mask not found
builtins.GMS_process_ID = 26185264 # 1x L8, Bug L1B_masks not found
#builtins.GMS_process_ID = 26185264 # 1x L8, Bug L1B_masks not found
#builtins.GMS_process_ID = 26185265 # 1x L8, Bug L2B_masks not found
#builtins.GMS_process_ID = 26185268 # "2x L8, Bug L2B_masks not found, incl. 1x bad archive"
builtins.GMS_process_ID = 26185269 # "10x L8, Bug L2B_masks not found"
from . import config
......@@ -263,7 +267,7 @@ def L2C_map_1(L2B_obj):
L2C_MRGS_tiles = HLP_F.cut_GMS_obj_into_MGRS_tiles(L2C_obj, pixbuffer=10)
[OUT_W.Obj2ENVI(MGRS_tile,compression=False) for MGRS_tile in L2C_MRGS_tiles]
L2C_obj.delete_tempFiles()
return L2C_obj
return L2C_obj # contains no array data in Python mode
def is_inMEM(GMS_objects, dataset):
......@@ -278,7 +282,8 @@ def is_inMEM(GMS_objects, dataset):
def run_processController_in_multiprocessing(usecase_data_list):
with multiprocessing.Pool() as pool: usecase_data_list = pool.map(L0A_P.add_local_availability, usecase_data_list)
failed_objects = []
failed_objects = []
sceneids_failed = []
"""LEVEL 0A-1A PROCESSING"""
L1A_already_present = lambda dataset: HLP_F.proc_level_already_present(dataset['proc_level'], 'L1A')
......@@ -289,7 +294,7 @@ def run_processController_in_multiprocessing(usecase_data_list):
if job.exec__L1AP[0]:
if parallelization_level == 'scenes':
# map
with multiprocessing.Pool(20) as pool:
with multiprocessing.Pool(12) as pool:
L1A_resObjects = pool.map(L0B_L1A_map, datalist_L1A_P)
else: # tiles
with multiprocessing.Pool() as pool:
......@@ -308,7 +313,9 @@ def run_processController_in_multiprocessing(usecase_data_list):
L1A_resObjects = pool.map(L1A_map_3, L1A_objects) # map_3
L1A_newObjects = [obj for obj in L1A_resObjects if isinstance(obj, L1A_P.L1A_object)]
failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, HLP_F.failed_GMS_object)]
failed_objects += [obj for obj in L1A_resObjects if isinstance(obj, HLP_F.failed_GMS_object) and
obj.scene_ID not in sceneids_failed]
sceneids_failed = [obj.scene_ID for obj in failed_objects]
"""LEVEL 1B PROCESSING"""
......@@ -330,8 +337,10 @@ def run_processController_in_multiprocessing(usecase_data_list):
with multiprocessing.Pool() as pool: L1B_resObjects = pool.map(L1B_map_1, L1A_Instances)
L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj,L1B_P.L1B_object)]
failed_objects+= [obj for obj in L1B_resObjects if isinstance(obj,HLP_F.failed_GMS_object)]
L1B_newObjects = [obj for obj in L1B_resObjects if isinstance(obj,L1B_P.L1B_object)]
failed_objects += [obj for obj in L1B_resObjects if isinstance(obj,HLP_F.failed_GMS_object) and
obj.scene_ID not in sceneids_failed]
sceneids_failed = [obj.scene_ID for obj in failed_objects]
"""LEVEL 1C PROCESSING"""
......@@ -381,7 +390,9 @@ def run_processController_in_multiprocessing(usecase_data_list):
L1C_resObjects = pool.map(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L1C_Tiles) # reduce
L1C_newObjects = [obj for obj in L1C_resObjects if isinstance(obj, L1C_P.L1C_object)]
failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, HLP_F.failed_GMS_object)]
failed_objects += [obj for obj in L1C_resObjects if isinstance(obj, HLP_F.failed_GMS_object) and
obj.scene_ID not in sceneids_failed]
sceneids_failed = [obj.scene_ID for obj in failed_objects]
"""LEVEL 2A PROCESSING"""
......@@ -423,7 +434,9 @@ def run_processController_in_multiprocessing(usecase_data_list):
L2A_resTiles = list(chain.from_iterable([pool.map(L2A_map, L1C_Instances)]))
L2A_tiles = [obj for obj in L2A_resTiles if isinstance(obj, L2A_P.L2A_object)]
failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, HLP_F.failed_GMS_object)]
failed_objects += [obj for obj in L2A_resTiles if isinstance(obj, HLP_F.failed_GMS_object) and
obj.scene_ID not in sceneids_failed]
sceneids_failed = [obj.scene_ID for obj in failed_objects]
# print('L2A_tiles',L2A_tiles)
......@@ -449,10 +462,10 @@ def run_processController_in_multiprocessing(usecase_data_list):
# L2A_newObjects = pool.map(HLP_F.merge_GMS_tiles_to_GMS_obj, grouped_L2A_Tiles) # reduce # will be too slow because it has to pickle back really large L2A_newObjects
L2A_newObjects = [HLP_F.merge_GMS_tiles_to_GMS_obj(tileList) for tileList in grouped_L2A_Tiles]
L2A_Instances = L2A_newObjects + L2A_DBObjects # combine newly and earlier processed L2A data
L2A_Instances = L2A_newObjects + L2A_DBObjects # combine newly and earlier processed L2A data
# print('L2A_Instances', L2A_Instances)
with multiprocessing.Pool(8) as pool: #FIXME
with multiprocessing.Pool(15) as pool: #FIXME
L2B_resObjects = pool.map(L2B_map_1, L2A_Instances)
......@@ -483,7 +496,9 @@ def run_processController_in_multiprocessing(usecase_data_list):
L2B_resObjects = [HLP_F.merge_GMS_tiles_to_GMS_obj(tileList) for tileList in grouped_L2B_Tiles]
L2B_newObjects = [obj for obj in L2B_resObjects if isinstance(obj, L2B_P.L2B_object)]
failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, HLP_F.failed_GMS_object)]
failed_objects += [obj for obj in L2B_resObjects if isinstance(obj, HLP_F.failed_GMS_object) and
obj.scene_ID not in sceneids_failed]
sceneids_failed = [obj.scene_ID for obj in failed_objects]
# print('L2B_newObjects',L2B_newObjects)
......@@ -505,22 +520,30 @@ def run_processController_in_multiprocessing(usecase_data_list):
L2B_Instances = L2B_newObjects + L2B_DBObjects # combine newly and earlier processed L2A data
# print('L2B_Instances', L2B_Instances)
with multiprocessing.Pool() as pool: L2C_resObjects = pool.map(L2C_map_1, L2B_Instances)
with multiprocessing.Pool(8) as pool: L2C_resObjects = pool.map(L2C_map_1, L2B_Instances) # FIXME 8 workers due to heavy IO
L2C_newObjects = [obj for obj in L2C_resObjects if isinstance(obj, L2C_P.L2C_object)]
failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, HLP_F.failed_GMS_object)]
failed_objects += [obj for obj in L2C_resObjects if isinstance(obj, HLP_F.failed_GMS_object) and
obj.scene_ID not in sceneids_failed]
sceneids_failed = [obj.scene_ID for obj in failed_objects]
# update 'failed_sceneids' column of job record within jobs table
sceneids_failed = list(set([obj.scene_ID for obj in failed_objects]))
DB_T.update_records_in_postgreSQLdb(job.conn_database, 'jobs',
{'failed_sceneids':sceneids_failed},{'id':job.ID})
# add job finish timestamp
DB_T.update_records_in_postgreSQLdb(job.conn_database, 'jobs', {'finishtime': datetime.datetime.now()},{'id':job.ID})
# get succeeded objects
succeeded_objects = []
for objList in [L2C_newObjects, L2B_newObjects, L2A_tiles, L1C_newObjects, L1B_newObjects, L1A_newObjects]:
if objList:
succeeded_objects = objList
break
"""Create job success summary"""
if L2C_newObjects or failed_objects:
detailed_JS, quick_JS = HLP_F.get_job_summary(L2C_newObjects+failed_objects)
if succeeded_objects or failed_objects:
detailed_JS, quick_JS = HLP_F.get_job_summary(succeeded_objects+failed_objects)
detailed_JS.to_excel(os.path.join(job.path_job_logs,'%s_summary.xlsx' % job.ID))
detailed_JS.to_csv (os.path.join(job.path_job_logs,'%s_summary.csv' % job.ID),sep='\t')
job.logger.info('\nQUICK JOB SUMMARY (ID %s):\n'%job.ID+quick_JS.to_string())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment