Commit 5c9644e0 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Further developed recording of memory usage statistics. Deleted deprecated...

Further developed recording of memory usage statistics. Deleted deprecated classes within 'locks' module. Added system overload blocking. Revised default resource limits. Bugfix for not raising exceptions during GMS_pipeline.
parent 09edbfa3
......@@ -9,6 +9,7 @@ import traceback
import warnings
from datetime import datetime
from typing import Union, TYPE_CHECKING # noqa F401 # flake8 issue
from pkg_resources import parse_version
import numpy as np
import pandas as pd
......@@ -1418,8 +1419,8 @@ def archive_exists_on_fileserver(conn_DB, entityID):
return exists
def record_stats_memusage(conn_db, GMS_obj):
# type: (str, GMS_object) -> None
def record_stats_memusage(conn_db, GMS_obj, min_version='0.13.16'):
# type: (str, GMS_object) -> bool
vals2write_dict = dict(
creationtime=datetime.now(),
software_version=CFG.version,
......@@ -1448,7 +1449,31 @@ def record_stats_memusage(conn_db, GMS_obj):
used_mem_l2c=GMS_obj.mem_usage['L2C'],
dims_x_l2a=GMS_obj.arr.cols,
dims_y_l2a=GMS_obj.arr.rows,
is_test=CFG.is_test
is_test=CFG.is_test,
sceneid=GMS_obj.scene_ID
)
create_record_in_postgreSQLdb(conn_db, 'stats_mem_usage_homo', vals2write_dict=vals2write_dict)
# get all existing database records matching the respective config)
vals2get = list(vals2write_dict.keys())
df_existing_recs = pd.DataFrame(
get_info_from_postgreSQLdb(conn_db, 'stats_mem_usage_homo',
vals2return=vals2get,
cond_dict={k: v for k, v in vals2write_dict.items()
if k not in ['creationtime', 'used_mem_l1a', 'used_mem_l1b',
'used_mem_l1c', 'used_mem_l2a', 'used_mem_l2b',
'used_mem_l2c', 'dims_x_l2a', 'dims_x_l2b']}),
columns=vals2get)
# filter the existing records by gms_preprocessing software version number (higher than min_version)
vers = list(df_existing_recs.software_version)
vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(min_version)]
df_existing_recs_usable = df_existing_recs.loc[df_existing_recs.software_version.isin(vers_usable)]
# add memory stats to database
# (but skip if there are already 10 records matching the respective config and software version number
# or if the current scene ID is already among the matching records)
if len(df_existing_recs_usable) < 10 and GMS_obj.scene_ID not in list(df_existing_recs_usable.sceneid):
create_record_in_postgreSQLdb(conn_db, 'stats_mem_usage_homo', vals2write_dict=vals2write_dict)
return True
else:
return False
......@@ -8,7 +8,6 @@ from redis_lock import Lock
from redis.exceptions import ConnectionError as RedisConnectionError
import functools
from psutil import virtual_memory
import timeout_decorator
from ..misc.logging import GMS_logger
from ..options.config import GMS_config as CFG
......@@ -100,203 +99,6 @@ class ProcessLock(SharedResourceLock):
super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
# class MemoryLock(SharedResourceLock):
# def __init__(self, allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
# """
#
# :param allowed_slots:
# :param usage_th: Memory usage threshold above which the memory lock is active (percent).
# :param logger:
# :param kwargs:
# """
# self.usage_th = usage_threshold
# self.blocking_th = blocking_threshold
# self.logger = logger or GMS_logger("RedisLock: '%s'" % self.namespace)
# self.mem_usage = virtual_memory().percent
#
# self._acquired = None
#
# self.check_system_overload()
#
# if self.mem_usage >= usage_threshold:
# self.logger.info('Memory usage is %.1f percent. Acquiring memory lock..' % self.mem_usage)
# self.disabled = False
# else:
# self.logger.debug('Memory usage is %.1f percent. Memory lock not needed -> skipped.' % self.mem_usage)
# self.disabled = True
#
# super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=self.logger,
# disabled=self.disabled, **kwargs)
#
# @timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
# exception_message='Memory lock could not be acquired after waiting 15 minutes '
# 'due to memory overload.')
# def check_system_overload(self):
# logged = False
# while self.mem_usage >= self.blocking_th:
# if not logged:
# self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
# % (self.mem_usage, self.blocking_th))
# logged = True
# time.sleep(5)
# self.mem_usage = virtual_memory().percent
#
# def acquire(self, blocking=True, timeout=None):
# if self.mem_usage > self.usage_th:
# # self._acquired = super(MemoryLock, self).acquire(blocking=blocking, timeout=timeout)
# self._acquired = super(MemoryLock, self).acquire(timeout=timeout)
# else:
# self._acquired = True
#
# return self._acquired
#
# def release(self):
# if self._acquired and not self.disabled:
# super(MemoryLock, self).release()
class MemoryLock(SharedResourceLock):
def __init__(self, mem2lock_gb, allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=None, **kwargs):
"""
:param usage_th: Memory usage threshold above which the memory lock is active (percent).
:param logger:
:param kwargs:
"""
super(MemoryLock, self).__init__(name='MemoryLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
self.mem2lock_gb = mem2lock_gb
self.usage_th = usage_threshold
self.blocking_th = blocking_threshold
self.mem_usage = virtual_memory().percent
@property
def mem_reserved_gb(self):
val = redis_conn.get('GMS_mem_reserved')
if val is not None:
return val
return 0
@property
def usable_memory_gb(self):
return int((virtual_memory().total * self.usage_th / 100 - virtual_memory().used) / 1024**3) \
- int(self.mem_reserved_gb)
@timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
exception_message='Memory lock could not be acquired after waiting 15 minutes '
'due to memory overload.')
def check_system_overload(self):
logged = False
while self.mem_usage >= self.blocking_th:
if not logged:
self.logger.info('Memory usage is %.1f percent. Waiting until memory usage is below %s percent.'
% (self.mem_usage, self.blocking_th))
logged = True
time.sleep(5)
self.mem_usage = virtual_memory().percent
@timeout_decorator.timeout(seconds=15 * 60, timeout_exception=TimeoutError,
exception_message='Memory lock could not be acquired after waiting 15 minutes '
'because not enough memory could reserved.')
def ensure_enough_memory(self):
if not self.disabled:
logged = False
with Lock(self.client, 'GMS_mem_checker'):
while self.mem2lock_gb < self.usable_memory_gb:
if not logged:
self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are usable.'
% (self.usable_memory_gb, self.mem2lock_gb))
logged = True
time.sleep(1)
self.client.incr('GMS_mem_reserved', self.mem2lock_gb)
def acquire(self, blocking=True, timeout=None):
# TODO vielleicht auch als float redis value per increment decrement + lock mit 1 slot
if not self.disabled:
with Lock(self.client, 'GMS_mem_checker'):
self.check_system_overload()
self.ensure_enough_memory()
if self.usage_th < self.mem_usage < self.blocking_th:
self.acquire(blocking=blocking, timeout=timeout)
# # TODO add an acquire lock here
# with MultiSlotLock(name='GMS_mem_acquire_key', allowed_slots=1):
# if self.usable_memory_gb >= self.mem2lock_gb:
# for i in range(self.mem2lock_gb):
# super(MemoryLock, self).acquire(timeout=timeout)
# self.client.incr('GMS_mem_reserved', self.mem2lock_gb)
#
# else:
# time.sleep(1)
# self.acquire(blocking=blocking, timeout=timeout)
def release(self):
if not self.disabled:
# for token in self._local_tokens:
# self.signal(token)
super(MemoryLock, self).release()
self.client.decr('GMS_mem_reserved', self.mem2lock_gb)
def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock.
:param processlock_kwargs: Keywourd arguments to be passed to ProcessLock class.
"""
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with ProcessLock(**processlock_kwargs):
result = func(*args, **kwargs)
return result
return wrapped_func
return decorator
def acquire_mem_lock(**memlock_kwargs):
"""Decorator function for MEMLock.
:param memlock_kwargs: Keywourd arguments to be passed to MemoryLock class.
"""
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with MemoryLock(**memlock_kwargs):
result = func(*args, **kwargs)
return result
return wrapped_func
return decorator
def release_unclosed_locks():
if redis_conn:
for L in [IOLock, ProcessLock]:
lock = L(allowed_slots=1)
lock.release_all_jobID_tokens()
# delete the complete redis namespace if no lock slot is acquired anymore
if lock.client.hlen(lock.grabbed_key) == 0:
lock.delete()
ML = MemoryReserver(1)
ML.release_all_jobID_tokens()
# delete the complete redis namespace if no lock slot is acquired anymore
if ML.client.hlen(ML.grabbed_key) == 0:
ML.delete()
class MemoryReserver(Semaphore):
def __init__(self, mem2lock_gb, max_usage=90, logger=None, **kwargs):
"""
......@@ -387,3 +189,59 @@ class MemoryReserver(Semaphore):
if self.mem_reserved_gb <= 0:
self.client.delete(self.reserved_key)
def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock.
:param processlock_kwargs: Keywourd arguments to be passed to ProcessLock class.
"""
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with ProcessLock(**processlock_kwargs):
result = func(*args, **kwargs)
return result
return wrapped_func
return decorator
def reserve_mem(**memlock_kwargs):
"""Decorator function for MemoryReserver.
:param memlock_kwargs: Keyword arguments to be passed to MemoryReserver class.
"""
def decorator(func):
@functools.wraps(func) # needed to avoid pickling errors
def wrapped_func(*args, **kwargs):
with MemoryReserver(**memlock_kwargs):
result = func(*args, **kwargs)
return result
return wrapped_func
return decorator
def release_unclosed_locks():
if redis_conn:
for L in [IOLock, ProcessLock]:
lock = L(allowed_slots=1)
lock.release_all_jobID_tokens()
# delete the complete redis namespace if no lock slot is acquired anymore
if lock.client.hlen(lock.grabbed_key) == 0:
lock.delete()
ML = MemoryReserver(1)
ML.release_all_jobID_tokens()
# delete the complete redis namespace if no lock slot is acquired anymore
if ML.client.hlen(ML.grabbed_key) == 0:
ML.delete()
# -*- coding: utf-8 -*-
import collections
import copy
import datetime
import functools
......@@ -16,6 +15,8 @@ from collections import OrderedDict
from itertools import chain
from typing import Iterable, List, Union, TYPE_CHECKING # noqa F401 # flake8 issue
import psutil
import time
from pkg_resources import parse_version
import numpy as np
import spectral
......@@ -998,7 +999,7 @@ class GMS_object(object):
attr_val = getattr(GMS_obj.MetaObj, attrN)
if isinstance(attr_val, list):
attrDic_fullLBA.update(dict(zip(GMS_obj.LayerBandsAssignment, attr_val)))
elif isinstance(attr_val, (dict, collections.OrderedDict)):
elif isinstance(attr_val, (dict, OrderedDict)):
attrDic_fullLBA.update(attr_val)
else:
raise ValueError(attrN)
......@@ -1737,7 +1738,7 @@ class GMS_object(object):
elif isinstance(v, GMS_identifier):
dict2write[k] = v.to_odict(include_logger=False)
elif isinstance(v, collections.OrderedDict) or isinstance(v, dict):
elif isinstance(v, OrderedDict) or isinstance(v, dict):
dict2write[k] = dict2write[k].copy()
if 'logger' in v:
if hasattr(dict2write[k]['logger'], 'handlers') and dict2write[k]['logger'].handlers[:]:
......@@ -2086,10 +2087,36 @@ class GMS_object(object):
self.close_loggers()
def record_mem_usage(self):
self.mem_usage[self.proc_level] = round(psutil.Process(os.getpid()).memory_info().rss / 1024**2, 1)
"""Record memory usage of current process ID for the current processing level (megabytes)."""
# get memory usage of current process ID
mem_usage = round(psutil.Process(os.getpid()).memory_info().rss / 1024**2, 1)
# update the value (only if there is not a larger value already because we want to record maximum memory needed)
if self.proc_level not in self.mem_usage or self.mem_usage[self.proc_level] > mem_usage:
self.mem_usage[self.proc_level] = mem_usage
# push recorded memory stats to database
if self.proc_level == 'L2C':
DB_T.record_stats_memusage(CFG.conn_database, self)
stats_pushed = DB_T.record_stats_memusage(CFG.conn_database, self)
if stats_pushed:
self.logger.info('Recorded memory usage statistics.')
def block_at_system_overload(self, max_usage=95, timeout=15*60):
logged = False
t_start = time.time()
while psutil.virtual_memory().percent >= max_usage:
if not logged:
self.logger.warning('Memory usage is %.1f percent. Waiting until memory usage is below %s percent. '
'Timeout: %s minutes.' % (psutil.virtual_memory().percent, max_usage, timeout / 60))
logged = True
if time.time() - t_start > timeout:
self.logger.exception('Processing could not be continued due to memory overload after waiting '
'%s minutes.' % (timeout / 60))
time.sleep(5)
def close_loggers(self):
if self._logger not in [None, 'not set']:
......@@ -2246,7 +2273,7 @@ class failed_GMS_object(GMS_object):
needed_attr = ['proc_level', 'image_type', 'scene_ID', 'entity_ID', 'satellite', 'sensor', 'subsystem',
'arr_shape', 'arr_pos']
if isinstance(GMS_object_or_OrdDict, collections.OrderedDict): # in case of unhandled exception within L1A_map
if isinstance(GMS_object_or_OrdDict, OrderedDict): # in case of unhandled exception within L1A_map
OrdDic = GMS_object_or_OrdDict
[setattr(self, k, OrdDic[k]) for k in needed_attr[:-2]]
self.arr_shape = 'cube' if 'arr_shape' not in OrdDic else OrdDic['arr_shape']
......@@ -2332,10 +2359,6 @@ def return_GMS_objs_without_arrays(GMS_pipeline):
@functools.wraps(GMS_pipeline) # needed to avoid pickling errors
def wrapped_GMS_pipeline(*args, **kwargs):
###################################################################
# prepare results to be back-serialized to multiprocessing master #
###################################################################
def flush_arrays(GMS_obj):
# type: (Union[GMS_object, L1C_object]) -> GMS_object
GMS_obj.flush_array_data()
......@@ -2346,21 +2369,24 @@ def return_GMS_objs_without_arrays(GMS_pipeline):
return GMS_obj
returnVal = None
try:
# NOTE: The GMS pipeline will never raise an exception as long as the exception handler is turned on.
returnVal = GMS_pipeline(*args, **kwargs)
finally:
# flush array data because they are too big to be kept in memory for many GMS_objects
if isinstance(returnVal, (GMS_object, failed_GMS_object)):
returnVal = flush_arrays(returnVal)
elif isinstance(returnVal, Iterable):
returnVal = [flush_arrays(obj) for obj in returnVal]
else: # OrderedDict (dataset)
# the OrderedDict will not contain any arrays
pass
###################################################################
# prepare results to be back-serialized to multiprocessing master #
###################################################################
# NOTE: Exceptions within GMS pipeline will be forwarded to calling function.
# NOTE: Exceptions within GMS mappers are catched by exception handler (if enabled)
returnVal = GMS_pipeline(*args, **kwargs)
# flush array data because they are too big to be kept in memory for many GMS_objects
if isinstance(returnVal, (GMS_object, failed_GMS_object)):
returnVal = flush_arrays(returnVal)
elif isinstance(returnVal, Iterable):
returnVal = [flush_arrays(obj) for obj in returnVal]
else: # OrderedDict (dataset)
# the OrderedDict will not contain any arrays
pass
return returnVal
return returnVal
return wrapped_GMS_pipeline
......@@ -2382,7 +2408,7 @@ def GMS_object_2_dataset_dict(GMS_obj):
])
def estimate_mem_usage(dataset_ID, satellite):
def estimate_mem_usage(dataset_ID, satellite, min_version='0.13.16'):
memcols = ['used_mem_l1a', 'used_mem_l1b', 'used_mem_l1c',
'used_mem_l2a', 'used_mem_l2b', 'used_mem_l2c']
......@@ -2415,10 +2441,12 @@ def estimate_mem_usage(dataset_ID, satellite):
if not df.empty:
df['used_mem_max'] = df[memcols].max(axis=1)
# get records of 3 most recent gms_preprocessing versions
# get records from gms_preprocessing versions higher than min_version
vers = list(df.software_version)
vers.sort(key=lambda s: list(map(int, s.split('.'))))
df_sub = df.loc[df.software_version.isin(vers[-3:])]
# vers.sort(key=lambda s: list(map(int, s.split('.'))))
vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(min_version)]
df_sub = df.loc[df.software_version.isin(vers_usable)]
mem_estim_mb = np.mean(list(df_sub.used_mem_max)) # megabytes
mem_estim_gb = mem_estim_mb / 1024 # gigabytes
......
# -*- coding: utf-8 -*-
from typing import List, Tuple, Generator, Iterable, Union # noqa F401 # flake8 issue
from psutil import virtual_memory
from ..options.config import GMS_config as CFG
from ..misc import exception_handler as EXC_H
......@@ -26,6 +27,7 @@ def L1A_map(dataset_dict): # map (scene-wise parallelization)
# type: (dict) -> L1A_P.L1A_object
L1A_obj = L1A_P.L1A_object(**dataset_dict)
L1A_obj.block_at_system_overload(max_usage=95)
L1A_obj.import_rasterdata()
L1A_obj.import_metadata()
L1A_obj.validate_GeoTransProj_GeoAlign() # sets self.GeoTransProj_ok and self.GeoAlign_ok
......@@ -53,6 +55,7 @@ def L1A_map_1(dataset_dict, block_size=None): # map (scene-wise parallelization
# type: (dict) -> List[L1A_P.L1A_object]
L1A_obj = L1A_P.L1A_object(**dataset_dict)
L1A_obj.block_at_system_overload(max_usage=95)
L1A_obj.import_rasterdata()
L1A_obj.import_metadata()
L1A_obj.validate_GeoTransProj_GeoAlign() # sets self.GeoTransProj_ok and self.GeoAlign_ok
......@@ -99,6 +102,8 @@ def L1B_map(L1A_obj):
"""L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
nur die für die ganze Szene gültigen Metadaten"""
L1A_obj.block_at_system_overload(max_usage=95)
L1B_obj = L1B_P.L1B_object(L1A_obj)
L1B_obj.compute_global_shifts()
......@@ -119,6 +124,7 @@ def L1C_map(L1B_objs):
:param L1B_objs: list containing one or multiple L1B objects belonging to the same scene ID.
"""
list(L1B_objs)[0].block_at_system_overload(max_usage=95)
# initialize L1C objects
L1C_objs = [L1C_P.L1C_object(L1B_obj) for L1B_obj in L1B_objs]
......@@ -163,6 +169,7 @@ def L2A_map(L1C_objs, block_size=None, return_tiles=True):
:param return_tiles: return computed L2A object in tiles
:return: list of L2A_object tiles
"""
L1C_objs[0].block_at_system_overload(max_usage=95)
# initialize L2A objects
L2A_objs = [L2A_P.L2A_object(L1C_obj) for L1C_obj in L1C_objs]
......@@ -208,6 +215,7 @@ def L2A_map(L1C_objs, block_size=None, return_tiles=True):
@update_proc_status
def L2B_map(L2A_obj):
# type: (L2A_P.L2A_object) -> L2B_P.L2B_object
L2A_obj.block_at_system_overload(max_usage=95)
L2B_obj = L2B_P.L2B_object(L2A_obj)
L2B_obj.spectral_homogenization()
if CFG.exec_L2BP[1]:
......@@ -221,7 +229,8 @@ def L2B_map(L2A_obj):
@EXC_H.log_uncaught_exceptions
@update_proc_status
def L2C_map(L2B_obj):
# type: (L2B_P.L2B_object) -> L2C_P.L2C_object
# type: (L2B_P.L2B_object) -> L2C_P.L2C_objec
L2B_obj.block_at_system_overload(max_usage=95)
L2C_obj = L2C_P.L2C_object(L2B_obj)
if CFG.exec_L2CP[1]:
L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=CFG.mgrs_pixel_buffer)
......@@ -246,22 +255,22 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
log_level=CFG.log_level, append=True)
# set CPU and memory limits
cpus2use = CFG.CPUs_all_jobs
cpulimit = CFG.CPUs_all_jobs
mem2reserve = 15
if redis_conn:
mem2reserve = estimate_mem_usage(list_dataset_dicts_per_scene[0]['dataset_ID'],
list_dataset_dicts_per_scene[0]['satellite'])
if not mem2reserve:
pipeline_logger.info('Homogenization has never been run with this config before. Memory usage not is '
'therfore estimatable. Limiting processes to %s in order to get some experience how '
'much memory is needed.' % cpus2use)
cpus2use = 5
mem2reserve = 15
mem_estim = estimate_mem_usage(list_dataset_dicts_per_scene[0]['dataset_ID'],
list_dataset_dicts_per_scene[0]['satellite'])
if mem_estim:
mem2reserve = mem_estim
else:
cpulimit = int((virtual_memory().total * .8 - virtual_memory().used) / 1024 ** 3 / mem2reserve)
pipeline_logger.info('No memory usage statistics from earlier jobs found for the current configuration.'
'Limiting processes to %s in order to collect memory statistics first.' % cpulimit)
# start processing
with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger),\
ProcessLock(allowed_slots=cpus2use, logger=pipeline_logger):
ProcessLock(allowed_slots=cpulimit, logger=pipeline_logger):
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
......@@ -275,7 +284,6 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
L1A_objects = []
if CFG.exec_L1AP[0] and input_proc_level is None:
# with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L1A_objects = \
[L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
......@@ -298,7 +306,6 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]]))
# with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
del L1A_objects
......@@ -318,7 +325,6 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]]))
# with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L1C_objects = L1C_map(L1B_objects)
del L1B_objects
......@@ -339,7 +345,6 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]]))
# with MemoryLock(allowed_slots=1, usage_threshold=80, blocking_threshold=90, logger=pipeline_logger):
L2A_obj = L2A_map(L1C_objects, return_tiles=False)