Commit d1531591 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added recording of memory usage via new database table 'stats_mem_usage_homo'....

Added recording of memory usage via new database table 'stats_mem_usage_homo'. Allows to intelligently estimation of memory usage.
parent 30ee85fa
......@@ -8,7 +8,7 @@ import sys
import traceback
import warnings
from datetime import datetime
from typing import Union # noqa F401 # flake8 issue
from typing import Union, TYPE_CHECKING # noqa F401 # flake8 issue
import numpy as np
import pandas as pd
......@@ -25,6 +25,9 @@ from ..options.config import GMS_config as CFG
from . import path_generator as PG
from .definition_dicts import proc_chain
if TYPE_CHECKING:
from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)
__author__ = 'Daniel Scheffler'
......@@ -1413,3 +1416,39 @@ def archive_exists_on_fileserver(conn_DB, entityID):
exists = False
return exists
def record_stats_memusage(conn_db, GMS_obj):
# type: (str, GMS_object) -> None
vals2write_dict = dict(
creationtime=datetime.now(),
software_version=CFG.version,
datasetid=GMS_obj.dataset_ID,
virtual_sensor_id=CFG.virtual_sensor_id,
target_gsd=CFG.target_gsd[0], # respects only xgsd
target_nbands=len(CFG.target_CWL),
inmem_serialization=CFG.inmem_serialization,
target_radunit_optical=CFG.target_radunit_optical,
skip_coreg=CFG.skip_coreg,
ac_estimate_accuracy=CFG.ac_estimate_accuracy,
ac_bandwise_accuracy=CFG.ac_bandwise_accuracy,
spathomo_estimate_accuracy=CFG.spathomo_estimate_accuracy,
spechomo_estimate_accuracy=CFG.spechomo_estimate_accuracy,
spechomo_bandwise_accuracy=CFG.spechomo_bandwise_accuracy,
parallelization_level=CFG.parallelization_level,
skip_thermal=CFG.skip_thermal,
skip_pan=CFG.skip_pan,
mgrs_pixel_buffer=CFG.mgrs_pixel_buffer,
cloud_masking_algorithm=CFG.cloud_masking_algorithm[GMS_obj.satellite],
used_mem_l1a=GMS_obj.mem_usage['L1A'],
used_mem_l1b=GMS_obj.mem_usage['L1B'],
used_mem_l1c=GMS_obj.mem_usage['L1C'],
used_mem_l2a=GMS_obj.mem_usage['L2A'],
used_mem_l2b=GMS_obj.mem_usage['L2B'],
used_mem_l2c=GMS_obj.mem_usage['L2C'],
dims_x_l2a=GMS_obj.arr.cols,
dims_y_l2a=GMS_obj.arr.rows,
is_test=CFG.is_test
)
create_record_in_postgreSQLdb(conn_db, 'stats_mem_usage_homo', vals2write_dict=vals2write_dict)
......@@ -22,7 +22,7 @@ except RedisConnectionError:
class MultiSlotLock(Semaphore):
def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs):
self.disabled = redis_conn is None
self.disabled = redis_conn is None or allowed_slots in [None, False]
self.namespace = name
self.allowed_slots = allowed_slots
self.logger = logger or GMS_logger("RedisLock: '%s'" % name)
......@@ -241,16 +241,6 @@ class MemoryLock(SharedResourceLock):
self.client.decr('GMS_mem_reserved', self.mem2lock_gb)
# def ensure_enough_memory(needed_mem_gb, usage_threshold=80):
# if redis_conn:
# usable_memory_gb = int((virtual_memory().total * usage_threshold / 100 - virtual_memory().used) / 1024**3) \
# - int(needed_mem_gb)
#
# with Lock(redis_conn, 'GMS_mem_checker'):
# if usable_memory_gb >= needed_mem_gb:
# redis_conn.
def acquire_process_lock(**processlock_kwargs):
"""Decorator function for ProcessLock.
......@@ -306,8 +296,6 @@ def release_unclosed_locks():
if ML.client.hlen(ML.grabbed_key) == 0:
ML.delete()
# redis_conn.delete('GMS_mem_checker')
class MemoryReserver(Semaphore):
def __init__(self, mem2lock_gb, max_usage=90, logger=None, **kwargs):
......@@ -328,10 +316,7 @@ class MemoryReserver(Semaphore):
@property
def mem_reserved_gb(self):
val = redis_conn.get('GMS_mem_reserved')
if val is not None:
return int(val)
return 0
return int(redis_conn.get('GMS_mem_reserved') or 0)
@property
def usable_memory_gb(self):
......
......@@ -15,6 +15,7 @@ import logging
from collections import OrderedDict
from itertools import chain
from typing import Iterable, List, Union, TYPE_CHECKING # noqa F401 # flake8 issue
import psutil
import numpy as np
import spectral
......@@ -137,6 +138,8 @@ class GMS_object(object):
self.lonlat_arr = None # set by self.write_tiles_to_ENVIfile
self.trueDataCornerUTM = None # set by self.from_tiles
self.mem_usage = {}
# set pathes
self.path_cloud_class_obj = ''
......@@ -2082,6 +2085,12 @@ class GMS_object(object):
# close logger
self.close_loggers()
def record_mem_usage(self):
self.mem_usage[self.proc_level] = round(psutil.Process(os.getpid()).memory_info().rss / 1024**2, 1)
if self.proc_level == 'L2C':
DB_T.record_stats_memusage(CFG.conn_database, self)
def close_loggers(self):
if self._logger not in [None, 'not set']:
self.logger.close() # this runs misc.logging.GMS_logger.close()
......@@ -2195,7 +2204,7 @@ class GMS_object(object):
class GMS_identifier(object):
def __init__(self, image_type, satellite, sensor, subsystem, proc_level, dataset_ID, logger=None):
# type: (str, str, str, str, str, int, logging.Logger) -> None
# type: (str, str, str, str, str, int, DatasetLogger) -> None
self.image_type = image_type
self.satellite = satellite
self.sensor = sensor
......@@ -2371,3 +2380,47 @@ def GMS_object_2_dataset_dict(GMS_obj):
('entity_ID', GMS_obj.entity_ID),
('filename', GMS_obj.filename)
])
def estimate_mem_usage(dataset_ID, satellite):
memcols = ['used_mem_l1a', 'used_mem_l1b', 'used_mem_l1c',
'used_mem_l2a', 'used_mem_l2b', 'used_mem_l2c']
df = DataFrame(DB_T.get_info_from_postgreSQLdb(
CFG.conn_database, 'stats_mem_usage_homo',
vals2return=['software_version'] + memcols,
cond_dict=dict(
datasetid=dataset_ID,
virtual_sensor_id=CFG.virtual_sensor_id,
target_gsd=CFG.target_gsd[0], # respects only xgsd
target_nbands=len(CFG.target_CWL),
inmem_serialization=CFG.inmem_serialization,
target_radunit_optical=CFG.target_radunit_optical,
# skip_coreg=CFG.skip_coreg,
ac_estimate_accuracy=CFG.ac_estimate_accuracy,
ac_bandwise_accuracy=CFG.ac_bandwise_accuracy,
spathomo_estimate_accuracy=CFG.spathomo_estimate_accuracy,
spechomo_estimate_accuracy=CFG.spechomo_estimate_accuracy,
spechomo_bandwise_accuracy=CFG.spechomo_bandwise_accuracy,
# parallelization_level=CFG.parallelization_level,
# skip_thermal=CFG.skip_thermal,
# skip_pan=CFG.skip_pan,
# mgrs_pixel_buffer=CFG.mgrs_pixel_buffer,
# cloud_masking_algorithm=CFG.cloud_masking_algorithm[satellite],
is_test=CFG.is_test
)),
columns=['software_version'] + memcols
)
if not df.empty:
df['used_mem_max'] = df[memcols].max(axis=1)
# get records of 3 most recent gms_preprocessing versions
vers = list(df.software_version)
vers.sort(key=lambda s: list(map(int, s.split('.'))))
df_sub = df.loc[df.software_version.isin(vers[-3:])]
mem_estim_mb = np.mean(list(df_sub.used_mem_max)) # megabytes
mem_estim_gb = mem_estim_mb / 1024 # gigabytes
return int(np.ceil(mem_estim_gb + .1 * mem_estim_gb))
......@@ -6,14 +6,14 @@ from ..options.config import GMS_config as CFG
from ..misc import exception_handler as EXC_H
from ..misc.path_generator import path_generator
from ..misc.logging import GMS_logger
from ..misc.locks import ProcessLock, MemoryReserver
from ..misc.locks import ProcessLock, MemoryReserver, redis_conn
from ..algorithms import L1A_P
from ..algorithms import L1B_P
from ..algorithms import L1C_P
from ..algorithms import L2A_P
from ..algorithms import L2B_P
from ..algorithms import L2C_P
from ..model.gms_object import failed_GMS_object, update_proc_status, return_GMS_objs_without_arrays
from ..model.gms_object import failed_GMS_object, update_proc_status, return_GMS_objs_without_arrays, estimate_mem_usage
from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
from ..algorithms.geoprocessing import get_common_extent
......@@ -41,6 +41,7 @@ def L1A_map(dataset_dict): # map (scene-wise parallelization)
if CFG.exec_L1AP[1]:
L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
L1A_obj.record_mem_usage()
L1A_obj.delete_tempFiles()
return L1A_obj
......@@ -87,6 +88,7 @@ def L1A_map_3(L1A_obj): # map (scene-wise parallelization)
L1A_obj.delete_tempFiles()
else:
L1A_obj.delete_tempFiles()
L1A_obj.record_mem_usage()
return L1A_obj
......@@ -102,6 +104,7 @@ def L1B_map(L1A_obj):
if CFG.exec_L1BP[1]:
L1B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
L1B_obj.record_mem_usage()
L1B_obj.delete_tempFiles()
return L1B_obj
......@@ -141,6 +144,7 @@ def L1C_map(L1B_objs):
L1C_obj.delete_tempFiles()
L1C_obj.delete_ac_input_arrays()
[L1C_obj.record_mem_usage() for L1C_obj in L1C_objs]
return L1C_objs
......@@ -193,8 +197,10 @@ def L2A_map(L1C_objs, block_size=None, return_tiles=True):
if return_tiles:
L2A_tiles = L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY)
[L2A_tile.record_mem_usage() for L2A_tile in L2A_tiles]
return list(L2A_tiles)
else:
L2A_obj.record_mem_usage()
return L2A_obj
......@@ -208,6 +214,7 @@ def L2B_map(L2A_obj):
L2B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
if L2B_obj.arr_shape == 'cube':
L2B_obj.delete_tempFiles()
L2B_obj.record_mem_usage()
return L2B_obj
......@@ -220,6 +227,7 @@ def L2C_map(L2B_obj):
L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=CFG.mgrs_pixel_buffer)
[MGRS_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask,
compression=CFG.output_data_compression) for MGRS_tile in L2C_MRGS_tiles]
L2C_obj.record_mem_usage()
L2C_obj.delete_tempFiles()
return L2C_obj # contains no array data in Python mode
......@@ -237,8 +245,22 @@ def run_complete_preprocessing(list_dataset_dicts_per_scene): # map (scene-wise
pipeline_logger = GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
log_level=CFG.log_level, append=True)
with MemoryReserver(mem2lock_gb=15, logger=pipeline_logger),\
ProcessLock(allowed_slots=CFG.CPUs_all_jobs, logger=pipeline_logger):
# set CPU and memory limits
cpus2use = CFG.CPUs_all_jobs
mem2reserve = 15
if redis_conn:
mem2reserve = estimate_mem_usage(list_dataset_dicts_per_scene[0]['dataset_ID'],
list_dataset_dicts_per_scene[0]['satellite'])
if not mem2reserve:
pipeline_logger.info('Homogenization has never been run with this config before. Memory usage not is '
'therfore estimatable. Limiting processes to %s in order to get some experience how '
'much memory is needed.' % cpus2use)
cpus2use = 5
# start processing
with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger),\
ProcessLock(allowed_slots=cpus2use, logger=pipeline_logger):
if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
......
......@@ -455,7 +455,7 @@ testdata.append('MultipleDatasetsInOneJob')
summary_testResults, summary_errors, summary_failures, summary_skipped, jobstatus = [[] for _ in range(5)]
# @unittest.SkipTest
@unittest.SkipTest
class Test_in_normal_mode(unittest.TestCase):
def setUp(self):
# self.job_id = 26184107
......@@ -515,17 +515,17 @@ class Test_in_normal_mode(unittest.TestCase):
# self.job_id = 26187053 # GMS41: AC: The input 'list_GMS_objs' contains duplicates: ['', '']
# self.job_id = 26187750 # GEOMS: [AC]: RuntimeWarning: All-NaN slice encountered
# self.job_id = 26187760 # GEOMS: [L2C]: ValueError: 'axis' entry is out of bounds
# self.job_id = 26187804 # GEOMS: Spatial homogenization leaves resampling artifacs at the image edges.
self.job_id = 26187922 # GEOMS: AssertionError (self.job_id = 26187922 # GEOMS: AssertionError)
self.job_id = 26187804 # GEOMS: Spatial homogenization leaves resampling artifacs at the image edges.
# self.job_id = 26187922 # GEOMS: AssertionError (self.job_id = 26187922 # GEOMS: AssertionError)
self.PC = process_controller(self.job_id, **dict(is_test=False, parallelization_level='scenes', db_host=db_host,
delete_old_output=True, disable_exception_handler=True))
# self.PC.config.spathomo_estimate_accuracy = True
# self.PC.config.ac_estimate_accuracy = True
# self.PC.config.spechomo_estimate_accuracy = True
self.PC.config.exec_L1CP = [1, 1, 0]
self.PC.config.exec_2ACP = [1, 1, 0]
self.PC.config.path_procdata_scenes = '/storage/gms/processed_scenes/20180227_MGRS33UUU_S2_L8_L7/'
# self.PC.config.exec_L1CP = [1, 1, 0]
# self.PC.config.exec_2ACP = [1, 1, 0]
# self.PC.config.path_procdata_scenes = '/storage/gms/processed_scenes/20180227_MGRS33UUU_S2_L8_L7/'
def test(self):
self.PC.run_all_processors()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment