Commit 4e7a3dab authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Revised test_cli.py. Added database_tools.delete_record_in_postgreSQLdb().

Former-commit-id: 4ad99917
Former-commit-id: 52aa18f0
parent 96bd6e28
......@@ -342,6 +342,35 @@ def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeo
return newID
def delete_record_in_postgreSQLdb(conn_params, tablename, record_id, timeout=15000):
# type: (str, str, dict, dict, int) -> Union[int, str]
"""Delete a single record in a postgreSQL database.
:param conn_params: <str> connection parameters as provided by CFG.conn_params
:param tablename: <str> name of the table within the database to be updated
:param record_id: <dict> ID of the record to be deleted
:param timeout: <int> allows to set a custom statement timeout (milliseconds)
"""
conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
connection = psycopg2.connect(conn_params)
if connection is None:
warnings.warn('database connection fault')
return 'database connection fault'
cursor = connection.cursor()
execute_pgSQL_query(cursor, "DELETE FROM %s WHERE id=%s;" % (tablename, record_id))
execute_pgSQL_query(cursor, "SELECT id FROM %s WHERE id=%s" % (tablename, record_id))
res = cursor.fetchone()
if 'connection' in locals():
connection.commit()
connection.close()
return 'success' if res is None else 'fail'
def get_pgSQL_geospatial_query_cond(conn_params, table2query, geomCol2use='bounds', tgt_corners_lonlat=None,
scene_ID=None, queryfunc='ST_Intersects', crossing_dateline_check=True):
assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
......
......@@ -10,68 +10,68 @@ Tests for gms_preprocessing.bin.run_gms
import unittest
import os
from runpy import run_path
import warnings
from gms_preprocessing.misc.helper_functions import subcall_with_output
from gms_preprocessing import __path__
from gms_preprocessing.misc.database_tools import delete_record_in_postgreSQLdb
path_run_gms = os.path.abspath(os.path.join(__path__[0], '..', 'bin', 'run_gms.py'))
def exec_call(cmd):
out, exitcode, err = subcall_with_output(' '.join(cmd) if cmd is list else cmd)
if exitcode:
raise Exception(err.decode('latin-1'))
else:
if out:
return out.decode('UTF-8')
class Test_run_jobid(unittest.TestCase):
def setUp(self):
self.base_cmd = 'python %s jobid %d -DH geoms' % (path_run_gms, 26186261) # Landsat8_CollectionData
os.environ['GMS_IS_TEST'] = 'True'
class Base_CLITester:
class Base_CLITestCase(unittest.TestCase):
baseargs = []
def tearDown(self):
del os.environ['GMS_IS_TEST']
def setUp(self):
self.parser_run = run_path(path_run_gms)['get_gms_argparser']()
os.environ['GMS_IS_TEST'] = 'True'
def test_hostame(self):
exec_call(self.base_cmd)
def tearDown(self):
del os.environ['GMS_IS_TEST']
# delete the created test job in case the created subparser creates a new job
if self.baseargs[0] in ['sceneids', 'entityids', 'filenames']:
res = delete_record_in_postgreSQLdb(self.current_CFG.conn_database,
tablename='jobs', record_id=self.current_CFG.ID)
if res != 'success':
warnings.warn('Test job record could not be deleted from jobs table of postgreSQL database.')
class Test_run_sceneids(unittest.TestCase):
def setUp(self):
self.base_cmd = 'python %s sceneids %d -DH geoms' % (path_run_gms, 32259730) # LC81930292017233LGN00
os.environ['GMS_IS_TEST'] = 'True'
@property
def current_CFG(self):
from gms_preprocessing.options.config import GMS_config
return GMS_config
def tearDown(self):
del os.environ['GMS_IS_TEST']
####################################
# test that run for each subparser #
####################################
def test_no_kwargs(self):
exec_call(self.base_cmd)
def test_hostname_custom(self):
parsed_args = self.parser_run.parse_args(self.baseargs +
['--db_host', 'geoms'])
parsed_args.func(parsed_args)
self.assertEqual(self.current_CFG.db_host, 'geoms')
class Test_run_entityids(unittest.TestCase):
class Test_run_jobid(Base_CLITester.Base_CLITestCase):
def setUp(self):
self.base_cmd = 'python %s entityids %s -DH geoms' % (path_run_gms, 'LC81930292017233LGN00')
os.environ['GMS_IS_TEST'] = 'True'
super().setUp()
self.baseargs = ['jobid', str(26186261)] # Landsat8_CollectionData
def tearDown(self):
del os.environ['GMS_IS_TEST']
def test_no_kwargs(self):
exec_call(self.base_cmd)
class Test_run_sceneids(Base_CLITester.Base_CLITestCase):
def setUp(self):
super().setUp()
self.baseargs = ['sceneids', str(32259730)] # LC81930292017233LGN00
class Test_run_filenames(unittest.TestCase):
class Test_run_entityids(Base_CLITester.Base_CLITestCase):
def setUp(self):
self.base_cmd = 'python %s filenames %s -DH geoms' \
% (path_run_gms, 'LC08_L1TP_193029_20170821_20170911_01_T1.tar.gz')
os.environ['GMS_IS_TEST'] = 'True'
super().setUp()
self.baseargs = ['entityids', 'LC81930292017233LGN00']
def tearDown(self):
del os.environ['GMS_IS_TEST']
def test_no_kwargs(self):
exec_call(self.base_cmd)
class Test_run_filenames(Base_CLITester.Base_CLITestCase):
def setUp(self):
super().setUp()
self.baseargs = ['filenames', 'LC08_L1TP_193029_20170821_20170911_01_T1.tar.gz']
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment