Commit 77a105fc authored by Daniel Scheffler's avatar Daniel Scheffler Committed by Mathias Peters
Browse files

Bugfix for not properly closed logfiles; added environment checker for Spatial...

Bugfix for not properly closed logfiles; added environment checker for Spatial Index Mediator Server
 L1A_P:
 - L1A_Obj.__getstate__(): bugfix for not properly closed logfiles
L1B_P:
- get_reference_image_params_pgSQL(): bugfix for not properly closed logfile
L2A_P:
- correct_shifts(): bugfix for not properly closed logfile
ENV:
- check_dependencies(): added functionality to check current status of Spatial Index Mediator Server and to start the server if needed
- added _log_or_print()
HLP_F:
- added close_logger()
- added draft of class GMS_logger (not usable so far)
- subcall_with_output(): added keywords no_stdout and no_stderr
SpatialIndexMediator:
- added class SpatialIndexMediatorServer: a class for interacting with java server (status, start, stop, restart, is_running, port, ...)
CFG:
- added job.path_spatIdxSrv
PC:
- added calls of environment checkers
pgSQL_DB:
- added 'path_spatial_index_mediator_server' to config table
parent e9f82e2a
......@@ -195,10 +195,13 @@ class L1A_object(object):
"""Defines how the attributes of GMS object are pickled."""
if hasattr(self,'logger'):
HLP_F.close_logger(self.logger)
self.logger = 'not set'
if self.GMS_identifier:
HLP_F.close_logger(self.GMS_identifier['logger'])
self.GMS_identifier['logger'] = 'not set'
if hasattr(self,'MetaObj') and self.MetaObj:
HLP_F.close_logger(self.MetaObj.logger)
self.MetaObj.logger = 'not set'
return self.__dict__
......
......@@ -477,6 +477,7 @@ class Scene_finder(object):
assert query_res != [], 'No entity-ID found for scene number %s' %self.imref_scene_ID
self.imref_entity_ID = query_res[0][0] # [('LC81510322013152LGN00',)]
break
HLP_F.close_logger(temp_logger)
def sceneIDList_to_filt_overlap_scenes(self,sceneIDList,min_overlap):
......
......@@ -428,6 +428,7 @@ class DESHIFTER(object):
self.set_deshift_results()
temp_logger.info("Calculated shift-corrected array for attribute '%s', band %s... %.2fs"
%(self.attrname2deshift, self.band2process,time.time()-t0))
HLP_F.close_logger(temp_logger)
return self.deshift_results
......
......@@ -97,6 +97,7 @@ class job:
path_procdata_scenes = joinP(path_fileserver, query_cfg(conn_db_meta, 'foldername_procdata_scenes'))
path_procdata_MGRS = joinP(path_fileserver, query_cfg(conn_db_meta, 'foldername_procdata_MGRS'))
path_archive = joinP(path_fileserver, query_cfg(conn_db_meta, 'foldername_download'))
path_spatIdxSrv = absP(query_cfg(conn_db_meta, 'path_spatial_index_mediator_server'))
path_earthSunDist = absP(query_cfg(conn_db_meta, 'path_earthSunDist'))
#path_earthSunDist = '/home/gfz-fe/GeoMultiSens/database/earth_sun_distance/Earth_Sun_distances_per_day_edited.csv' # FIXME!!
path_SRFs = absP(query_cfg(conn_db_meta, 'path_SRFs'))
......
......@@ -2,10 +2,88 @@
import socket
import struct
import os
import re
import warnings
from datetime import datetime, timedelta
from shapely.geometry import Polygon
from shapely.geometry import Polygon
class SpatialIndexMediatorServer:
controller = 'index-mediator-server'
def __init__(self, rootDir):
self.rootDir = rootDir
@property
def is_running(self):
return self.status[0]
@property
def port(self):
return self.status[1]
@property
def status(self):
"""Check server status.
:return running(bool): running or not?
:return port(int):
"""
outputStr = self._communicate('status')
# decrypt
running = 'is running' in outputStr
_port = re.search('with pid ([\d]*)', outputStr)
port = int(_port.group(1)) if _port else None
return running, port
def start(self):
outputStr = self._communicate('start')
if outputStr=='success' and self.is_running:
return 'started'
else:
warnings.warn("\nStarting Spatial Index Mediator Server failed with message '%s'!"
%outputStr.replace('\n',''))
def stop(self):
outputStr = self._communicate('stop')
if re.search('index-mediator-server stopped', outputStr, re.I):
return 'stopped'
else:
warnings.warn("\nStopping Spatial Index Mediator Server failed with message '%s'!"
%outputStr.replace('\n',''))
def restart(self):
outputStr = self._communicate('restart')
if outputStr=='success' and self.is_running:
return 'restarted'
else:
warnings.warn("\nRestarting Spatial Index Mediator Server failed with message '%s'!"
%outputStr.replace('\n',''))
def _communicate(self, controller_cmd):
curdir = os.path.abspath(os.curdir)
os.chdir(self.rootDir)
no_stdout = no_stderr = controller_cmd in ['start','restart'] # FIXME workaround: otherwise subcall_with_output hangs at proc.communicate (waiting for EOF forever)
from ..misc.helper_functions import subcall_with_output
output, exitcode, err = subcall_with_output('bash %s %s' % (os.path.join(self.rootDir, self.controller),
controller_cmd), no_stdout, no_stderr)
os.chdir(curdir)
if exitcode:
raise err
else:
if output:
return output.decode('UTF-8')
else: # FIXME actually there should be always an output (also with controller commands 'start' and 'restart'
return 'success'
class SpatialIndexMediator:
FULL_SCENE_QUERY_MSG = 3
......@@ -154,9 +232,6 @@ class SpatialIndexMediator:
return scenes
class Connection:
""" Connection to the spatial index mediator server """
......
# -*- coding: utf-8 -*-
__author__='Daniel Scheffler'
import builtins
from .SpatialIndexMediator import SpatialIndexMediatorServer
job = builtins.GMS_config.job # read from builtins (set by process_controller)
def check_ports():
portsDict = {
......@@ -14,11 +20,21 @@ def check_read_write_permissions():
pass
def check_dependencies():
pyLibs = []
def check_dependencies(logger=None):
pyLibs = []
javaLibs = []
# check if spatial index mediator is running
pass
# check if spatial index mediator is running
SpatIdxSrv = SpatialIndexMediatorServer(job.path_spatIdxSrv)
if not SpatIdxSrv.is_running:
SpatIdxSrv.start()
_log_or_print('Spatial Index Mediator Server started successfully.', logger.info)
if not SpatIdxSrv.is_running:
_log_or_print('Attempt to start Spatial Index Mediator Server failed.', logger.info)
def _log_or_print(msg, loggerLvl = None):
if loggerLvl:
loggerLvl(msg)
else: print(msg)
......@@ -123,6 +123,54 @@ def setup_logger(name_logfile, path_logfile,append=1):
return logger
def close_logger(logger):
for handler in logger.handlers:
handler.close()
logger.removeHandler(handler)
class GMS_logger(logging.Logger):
raise NotImplementedError
def __init__(self):
super.__init__(self)
@staticmethod
def setup(name_logfile, path_logfile,append=1):
"""Returns a logging.logger instance pointing to the given logfile path.
:param name_logfile:
:param path_logfile:
:param append: <bool> whether to append the log message to an existing logfile (1)
or to create a new logfile (0); default=1
"""
logger = logging.getLogger(name_logfile)
formatter = logging.Formatter('%(asctime)s: %(message)s', datefmt='%Y/%m/%d %H:%M:%S')
while not os.path.isdir(os.path.dirname(path_logfile)):
try:
os.makedirs(os.path.dirname(path_logfile))
except OSError as e:
if e.errno != 17:
raise
else:
pass
fileHandler = logging.FileHandler(path_logfile, mode='a' if append else 'w')
fileHandler.setFormatter(formatter)
fileHandler.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
streamHandler.setFormatter(formatter)
logger.setLevel(logging.DEBUG)
if not logger.handlers:
logger.addHandler(fileHandler)
logger.addHandler(streamHandler)
return logger
def close(self):
for handler in self.handlers:
handler.close()
self.removeHandler(handler)
def MAP(func, *args, CPUs=None):
import multiprocessing
with multiprocessing.Pool() as pool:
......@@ -297,13 +345,14 @@ def ENVIfile_to_ENVIcompressed(inPath_hdr,outPath_hdr=None):
# FIXME include file reordering
def subcall_with_output(cmd):
def subcall_with_output(cmd, no_stdout=False, no_stderr=False):
"""Execute external command and get its stdout, exitcode and stderr.
:param cmd: a normal shell command including parameters
"""
proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
proc = Popen(shlex.split(cmd), stdout=None if no_stdout else PIPE, stderr=None if no_stderr else PIPE)
out, err = proc.communicate()
exitcode = proc.returncode
return out, exitcode, err
......
......@@ -45,12 +45,12 @@ if isdebugging: #override the existing settings in order to get write access eve
#builtins.GMS_process_ID = 26185253 # 25x L8, Zielsensor L8, spat.ref L8
#builtins.GMS_process_ID = 26185254 # 10x L8, Zielsensor L8, spat.ref L8
#builtins.GMS_process_ID = 26185255 # 1x L8 Bug 5 corners found -> Grund=Schreibfehler L1A im tiled Python-mode bei mehr als 1 Szene im Job
builtins.GMS_process_ID = 26185256 # 1x L7 SLC off, Zielsensor L8, spat.ref L8
#builtins.GMS_process_ID = 26185256 # 1x L7 SLC off, Zielsensor L8, spat.ref L8
#builtins.GMS_process_ID = 26185257 # Beta-Job - 219 x L8, 172 x L7, 111 x S2, spatref L8
#builtins.GMS_process_ID = 26185258 # Beta-Job - 219 x L8, spatref L8
#builtins.GMS_process_ID = 26185259 # Beta-Job - 172 x L7, spatref L8
#builtins.GMS_process_ID = 26185260 # Beta-Job - 111 x S2, spatref L8
#builtins.GMS_process_ID = 26185268 # 25x L7 SLC off, Zielsensor L8, spat.ref L8
builtins.GMS_process_ID = 26185268 # 25x L7 SLC off, Zielsensor L8, spat.ref L8
#builtins.GMS_process_ID = 26185269 # 1x L7 SLC off, Bug SpatialIndexMediator
#builtins.GMS_process_ID = 26185270 # 5x L7 SLC off, Bug SpatialIndexMediator
#builtins.GMS_process_ID = 26185275 # 1x L8, spat. Ref. L8 Bug L1B_mask not found
......@@ -103,6 +103,7 @@ from .algorithms import L1C_P # Level 1C Processor
from .algorithms import L2A_P # Level 2A Processor
from .algorithms import L2B_P # Level 2B Processor
from .algorithms import L2C_P # Level 2C Processor
from .misc import environment as ENV # environment
########################### core functions ####################################
......@@ -276,6 +277,11 @@ def is_inMEM(GMS_objects, dataset):
########################################### MAIN/ARGUMENT PARSER #######################################################
def run_processController_in_multiprocessing(usecase_data_list):
# check environment
job.logger('Checking system environment...')
ENV.check_dependencies(job.logger)
with multiprocessing.Pool() as pool: usecase_data_list = pool.map(L0A_P.add_local_availability, usecase_data_list)
failed_objects = []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment