Commit 0b05218d authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Moved database_tools.USGS_FullMetaDB() to external library 'usgs_metadb'.


Former-commit-id: 2ebaa6c5
parent c12b5920
...@@ -4,14 +4,11 @@ import itertools ...@@ -4,14 +4,11 @@ import itertools
import os import os
import re import re
import shutil import shutil
import sqlite3
import sys import sys
import traceback import traceback
import warnings import warnings
from datetime import datetime from datetime import datetime
from typing import Union # noqa F401 # flake8 issue from typing import Union # noqa F401 # flake8 issue
from logging import getLogger
from urllib.request import urlretrieve
import numpy as np import numpy as np
import pandas as pd import pandas as pd
...@@ -24,15 +21,11 @@ from shapely.geometry import Polygon, box, MultiPolygon ...@@ -24,15 +21,11 @@ from shapely.geometry import Polygon, box, MultiPolygon
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.types import to_instance, TypeEngine from sqlalchemy.types import to_instance, TypeEngine
from py_tools_ds.compression.decompress import decompress
from py_tools_ds.processing.progress_mon import ProgressBar
from ..options.config import GMS_config as CFG from ..options.config import GMS_config as CFG
from . import path_generator as PG from . import path_generator as PG
from .definition_dicts import proc_chain from .definition_dicts import proc_chain
# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies) # + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)
# + misc.path_generator.get_path_metaCSV: (left out here in order to avoid circular dependencies)
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
...@@ -1420,161 +1413,3 @@ def archive_exists_on_fileserver(conn_DB, entityID): ...@@ -1420,161 +1413,3 @@ def archive_exists_on_fileserver(conn_DB, entityID):
exists = False exists = False
return exists return exists
class USGS_FullMetaDB(object):
def __init__(self, path_root, root_url='https://landsat.usgs.gov/landsat/metadata_service/bulk_metadata_files/'):
self.path_root = path_root
self.root_url = root_url
self.path_db = os.path.join(path_root, 'USGS_FullMetaDB.sql')
# self.dict_metaDB_URL = {
# 'Landsat-4-5_TM': [['1982-08-22_1989-12-31', root_url + 'LANDSAT_TM-1980-1989.csv'],
# ['1990-01-01_1999-12-31', root_url + 'LANDSAT_TM-1990-1999.csv'],
# ['2000-01-01_2009-12-31', root_url + 'LANDSAT_TM-2000-2009.csv'],
# ['2010-01-01_2012-05-05', root_url + 'LANDSAT_TM-2010-2012.csv']],
# 'Landsat-7_ETM+': [['1999-05-28_2003-05-31', root_url + 'LANDSAT_ETM.csv'],
# ['2003-06-01_today', root_url + 'LANDSAT_ETM_SLC_OFF.csv']],
# 'Landsat-8_OLI_TIRS': [['2013-04-11_today', root_url + 'LANDSAT_8.csv']]}
self.dict_metaDB_URL = {
# Pre-Collection
'Landsat-1-3_MSS':
['1972-1983', root_url + 'LANDSAT_MSS1.csv.gz'],
'Landsat-4-5_MSS':
['1982-1997, 2012-2013', root_url + 'LANDSAT_MSS2.csv.gz'],
# Collection 1
'Landsat-4-5_TM':
['1980-2012', root_url + 'LANDSAT_TM_C1.csv.gz'],
'Landsat-7_ETM+': # (combined SLC-on/off)
['1999-Present', root_url + 'LANDSAT_ETM_C1.csv.gz'],
'Landsat-8_OLI_TIRS': # (combined Pre- and On-WRS)
['2013_04-Present', root_url + 'LANDSAT_8_C1.csv.gz']}
def query(self, sqlquery, limit=None):
# type: (str, int) -> pd.DataFrame
connection = sqlite3.connect(self.path_db)
if connection is None:
print('database connection fault')
cursor = connection.cursor()
cursor.execute(sqlquery)
qres = cursor.fetchall() if not limit else cursor.fetchmany(limit) if limit > 1 else cursor.fetchone()
result = pd.DataFrame(qres)
if connection:
cursor.close()
connection.close()
return result
@property
def tables(self):
df = self.query("SELECT name FROM sqlite_master WHERE type='table';")
return df[0].tolist() if df.size > 0 else []
@staticmethod
def _get_sensorcode(satellite, sensor, subsystem):
sensorcode = '%s_%s_%s' % (satellite, sensor, subsystem) if subsystem else '%s_%s' % (satellite, sensor)
return sensorcode if sensorcode not in ['Landsat-4_TM', 'Landsat-5_TM'] else 'Landsat-4-5_TM'
def update_all_tables(self):
for sat, sen, sub in [['Landsat-5', 'TM', ''], # also includes Landsat-4
['Landsat-7', 'ETM+', ''],
['Landsat-8', 'OLI_TIRS', '']]:
sensorcode = self._get_sensorcode(sat, sen, sub)
path_csv = self.update_metaCSV(sat, sen, sub)
self.CSVdatabase2SQLdatabase(path_csv, self.path_db, sensorcode)
def update_specific_sensor(self, dates2check, satellite, sensor, subsystem):
if not isinstance(dates2check, list):
dates2check = [dates2check]
date2check = max([datetime.strptime(date, '%Y-%m-%d') for date in dates2check])
tablename = self._get_sensorcode(satellite, sensor, subsystem)
if tablename not in self.tables or not os.path.isfile(self.path_db):
# recreate the table
path_csv = self.update_metaCSV(satellite, sensor, subsystem)
self.CSVdatabase2SQLdatabase(path_csv, tablename)
else:
date_max = datetime.strptime(
self.query("Select max(acquisitionDate) FROM " + tablename)[0], '%Y-%m-%d')
if date2check > date_max:
path_csv = self.update_metaCSV(satellite, sensor, subsystem)
self.CSVdatabase2SQLdatabase(path_csv, tablename, last_updated=date_max)
def update_metaCSV(self, satellite, sensor, subsystem=None, force_redownload=False):
# type: (str, str, str, bool) -> str
# get sensorcode
sensorcode = self._get_sensorcode(satellite, sensor, subsystem)
assert sensorcode in list(self.dict_metaDB_URL.keys()), \
'The Download of a metadata database for %s is not yet supported.' % sensorcode
# create output directory if it not exists
if not os.path.isdir(self.path_root):
try:
os.makedirs(os.path.dirname(self.path_root))
except OSError as e:
# occurrs if other workers try to create the dir. at the same time and dir already exists
if e.errno != 17:
raise
daterange_tag, url = self.dict_metaDB_URL[sensorcode]
path_csv = os.path.join(self.path_root, 'FullMetaDB__%s__%s.csv'
% (sensorcode, datetime.now().strftime('%Y-%m-%d')))
path_csv_gz = path_csv + '.gz'
if os.path.exists(path_csv) and not force_redownload:
print('Current metadata database for %s already exists.' % path_csv)
return path_csv
elif os.path.exists(path_csv_gz) and not force_redownload:
print('Found an already downloaded (current) metadata database for %s at %s. Unpacking CSV...'
% (sensorcode, path_csv_gz))
decompress(path_csv_gz, outputpath=path_csv, logger=getLogger('decompressor'))
else:
if force_redownload:
[os.remove(p) for p in [path_csv_gz, path_csv] if os.path.exists(p)]
else:
print('No %s %s metadata database found. Downloading..' % (satellite, sensor))
pBar = ProgressBar(prefix='Download progress: ')
def dlProgress(count, blockSize, totalSize):
percentage = int(count * blockSize * 100 / totalSize)
pBar.print_progress(percentage)
urlretrieve(url, path_csv_gz, reporthook=dlProgress)
decompress(path_csv_gz, outputpath=path_csv, logger=getLogger('decompressor'))
# remove .csv.gz file
if os.path.exists(path_csv_gz):
os.remove(path_csv_gz)
else:
raise RuntimeError('Extraction failed ')
return path_csv
def CSVdatabase2SQLdatabase(self, pathCSV, tablename, last_updated=None):
print('Building %s metadata database...' % ' '.join(tablename.split('_')))
pd_dataframe = pd.read_csv(pathCSV)
connection = sqlite3.connect(self.path_db)
if last_updated is not None:
assert isinstance(last_updated, str) and len(last_updated) == 10,\
"Last_updated keyword must be a date string like YYYY-MM-DD"
pd_dataframe = pd_dataframe[pd_dataframe['acquisitionDate'] >= last_updated]
pd_dataframe.to_sql(tablename, connection, if_exists='append')
connection.close()
assert os.path.isfile(self.path_db), "Building of metadata database failed. Database still not found."
...@@ -182,14 +182,6 @@ def get_tempfile(ext=None, prefix=None, tgt_dir=None): ...@@ -182,14 +182,6 @@ def get_tempfile(ext=None, prefix=None, tgt_dir=None):
return path return path
def get_path_metaCSV(satellite, sensor):
sensorcode = '%s_%s' % (satellite, sensor)
sensorcode = sensorcode if sensorcode not in ['Landsat-4_TM', 'Landsat-5_TM'] else 'Landsat-4-5_TM'
path_metaCSV = glob.glob(os.path.join(CFG.path_db_meta, '%s_metaDB_*.csv' % sensorcode))
path_metaCSV = path_metaCSV[-1] if path_metaCSV != [] else []
return path_metaCSV if path_metaCSV != [] else 'metaCSV not found'
def get_path_cloud_class_obj(GMS_identifier, get_all=False): def get_path_cloud_class_obj(GMS_identifier, get_all=False):
"""Returns the absolute path of the the training data used by cloud classifier. """Returns the absolute path of the the training data used by cloud classifier.
:param GMS_identifier: :param GMS_identifier:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment