Commit 50bf3772 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

- Revised config.Job -> now features an own configuration for test mode....

- Revised config.Job -> now features an own configuration for test mode. Passing arguments from outside is now much easier.
- Refactored some attributes of config.Job.
- added cloud classifiers for the included test data
- misc.exceptions: added GMSConfigParameterError
- misc.path_generator: revised get_path_cloud_class_obj(): merged subfolders for cloud classifiers on disk
- processing.pipeline: refactored 'exec__...' to 'exec_...'
- tests.test_geomultisens: removed superfluous paths configs
- removed cloud_classifiers from .gitignore
Former-commit-id: 86ded16f
Former-commit-id: 021aa530
parent 16d07c5c
......@@ -67,7 +67,6 @@ target/
.idea/
BAK/
OLD/
geomultisens/database/cloud_classifier/
tests/data/output_mgrs_tiles/
tests/data/output_scenes/
tests/data/sampledata/
......
......@@ -688,7 +688,6 @@
{
"ename": "AssertionError",
"evalue": "Mixed data types in postgreSQL matching expressions are not supported. Got [].",
"output_type": "error",
"traceback": [
"\u001b[1;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[1;31mAssertionError\u001b[0m Traceback (most recent call last)",
......@@ -699,7 +698,8 @@
"\u001b[1;32m/home/gfz-fe/GeoMultiSens/misc/database_tools.py\u001b[0m in \u001b[0;36m<listcomp>\u001b[1;34m(.0)\u001b[0m\n\u001b[0;32m 110\u001b[0m \u001b[1;32mreturn\u001b[0m \u001b[1;34m'database connection fault'\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 111\u001b[0m \u001b[0mcursor\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mconnection\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcursor\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 112\u001b[1;33m \u001b[0mcondition\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;34m\"WHERE \"\u001b[0m \u001b[1;33m+\u001b[0m \u001b[1;34m\" AND \"\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mjoin\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m[\u001b[0m\u001b[0mget_postgreSQL_matchingExp\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mk\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mv\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mk\u001b[0m\u001b[1;33m,\u001b[0m\u001b[0mv\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mcond_dict\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mitems\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 113\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0mcond_dict\u001b[0m \u001b[1;32melse\u001b[0m \u001b[1;34m\"\"\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 114\u001b[0m \u001b[0mcmd\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;34m\"SELECT \"\u001b[0m \u001b[1;33m+\u001b[0m \u001b[1;34m','\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mjoin\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mvals2return\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;33m+\u001b[0m \u001b[1;34m\" FROM \"\u001b[0m \u001b[1;33m+\u001b[0m \u001b[0mtablename\u001b[0m \u001b[1;33m+\u001b[0m \u001b[1;34m\" \"\u001b[0m \u001b[1;33m+\u001b[0m \u001b[0mcondition\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;32m/home/gfz-fe/GeoMultiSens/misc/database_tools.py\u001b[0m in \u001b[0;36mget_postgreSQL_matchingExp\u001b[1;34m(key, value)\u001b[0m\n\u001b[0;32m 83\u001b[0m \u001b[0mdTypes_in_value\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlist\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mset\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m[\u001b[0m\u001b[0mtype\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mi\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mi\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 84\u001b[0m \u001b[1;32massert\u001b[0m \u001b[0mlen\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mdTypes_in_value\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m==\u001b[0m\u001b[1;36m1\u001b[0m\u001b[1;33m,\u001b[0m\u001b[0;31m\\\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 85\u001b[1;33m \u001b[1;34m'Mixed data types in postgreSQL matching expressions are not supported. Got %s.'\u001b[0m \u001b[1;33m%\u001b[0m\u001b[0mdTypes_in_value\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 86\u001b[0m \u001b[1;32massert\u001b[0m \u001b[0mdTypes_in_value\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m]\u001b[0m \u001b[1;32min\u001b[0m \u001b[1;33m[\u001b[0m\u001b[0mint\u001b[0m\u001b[1;33m,\u001b[0m\u001b[0mstr\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 87\u001b[0m \u001b[0mpgList\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;34m\",\"\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mjoin\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;34m\"'%s'\"\u001b[0m \u001b[1;33m%\u001b[0m\u001b[0mi\u001b[0m \u001b[1;32mif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mvalue\u001b[0m\u001b[1;33m[\u001b[0m\u001b[1;36m0\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m,\u001b[0m\u001b[0mstr\u001b[0m\u001b[1;33m)\u001b[0m \u001b[1;32melse\u001b[0m \u001b[1;34m\"%s\"\u001b[0m \u001b[1;33m%\u001b[0m\u001b[0mi\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0mi\u001b[0m \u001b[1;32min\u001b[0m \u001b[0mvalue\u001b[0m\u001b[1;33m]\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;31mAssertionError\u001b[0m: Mixed data types in postgreSQL matching expressions are not supported. Got []."
]
],
"output_type": "error"
}
],
"source": [
......@@ -1500,8 +1500,8 @@
"start_time": "2016-09-10T01:29:51.279093"
},
"code_folding": [
0,
23
0.0,
23.0
],
"collapsed": false,
"hidden": true,
......@@ -1529,7 +1529,7 @@
" L1A_obj.calc_orbit_overpassParams() # requires corner positions\n",
" L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds',0)\n",
" L1A_obj.MetaObj2ODict()\n",
" if job.exec__L1AP[1]:\n",
" if job.exec_L1AP[1]:\n",
" OUT_W.Obj2ENVI(L1A_obj)\n",
" L1A_obj.delete_tempFiles()\n",
" else:\n",
......@@ -1566,7 +1566,7 @@
" L1A_obj.calc_orbit_overpassParams() # requires corner positions\n",
" L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds',0)\n",
" L1A_obj.MetaObj2ODict() # requires Meta dict\n",
" if job.exec__L1AP[1]:\n",
" if job.exec_L1AP[1]:\n",
" OUT_W.Obj2ENVI(L1A_obj)\n",
" L1A_obj.delete_tempFiles()\n",
" else:\n",
......@@ -1587,7 +1587,7 @@
"\n",
" \"\"\"2. get L1B object with attribute coreg_info\"\"\"\n",
" L1B_obj = L1B_P.L1B_object(L1A_obj,COREG_obj)\n",
" if job.exec__L1BP[1]:\n",
" if job.exec_L1BP[1]:\n",
" OUT_W.Obj2ENVI(L1B_obj)\n",
" L1B_obj.delete_tempFiles()\n",
" return L1B_obj\n",
......@@ -1599,7 +1599,7 @@
" L1C_obj.get_lonlat_coord_array()\n",
" L1C_obj.calc_acquisition_illumination_geometry()\n",
" L1C_obj.atm_corr()\n",
" if job.exec__L1CP[1]:\n",
" if job.exec_L1CP[1]:\n",
" OUT_W.Obj2ENVI(L1C_obj)\n",
" if L1C_obj.arr_shape=='cube':\n",
" L1C_obj.delete_tempFiles()\n",
......@@ -1623,7 +1623,7 @@
" L2A_obj.calc_mask_nodata() # update no data mask\n",
" L2A_obj.mask_clouds = L2A_obj.masks[:, :, 1]\n",
" L2A_obj.calc_corner_positions() # update corner coordinates\n",
" if job.exec__L2AP[1]:\n",
" if job.exec_L2AP[1]:\n",
" OUT_W.Obj2ENVI(L2A_obj)\n",
" L2A_obj.delete_tempFiles()\n",
" L2A_tiles = HLP_F.cut_GMS_obj_into_blocks((L2A_obj, [2048, 2048]))\n",
......@@ -1646,7 +1646,7 @@
" L2A_obj.calc_mask_nodata() # update no data mask\n",
" #L2A_obj.calc_cloud_mask() # update cloud mask # FIXME expects TOA-Ref but will get BOA-Ref -> is that a problem? Andre?\n",
" L2A_obj.calc_corner_positions() # update corner coordinates\n",
" if job.exec__L2AP[1]:\n",
" if job.exec_L2AP[1]:\n",
" OUT_W.Obj2ENVI(L2A_obj)\n",
" L2A_obj.delete_tempFiles()\n",
" L2A_tiles = HLP_F.cut_GMS_obj_into_blocks((L2A_obj,[2048,2048]))\n",
......@@ -1657,7 +1657,7 @@
" # type: (L2A_P.L2A_object) -> L2B_P.L2B_object\n",
" L2B_obj = L2B_P.L2B_object(L2A_obj)\n",
" L2B_obj.spectral_homogenization()\n",
" if job.exec__L2BP[1]:\n",
" if job.exec_L2BP[1]:\n",
" OUT_W.Obj2ENVI(L2B_obj)\n",
" if L2B_obj.arr_shape=='cube':\n",
" L2B_obj.delete_tempFiles()\n",
......@@ -1667,7 +1667,7 @@
"def L2C_map_1(L2B_obj):\n",
" # type: (L2B_P.L2B_object) -> L2C_P.L2C_object\n",
" L2C_obj = L2C_P.L2C_object(L2B_obj)\n",
" if job.exec__L2BP[1]:\n",
" if job.exec_L2BP[1]:\n",
" L2C_MRGS_tiles = HLP_F.cut_GMS_obj_into_MGRS_tiles(L2C_obj, pixbuffer=10)\n",
" [OUT_W.Obj2ENVI(MGRS_tile,compression=False) for MGRS_tile in L2C_MRGS_tiles]\n",
" L2C_obj.delete_tempFiles()\n",
......@@ -1689,11 +1689,11 @@
"start_time": "2016-09-09T22:32:09.810Z"
},
"code_folding": [
84,
145,
204,
243,
271
84.0,
145.0,
204.0,
243.0,
271.0
],
"collapsed": false,
"run_control": {
......@@ -2043,7 +2043,7 @@
"\n",
"\n",
"L1A_newObjects = []\n",
"if job.exec__L1AP[0]:\n",
"if job.exec_L1AP[0]:\n",
" if parallelization_level == 'scenes':\n",
" # map\n",
" with multiprocessing.Pool(12) as pool:\n",
......@@ -2076,7 +2076,7 @@
" if not L1B_already_present(dataset) and not is_inMEM(L1A_newObjects+failed_objects,dataset)]\n",
"\n",
"L1B_newObjects = []\n",
"if job.exec__L1BP[0]:\n",
"if job.exec_L1BP[0]:\n",
" # run on full cubes\n",
" # get earlier processed L1A data\n",
" GMSfile_list_L1A_inDB = INP_R.get_list_GMSfiles(datalist_L1B_P, 'L1A')\n",
......@@ -2101,7 +2101,7 @@
" if not L1C_already_present(dataset) and not is_inMEM(L1B_newObjects+failed_objects,dataset)]\n",
"\n",
"L1C_newObjects = []\n",
"if job.exec__L1CP[0]:\n",
"if job.exec_L1CP[0]:\n",
" GMSfile_list_L1B_inDB = INP_R.get_list_GMSfiles(datalist_L1C_P, 'L1B')\n",
" if parallelization_level == 'scenes':\n",
" work = [[GMS, ['cube', None]] for GMS in GMSfile_list_L1B_inDB]\n",
......@@ -2153,7 +2153,7 @@
" if not L2A_already_present(dataset) and not is_inMEM(L1C_newObjects+failed_objects,dataset)]\n",
"\n",
"L2A_tiles = []\n",
"if job.exec__L2AP[0]:\n",
"if job.exec_L2AP[0]:\n",
" # get earlier processed L1C data\n",
" GMSfile_list_L1C_inDB = INP_R.get_list_GMSfiles(datalist_L2A_P, 'L1C')\n",
" work = [[GMS, ['cube', None]] for GMS in GMSfile_list_L1C_inDB]\n",
......@@ -2199,7 +2199,7 @@
" if not L2B_already_present(dataset) and not is_inMEM(L2A_tiles+failed_objects,dataset)]\n",
"\n",
"L2B_newObjects = []\n",
"if job.exec__L2BP[0]:\n",
"if job.exec_L2BP[0]:\n",
" # get earlier processed L2A data\n",
" GMSfile_list_L2A_inDB = INP_R.get_list_GMSfiles(datalist_L2B_P, 'L2A')\n",
"\n",
......@@ -2260,7 +2260,7 @@
"datalist_L2C_P = [dataset for dataset in usecase_data_list\n",
" if not L2C_already_present(dataset) and not is_inMEM(L2B_newObjects+failed_objects,dataset)]\n",
"L2C_newObjects = []\n",
"if job.exec__L2CP[0]:\n",
"if job.exec_L2CP[0]:\n",
" # get earlier processed L2B data\n",
" GMSfile_list_L2B_inDB = INP_R.get_list_GMSfiles(datalist_L2C_P, 'L2B')\n",
"\n",
......@@ -3248,7 +3248,6 @@
{
"ename": "KeyboardInterrupt",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)",
......@@ -3268,7 +3267,8 @@
"\u001b[0;32m/home/gms/python_gms/python/lib/python3.5/site-packages/ipykernel/kernelbase.py\u001b[0m in \u001b[0;36mraw_input\u001b[0;34m(self, prompt)\u001b[0m\n\u001b[1;32m 687\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_parent_ident\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 688\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_parent_header\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 689\u001b[0;31m \u001b[0mpassword\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mFalse\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 690\u001b[0m )\n\u001b[1;32m 691\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/home/gms/python_gms/python/lib/python3.5/site-packages/ipykernel/kernelbase.py\u001b[0m in \u001b[0;36m_input_request\u001b[0;34m(self, prompt, ident, parent, password)\u001b[0m\n\u001b[1;32m 717\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mKeyboardInterrupt\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 718\u001b[0m \u001b[0;31m# re-raise KeyboardInterrupt, to truncate traceback\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 719\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mKeyboardInterrupt\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 720\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 721\u001b[0m \u001b[0;32mbreak\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mKeyboardInterrupt\u001b[0m: "
]
],
"output_type": "error"
}
],
"source": [
......@@ -3534,7 +3534,7 @@
"navigate_menu": true,
"number_sections": true,
"sideBar": true,
"threshold": 6,
"threshold": 6.0,
"toc_cell": false,
"toc_section_display": "block",
"toc_window_display": true
......
......@@ -1154,7 +1154,7 @@
" L1A_obj.calc_orbit_overpassParams() # requires corner positions\n",
" L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds',0)\n",
" L1A_obj.MetaObj2ODict()\n",
" if job.exec__L1AP[1]:\n",
" if job.exec_L1AP[1]:\n",
" OUT_W.Obj2ENVI(L1A_obj)\n",
" L1A_obj.delete_tempFiles()\n",
" else:\n",
......@@ -1189,7 +1189,7 @@
" L1A_obj.calc_orbit_overpassParams() # requires corner positions\n",
" L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds',0)\n",
" L1A_obj.MetaObj2ODict() # requires Meta dict\n",
" if job.exec__L1AP[1]:\n",
" if job.exec_L1AP[1]:\n",
" OUT_W.Obj2ENVI(L1A_obj)\n",
" L1A_obj.delete_tempFiles()\n",
" else:\n",
......@@ -1209,7 +1209,7 @@
"\n",
" \"\"\"2. get L1B object with attribute coreg_info\"\"\"\n",
" L1B_obj = L1B_P.L1B_object(L1A_obj,COREG_obj)\n",
" if job.exec__L1BP[1]:\n",
" if job.exec_L1BP[1]:\n",
" OUT_W.Obj2ENVI(L1B_obj)\n",
" L1B_obj.delete_tempFiles()\n",
" return L1B_obj\n",
......@@ -1220,7 +1220,7 @@
" L1C_obj.get_lonlat_coord_array()\n",
" L1C_obj.calc_acquisition_illumination_geometry()\n",
" L1C_obj.atm_corr()\n",
" if job.exec__L1CP[1]:\n",
" if job.exec_L1CP[1]:\n",
" OUT_W.Obj2ENVI(L1C_obj)\n",
" if L1C_obj.arr_shape=='cube':\n",
" L1C_obj.delete_tempFiles()\n",
......@@ -1242,7 +1242,7 @@
" L2A_obj.calc_mask_nodata() # update no data mask\n",
" L2A_obj.mask_clouds = L2A_obj.masks[:,:,1] \n",
" L2A_obj.calc_corner_positions() # update corner coordinates\n",
" if job.exec__L2AP[1]:\n",
" if job.exec_L2AP[1]:\n",
" OUT_W.Obj2ENVI(L2A_obj)\n",
" L2A_obj.delete_tempFiles()\n",
" L2A_tiles = HLP_F.cut_GMS_obj_into_blocks((L2A_obj,[2048,2048]))\n",
......@@ -1263,7 +1263,7 @@
" L2A_obj.calc_mask_nodata() # update no data mask\n",
" #L2A_obj.calc_cloud_mask() # update cloud mask # FIXME expects TOA-Ref but will get BOA-Ref -> is that a problem? Andre?\n",
" L2A_obj.calc_corner_positions() # update corner coordinates\n",
" if job.exec__L2AP[1]:\n",
" if job.exec_L2AP[1]:\n",
" OUT_W.Obj2ENVI(L2A_obj)\n",
" L2A_obj.delete_tempFiles()\n",
" L2A_tiles = HLP_F.cut_GMS_obj_into_blocks((L2A_obj,[2048,2048]))\n",
......@@ -1273,7 +1273,7 @@
" # type: (L2A_P.L2A_object) -> L2B_P.L2B_object\n",
" L2B_obj = L2B_P.L2B_object(L2A_obj)\n",
" L2B_obj.spectral_homogenization()\n",
" if job.exec__L2BP[1]:\n",
" if job.exec_L2BP[1]:\n",
" OUT_W.Obj2ENVI(L2B_obj)\n",
" if L2B_obj.arr_shape=='cube':\n",
" L2B_obj.delete_tempFiles()\n",
......@@ -1282,7 +1282,7 @@
"def L2C_map_1(L2B_obj):\n",
" # type: (L2B_P.L2B_object) -> L2C_P.L2C_object\n",
" L2C_obj = L2C_P.L2C_object(L2B_obj)\n",
" if job.exec__L2BP[1]:\n",
" if job.exec_L2BP[1]:\n",
" L2C_MRGS_tiles = HLP_F.cut_GMS_obj_into_MGRS_tiles(L2C_obj, pixbuffer=10)\n",
" [OUT_W.Obj2ENVI(MGRS_tile,compression=False) for MGRS_tile in L2C_MRGS_tiles]\n",
" L2C_obj.delete_tempFiles()\n",
......@@ -1354,7 +1354,7 @@
"\n",
"\n",
"L1A_newObjects = []\n",
"if job.exec__L1AP[0]:\n",
"if job.exec_L1AP[0]:\n",
" if parallelization_level == 'scenes':\n",
" # map\n",
" with multiprocessing.Pool() as pool:\n",
......@@ -1381,7 +1381,7 @@
" if not L1B_already_present(dataset) and dataset not in datalist_L1A_P]\n",
"\n",
"L1B_newObjects = []\n",
"if job.exec__L1BP[0]:\n",
"if job.exec_L1BP[0]:\n",
" # run on full cubes\n",
" # get earlier processed L1A data\n",
" GMSfile_list_L1A_inDB = INP_R.get_list_GMSfiles([datalist_L1B_P, 'L1A'])\n",
......@@ -1401,7 +1401,7 @@
" if not L1C_already_present(dataset) and dataset not in datalist_L1B_P]\n",
"\n",
"L1C_newObjects = []\n",
"if job.exec__L1CP[0]:\n",
"if job.exec_L1CP[0]:\n",
" GMSfile_list_L1B_inDB = INP_R.get_list_GMSfiles([datalist_L1C_P, 'L1B'])\n",
" if parallelization_level == 'scenes':\n",
" work = [[GMS, ['cube', None]] for GMS in GMSfile_list_L1B_inDB]\n",
......@@ -1447,7 +1447,7 @@
" if not L2A_already_present(dataset) and dataset not in datalist_L1C_P]\n",
"\n",
"L2A_tiles = []\n",
"if job.exec__L2AP[0]:\n",
"if job.exec_L2AP[0]:\n",
" # get earlier processed L1C data\n",
" GMSfile_list_L1C_inDB = INP_R.get_list_GMSfiles([datalist_L2A_P, 'L1C'])\n",
" work = [[GMS, ['cube', None]] for GMS in GMSfile_list_L1C_inDB]\n",
......@@ -1486,7 +1486,7 @@
" if not L2B_already_present(dataset) and dataset not in datalist_L2A_P]\n",
"\n",
"L2B_newObjects = []\n",
"if job.exec__L2BP[0]:\n",
"if job.exec_L2BP[0]:\n",
" # get earlier processed L2A data\n",
" GMSfile_list_L2A_inDB = INP_R.get_list_GMSfiles([datalist_L2B_P, 'L2A'])\n",
"\n",
......@@ -1537,7 +1537,7 @@
"datalist_L2C_P = [dataset for dataset in usecase_data_list\n",
" if not L2C_already_present(dataset) and dataset not in datalist_L2B_P]\n",
"L2C_newObjects = []\n",
"if job.exec__L2CP[0]:\n",
"if job.exec_L2CP[0]:\n",
" # get earlier processed L2B data\n",
" GMSfile_list_L2B_inDB = INP_R.get_list_GMSfiles([datalist_L2C_P, 'L2B'])\n",
" \n",
......@@ -2111,4 +2111,4 @@
},
"nbformat": 4,
"nbformat_minor": 0
}
\ No newline at end of file
}
......@@ -738,7 +738,7 @@ class L1A_object(GMS_object):
self.GMS_identifier['logger'] = self.logger
if not CFG.job.bench_CLD_class:
if not CFG.job.bench_cloudMask:
self.path_cloud_class_obj = PG.get_path_cloud_class_obj(self.GMS_identifier)
CLD_obj = CLD_P.GmsCloudClassifier(classifier=self.path_cloud_class_obj)
assert CLD_obj, 'Error loading cloud classifier.'
......
......@@ -665,7 +665,7 @@ class AtmCorr(object):
'creation of the DEM corresponding to scene %s (entity ID: %s). Error message was: '
'\n%s\n' % (self.inObjs[0].scene_ID, self.inObjs[0].entity_ID, repr(e)))
self.logger.info("Print traceback in case you care:")
self.logger.info(traceback.format_exc())
self.logger.warning(traceback.format_exc())
return dem
......@@ -712,9 +712,8 @@ class AtmCorr(object):
cm_array = CMC.cloud_mask_array
cm_legend = CMC.cloud_mask_legend
except Exception as err:
print('\nAn error occurred during FMASK cloud masking.')
print("Print traceback in case you care:")
print(traceback.format_exc())
self.logger.error('\nAn error occurred during FMASK cloud masking. Error message was: ')
self.logger.error(traceback.format_exc())
return None
else:
......@@ -815,7 +814,6 @@ class AtmCorr(object):
self.logger.error('\nAn error occurred during atmospheric correction. BE AWARE THAT THE SCENE %s '
'(ENTITY ID %s) HAS NOT BEEN ATMOSPHERICALLY CORRECTED! Error message was: \n%s\n'
% (self.inObjs[0].scene_ID, self.inObjs[0].entity_ID, repr(e)))
self.logger.error("Print traceback in case you care:")
self.logger.error(traceback.format_exc())
# TODO include that in the job summary
......
......@@ -15,6 +15,7 @@ import psycopg2
import psycopg2.extras
from collections import OrderedDict
import multiprocessing
from inspect import getargvalues, stack, getfullargspec, signature, _empty
def set_config(call_type, job_ID, exec_mode='Python', db_host='localhost', reset=False, job_kwargs=None):
......@@ -72,12 +73,12 @@ GMS_config = GMS_configuration()
class Job:
class Job(object):
def __init__(self, call_type, ID, exec_mode='Python', db_host='localhost', exec_L1AP=None, exec_L1BP=None,
exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None, sub_multiProc=True,
exc_handler=True, log_level='INFO', blocksize=(2048,2048), is_test=False, profiling=False,
bench_all=False, bench_cloudMask=False, path_procdata_scenes=None, path_procdata_MGRS=None,
path_archive=None):
exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None, allow_subMultiprocessing=True,
disable_exception_handler=True, log_level='INFO', tiling_block_size_XY=(2048,2048), is_test=False,
profiling=False, benchmark_global=False, bench_cloudMask=False, path_procdata_scenes=None,
path_procdata_MGRS=None, path_archive=None):
"""Create a job configuration
......@@ -93,23 +94,33 @@ class Job:
:param exec_L2BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L2CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param CPUs: number of CPU cores to be used for processing (default: None -> use all available)
:param sub_multiProc: allow multiprocessing within workers
:param exc_handler: enable/disable automatic handling of unexpected exceptions (default: True -> enabled)
:param allow_subMultiprocessing:
allow multiprocessing within workers
:param disable_exception_handler:
enable/disable automatic handling of unexpected exceptions (default: True -> enabled)
:param log_level: the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';
default: 'INFO')
:param blocksize: X/Y block size to be used for any tiling process (default: (2048,2048)
:param tiling_block_size_XY:
X/Y block size to be used for any tiling process (default: (2048,2048)
:param is_test: whether the curent job represents a test job (run by a test runner) or not (default=False)
:param profiling: enable/disable code profiling (default: False)
:param bench_all: enable/disable benchmark of the whole processing pipeline
:param benchmark_global:
enable/disable benchmark of the whole processing pipeline
:param bench_cloudMask: enable/disable benchmark of the of the cloud mask generator module
:param path_procdata_scenes: output path to store processed scenes
:param path_procdata_MGRS: output path to store processed MGRS tiles
:param path_procdata_scenes:
output path to store processed scenes
:param path_procdata_MGRS:
output path to store processed MGRS tiles
:param path_archive: input path where downloaded data are stored
"""
# private attributes
self._DB_config = None
self._DB_job_record = None
## args
self.ID = ID
self.call_type = call_type
self.call_type = call_type # FIXME deprecated
self.exec_mode = exec_mode
assert exec_mode in ['Flink','Python']
self.db_host = db_host
......@@ -117,23 +128,23 @@ class Job:
## kwargs
# processor configuration: [run processor, write output, delete output if not needed anymore]
self.exec__L1AP = [1, 1, 1] if not exec_L1AP else exec_L1AP
self.exec__L1BP = [1, 1, 1] if not exec_L1BP else exec_L1BP
self.exec__L1CP = [1, 1, 1] if not exec_L1CP else exec_L1CP
self.exec__L2AP = [1, 1, 1] if not exec_L2AP else exec_L2AP
self.exec__L2BP = [1, 1, 0] if not exec_L2BP else exec_L2BP
self.exec__L2CP = [1, 1, 0] if not exec_L2CP else exec_L2CP
self.exec_L1AP = [1, 1, 1]
self.exec_L1BP = [1, 1, 1]
self.exec_L1CP = [1, 1, 1]
self.exec_L2AP = [1, 1, 1]
self.exec_L2BP = [1, 1, 0]
self.exec_L2CP = [1, 1, 0]
self.validate_exec_configs()
self.CPUs = CPUs if CPUs else multiprocessing.cpu_count()
self.allow_subMultiprocessing = sub_multiProc
self.disable_exception_handler = exc_handler is False
self.CPUs = multiprocessing.cpu_count()
self.allow_subMultiprocessing = allow_subMultiprocessing
self.disable_exception_handler = disable_exception_handler is False
self.log_level = log_level
self.tiling_block_size_XY = blocksize
self.tiling_block_size_XY = tiling_block_size_XY
self.is_test = is_test
self.profiling = profiling
self.benchmark_global = bench_all
self.bench_CLD_class = bench_cloudMask
self.benchmark_global = benchmark_global
self.bench_cloudMask = bench_cloudMask
self.cloud_masking_algorithm = {'Landsat-4': 'FMASK',
'Landsat-5': 'FMASK',
'Landsat-7': 'FMASK',
......@@ -151,69 +162,61 @@ class Job:
self.end_time = None
self.computation_time = None
self.hostname = socket.gethostname()
self._DB_job_record = None
## set all the pathes
absP, joinP = lambda r: os.path.join(os.path.dirname(__file__), os.path.relpath(r)), lambda *x: os.path.join(*x)
self.path_earthSunDist = absP('./database/earth_sun_distance/Earth_Sun_distances_per_day_edited.csv')
self.path_SRFs = absP('./database/srf/')
self.path_cloud_classif = absP('./database/cloud_classifier/')
self.path_solar_irr = absP('./database/solar_irradiance/SUNp1fontenla__350-2500nm_@0.1nm_converted.txt')
self.path_testing = absP('./sandbox/') # FIXME
self.path_benchmarks = absP('./benchmarks/')
self.path_job_logs = absP('./logs/job_logs/')
if self.call_type == 'console':
"""path_fileserver is to be replaced by Fileserver URL"""
self.path_fileserver = '/misc/gms2/scheffler/GeoMultiSens/' if self.hostname != 'geoms' else absP('./')
self.path_tempdir = '/dev/shm/GeoMultiSens/'
# path_procdata = absP('./database/processed_data/')
# path_procdata = '/srv/gms2/scheffler/GeoMultiSens/database/processed_data/'
self.path_procdata = joinP(self.path_fileserver, 'database/processed_scenes%s/'
%('_bench' if self.benchmark_global else ''))
self.path_procdata_MGRS = joinP(self.path_fileserver, 'database/processed_mgrs_tiles%s/'
%('_bench' if self.benchmark_global else ''))
self.path_database = joinP(self.path_fileserver, 'database/processed_scenes%s/data_DB.db'
%('_bench' if self.benchmark_global else ''))
# path_database = absP('./database/processed_data/data_DB.db')
# path_db_meta = absP('./database/metadata/')
self.path_db_meta = absP('./database/metadata/metadata_DB.db') # ('geoms.gfz-potsdam.de:5432')
# path_archive = absP('./database/sampledata/')
# path_archive = '/srv/gms2/scheffler/GeoMultiSens/database/sampledata/'
self.path_archive = joinP(self.path_fileserver, 'database/sampledata/')
elif self.call_type == 'webapp':
from .misc.database_tools import get_info_from_postgreSQLdb
query_cfg = lambda key: \
get_info_from_postgreSQLdb(self.conn_database, 'config', ['value'], {'key': "%s" % key})[0][0]
self.path_fileserver = query_cfg('path_data_root')
self.path_tempdir = query_cfg('path_tempdir')
self.path_procdata_scenes = joinP(self.path_fileserver, query_cfg('foldername_procdata_scenes'))\
if not path_procdata_scenes else path_procdata_scenes
self.path_procdata_MGRS = joinP(self.path_fileserver, query_cfg('foldername_procdata_MGRS'))\
if not path_procdata_MGRS else path_procdata_MGRS
self.path_archive = joinP(self.path_fileserver, query_cfg('foldername_download'))\
if not path_archive else path_archive
self.path_spatIdxSrv = query_cfg( 'path_spatial_index_mediator_server')
self.path_earthSunDist = absP(query_cfg('path_earthSunDist'))
self.path_SRFs = absP(query_cfg('path_SRFs'))
self.path_cloud_classif = query_cfg('path_cloud_classif')
self.path_solar_irr = absP(query_cfg('path_solar_irr'))
self.path_ac_tables = query_cfg('path_ac_tables')
self.path_SNR_models = absP(query_cfg('path_SNR_models'))
self.path_dem_proc_srtm_90m = query_cfg('path_dem_proc_srtm_90m')
self.path_ECMWF_db = query_cfg('path_ECMWF_db')
self.path_testing = absP(query_cfg('path_testing'))
self.path_benchmarks = absP(query_cfg('path_benchmarks'))
self.path_job_logs = absP(query_cfg('path_job_logs'))
self.java_commands = collections.OrderedDict()
self.java_commands["keyword"] = query_cfg('command_keyword')
self.java_commands["value_download"] = query_cfg('command_value_download')
assert os.path.isdir(self.path_archive), \
"Given archive folder '%s' does not exist. Execution stopped" % self.path_archive
## set all the default pathes
# TODO: HOW TO DEAL WITH THESE ATTRIBUTES IN TEST MODE?
self.path_spatIdxSrv = self.DB_config['path_spatial_index_mediator_server']
self.path_tempdir = self.DB_config['path_tempdir']
self.path_ac_tables = self.DB_config['path_ac_tables']
self.path_SNR_models = self.DB_config['path_SNR_models']
self.path_dem_proc_srtm_90m = self.DB_config['path_dem_proc_srtm_90m']
self.path_ECMWF_db = self.DB_config['path_ECMWF_db']
self.java_commands = collections.OrderedDict([
("keyword", self.DB_config['command_keyword']),
("value_download", self.DB_config['command_value_download'])])
if not self.is_test:
self.path_fileserver = self.DB_config['path_data_root']
self.path_archive = self.joinP(self.path_fileserver, self.DB_config['foldername_download'])
self.path_procdata_scenes = self.joinP(self.path_fileserver, self.DB_config['foldername_procdata_scenes'])
self.path_procdata_MGRS = self.joinP(self.path_fileserver, self.DB_config['foldername_procdata_MGRS'])
self.path_earthSunDist = self.DB_config['path_earthSunDist']
self.path_SRFs = self.DB_config['path_SRFs']
self.path_cloud_classif = self.DB_config['path_cloud_classif']
self.path_solar_irr = self.DB_config['path_solar_irr']
self.path_testing = self.DB_config['path_testing']
self.path_benchmarks = self.DB_config['path_benchmarks']
self.path_job_logs = self.DB_config['path_job_logs']
self.java_commands = collections.OrderedDict([
("keyword", self.DB_config['command_keyword']),
("value_download", self.DB_config['command_value_download'])])
else:
# in test mode, the repository should be self-contained -> use only relative paths
self.path_archive = self.absP('./tests/data/')
self.path_archive = self.absP('./tests/data/archive_data/')
self.path_procdata_scenes = self.absP('./tests/data/output_scenes/')
self.path_procdata_MGRS = self.absP('./tests/data/output_mgrs_tiles/')
self.path_earthSunDist = self.absP('./database/earth_sun_distance/Earth_Sun_distances_per_day_edited.csv',)
self.path_SRFs = self.absP('./database/srf/')
self.path_cloud_classif = self.absP('./database/cloud_classifier/')
self.path_solar_irr = self.absP('./database/solar_irradiance/SUNp1fontenla__350-2500nm_@0.1nm_converted.txt')
self.path_testing = self.absP('./sandbox/') # FIXME
self.path_benchmarks = self.absP('./benchmarks/')
self.path_job_logs = self.absP('./logs/job_logs/')
# overwrite defaults with user provided keyword arguments
kwargs = self.get_init_argskwargs()['kwargs']
defaults = self.get_init_kwdefaults()
for kwName, kwVal in kwargs.items():
if not hasattr(self, kwName):
from .misc.exceptions import GMSConfigParameterError
raise GMSConfigParameterError("'%s' is not a valid parameter for config.Job." %kwName)
else:
if kwVal != defaults[kwName]:
setattr(self, kwName, kwVal)
# create missing output directories
if not os.path.isdir(self.path_job_logs):
os.makedirs(self.path_job_logs)
......@@ -224,6 +227,48 @@ class Job:
if self.call_type=='webapp' else ''
def get_init_argskwargs(self, ignore=("logger",)):
"""
Return a tuple containing dictionary of calling function's. named arguments and a list of
calling function's unnamed positional arguments.
"""
posname, kwname, argskwargs = getargvalues(stack()[1][0])[-3:]
argskwargs.update(argskwargs.pop(kwname, []))
argskwargs = {k: v for k, v in argskwargs.items() if k not in ignore and k != 'self' and not k.startswith('__')}
sig = signature(self.__init__)
argsnames = [k for k in sig.parameters if sig.parameters[k].default==_empty]