Commit c78dff7a authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Test implementation for expiring ACQUISITION key for MemoryReserver.

Moved computation of media ac_errors for datasets with multiple subsystems from L2C to L2A to avoid memory overflows in L2B or L2C.
parent ea441106
......@@ -152,7 +152,7 @@ class L2B_object(L2A_object):
"number..")
outarr = interp1d(np.array(src_cwls), self.ac_errors,
axis=2, kind='linear', fill_value='extrapolate')(tgt_cwl)
self.ac_errors = outarr.astype(np.int16)
self.ac_errors = outarr.astype(self.ac_errors.dtype)
class SpectralHomogenizer(object):
......
......@@ -82,17 +82,15 @@ class AccuracyCube(GeoArray):
def generate_array(self):
err_layers = self.layers.copy() # copy OrdDict, otherwise attributes of GMS_object are overwritten
# handle CFG.ac_bandwise_accuracy (average ac_errors if needed)
if 'ac_errors' in err_layers and not CFG.ac_bandwise_accuracy and err_layers['ac_errors'].bands > 1:
err_layers['ac_errors'].arr = \
np.median(err_layers['ac_errors'], axis=2).astype(err_layers['ac_errors'].dtype)
# handle CFG.spechomo_bandwise_accuracy (average spec_homo_errors if needed)
if 'spec_homo_errors' in err_layers and not CFG.spechomo_bandwise_accuracy and \
err_layers['spec_homo_errors'].bands > 1:
err_layers['spec_homo_errors'].arr = \
np.median(err_layers['spec_homo_errors'], axis=2).astype(err_layers['spec_homo_errors'].dtype)
# validate dimensionality of ac_errors array
if 'ac_errors' in err_layers and not CFG.ac_bandwise_accuracy:
assert err_layers['ac_errors'].ndim == 2, "Received a 3D 'ac_errors' array although CFG.ac_bandwise " \
"accuracy is False."
# # validate dimensionality of spec_homo_errors array
if 'spec_homo_errors' in err_layers and not CFG.spechomo_bandwise_accuracy:
assert err_layers['spec_homo_errors'].ndim == 2, "Received a 3D 'spec_homo_errors' array although " \
"CFG.spechomo_bandwise_accuracy is False."
# stack all accuracy layers together
accArr = np.dstack(err_layers.values()).astype('int16')
......
......@@ -142,7 +142,11 @@ class MemoryReserver(Semaphore):
def reserved_key_jobID(self):
return self._get_and_set_key('_reserved_key_jobID', 'MEM_RESERVED_BY_GMSJOB_%s' % CFG.ID)
def acquire(self, timeout=0, target=None):
@property
def acquisition_key(self):
return self._get_and_set_key('_acquisition_key', 'ACQUISITION_LOCK')
def acquire_old(self, timeout=0, target=None):
if not self.disabled:
with MemoryReserverAcquisitionLock():
......@@ -166,6 +170,40 @@ class MemoryReserver(Semaphore):
time.sleep(1)
self.acquire(timeout=timeout)
def acquire(self, timeout=0, target=None):
if not self.disabled:
token = self.client.getset(self.acquisition_key, self.exists_val)
if token:
time.sleep(1)
self.acquire(timeout=timeout)
self.client.expire(self.acquisition_key, 10)
try:
if self.usable_memory_gb >= self.mem2lock_gb:
for i in range(self.mem2lock_gb):
token = super(MemoryReserver, self).acquire(timeout=timeout)
self.client.hset(self.grabbed_key_jobID, token, self.current_time)
self.client.incr(self.reserved_key, self.mem2lock_gb)
self.client.incr(self.reserved_key_jobID, self.mem2lock_gb)
self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb)
self._waiting = False
else:
if not self._waiting:
self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are usable.'
% (self.usable_memory_gb, self.mem2lock_gb))
self._waiting = True
time.sleep(1)
self.acquire(timeout=timeout)
finally:
self.client.delete(self.acquisition_key)
def release(self):
if not self.disabled:
for token in self._local_tokens:
......
......@@ -1084,6 +1084,15 @@ class GMS_object(object):
geoArrs_same_extent[0].gt, geoArrs_same_extent[0].prj,
bandnames=bandnames,
nodata=geoArrs_same_extent[0].nodata)
# handle CFG.ac_bandwise_accuracy (average ac_errors if needed)
# NOTE: full_geoArr will only have 3 dims in case of multiple subsystems
# -> median cannot directly computed during AC due to different GSDs of the subsystems
# -> first perform spatial homogenization, directly after that compute median
# -> this is not done later (e.g., in L2C) to avoid memory overflows in L2B or L2C
if attrname == 'ac_errors' and not CFG.ac_bandwise_accuracy and full_geoArr.bands > 1:
full_geoArr = np.median(full_geoArr, axis=2).astype(full_geoArr.dtype)
setattr(GMS_obj_merged, attrname, full_geoArr)
# handle the remaining arrays
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment