Commit 7e371a75 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added database_tools.USGS_FullMetaDB(), based on previous functions.

parent 178e0273
Pipeline #1657 passed with stage
in 10 minutes and 58 seconds
...@@ -10,6 +10,8 @@ import traceback ...@@ -10,6 +10,8 @@ import traceback
import warnings import warnings
from datetime import datetime from datetime import datetime
from typing import Union # noqa F401 # flake8 issue from typing import Union # noqa F401 # flake8 issue
from logging import getLogger
from urllib.request import urlretrieve
import numpy as np import numpy as np
import pandas as pd import pandas as pd
...@@ -22,6 +24,9 @@ from shapely.geometry import Polygon, box, MultiPolygon ...@@ -22,6 +24,9 @@ from shapely.geometry import Polygon, box, MultiPolygon
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.types import to_instance, TypeEngine from sqlalchemy.types import to_instance, TypeEngine
from py_tools_ds.compression.decompress import decompress
from py_tools_ds.processing.progress_mon import ProgressBar
from ..options.config import GMS_config as CFG from ..options.config import GMS_config as CFG
from . import path_generator as PG from . import path_generator as PG
from .definition_dicts import proc_chain from .definition_dicts import proc_chain
...@@ -1384,110 +1389,6 @@ def postgreSQL_table_to_csv(conn_db, path_csv, tablename): ...@@ -1384,110 +1389,6 @@ def postgreSQL_table_to_csv(conn_db, path_csv, tablename):
raise NotImplementedError # TODO raise NotImplementedError # TODO
def download_current_ProviderMetaDB(satellite, sensor, subsystem, date, rebuild_sensorsDB=False):
sensorcode = '%s_%s_%s' % (satellite, sensor, subsystem) if subsystem not in ['', None] else \
'%s_%s' % (satellite, sensor)
sensorcode = sensorcode if sensorcode not in ['Landsat-4_TM', 'Landsat-5_TM'] else 'Landsat-4-5_TM'
if rebuild_sensorsDB:
print('No %s %s metadata database found. Downloading..' % (satellite, sensor))
else:
print('%s %s metadata database has to be updated. Downloading the newest version..' % (satellite, sensor))
rootURL = 'http://landsat.usgs.gov/metadata_service/bulk_metadata_files/'
dict_metaDB_URL = {
'Landsat-4-5_TM': [['1982-08-22_1989-12-31', rootURL + 'LANDSAT_TM-1980-1989.csv'],
['1990-01-01_1999-12-31', rootURL + 'LANDSAT_TM-1990-1999.csv'],
['2000-01-01_2009-12-31', rootURL + 'LANDSAT_TM-2000-2009.csv'],
['2010-01-01_2012-05-05', rootURL + 'LANDSAT_TM-2010-2012.csv']],
'Landsat-7_ETM+': [['1999-05-28_2003-05-31', rootURL + 'LANDSAT_ETM.csv'],
['2003-06-01_today', rootURL + 'LANDSAT_ETM_SLC_OFF.csv']],
'Landsat-8_OLI_TIRS': [['2013-04-11_today', rootURL + 'LANDSAT_8.csv']]}
assert sensorcode in list(dict_metaDB_URL.keys()), \
'The Download of a metadata database for %s is not yet supported.' % sensorcode
url_list = dict_metaDB_URL[sensorcode]
outdir = os.path.dirname(CFG.path_db_meta)
paths_downloaded_files = []
if not os.path.isdir(outdir):
try:
os.makedirs(os.path.dirname(outdir))
except OSError as e: # occurrs if other workers try to create the dir. at the same time and dir. already exists
if e.errno != 17:
raise
pass
if rebuild_sensorsDB:
for url in url_list:
daterange_tag, url = (url[0] if 'today' not in url[0] else
url[0].split('_')[0] + datetime.now().strftime('%Y-%m-%d')), url[1]
prefix = '%s_%s' % (satellite, sensor)
prefix = prefix if prefix not in ['Landsat-4_TM', 'Landsat-5_TM'] else 'Landsat-4-5_TM'
outpath = os.path.join(outdir, '%s_metaDB__%s.csv' % (prefix, daterange_tag))
# file = urllib.request.urlretrieve(url, outpath)
paths_downloaded_files.append(outpath)
else:
if sensorcode == 'Landsat-7_ETM+':
dr_url = url_list[1] if date >= datetime.strptime('2003-06-01', '%Y-%m-%d') else url_list[0]
elif sensorcode == 'Landsat-4-5_TM':
dr_url =\
url_list[0] if date <= datetime.strptime('1989-12-31', '%Y-%m-%d') else \
url_list[1] if date <= datetime.strptime('1999-12-31', '%Y-%m-%d') else \
url_list[2] if date <= datetime.strptime('2009-12-31', '%Y-%m-%d') else url_list[3]
else:
raise RuntimeError('Unexpected sensorcode.')
daterange_tag, url = dr_url[0], dr_url[1]
outpath = os.path.join(outdir, '%s_%s_metaDB__%s.csv' % (satellite, sensor, daterange_tag))
# file = urllib.request.urlretrieve(url, outpath)
paths_downloaded_files.append(outpath)
return paths_downloaded_files
def CSVdatabase2SQLdatabase(pathCSV, pathSQL, tablename, last_updated=None):
print('Building %s metadata database...' % ' '.join(tablename.split('_')))
pd_dataframe = pd.read_csv(pathCSV)
connection = sqlite3.connect(pathSQL)
if last_updated is not None:
assert isinstance(last_updated, str) and len(
last_updated) == 10, "last_updated keyword must be a date string like YYYY-MM-DD"
pd_dataframe = pd_dataframe[pd_dataframe['acquisitionDate'] >= last_updated]
pd_dataframe.to_sql(tablename, connection, if_exists='append')
connection.close()
assert os.path.isfile(CFG.path_db_meta), "Building of metadata database failed. Database still not found."
def update_metaDB_if_needed(satellite, sensor, subsystem, dates2check):
if not isinstance(dates2check, list):
dates2check = [dates2check]
date2check = max([datetime.strptime(date, '%Y-%m-%d') for date in dates2check])
tablename = '%s_%s_%s' % (satellite.replace('-', ''), sensor.replace('+', ''), subsystem) \
if subsystem not in ['', None] else '%s_%s' % (satellite.replace('-', ''), sensor.replace('+', ''))
tablename = tablename if tablename not in ['Landsat4_TM', 'Landsat5_TM'] else 'Landsat45_TM'
connection = sqlite3.connect(CFG.path_db_meta)
if connection is None:
return 'database connection fault'
cursor = connection.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
included_tables = [i[0] for i in cursor.fetchall()]
if tablename not in included_tables or not os.path.isfile(CFG.path_db_meta):
from .path_generator import get_path_metaCSV
path_csv = get_path_metaCSV(satellite, sensor)
if path_csv == 'metaCSV not found':
paths_CSVs = download_current_ProviderMetaDB(satellite, sensor, subsystem, date2check,
rebuild_sensorsDB=True)
else:
paths_CSVs = glob.glob(os.path.join(CFG.path_db_meta, '%s_%s_metaDB_*.db' % (satellite, sensor)))
for path_csv in paths_CSVs:
CSVdatabase2SQLdatabase(path_csv, CFG.path_db_meta, tablename, last_updated=None)
else:
cursor.execute("Select max(acquisitionDate) FROM " + tablename)
date_max = datetime.strptime(cursor.fetchone()[0], '%Y-%m-%d')
if date2check > date_max:
path_csv = download_current_ProviderMetaDB(satellite, sensor, subsystem, date2check)[0]
CSVdatabase2SQLdatabase(path_csv, CFG.path_db_meta, tablename, last_updated=date_max)
# pd_dataframe[pd_dataframe['acquisitionDate'] > '2003-05-28']
# pd_dataframe['acquisitionDate'].max()
# combined.to_sql("cps_raw.cps_basic_tabulation", engine, if_exists='append')
def archive_exists_on_fileserver(conn_DB, entityID): def archive_exists_on_fileserver(conn_DB, entityID):
# type: (str,str) -> bool # type: (str,str) -> bool
"""Queries the postgreSQL database for the archive filename of the given entity ID and checks if the """Queries the postgreSQL database for the archive filename of the given entity ID and checks if the
...@@ -1519,3 +1420,161 @@ def archive_exists_on_fileserver(conn_DB, entityID): ...@@ -1519,3 +1420,161 @@ def archive_exists_on_fileserver(conn_DB, entityID):
exists = False exists = False
return exists return exists
class USGS_FullMetaDB(object):
def __init__(self, path_root, root_url='https://landsat.usgs.gov/landsat/metadata_service/bulk_metadata_files/'):
self.path_root = path_root
self.root_url = root_url
self.path_db = os.path.join(path_root, 'USGS_FullMetaDB.sql')
# self.dict_metaDB_URL = {
# 'Landsat-4-5_TM': [['1982-08-22_1989-12-31', root_url + 'LANDSAT_TM-1980-1989.csv'],
# ['1990-01-01_1999-12-31', root_url + 'LANDSAT_TM-1990-1999.csv'],
# ['2000-01-01_2009-12-31', root_url + 'LANDSAT_TM-2000-2009.csv'],
# ['2010-01-01_2012-05-05', root_url + 'LANDSAT_TM-2010-2012.csv']],
# 'Landsat-7_ETM+': [['1999-05-28_2003-05-31', root_url + 'LANDSAT_ETM.csv'],
# ['2003-06-01_today', root_url + 'LANDSAT_ETM_SLC_OFF.csv']],
# 'Landsat-8_OLI_TIRS': [['2013-04-11_today', root_url + 'LANDSAT_8.csv']]}
self.dict_metaDB_URL = {
# Pre-Collection
'Landsat-1-3_MSS':
['1972-1983', root_url + 'LANDSAT_MSS1.csv.gz'],
'Landsat-4-5_MSS':
['1982-1997, 2012-2013', root_url + 'LANDSAT_MSS2.csv.gz'],
# Collection 1
'Landsat-4-5_TM':
['1980-2012', root_url + 'LANDSAT_TM_C1.csv.gz'],
'Landsat-7_ETM+': # (combined SLC-on/off)
['1999-Present', root_url + 'LANDSAT_ETM_C1.csv.gz'],
'Landsat-8_OLI_TIRS': # (combined Pre- and On-WRS)
['2013_04-Present', root_url + 'LANDSAT_8_C1.csv.gz']}
def query(self, sqlquery, limit=None):
# type: (str, int) -> pd.DataFrame
connection = sqlite3.connect(self.path_db)
if connection is None:
print('database connection fault')
cursor = connection.cursor()
cursor.execute(sqlquery)
qres = cursor.fetchall() if not limit else cursor.fetchmany(limit) if limit > 1 else cursor.fetchone()
result = pd.DataFrame(qres)
if connection:
cursor.close()
connection.close()
return result
@property
def tables(self):
df = self.query("SELECT name FROM sqlite_master WHERE type='table';")
return df[0].tolist() if df.size > 0 else []
@staticmethod
def _get_sensorcode(satellite, sensor, subsystem):
sensorcode = '%s_%s_%s' % (satellite, sensor, subsystem) if subsystem else '%s_%s' % (satellite, sensor)
return sensorcode if sensorcode not in ['Landsat-4_TM', 'Landsat-5_TM'] else 'Landsat-4-5_TM'
def update_all_tables(self):
for sat, sen, sub in [['Landsat-5', 'TM', ''], # also includes Landsat-4
['Landsat-7', 'ETM+', ''],
['Landsat-8', 'OLI_TIRS', '']]:
sensorcode = self._get_sensorcode(sat, sen, sub)
path_csv = self.update_metaCSV(sat, sen, sub)
self.CSVdatabase2SQLdatabase(path_csv, self.path_db, sensorcode)
def update_specific_sensor(self, dates2check, satellite, sensor, subsystem):
if not isinstance(dates2check, list):
dates2check = [dates2check]
date2check = max([datetime.strptime(date, '%Y-%m-%d') for date in dates2check])
tablename = self._get_sensorcode(satellite, sensor, subsystem)
if tablename not in self.tables or not os.path.isfile(self.path_db):
# recreate the table
path_csv = self.update_metaCSV(satellite, sensor, subsystem)
self.CSVdatabase2SQLdatabase(path_csv, tablename)
else:
date_max = datetime.strptime(
self.query("Select max(acquisitionDate) FROM " + tablename)[0], '%Y-%m-%d')
if date2check > date_max:
path_csv = self.update_metaCSV(satellite, sensor, subsystem)
self.CSVdatabase2SQLdatabase(path_csv, tablename, last_updated=date_max)
def update_metaCSV(self, satellite, sensor, subsystem=None, force_redownload=False):
# type: (str, str, str, bool) -> str
# get sensorcode
sensorcode = self._get_sensorcode(satellite, sensor, subsystem)
assert sensorcode in list(self.dict_metaDB_URL.keys()), \
'The Download of a metadata database for %s is not yet supported.' % sensorcode
# create output directory if it not exists
if not os.path.isdir(self.path_root):
try:
os.makedirs(os.path.dirname(self.path_root))
except OSError as e:
# occurrs if other workers try to create the dir. at the same time and dir already exists
if e.errno != 17:
raise
daterange_tag, url = self.dict_metaDB_URL[sensorcode]
path_csv = os.path.join(self.path_root, 'FullMetaDB__%s__%s.csv'
% (sensorcode, datetime.now().strftime('%Y-%m-%d')))
path_csv_gz = path_csv + '.gz'
if os.path.exists(path_csv) and not force_redownload:
print('Current metadata database for %s already exists.' % path_csv)
return path_csv
elif os.path.exists(path_csv_gz) and not force_redownload:
print('Found an already downloaded (current) metadata database for %s at %s. Unpacking CSV...'
% (sensorcode, path_csv_gz))
decompress(path_csv_gz, outputpath=path_csv, logger=getLogger('decompressor'))
else:
if force_redownload:
[os.remove(p) for p in [path_csv_gz, path_csv] if os.path.exists(p)]
else:
print('No %s %s metadata database found. Downloading..' % (satellite, sensor))
pBar = ProgressBar(prefix='Download progress: ')
def dlProgress(count, blockSize, totalSize):
percentage = int(count * blockSize * 100 / totalSize)
pBar.print_progress(percentage)
urlretrieve(url, path_csv_gz, reporthook=dlProgress)
decompress(path_csv_gz, outputpath=path_csv, logger=getLogger('decompressor'))
# remove .csv.gz file
if os.path.exists(path_csv_gz):
os.remove(path_csv_gz)
else:
raise RuntimeError('Extraction failed ')
return path_csv
def CSVdatabase2SQLdatabase(self, pathCSV, tablename, last_updated=None):
print('Building %s metadata database...' % ' '.join(tablename.split('_')))
pd_dataframe = pd.read_csv(pathCSV)
connection = sqlite3.connect(self.path_db)
if last_updated is not None:
assert isinstance(last_updated, str) and len(last_updated) == 10,\
"Last_updated keyword must be a date string like YYYY-MM-DD"
pd_dataframe = pd_dataframe[pd_dataframe['acquisitionDate'] >= last_updated]
pd_dataframe.to_sql(tablename, connection, if_exists='append')
connection.close()
assert os.path.isfile(self.path_db), "Building of metadata database failed. Database still not found."
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment