Commit 357719f5 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Merge branch 'master' into feature/add_locks

parents e475587d 31828c1a
......@@ -40,6 +40,8 @@ class L1A_object(GMS_object):
"""
# TODO docstring
# NOTE: kwargs is in here to allow instanciating with additional arg 'proc_level'
# load default attribute values and methods
super(L1A_object, self).__init__()
......@@ -274,7 +276,6 @@ class L1A_object(GMS_object):
Works for: RapidEye (metadata.xml),SPOT(metadata.dim),LANDSAT(mtl.txt),ASTER(downloaded coremetadata),
ALOS(summary.txt & Leader file)
:param v:
:return:
"""
......@@ -522,7 +523,6 @@ class L1A_object(GMS_object):
if rasObj.get_projection_type() == 'UTM':
self.MetaObj.CornerTieP_UTM = rasObj.get_corner_coordinates('UTM')
self.meta_odict = self.MetaObj.to_odict() # important in order to keep geotransform/projection
if CFG.inmem_serialization:
self.delete_tempFiles() # these files are needed later in Python execution mode
self.MetaObj.Dataname = previous_dataname # /vsi.. pointing directly to raw data archive (which exists)
......
......@@ -16,7 +16,8 @@ import numpy as np
from geopandas import GeoDataFrame
from shapely.geometry import box
import pytz
from typing import Union # noqa F401 # flake8 issue
import traceback
from typing import Union, TYPE_CHECKING # noqa F401 # flake8 issue
from arosics import COREG, DESHIFTER
from geoarray import GeoArray
......@@ -36,6 +37,10 @@ from ..misc.logging import GMS_logger
from ..misc.spatial_index_mediator import SpatialIndexMediator
from ..misc.definition_dicts import get_GMS_sensorcode, get_outFillZeroSaturated
if TYPE_CHECKING:
from shapely.geometry import Polygon # noqa F401 # flake8 issue
from logging import Logger # noqa F401 # flake8 issue
__author__ = 'Daniel Scheffler'
......@@ -44,6 +49,7 @@ class Scene_finder(object):
def __init__(self, src_boundsLonLat, src_AcqDate, src_prj, src_footprint_poly, sceneID_excluded=None,
min_overlap=20, min_cloudcov=0, max_cloudcov=20, plusminus_days=30, plusminus_years=10, logger=None):
# type: (list, datetime, str, Polygon, int, int, int, int, int, int, Logger) -> None
"""Initialize Scene_finder.
:param src_boundsLonLat:
......@@ -271,7 +277,7 @@ class L1B_object(L1A_object):
def get_spatial_reference_scene(self):
boundsLonLat = corner_coord_to_minmax(self.trueDataCornerLonLat)
footprint_poly = HLP_F.CornerLonLat_to_shapelyPoly(self.trueDataCornerLonLat)
RSF = Scene_finder(boundsLonLat, self.acq_datetime, self.meta_odict['coordinate system string'],
RSF = Scene_finder(boundsLonLat, self.acq_datetime, self.MetaObj.projection,
footprint_poly, self.scene_ID,
min_overlap=CFG.spatial_ref_min_overlap,
min_cloudcov=CFG.spatial_ref_min_cloudcov,
......@@ -480,13 +486,14 @@ class L1B_object(L1A_object):
ref_obj = GMS_object.from_disk((path_gmsFile, ['cube', None]))
# get spectral characteristics
ref_cwl, shift_cwl = [[float(i) for i in GMS_obj.meta_odict['wavelength']] for GMS_obj in [ref_obj, self]]
ref_fwhm, shift_fwhm = [[float(i) for i in GMS_obj.meta_odict['bandwidths']] for GMS_obj in [ref_obj, self]]
ref_cwl = [float(ref_obj.MetaObj.CWL[bN]) for bN in ref_obj.MetaObj.LayerBandsAssignment]
shift_cwl = [float(self.MetaObj.CWL[bN]) for bN in self.MetaObj.LayerBandsAssignment]
shift_fwhm = [float(self.MetaObj.FWHM[bN]) for bN in self.MetaObj.LayerBandsAssignment]
# exclude cirrus/oxygen band of Landsat-8/Sentinel-2
shift_bbl, ref_bbl = [False] * len(shift_cwl), [False] * len(ref_cwl) # bad band lists
for GMS_obj, s_r, bbl in zip([self, ref_obj], ['shift', 'ref'], [shift_bbl, ref_bbl]):
GMS_obj.GMS_identifier['logger'] = None # set a dummy value in order to avoid Exception
GMS_obj.GMS_identifier.logger = None # set a dummy value in order to avoid Exception
sensorcode = get_GMS_sensorcode(GMS_obj.GMS_identifier)
if sensorcode in ['LDCM', 'S2A', 'S2B'] and '9' in GMS_obj.LayerBandsAssignment:
bbl[GMS_obj.LayerBandsAssignment.index('9')] = True
......@@ -541,6 +548,9 @@ class L1B_object(L1A_object):
self.logger.warning('Coregistration skipped according to user configuration.')
elif self.coreg_needed and self.spatRef_available:
self.coreg_info.update({'reference scene ID': self.spatRef_scene.scene_ID})
self.coreg_info.update({'reference entity ID': self.spatRef_scene.entity_ID})
geoArr_ref = GeoArray(self.spatRef_scene.filePath)
geoArr_shift = GeoArray(self.arr)
r_b4match, s_b4match = self.get_opt_bands4matching(target_cwlPos_nm=CFG.coreg_band_wavelength_for_matching)
......@@ -552,7 +562,7 @@ class L1B_object(L1A_object):
max_shift=CFG.coreg_max_shift_allowed,
ws=CFG.coreg_window_size,
data_corners_ref=[[x, y] for x, y in self.spatRef_scene.polyUTM.convex_hull.exterior.coords],
data_corners_tgt=[transform_any_prj(EPSG2WKT(4326), self.meta_odict['coordinate system string'], x, y)
data_corners_tgt=[transform_any_prj(EPSG2WKT(4326), self.MetaObj.projection, x, y)
for x, y in HLP_F.reorder_CornerLonLat(self.trueDataCornerLonLat)],
nodata=(get_outFillZeroSaturated(geoArr_ref.dtype)[0],
get_outFillZeroSaturated(geoArr_shift.dtype)[0]),
......@@ -561,26 +571,36 @@ class L1B_object(L1A_object):
q=True
)
COREG_obj = COREG(geoArr_ref, geoArr_shift, **coreg_kwargs)
COREG_obj.calculate_spatial_shifts()
self.coreg_info.update(
COREG_obj.coreg_info) # no clipping to trueCornerLonLat until here -> only shift correction
self.coreg_info.update({'reference scene ID': self.spatRef_scene.scene_ID})
self.coreg_info.update({'reference entity ID': self.spatRef_scene.entity_ID})
self.coreg_info.update({'shift_reliability': COREG_obj.shift_reliability})
if COREG_obj.success:
self.coreg_info['success'] = True
self.logger.info("Calculated map shifts (X,Y): %s / %s"
% (COREG_obj.x_shift_map,
COREG_obj.y_shift_map)) # FIXME direkt in calculate_spatial_shifts loggen
self.logger.info("Reliability of calculated shift: %.1f percent" % COREG_obj.shift_reliability)
# initialize COREG object
try:
COREG_obj = COREG(geoArr_ref, geoArr_shift, **coreg_kwargs)
except Exception as e:
COREG_obj = None
self.logger.error('\nAn error occurred during coregistration. BE AWARE THAT THE SCENE %s '
'(ENTITY ID %s) HAS NOT BEEN COREGISTERED! Error message was: \n%s\n'
% (self.scene_ID, self.entity_ID, repr(e)))
self.logger.error(traceback.format_exc())
# TODO include that in the job summary
# calculate_spatial_shifts
if COREG_obj:
COREG_obj.calculate_spatial_shifts()
self.coreg_info.update(
COREG_obj.coreg_info) # no clipping to trueCornerLonLat until here -> only shift correction
self.coreg_info.update({'shift_reliability': COREG_obj.shift_reliability})
if COREG_obj.success:
self.coreg_info['success'] = True
self.logger.info("Calculated map shifts (X,Y): %s / %s"
% (COREG_obj.x_shift_map,
COREG_obj.y_shift_map)) # FIXME direkt in calculate_spatial_shifts loggen
self.logger.info("Reliability of calculated shift: %.1f percent" % COREG_obj.shift_reliability)
else:
# TODO add database entry with error hint
[self.logger.error('ERROR during coregistration of scene %s (entity ID %s):\n%s'
% (self.scene_ID, self.entity_ID, err)) for err in COREG_obj.tracked_errors]
else:
# TODO add database entry with error hint
[self.logger.error('ERROR during coregistration of scene %s (entity ID %s):\n%s'
% (self.scene_ID, self.entity_ID, err)) for err in COREG_obj.tracked_errors]
else:
if self.coreg_needed:
......@@ -671,10 +691,10 @@ class L1B_object(L1A_object):
# update geoinformations and array shape related attributes
self.logger.info("Updating geoinformations of '%s' attribute..." % attrname)
if attrname == 'arr':
self.meta_odict['map info'] = DS.updated_map_info
self.meta_odict['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
self.MetaObj.map_info = DS.updated_map_info
self.MetaObj.projection = EPSG2WKT(WKT2EPSG(DS.updated_projection))
self.shape_fullArr = DS.arr_shifted.shape
self.meta_odict['lines'], self.meta_odict['samples'] = DS.arr_shifted.shape[:2]
self.MetaObj.rows, self.MetaObj.cols = DS.arr_shifted.shape[:2]
else:
self.masks_meta['map info'] = DS.updated_map_info
self.masks_meta['coordinate system string'] = EPSG2WKT(WKT2EPSG(DS.updated_projection))
......@@ -686,7 +706,7 @@ class L1B_object(L1A_object):
geoArr.arr, geoArr.gt, geoArr.prj = \
DS.GeoArray_shifted.arr, DS.GeoArray_shifted.gt, DS.GeoArray_shifted.prj
# setattr(self,attrname, DS.GeoArray_shifted) # NOTE: don't set array earlier because setter will also
# # update arr.gt/.prj/.nodata from meta_odict
# # update arr.gt/.prj/.nodata from MetaObj
self.resamp_needed = False
self.coreg_needed = False
......
......@@ -64,8 +64,8 @@ class L1C_object(L1B_object):
self.logger.info('Calculating LonLat array...')
self._lonlat_arr = \
GEOP.get_lonlat_coord_array(self.shape_fullArr, self.arr_pos,
mapinfo2geotransform(self.meta_odict['map info']),
self.meta_odict['coordinate system string'],
mapinfo2geotransform(self.MetaObj.map_info),
self.MetaObj.projection,
meshwidth=10, # for faster processing
nodata_mask=None, # dont overwrite areas outside the image with nodata
outFill=get_outFillZeroSaturated(np.float32)[0])[0]
......@@ -87,17 +87,18 @@ class L1C_object(L1B_object):
"""
if self._VZA_arr is None:
self.logger.info('Calculating viewing zenith array...')
if 'ViewingAngle_arrProv' in self.meta_odict and self.meta_odict['ViewingAngle_arrProv']:
if self.MetaObj.ViewingAngle_arrProv:
# Sentinel-2
self._VZA_arr = GEOP.adjust_acquisArrProv_to_shapeFullArr(self.meta_odict['ViewingAngle_arrProv'],
self.shape_fullArr,
meshwidth=10, # for faster processing
subset=None,
bandwise=0)
self._VZA_arr = GEOP.adjust_acquisArrProv_to_shapeFullArr(
{k: v.tolist() for k, v in self.MetaObj.ViewingAngle_arrProv.items()},
self.shape_fullArr,
meshwidth=10, # for faster processing
subset=None,
bandwise=0)
else:
self._VZA_arr = GEOP.calc_VZA_array(self.shape_fullArr, self.arr_pos, self.fullSceneCornerPos,
float(self.meta_odict['ViewingAngle']),
float(self.meta_odict['FieldOfView']),
float(self.MetaObj.ViewingAngle),
float(self.MetaObj.FOV),
self.logger,
nodata_mask=None, # dont overwrite areas outside image with nodata
outFill=get_outFillZeroSaturated(np.float32)[0],
......@@ -120,13 +121,14 @@ class L1C_object(L1B_object):
"""
if self._VAA_arr is None:
self.logger.info('Calculating viewing azimuth array...')
if 'IncidenceAngle_arrProv' in self.meta_odict and self.meta_odict['IncidenceAngle_arrProv']:
if self.MetaObj.IncidenceAngle_arrProv:
# Sentinel-2
self._VAA_arr = GEOP.adjust_acquisArrProv_to_shapeFullArr(self.meta_odict['IncidenceAngle_arrProv'],
self.shape_fullArr,
meshwidth=10, # for faster processing
subset=None,
bandwise=0)
self._VAA_arr = GEOP.adjust_acquisArrProv_to_shapeFullArr(
{k: v.tolist() for k, v in self.MetaObj.IncidenceAngle_arrProv.items()},
self.shape_fullArr,
meshwidth=10, # for faster processing
subset=None,
bandwise=0)
else:
# only a mean VAA is available
if self.VAA_mean is None:
......@@ -156,11 +158,11 @@ class L1C_object(L1B_object):
self._SZA_arr, self._SAA_arr = \
GEOP.calc_SZA_SAA_array(
self.shape_fullArr, self.arr_pos,
self.meta_odict['AcqDate'],
self.meta_odict['AcqTime'],
self.MetaObj.AcqDate,
self.MetaObj.AcqTime,
self.fullSceneCornerPos,
self.fullSceneCornerLonLat,
self.meta_odict['overpass duraction sec'],
self.MetaObj.overpassDurationSec,
self.logger,
meshwidth=10,
nodata_mask=None, # dont overwrite areas outside the image with nodata
......@@ -294,7 +296,7 @@ class AtmCorr(object):
logger_atmCorr.addHandler(fileHandler)
inObj.close_GMS_loggers()
inObj.close_loggers()
self._logger = logger_atmCorr
return self._logger
......@@ -315,7 +317,7 @@ class AtmCorr(object):
self._logger.close()
self._logger = None
[inObj.close_GMS_loggers() for inObj in self.inObjs]
[inObj.close_loggers() for inObj in self.inObjs]
@property
def GSDs(self):
......@@ -360,7 +362,7 @@ class AtmCorr(object):
# float32! -> conversion to np.float16 will convert -9999 to -10000
arr2pass = inObj.arr[:, :, bandIdx].astype(np.float32)
arr2pass[arr2pass == inObj.arr.nodata] = np.nan # set nodata values to np.nan
data_dict[bandN] = (arr2pass / inObj.meta_odict['ScaleFactor']).astype(np.float16)
data_dict[bandN] = (arr2pass / inObj.MetaObj.ScaleFactor).astype(np.float16)
else:
inObj.logger.warning("Band '%s' cannot be included into atmospheric correction because it "
"exists multiple times." % bandN)
......@@ -472,14 +474,14 @@ class AtmCorr(object):
del self.logger # otherwise each input object would have multiple fileHandlers
metadata = dict(
U=self.inObjs[0].meta_odict['EarthSunDist'],
U=self.inObjs[0].MetaObj.EarthSunDist,
SENSING_TIME=self.inObjs[0].acq_datetime,
# SENSING_TIME=datetime.strptime('2015-08-12 10:40:21 +0000', '%Y-%m-%d %H:%M:%S %z'),
viewing_zenith=self._meta_get_viewing_zenith(),
viewing_azimuth=self._meta_get_viewing_azimuth(),
relative_viewing_azimuth=self._meta_get_relative_viewing_azimuth(),
sun_mean_azimuth=self.inObjs[0].meta_odict['SunAzimuth'],
sun_mean_zenith=90 - self.inObjs[0].meta_odict['SunElevation'],
sun_mean_azimuth=self.inObjs[0].MetaObj.SunAzimuth,
sun_mean_zenith=90 - self.inObjs[0].MetaObj.SunElevation,
solar_irradiance=self._meta_get_solar_irradiance(),
aux_data=self._meta_get_aux_data(),
spatial_samplings=self._meta_get_spatial_samplings()
......@@ -676,8 +678,8 @@ class AtmCorr(object):
# FIXME calculation of center wavelengths within SRF() used not the GMS algorithm
# SRF instance must be created for all bands and the previous proc level
GMS_identifier_fullScene = self.inObjs[0].GMS_identifier
GMS_identifier_fullScene['Subsystem'] = ''
GMS_identifier_fullScene['proc_level'] = proc_chain[proc_chain.index(self.inObjs[0].proc_level) - 1]
GMS_identifier_fullScene.subsystem = ''
GMS_identifier_fullScene.proc_level = proc_chain[proc_chain.index(self.inObjs[0].proc_level) - 1]
return SRF(GMS_identifier_fullScene, wvl_unit='nanometers', format_bandnames=True)
......@@ -690,7 +692,7 @@ class AtmCorr(object):
tgt_res = self.inObjs[0].ac_options['cld_mask']['target_resolution']
# check if input GMS objects provide a cloud mask
avail_cloud_masks = {inObj.GMS_identifier['Subsystem']: inObj.mask_clouds for inObj in self.inObjs}
avail_cloud_masks = {inObj.GMS_identifier.subsystem: inObj.mask_clouds for inObj in self.inObjs}
no_avail_CMs = list(set(avail_cloud_masks.values())) == [None]
# compute cloud mask if not already provided
......@@ -950,7 +952,6 @@ class AtmCorr(object):
inObj.MetaObj.LayerBandsAssignment = out_LBA
inObj.LayerBandsAssignment = out_LBA
inObj.MetaObj.filter_layerdependent_metadata()
inObj.meta_odict = inObj.MetaObj.to_odict() # actually auto-updated by getter
##################################################################################
# join SURFACE REFLECTANCE as 3D int16 array, scaled to scale factor from config #
......
......@@ -36,6 +36,7 @@ from ..misc.definition_dicts import datasetid_to_sat_sen, sat_sen_to_datasetid
from ..misc.exceptions import ClassifierNotAvailableError
from ..model.metadata import get_LayerBandsAssignment
from .L2A_P import L2A_object
from ..model.gms_object import GMS_identifier
__author__ = 'Daniel Scheffler'
......@@ -60,13 +61,25 @@ class L2B_object(L2A_object):
method = CFG.spechomo_method
src_dsID = sat_sen_to_datasetid(self.satellite, self.sensor)
src_cwls = self.meta_odict['wavelength']
src_cwls = [float(self.MetaObj.CWL[bN]) for bN in self.MetaObj.LayerBandsAssignment]
# FIXME exclude or include thermal bands; respect sorted CWLs in context of LayerBandsAssignment
tgt_sat, tgt_sen = datasetid_to_sat_sen(CFG.datasetid_spectral_ref)
# NOTE: get target LBA at L2A, because spectral characteristics of target sensor do not change after AC
tgt_LBA = get_LayerBandsAssignment(
dict(Satellite=tgt_sat, Sensor=tgt_sen, Subsystem=None,
image_type='RSD', proc_level='L2A', dataset_ID=src_dsID, logger=None))
GMS_identifier(satellite=tgt_sat, sensor=tgt_sen, subsystem='',
image_type='RSD', proc_level='L2A', dataset_ID=src_dsID, logger=None))
if CFG.datasetid_spectral_ref is None:
tgt_cwl = CFG.target_CWL
tgt_fwhm = CFG.target_FWHM
else:
# exclude those bands from CFG.target_CWL and CFG.target_FWHM that have been removed after AC
full_LBA = get_LayerBandsAssignment(
GMS_identifier(satellite=tgt_sat, sensor=tgt_sen, subsystem='',
image_type='RSD', proc_level='L1A', dataset_ID=src_dsID, logger=None),
no_thermal=True, no_pan=False, return_fullLBA=True, sort_by_cwl=True, proc_level='L1A')
tgt_cwl = [dict(zip(full_LBA, CFG.target_CWL))[bN] for bN in tgt_LBA]
tgt_fwhm = [dict(zip(full_LBA, CFG.target_FWHM))[bN] for bN in tgt_LBA]
####################################################
# special cases where homogenization is not needed #
......@@ -92,7 +105,7 @@ class L2B_object(L2A_object):
if method == 'LI' or CFG.datasetid_spectral_ref is None:
# linear interpolation (if intended by user or in case of custom spectral characteristics of target sensor)
# -> no classifier for that case available -> linear interpolation
im = SpH.interpolate_cube(self.arr, src_cwls, CFG.target_CWL, kind='linear')
im = SpH.interpolate_cube(self.arr, src_cwls, tgt_cwl, kind='linear')
if CFG.spechomo_estimate_accuracy:
self.logger.warning("Unable to compute any error information in case spectral homogenization algorithm "
......@@ -114,7 +127,7 @@ class L2B_object(L2A_object):
compute_errors=CFG.spechomo_estimate_accuracy,
bandwise_errors=CFG.spechomo_bandwise_accuracy,
fallback_argskwargs=dict(
args=dict(source_CWLs=src_cwls, target_CWLs=CFG.target_CWL,),
args=dict(source_CWLs=src_cwls, target_CWLs=tgt_cwl,),
kwargs=dict(kind='linear')
))
......@@ -123,11 +136,9 @@ class L2B_object(L2A_object):
###################
self.LayerBandsAssignment = tgt_LBA
self.meta_odict['wavelength'] = list(CFG.target_CWL)
self.meta_odict['bandwidths'] = list(CFG.target_FWHM)
self.meta_odict['bands'] = len(CFG.target_CWL)
if 'band names' in self.meta_odict: # FIXME bug workaround
del self.meta_odict['band names'] # TODO
self.MetaObj.CWL = dict(zip(tgt_LBA, tgt_cwl))
self.MetaObj.FWHM = dict(zip(tgt_LBA, tgt_fwhm))
self.MetaObj.bands = len(tgt_LBA)
self.arr = im # type: GeoArray
self.spec_homo_errors = errs # type: Union[np.ndarray, None] # int16, None if ~CFG.spechomo_estimate_accuracy
......@@ -936,15 +947,15 @@ class ReferenceCube_Generator(object):
return self._refcubes
def _get_tgt_GMS_identifier(self, tgt_sat, tgt_sen):
# type: (str, str) -> dict
# type: (str, str) -> GMS_identifier
"""Get a GMS identifier for the specified target sensor such that all possible bands are included (L1A)
:param tgt_sat: target satellite
:param tgt_sen: target sensor
:return:
"""
return dict(Satellite=tgt_sat, Sensor=tgt_sen, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger) # use L1A to have all bands available
return GMS_identifier(satellite=tgt_sat, sensor=tgt_sen, subsystem=None, image_type='RSD', dataset_ID=None,
proc_level='L1A', logger=self.logger) # use L1A to have all bands available
def _get_tgt_LayerBandsAssignment(self, tgt_sat, tgt_sen):
# type: (str, str) -> list
......@@ -1035,8 +1046,8 @@ class ReferenceCube_Generator(object):
# first, perform spectral resampling to Sentinel-2 to reduce dimensionality
if downsamp_sat and downsamp_sen:
tgt_srf = SRF(dict(Satellite=downsamp_sat, Sensor=downsamp_sen, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger))
tgt_srf = SRF(GMS_identifier(satellite=downsamp_sat, sensor=downsamp_sen, subsystem=None, image_type='RSD',
dataset_ID=None, proc_level='L1A', logger=self.logger))
im2clust = self.resample_image_spectrally(im2clust, tgt_srf, progress=progress)
# compute KMeans clusters for the spectrally resampled image
......@@ -1299,9 +1310,11 @@ class Classifier_Generator(object):
['1', '2', '3', '4', '5', '6', '7'],
['1', '2', '3', '4', '5', '6', '7'], ...]
"""
L1A_GMSid = dict(Satellite=satellite, Sensor=sensor, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=None)
L1C_GMSid = {**L1A_GMSid, **dict(proc_level='L1C')} # different numbers of bands after AC
L1A_GMSid = GMS_identifier(satellite=satellite, sensor=sensor, subsystem=None, image_type='RSD',
dataset_ID=None, proc_level='L1A', logger=None)
# different numbers of bands after AC
L1C_GMSid = GMS_identifier(satellite=satellite, sensor=sensor, subsystem=None, image_type='RSD',
dataset_ID=None, proc_level='L1C', logger=None)
return [get_LayerBandsAssignment(L1A_GMSid, no_pan=False, sort_by_cwl=True), # L1A_withPan_cwlSorted
get_LayerBandsAssignment(L1C_GMSid, no_pan=False, sort_by_cwl=True), # L1C_withPan_cwlSorted
......
......@@ -12,7 +12,7 @@ import zipfile
from tempfile import NamedTemporaryFile as tempFile
from logging import Logger
from matplotlib import pyplot as plt
from typing import Union, Dict, List, Tuple # noqa F401 # flake8 issue
from typing import Union, Dict, List, Tuple, TYPE_CHECKING # noqa F401 # flake8 issue
from datetime import datetime
import logging
......@@ -38,6 +38,9 @@ from ..misc.database_tools import get_overlapping_scenes_from_postgreSQLdb
from ..misc.path_generator import path_generator
from ..misc.spatial_index_mediator import SpatialIndexMediator
if TYPE_CHECKING:
from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue
def read_ENVIfile(path, arr_shape, arr_pos, logger=None, return_meta=False, q=0):
hdr_path = os.path.splitext(path)[0] + '.hdr' if not os.path.splitext(path)[1] == '.hdr' else path
......@@ -172,11 +175,11 @@ def get_list_GMSfiles(dataset_list, target):
return GMS_list
def SRF_reader(GMS_identifier, no_thermal=None, no_pan=None, v=False):
# type: (dict, bool) -> collections.OrderedDict
def SRF_reader(GMS_id, no_thermal=None, no_pan=None, v=False):
# type: (GMS_identifier, bool) -> collections.OrderedDict
"""Read SRF for any sensor and return a dictionary containing band names as keys and SRF numpy arrays as values.
:param GMS_identifier:
:param GMS_id:
:param no_thermal: whether to exclude thermal bands from the returned bands list
(default: CFG.skip_thermal)
:param no_pan: whether to exclude panchromatic bands from the returned bands list
......@@ -188,37 +191,36 @@ def SRF_reader(GMS_identifier, no_thermal=None, no_pan=None, v=False):
no_thermal = no_thermal if no_thermal is not None else CFG.skip_thermal
no_pan = no_pan if no_pan is not None else CFG.skip_pan
satellite, sensor = GMS_identifier['Satellite'], GMS_identifier['Sensor']
logger = GMS_identifier['logger'] or Logger(__name__)
LayerBandsAssignment = META.get_LayerBandsAssignment(GMS_identifier, no_thermal=no_thermal, no_pan=no_pan)
logger = GMS_id.logger or Logger(__name__)
LayerBandsAssignment = META.get_LayerBandsAssignment(GMS_id, no_thermal=no_thermal, no_pan=no_pan)
SRF_dict = collections.OrderedDict()
SRF_dir = PG.get_path_srf_file(GMS_identifier)
SRF_dir = PG.get_path_srf_file(GMS_id)
if os.path.isdir(SRF_dir):
for bandname in LayerBandsAssignment:
SRF_path = PG.get_path_srf_file(GMS_identifier, bandname=bandname)
SRF_path = PG.get_path_srf_file(GMS_id, bandname=bandname)
try:
SRF_dict[bandname] = np.loadtxt(SRF_path, skiprows=1)
if v:
logger.info('Reading SRF for %s %s, %s...' % (satellite, sensor, bandname))
logger.info('Reading SRF for %s %s, %s...' % (GMS_id.satellite, GMS_id.sensor, bandname))
except FileNotFoundError:
logger.warning('No spectral response function found for %s %s %s at %s! >None< is returned.'
% (satellite, sensor, bandname, SRF_path))
% (GMS_id.satellite, GMS_id.sensor, bandname, SRF_path))
else:
warnings.warn('SRF database directory not available at %s.' % SRF_dir)
SRF_dict = {}
logger.warning("No spectral response functions available for '%s %s'. Preconfigured values are used for solar "
"irradiance and central wavelength instead." % (satellite, sensor))
"irradiance and central wavelength instead." % (GMS_id.satellite, GMS_id.sensor))
return SRF_dict
class SRF(object):
def __init__(self, GMS_identifier=None, wvl_unit='nanometers', specres_nm=1, format_bandnames=False,
def __init__(self, GMS_id=None, wvl_unit='nanometers', specres_nm=1, format_bandnames=False,
no_thermal=None, no_pan=None, v=False):
# type: (dict, str, float, bool, bool) -> None
# type: (GMS_identifier, str, float, bool, bool) -> None
"""SRF instance provides SRF functions, wavelength positions, etc..
:param GMS_identifier: dictionary as provided by GMS_object
:param GMS_id: GMS_identifier as provided by GMS_object
:param wvl_unit: the wavelengths unit to be used within SRF instance ('nanometers' or 'micrometers)
:param specres_nm: output spectral resolution of SRFs in nanometers
:param format_bandnames: whether to format default strings from LayerBandsAssignment as 'B01', 'B02' etc..
......@@ -244,18 +246,18 @@ class SRF(object):
self.wvl_unit = wvl_unit
self.specres_nm = specres_nm
self.format_bandnames = format_bandnames
self.satellite = GMS_identifier['Satellite'] if GMS_identifier else ''
self.sensor = GMS_identifier['Sensor'] if GMS_identifier else ''
self.satellite = GMS_id.satellite if GMS_id else ''
self.sensor = GMS_id.sensor if GMS_id else ''
self.conv = {}
self.no_thermal = no_thermal
self.no_pan = no_pan
self.v = v
if GMS_identifier:
self.from_GMS_identifier(GMS_identifier)
if GMS_id:
self.from_GMS_identifier(GMS_id)
def from_GMS_identifier(self, GMS_identifier):
srf_dict = SRF_reader(GMS_identifier, no_thermal=self.no_thermal, no_pan=self.no_pan,
def from_GMS_identifier(self, GMS_id):
srf_dict = SRF_reader(GMS_id, no_thermal=self.no_thermal, no_pan=self.no_pan,
v=self.v) # type: collections.OrderedDict # (ordered according to LBA)
return self.from_dict(srf_dict)
......
......@@ -297,8 +297,7 @@ def mask_to_ENVI_Classification(InObj, maskname):
rows, cols = classif_array.shape[:2]
bands = 1 if classif_array.ndim == 2 else classif_array.shape[2]
mapI, CSS = (InObj.meta_odict['map info'], InObj.meta_odict['coordinate system string']) \
if hasattr(InObj, 'meta_odict') and InObj.meta_odict else (InObj.MetaObj.map_info, InObj.MetaObj.projection)
mapI, CSS = InObj.MetaObj.map_info, InObj.MetaObj.projection
mask_md = {'file type': 'ENVI Classification', 'map info': mapI, 'coordinate system string': CSS, 'lines': rows,
'samples': cols, 'bands': bands, 'header offset': 0, 'byte order': 0, 'interleave': 'bsq',
'data type': 1, 'data ignore value': get_outFillZeroSaturated(classif_array.dtype)[0]}
......
......@@ -4,9 +4,13 @@ import collections
import re
import numpy as np
from typing import TYPE_CHECKING # noqa F401 # flake8 issue
from ..options.config import GMS_config as CFG
if TYPE_CHECKING:
from ..model.gms_object import GMS_identifier # noqa F401 # flake8 issue
__author__ = 'Daniel Scheffler'
dtype_lib_Python_IDL = {'bool_': 0, 'uint8': 1, 'int8': 1, 'int_': 1, 'int16': 2, 'uint16': 12, 'int32': 3,
......@@ -22,10 +26,10 @@ db_jobs_statistics_def = {'pending': 1, 'started': 2, None: 2, 'L1A': 3, 'L1B':
bandslist_all_errors = ['ac_errors', 'mask_clouds_confidence', 'spat_homo_errors', 'spec_homo_errors']
def get_GMS_sensorcode(GMS_identifier):
# type: (dict) -> str
def get_GMS_sensorcode(GMS_id):
# type: (GMS_identifier) -> str
Satellite, Sensor, Subsystem = (GMS_identifier['Satellite'], GMS_identifier['Sensor'], GMS_identifier['Subsystem'])