Commit 2ebaa6c5 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Moved database_tools.USGS_FullMetaDB() to external library 'usgs_metadb'.

parent 7e371a75
Pipeline #1658 passed with stage
in 10 minutes and 59 seconds
......@@ -4,14 +4,11 @@ import itertools
import os
import re
import shutil
import sqlite3
import sys
import traceback
import warnings
from datetime import datetime
from typing import Union # noqa F401 # flake8 issue
from logging import getLogger
from urllib.request import urlretrieve
import numpy as np
import pandas as pd
......@@ -24,15 +21,11 @@ from shapely.geometry import Polygon, box, MultiPolygon
from sqlalchemy import create_engine
from sqlalchemy.types import to_instance, TypeEngine
from py_tools_ds.compression.decompress import decompress
from py_tools_ds.processing.progress_mon import ProgressBar
from ..options.config import GMS_config as CFG
from . import path_generator as PG
from .definition_dicts import proc_chain
# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)
# + misc.path_generator.get_path_metaCSV: (left out here in order to avoid circular dependencies)
__author__ = 'Daniel Scheffler'
......@@ -1420,161 +1413,3 @@ def archive_exists_on_fileserver(conn_DB, entityID):
exists = False
return exists
class USGS_FullMetaDB(object):
def __init__(self, path_root, root_url=''):
self.path_root = path_root
self.root_url = root_url
self.path_db = os.path.join(path_root, 'USGS_FullMetaDB.sql')
# self.dict_metaDB_URL = {
# 'Landsat-4-5_TM': [['1982-08-22_1989-12-31', root_url + 'LANDSAT_TM-1980-1989.csv'],
# ['1990-01-01_1999-12-31', root_url + 'LANDSAT_TM-1990-1999.csv'],
# ['2000-01-01_2009-12-31', root_url + 'LANDSAT_TM-2000-2009.csv'],
# ['2010-01-01_2012-05-05', root_url + 'LANDSAT_TM-2010-2012.csv']],
# 'Landsat-7_ETM+': [['1999-05-28_2003-05-31', root_url + 'LANDSAT_ETM.csv'],
# ['2003-06-01_today', root_url + 'LANDSAT_ETM_SLC_OFF.csv']],
# 'Landsat-8_OLI_TIRS': [['2013-04-11_today', root_url + 'LANDSAT_8.csv']]}
self.dict_metaDB_URL = {
# Pre-Collection
['1972-1983', root_url + 'LANDSAT_MSS1.csv.gz'],
['1982-1997, 2012-2013', root_url + 'LANDSAT_MSS2.csv.gz'],
# Collection 1
['1980-2012', root_url + 'LANDSAT_TM_C1.csv.gz'],
'Landsat-7_ETM+': # (combined SLC-on/off)
['1999-Present', root_url + 'LANDSAT_ETM_C1.csv.gz'],
'Landsat-8_OLI_TIRS': # (combined Pre- and On-WRS)
['2013_04-Present', root_url + 'LANDSAT_8_C1.csv.gz']}
def query(self, sqlquery, limit=None):
# type: (str, int) -> pd.DataFrame
connection = sqlite3.connect(self.path_db)
if connection is None:
print('database connection fault')
cursor = connection.cursor()
qres = cursor.fetchall() if not limit else cursor.fetchmany(limit) if limit > 1 else cursor.fetchone()
result = pd.DataFrame(qres)
if connection:
return result
def tables(self):
df = self.query("SELECT name FROM sqlite_master WHERE type='table';")
return df[0].tolist() if df.size > 0 else []
def _get_sensorcode(satellite, sensor, subsystem):
sensorcode = '%s_%s_%s' % (satellite, sensor, subsystem) if subsystem else '%s_%s' % (satellite, sensor)
return sensorcode if sensorcode not in ['Landsat-4_TM', 'Landsat-5_TM'] else 'Landsat-4-5_TM'
def update_all_tables(self):
for sat, sen, sub in [['Landsat-5', 'TM', ''], # also includes Landsat-4
['Landsat-7', 'ETM+', ''],
['Landsat-8', 'OLI_TIRS', '']]:
sensorcode = self._get_sensorcode(sat, sen, sub)
path_csv = self.update_metaCSV(sat, sen, sub)
self.CSVdatabase2SQLdatabase(path_csv, self.path_db, sensorcode)
def update_specific_sensor(self, dates2check, satellite, sensor, subsystem):
if not isinstance(dates2check, list):
dates2check = [dates2check]
date2check = max([datetime.strptime(date, '%Y-%m-%d') for date in dates2check])
tablename = self._get_sensorcode(satellite, sensor, subsystem)
if tablename not in self.tables or not os.path.isfile(self.path_db):
# recreate the table
path_csv = self.update_metaCSV(satellite, sensor, subsystem)
self.CSVdatabase2SQLdatabase(path_csv, tablename)
date_max = datetime.strptime(
self.query("Select max(acquisitionDate) FROM " + tablename)[0], '%Y-%m-%d')
if date2check > date_max:
path_csv = self.update_metaCSV(satellite, sensor, subsystem)
self.CSVdatabase2SQLdatabase(path_csv, tablename, last_updated=date_max)
def update_metaCSV(self, satellite, sensor, subsystem=None, force_redownload=False):
# type: (str, str, str, bool) -> str
# get sensorcode
sensorcode = self._get_sensorcode(satellite, sensor, subsystem)
assert sensorcode in list(self.dict_metaDB_URL.keys()), \
'The Download of a metadata database for %s is not yet supported.' % sensorcode
# create output directory if it not exists
if not os.path.isdir(self.path_root):
except OSError as e:
# occurrs if other workers try to create the dir. at the same time and dir already exists
if e.errno != 17:
daterange_tag, url = self.dict_metaDB_URL[sensorcode]
path_csv = os.path.join(self.path_root, 'FullMetaDB__%s__%s.csv'
% (sensorcode,'%Y-%m-%d')))
path_csv_gz = path_csv + '.gz'
if os.path.exists(path_csv) and not force_redownload:
print('Current metadata database for %s already exists.' % path_csv)
return path_csv
elif os.path.exists(path_csv_gz) and not force_redownload:
print('Found an already downloaded (current) metadata database for %s at %s. Unpacking CSV...'
% (sensorcode, path_csv_gz))
decompress(path_csv_gz, outputpath=path_csv, logger=getLogger('decompressor'))
if force_redownload:
[os.remove(p) for p in [path_csv_gz, path_csv] if os.path.exists(p)]
print('No %s %s metadata database found. Downloading..' % (satellite, sensor))
pBar = ProgressBar(prefix='Download progress: ')
def dlProgress(count, blockSize, totalSize):
percentage = int(count * blockSize * 100 / totalSize)
urlretrieve(url, path_csv_gz, reporthook=dlProgress)
decompress(path_csv_gz, outputpath=path_csv, logger=getLogger('decompressor'))
# remove .csv.gz file
if os.path.exists(path_csv_gz):
raise RuntimeError('Extraction failed ')
return path_csv
def CSVdatabase2SQLdatabase(self, pathCSV, tablename, last_updated=None):
print('Building %s metadata database...' % ' '.join(tablename.split('_')))
pd_dataframe = pd.read_csv(pathCSV)
connection = sqlite3.connect(self.path_db)
if last_updated is not None:
assert isinstance(last_updated, str) and len(last_updated) == 10,\
"Last_updated keyword must be a date string like YYYY-MM-DD"
pd_dataframe = pd_dataframe[pd_dataframe['acquisitionDate'] >= last_updated]
pd_dataframe.to_sql(tablename, connection, if_exists='append')
assert os.path.isfile(self.path_db), "Building of metadata database failed. Database still not found."
......@@ -182,14 +182,6 @@ def get_tempfile(ext=None, prefix=None, tgt_dir=None):
return path
def get_path_metaCSV(satellite, sensor):
sensorcode = '%s_%s' % (satellite, sensor)
sensorcode = sensorcode if sensorcode not in ['Landsat-4_TM', 'Landsat-5_TM'] else 'Landsat-4-5_TM'
path_metaCSV = glob.glob(os.path.join(CFG.path_db_meta, '%s_metaDB_*.csv' % sensorcode))
path_metaCSV = path_metaCSV[-1] if path_metaCSV != [] else []
return path_metaCSV if path_metaCSV != [] else 'metaCSV not found'
def get_path_cloud_class_obj(GMS_identifier, get_all=False):
"""Returns the absolute path of the the training data used by cloud classifier.
:param GMS_identifier:
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment