Commit 4c2a7803 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Fixed typing issues.

parent 9a446c79
Pipeline #1233 passed with stage
in 7 minutes and 43 seconds
...@@ -6,7 +6,7 @@ import re ...@@ -6,7 +6,7 @@ import re
import logging import logging
import dill import dill
import traceback import traceback
from typing import List, TypeVar from typing import List # noqa F401 # flake8 issue
import numpy as np import numpy as np
...@@ -197,9 +197,6 @@ class L1C_object(L1B_object): ...@@ -197,9 +197,6 @@ class L1C_object(L1B_object):
del self.dem del self.dem
_T_list_L1Cobjs = TypeVar(List[L1C_object])
class AtmCorr(object): class AtmCorr(object):
def __init__(self, *L1C_objs, reporting=False): def __init__(self, *L1C_objs, reporting=False):
"""Wrapper around atmospheric correction by Andre Hollstein, GFZ Potsdam """Wrapper around atmospheric correction by Andre Hollstein, GFZ Potsdam
...@@ -226,7 +223,7 @@ class AtmCorr(object): ...@@ -226,7 +223,7 @@ class AtmCorr(object):
assert len(list(set(scene_IDs))) == 1, \ assert len(list(set(scene_IDs))) == 1, \
"Input GMS objects for 'AtmCorr' must all belong to the same scene ID!. Received %s." % scene_IDs "Input GMS objects for 'AtmCorr' must all belong to the same scene ID!. Received %s." % scene_IDs
self.inObjs = L1C_objs # type: _T_list_L1Cobjs self.inObjs = L1C_objs # type: List[L1C_object]
self.reporting = reporting self.reporting = reporting
self.ac_input = {} # set by self.run_atmospheric_correction() self.ac_input = {} # set by self.run_atmospheric_correction()
self.results = None # direct output of external atmCorr module (set by run_atmospheric_correction) self.results = None # direct output of external atmCorr module (set by run_atmospheric_correction)
...@@ -677,7 +674,7 @@ class AtmCorr(object): ...@@ -677,7 +674,7 @@ class AtmCorr(object):
cm_geoarray = CMC.cloud_mask_geoarray cm_geoarray = CMC.cloud_mask_geoarray
cm_array = CMC.cloud_mask_array cm_array = CMC.cloud_mask_array
cm_legend = CMC.cloud_mask_legend cm_legend = CMC.cloud_mask_legend
except Exception as err: except Exception:
self.logger.error('\nAn error occurred during FMASK cloud masking. Error message was: ') self.logger.error('\nAn error occurred during FMASK cloud masking. Error message was: ')
self.logger.error(traceback.format_exc()) self.logger.error(traceback.format_exc())
return None return None
......
...@@ -7,16 +7,13 @@ import numpy as np ...@@ -7,16 +7,13 @@ import numpy as np
from scipy.interpolate import interp1d from scipy.interpolate import interp1d
import scipy as sp import scipy as sp
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from typing import TypeVar
from ..config import GMS_config as CFG from ..config import GMS_config as CFG
from ..io.input_reader import SRF from ..io.input_reader import SRF # noqa F401 # flake8 issue
from .L2A_P import L2A_object from .L2A_P import L2A_object
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
_T_SRF = TypeVar(SRF)
class L2B_object(L2A_object): class L2B_object(L2A_object):
def __init__(self, L2A_obj=None): def __init__(self, L2A_obj=None):
...@@ -67,7 +64,7 @@ class SpectralResampler(object): ...@@ -67,7 +64,7 @@ class SpectralResampler(object):
"""Class for spectral resampling of a single spectral signature (1D-array) or an image (3D-array).""" """Class for spectral resampling of a single spectral signature (1D-array) or an image (3D-array)."""
def __init__(self, wvl_src, srf_tgt, wvl_unit='nanometers'): def __init__(self, wvl_src, srf_tgt, wvl_unit='nanometers'):
# type: (np.ndarray, _T_SRF, str) -> None # type: (np.ndarray, SRF, str) -> None
"""Get an instance of the SpectralResampler1D class. """Get an instance of the SpectralResampler1D class.
:param wvl_src: center wavelength positions of the source spectrum :param wvl_src: center wavelength positions of the source spectrum
......
...@@ -10,7 +10,7 @@ import tarfile ...@@ -10,7 +10,7 @@ import tarfile
import warnings import warnings
import zipfile import zipfile
from tempfile import NamedTemporaryFile as tempFile from tempfile import NamedTemporaryFile as tempFile
from typing import TYPE_CHECKING, TypeVar from typing import TYPE_CHECKING
import dill import dill
import numpy as np import numpy as np
...@@ -33,8 +33,7 @@ from py_tools_ds.geo.coord_calc import corner_coord_to_minmax ...@@ -33,8 +33,7 @@ from py_tools_ds.geo.coord_calc import corner_coord_to_minmax
from py_tools_ds.geo.coord_trafo import transform_any_prj from py_tools_ds.geo.coord_trafo import transform_any_prj
if TYPE_CHECKING: if TYPE_CHECKING:
from logging import Logger from logging import Logger # noqa F401 # flake8 issue
_T_logger = TypeVar(Logger)
def read_ENVIfile(path, arr_shape, arr_pos, logger=None, return_meta=False, q=0): def read_ENVIfile(path, arr_shape, arr_pos, logger=None, return_meta=False, q=0):
...@@ -43,7 +42,7 @@ def read_ENVIfile(path, arr_shape, arr_pos, logger=None, return_meta=False, q=0) ...@@ -43,7 +42,7 @@ def read_ENVIfile(path, arr_shape, arr_pos, logger=None, return_meta=False, q=0)
def read_ENVIhdr_to_dict(hdr_path, logger=None): def read_ENVIhdr_to_dict(hdr_path, logger=None):
# type: (str, _T_logger) -> dict # type: (str, Logger) -> dict
if not os.path.isfile(hdr_path): if not os.path.isfile(hdr_path):
if logger: if logger:
logger.critical('read_ENVIfile: Input data not found at %s.' % hdr_path) logger.critical('read_ENVIfile: Input data not found at %s.' % hdr_path)
......
...@@ -8,12 +8,11 @@ import datetime ...@@ -8,12 +8,11 @@ import datetime
import gzip import gzip
import json import json
import os import os
from typing import TYPE_CHECKING, TypeVar from typing import TYPE_CHECKING
import pickle import pickle
import dill import dill
import numpy as np import numpy as np
import spectral
from osgeo import gdal_array from osgeo import gdal_array
from spectral.io import envi from spectral.io import envi
from spectral.io.envi import check_compatibility, check_new_filename, write_envi_header, _write_header_param from spectral.io.envi import check_compatibility, check_new_filename, write_envi_header, _write_header_param
...@@ -40,12 +39,7 @@ from ..misc.definition_dicts import \ ...@@ -40,12 +39,7 @@ from ..misc.definition_dicts import \
from ..misc.logging import GMS_logger from ..misc.logging import GMS_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from ..model.gms_object import GMS_object from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
_T_GMSobj = TypeVar(GMS_object)
if float(spectral.__version__) < 0.16:
warnings.warn('Your version of the library "Spectral Python" has some problems with writing signed integer '
'8bit data. Please update the library (pip install spectral --upgrade) to avoid writing issues.')
enviHdr_keyOrder = \ enviHdr_keyOrder = \
['ENVI', 'description', 'samples', 'lines', 'bands', 'header offset', 'file type', 'data type', ['ENVI', 'description', 'samples', 'lines', 'bands', 'header offset', 'file type', 'data type',
...@@ -290,7 +284,7 @@ def reorder_ENVI_header(path_hdr, tgt_keyOrder): ...@@ -290,7 +284,7 @@ def reorder_ENVI_header(path_hdr, tgt_keyOrder):
def mask_to_ENVI_Classification(InObj, maskname): def mask_to_ENVI_Classification(InObj, maskname):
# type: (_T_GMSobj, str) -> (np.ndarray, dict, list, list) # type: (GMS_object, str) -> (np.ndarray, dict, list, list)
cd = get_mask_classdefinition(maskname, InObj.satellite) cd = get_mask_classdefinition(maskname, InObj.satellite)
if cd is None: if cd is None:
InObj.logger.warning("No class definition available for mask '%s'." % maskname) InObj.logger.warning("No class definition available for mask '%s'." % maskname)
......
...@@ -10,7 +10,7 @@ import sys ...@@ -10,7 +10,7 @@ import sys
import traceback import traceback
import warnings import warnings
from datetime import datetime from datetime import datetime
from typing import Union, TypeVar from typing import Union # noqa F401 # flake8 issue
import numpy as np import numpy as np
import pandas as pd import pandas as pd
...@@ -31,8 +31,6 @@ from .definition_dicts import proc_chain ...@@ -31,8 +31,6 @@ from .definition_dicts import proc_chain
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
_T_list_str = TypeVar(Union[list, str])
def execute_pgSQL_query(cursor, query_command): def execute_pgSQL_query(cursor, query_command):
"""Executes a postgreSQL query catches the full error message if there is one. """Executes a postgreSQL query catches the full error message if there is one.
...@@ -75,7 +73,7 @@ def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid): ...@@ -75,7 +73,7 @@ def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid):
def get_info_from_SQLdb(path_db, tablename, vals2return, cond_dict, records2fetch=0): def get_info_from_SQLdb(path_db, tablename, vals2return, cond_dict, records2fetch=0):
# type: (str,str,list,dict,int) -> _T_list_str # type: (str,str,list,dict,int) -> Union[list, str]
"""Queries an SQL database for the given parameters. """Queries an SQL database for the given parameters.
:param path_db: <str> the physical path of the SQL database on disk :param path_db: <str> the physical path of the SQL database on disk
...@@ -151,7 +149,7 @@ def get_postgreSQL_matchingExp(key, value): ...@@ -151,7 +149,7 @@ def get_postgreSQL_matchingExp(key, value):
def get_info_from_postgreSQLdb(conn_params, tablename, vals2return, cond_dict=None, records2fetch=0, timeout=15000): def get_info_from_postgreSQLdb(conn_params, tablename, vals2return, cond_dict=None, records2fetch=0, timeout=15000):
# type: (str, str, _T_list_str, dict, int, int) -> _T_list_str # type: (str, str, Union[list, str], dict, int, int) -> Union[list, str]
"""Queries a postgreSQL database for the given parameters. """Queries a postgreSQL database for the given parameters.
:param conn_params: <str> connection parameters as provided by CFG.job.conn_params :param conn_params: <str> connection parameters as provided by CFG.job.conn_params
......
...@@ -10,7 +10,7 @@ import time ...@@ -10,7 +10,7 @@ import time
from itertools import chain from itertools import chain
import signal import signal
import re import re
from typing import TYPE_CHECKING, TypeVar from typing import TYPE_CHECKING
from ..io import output_writer as OUT_W from ..io import output_writer as OUT_W
from ..io import input_reader as INP_R from ..io import input_reader as INP_R
...@@ -29,8 +29,7 @@ from .multiproc import MAP ...@@ -29,8 +29,7 @@ from .multiproc import MAP
from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def from ..misc.definition_dicts import proc_chain, db_jobs_statistics_def
if TYPE_CHECKING: if TYPE_CHECKING:
from collections import OrderedDict from collections import OrderedDict # noqa F401 # flake8 issue
_T_OrdDict = TypeVar(OrderedDict)
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
...@@ -261,7 +260,7 @@ class process_controller(object): ...@@ -261,7 +260,7 @@ class process_controller(object):
@staticmethod @staticmethod
def _is_inMEM(GMS_objects, dataset): def _is_inMEM(GMS_objects, dataset):
# type: (list, _T_OrdDict) -> bool # type: (list, OrderedDict) -> bool
"""Checks whether a dataset within a dataset list has been processed in the previous processing level. """Checks whether a dataset within a dataset list has been processed in the previous processing level.
:param GMS_objects: <list> a list of GMS objects that has been recently processed :param GMS_objects: <list> a list of GMS objects that has been recently processed
:param dataset: <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID() :param dataset: <collections.OrderedDict> as generated by L0A_P.get_data_list_of_current_jobID()
......
...@@ -12,8 +12,9 @@ with open('HISTORY.rst') as history_file: ...@@ -12,8 +12,9 @@ with open('HISTORY.rst') as history_file:
requirements = [ requirements = [
'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill', 'matplotlib', 'numpy', 'scikit-learn', 'scipy', 'gdal', 'pyproj', 'shapely', 'ephem', 'pyorbital', 'dill',
'pandas', 'numba', 'spectral', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy', 'psycopg2', 'pandas', 'numba', 'spectral>=0.16', 'geopandas', 'iso8601', 'pyinstrument', 'geoalchemy2', 'sqlalchemy', 'psycopg2',
'py_tools_ds', 'geoarray', 'arosics' 'py_tools_ds', 'geoarray', 'arosics'
# spectral<0.16 has some problems with writing signed integer 8bit data
# fmask # conda install -c conda-forge python-fmask # fmask # conda install -c conda-forge python-fmask
# 'pyhdf', # conda install --yes -c conda-forge pyhdf # 'pyhdf', # conda install --yes -c conda-forge pyhdf
# 'sicor', # pip install git+https://gitext.gfz-potsdam.de/hollstei/sicor.git # 'sicor', # pip install git+https://gitext.gfz-potsdam.de/hollstei/sicor.git
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment