Commit 8c7ce88f authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Revised SensorMapGeometryTransformer (now fully operable) and added...

Revised SensorMapGeometryTransformer (now fully operable) and added corresponding tests. Added boxObj.buffer_mapXY() + test. Added type hints.
parent b0306dfb
Pipeline #3364 failed with stages
in 1 minute and 23 seconds
# -*- coding: utf-8 -*-
import warnings
from typing import Iterable, Tuple # noqa: F401
import numpy as np
# custom
......@@ -146,6 +147,7 @@ def calc_FullDataset_corner_positions(mask_1bit, assert_four_corners=True, algor
def corner_coord_to_minmax(corner_coords):
# type: (Iterable[Tuple[float, float], Tuple[float, float], Tuple[float, float], Tuple[float, float]]) -> tuple
"""Returns the bounding coordinates for a given set of XY coordinates.
:param corner_coords: # four XY tuples of corner coordinates. Their order does not matter.
......@@ -154,4 +156,5 @@ def corner_coord_to_minmax(corner_coords):
x_vals = [i[0] for i in corner_coords]
y_vals = [i[1] for i in corner_coords]
xmin, xmax, ymin, ymax = min(x_vals), max(x_vals), min(y_vals), max(y_vals)
return xmin, xmax, ymin, ymax
......@@ -157,7 +157,7 @@ def EPSG2WKT(EPSG_code):
def WKT2EPSG(wkt, epsgfile=''):
# type: (str) -> Union[int, None]
# type: (str, str) -> Union[int, None]
""" Transform a WKT string to an EPSG code
:param wkt: WKT definition
:param epsgfile: the proj.4 epsg file (automatically located if no path is provided)
......
......@@ -25,13 +25,14 @@ from pyresample.kd_tree import resample_nearest, resample_gauss, resample_custom
from pyresample.utils import get_area_def
from ...dtypes.conversion import dTypeDic_NumPy2GDAL
from ..projection import WKT2EPSG, isProjectedOrGeographic, prj_equal
from ..coord_trafo import pixelToLatLon, get_proj4info, proj4_to_dict
from ..projection import EPSG2WKT, WKT2EPSG, isProjectedOrGeographic, prj_equal, proj4_to_WKT
from ..coord_trafo import pixelToLatLon, get_proj4info, proj4_to_dict, transform_coordArray, transform_any_prj
from ..coord_calc import corner_coord_to_minmax, get_corner_coordinates
from ...io.raster.gdal import get_GDAL_ds_inmem
from ...io.raster.writer import write_numpy_to_image
from ...processing.progress_mon import ProgressBar
from ...compatibility.gdal import get_gdal_func
from ...processing.shell import subcall_with_output
__author__ = "Daniel Scheffler"
......@@ -501,8 +502,8 @@ class SensorMapGeometryTransformer(object):
"""Get an instance of SensorMapGeometryTransformer.
:param data: numpy array to be warped to sensor or map geometry
:param lons: longitude array
:param lats: latitude array
:param lons: 2D longitude array
:param lats: 2D latitude array
:Keyword Arguments: (further documentation here: https://pyresample.readthedocs.io/en/latest/swath.html)
- resamp_alg: resampling algorithm ('nearest', 'bilinear', 'gauss', 'custom')
......@@ -527,87 +528,138 @@ class SensorMapGeometryTransformer(object):
- with_uncert: <bool> [ONLY 'gauss' and 'custom'] Calculate uncertainty estimates
NOTE: resampling function has 3 return values instead of 1: result, stddev, count
"""
# validation
if lons.ndim > 2:
raise NotImplementedError(lons, '%dD longitude arrays are not yet supported.' % lons.ndim)
if lats.ndim > 2:
raise NotImplementedError(lats, '%dD latitude arrays are not yet supported.' % lats.ndim)
self.data = data
self.resamp_alg = resamp_alg
self.opts = dict(radius_of_influence=radius_of_influence,
sigmas=(radius_of_influence / 2))
self.opts.update(opts)
self.area_definition = None
if resamp_alg == 'bilinear':
del self.opts['radius_of_influence']
self.opts['radius'] = radius_of_influence
self.lats = lats
self.lons = lons
self.swath_definition = SwathDefinition(lons=lons, lats=lats)
self.area_extent = [np.min(lons), np.min(lats), np.max(lons), np.max(lats)]
self.area_extent_ll = [np.min(lons), np.min(lats), np.max(lons), np.max(lats)]
self.area_definition = None
def compute_output_shape(self):
# type: () -> Tuple[int, int]
def _get_target_extent(self, tgt_epsg):
if tgt_epsg == 4326:
tgt_extent = self.area_extent_ll
else:
corner_coords_ll = [[self.lons[0, 0], self.lats[0, 0]], # UL_xy
[self.lons[0, -1], self.lats[0, -1]], # UR_xy
[self.lons[-1, 0], self.lats[-1, 0]], # LL_xy
[self.lons[-1, -1], self.lats[-1, -1]], # LR_xy
]
corner_coords_tgt_prj = [transform_any_prj(EPSG2WKT(4326), EPSG2WKT(tgt_epsg), x, y)
for x, y in corner_coords_ll]
corner_coords_tgt_prj_np = np.array(corner_coords_tgt_prj)
x_coords, y_coords = corner_coords_tgt_prj_np[:, 0], corner_coords_tgt_prj_np[:, 1]
tgt_extent = [np.min(x_coords), np.min(y_coords), np.max(x_coords), np.max(y_coords)]
return tgt_extent
def compute_output_shape(self, tgt_prj, tgt_extent, tgt_res=None):
# type: (Union[int, str], List[float, float, float, float], Tuple[float, float]) -> (int, int, tuple, str, ...)
"""Estimates the map geometry output shape of a sensor geometry array resampled to map geometry.
:param tgt_prj: target projection (WKT or 'epsg:1234' or <EPSG_int>)
:param tgt_extent: extent coordinates of output map geometry array (LL_x, LL_y, UR_x, UR_y) in the tgt_prj
:param tgt_res: target X/Y resolution (e.g., (30, 30))
:return:
"""
tgt_epsg = WKT2EPSG(proj4_to_WKT(get_proj4info(proj=tgt_prj)))
tgt_extent = tgt_extent or self._get_target_extent(tgt_epsg)
with TemporaryDirectory() as td:
path_lons_lats = os.path.join(td, 'lons_lats.bsq')
path_lons_lats_vrt = os.path.join(td, 'lons_lats.vrt')
path_xycoords = os.path.join(td, 'xy_coords.bsq')
path_xycoords_vrt = os.path.join(td, 'xy_coords.vrt')
path_data = os.path.join(td, 'data.bsq')
path_datavrt = os.path.join(td, 'data.vrt')
path_data_out = os.path.join(td, 'data_out.bsq')
# write lons and lats
# lons_lats = np.dstack([self.swath_definition.lons[::10, ::10],
# self.swath_definition.lats[::10, ::10]])
lons_lats = np.dstack([self.swath_definition.lons,
# write X/Y coordinate array
if tgt_epsg == 4326:
xy_coords = np.dstack([self.swath_definition.lons,
self.swath_definition.lats])
write_numpy_to_image(lons_lats, path_lons_lats, 'ENVI')
# xy_coords = np.dstack([self.swath_definition.lons[::10, ::10],
# self.swath_definition.lats[::10, ::10]])
else:
xy_coords = np.dstack(list(transform_coordArray(EPSG2WKT(4326), EPSG2WKT(tgt_epsg),
self.swath_definition.lons,
self.swath_definition.lats)))
write_numpy_to_image(xy_coords, path_xycoords, 'ENVI')
# create VRT for lons and lats
ds_lons_lats = gdal.Open(path_lons_lats)
# create VRT for X/Y coordinate array
ds_xy_coords = gdal.Open(path_xycoords)
drv_vrt = gdal.GetDriverByName("VRT")
vrt = drv_vrt.CreateCopy(path_lons_lats_vrt, ds_lons_lats)
del ds_lons_lats, vrt
vrt = drv_vrt.CreateCopy(path_xycoords_vrt, ds_xy_coords)
del ds_xy_coords, vrt
# create VRT for one data band
mask_band = np.ones((self.data.shape[:2]), np.int32)
write_numpy_to_image(mask_band, path_data, 'ENVI')
ds_data = gdal.Open(path_data)
vrt = drv_vrt.CreateCopy(path_datavrt, ds_data)
vrt.SetMetadata({"X_DATASET": path_lons_lats_vrt,
"Y_DATASET": path_lons_lats_vrt,
vrt.SetMetadata({"X_DATASET": path_xycoords_vrt,
"Y_DATASET": path_xycoords_vrt,
"X_BAND": "1",
"Y_BAND": "2",
"PIXEL_OFFSET": "0",
"LINE_OFFSET": "0",
"PIXEL_STEP": "1",
"LINE_STEP": "1",
"SRS": EPSG2WKT(tgt_epsg),
}, "GEOLOCATION")
vrt.FlushCache()
del ds_data, vrt
os.system('gdalwarp -geoloc -t_srs EPSG:4326 %s %s -srcnodata 0 -r near -of ENVI '
'-dstnodata none -et 0 -overwrite' % (path_datavrt, path_data_out))
subcall_with_output('gdalwarp %s %s '
'-geoloc '
'-t_srs EPSG:%d '
'-srcnodata 0 '
'-r near '
'-of ENVI '
'-dstnodata none '
'-et 0 '
'-overwrite '
'-te %s'
'%s' % (path_datavrt, path_data_out, tgt_epsg,
' '.join([str(i) for i in tgt_extent]),
' -tr %s %s' % tgt_res if tgt_res else '',),
v=True)
# get output X/Y size
ds_out = gdal.Open(path_data_out)
if not ds_out:
raise Exception(gdal.GetLastErrorMsg())
x_size = ds_out.RasterXSize
y_size = ds_out.RasterYSize
out_gt = ds_out.GetGeoTransform()
out_prj = ds_out.GetProjection()
del ds_out
return x_size, y_size
def get_area_definition(self, proj4_args, cols_out, rows_out):
# type: (Union[str, list], int, int) -> AreaDefinition
"""Get output area definition.
# add 1 px buffer around out_extent to avoid cutting the output image
x_size += 2
y_size += 2
out_gt = list(out_gt)
out_gt[0] -= out_gt[1]
out_gt[3] += abs(out_gt[5])
out_gt = tuple(out_gt)
xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=out_gt, cols=x_size, rows=y_size))
out_extent = list((xmin, ymin, xmax, ymax))
:param proj4_args: Proj4 arguments as list of arguments or string
:param cols_out: number of output columns
:param rows_out: number of output rows
"""
area_def_out = get_area_def(area_id='',
area_name='',
proj_id='',
proj4_args=proj4_args,
x_size=cols_out,
y_size=rows_out,
area_extent=self.area_extent # xmin, ymin, xmax, ymax
) # type: AreaDefinition
return area_def_out
return x_size, y_size, out_gt, out_prj, out_extent
def _resample(self, source_geo_def, target_geo_def):
# type: (Union[AreaDefinition, SwathDefinition], Union[AreaDefinition, SwathDefinition]) -> np.ndarray
......@@ -640,18 +692,29 @@ class SensorMapGeometryTransformer(object):
return result
def to_map_geometry(self, tgt_prj):
# type: (Union[str, int]) -> np.ndarray
def to_map_geometry(self, tgt_prj, tgt_extent=None, tgt_res=None):
# type: (Union[str, int], List[float, float, float, float], Tuple[float, float]) -> (np.ndarray, tuple, str)
"""Transform the input sensor geometry array into map geometry.
:param tgt_prj: target projection (WKT or 'epsg:1234' or <EPSG_int>)
:param tgt_extent: extent coordinates of output map geometry array (LL_x, LL_y, UR_x, UR_y) in the tgt_prj
:param tgt_res: target X/Y resolution (e.g., (30, 30))
"""
cols_out, rows_out = self.compute_output_shape()
cols_out, rows_out, out_gt, out_prj, out_extent = \
self.compute_output_shape(tgt_prj=tgt_prj, tgt_extent=tgt_extent, tgt_res=tgt_res)
self.area_definition = self.get_area_definition(proj4_args=get_proj4info(proj=tgt_prj),
cols_out=cols_out, rows_out=rows_out,
)
return self._resample(self.swath_definition, self.area_definition)
if not self.area_definition:
self.area_definition = \
get_area_def(area_id='',
area_name='',
proj_id='',
proj4_args=get_proj4info(proj=tgt_prj),
x_size=cols_out,
y_size=rows_out,
area_extent=out_extent,
) # type: AreaDefinition
return self._resample(self.swath_definition, self.area_definition), out_gt, out_prj
def to_sensor_geometry(self, src_prj, src_extent):
# type: (Union[str, int], List[float, float, float, float]) -> np.ndarray
......
......@@ -165,6 +165,16 @@ class boxObj(object):
xmin, xmax, ymin, ymax = xmin - buffImX, xmax + buffImX, ymin - buffImY, ymax + buffImY
self.imPoly = box(xmin, ymin, xmax, ymax)
def buffer_mapXY(self, buffMapX=0, buffMapY=0):
# type: (float, float) -> None
"""Buffer the box in X- and/or Y-direction.
:param buffMapX: <float> buffer value in x-direction as MAP UNITS
:param buffMapY: <float> buffer value in y-direction as MAP UNITS
"""
xmin, xmax, ymin, ymax = self.boundsMap
xmin, xmax, ymin, ymax = xmin - buffMapX, xmax + buffMapX, ymin - buffMapY, ymax + buffMapY
self.mapPoly = box(xmin, ymin, xmax, ymax)
def is_larger_DimXY(self, boundsIm2test):
# type: (tuple) -> (bool,bool)
"""Checks if the boxObj is larger than a given set of bounding image coordinates (in X- and/or Y-direction).
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
test_reproject
--------------
Tests for `py_tools_ds.geo.raster.reproject` module.
"""
import os
from unittest import TestCase
import numpy as np
from gdalnumeric import LoadFile
from py_tools_ds import __path__
from py_tools_ds.geo.coord_calc import corner_coord_to_minmax, get_corner_coordinates
from py_tools_ds.geo.raster.reproject import SensorMapGeometryTransformer
tests_path = os.path.abspath(os.path.join(__path__[0], "..", "tests"))
class Test_SensorMapGeometryTransformer(TestCase):
def setUp(self):
self.dem_map_geo = LoadFile(os.path.join(tests_path, 'data', 'dem_map_geo.bsq'))
self.dem_sensor_geo = LoadFile(os.path.join(tests_path, 'data', 'dem_sensor_geo.bsq'))
self.lons = LoadFile(os.path.join(tests_path, 'data', 'lons_full_vnir.bsq'))
self.lats = LoadFile(os.path.join(tests_path, 'data', 'lats_full_vnir.bsq'))
self.dem_area_extent_coarse_subset_utm = [622613.864409047, # LL_x
5254111.40255343, # LL_x
660473.864409047, # LL_x
5269351.40255343] # UR_y
self.expected_dem_area_extent_lonlat = [10.685733901515151, # LL_x
47.44113415492957, # LL_y
11.073066098484848, # UR_x
47.54576584507042] # UR_y
self.expected_dem_area_extent_utm = [626938.928052, # LL_x
5256253.56579, # LL_y
656188.928052, # UR_x
5267203.56579] # UR_y
def test_to_sensor_geometry(self):
SMGT = SensorMapGeometryTransformer(self.dem_map_geo,
lons=self.lons,
lats=self.lats,
resamp_alg='nearest')
dem_sensors_geo = SMGT.to_sensor_geometry(src_prj=32632, src_extent=self.dem_area_extent_coarse_subset_utm)
self.assertIsInstance(dem_sensors_geo, np.ndarray)
self.assertEquals(dem_sensors_geo.shape, (150, 1000))
def test_to_map_geometry_lonlat(self):
SMGT = SensorMapGeometryTransformer(self.dem_sensor_geo,
lons=self.lons,
lats=self.lats,
resamp_alg='nearest')
# to Lon/Lat
dem_map_geo, dem_gt, dem_prj = SMGT.to_map_geometry(tgt_prj=4326)
self.assertIsInstance(dem_map_geo, np.ndarray)
self.assertEquals(dem_map_geo.shape, (286, 1058))
xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=dem_gt,
cols=dem_map_geo.shape[1],
rows=dem_map_geo.shape[0]))
self.assertTrue(False not in np.isclose(np.array([xmin, ymin, xmax, ymax]),
np.array(self.expected_dem_area_extent_lonlat)))
def test_to_map_geometry_utm(self):
SMGT = SensorMapGeometryTransformer(self.dem_sensor_geo,
lons=self.lons,
lats=self.lats,
resamp_alg='nearest')
# to UTM32
dem_map_geo, dem_gt, dem_prj = SMGT.to_map_geometry(tgt_prj=32632, tgt_res=(30, 30))
self.assertIsInstance(dem_map_geo, np.ndarray)
self.assertEquals(dem_map_geo.shape, (365, 975))
xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=dem_gt,
cols=dem_map_geo.shape[1],
rows=dem_map_geo.shape[0]))
self.assertTrue(False not in np.isclose(np.array([xmin, ymin, xmax, ymax]),
np.array(self.expected_dem_area_extent_utm)))
......@@ -128,6 +128,9 @@ class Test_boxObj(TestCase):
def test_buffer_imXY(self):
self.box.buffer_imXY(buffImX=1, buffImY=1)
def test_buffer_mapXY(self):
self.box.buffer_imXY(buffImX=0.6, buffImY=0.6)
def test_is_larger_DimXY(self):
box_bounds = (4014, 4270, 4476, 4732) # xmin, xmax, ymin, ymax
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment