Commit ce399b57 authored by Nicolas Garcia Ospina's avatar Nicolas Garcia Ospina
Browse files

Included new csv importer and Albers calculator

parent 7f42e20a
Pipeline #23016 passed with stage
in 3 minutes and 58 seconds
......@@ -12,6 +12,7 @@ tiles:
output_pathname: ./results
obm_output_pathname: ./obm_results
import_pathname: ./results
number_cores: 1
batch_size: 1000
......
......@@ -18,10 +18,8 @@
import logging
import pandas
import geopandas
from sqlalchemy import create_engine
from shapely import wkt
import csv
from psycopg2 import extras
# Initialize log
logger = logging.getLogger(__name__)
......@@ -29,28 +27,49 @@ logger = logging.getLogger(__name__)
class ImportCSV:
@staticmethod
def csv_to_postgis(database, csv_filepath, tablename, geometry_field, crs="epsg:4326"):
"""Reads a CSV file with GeoPandas and inserts it into a given database table.
def csv2postgis(database, csv_filepath, tablename):
Args:
database (database.Database): Database instance with credentials and
connection ready to perform data importing.
with open(csv_filepath, "r") as file:
reader = csv.reader(file)
columns = next(reader)
data = list(map(tuple, reader))
logger.info("{} rows to be imported".format(len(data)))
csv_filepath (tile.Tile): Tile object with quadkey, crs and geometry attributes.
tablename (str): Table name within database for searching (e.g. "obm_tiles")
geometry_field (str): Name of the column with geometries.
crs (str): EPSG/SRID code for the specified table. Default = "epsg:4326"
"""
if "built_area" in columns:
sql_query = """INSERT INTO public.{} (quadkey, source_id, built_area, built_area_size, last_update)
VALUES %s
ON CONFLICT (quadkey, source_id)
DO UPDATE
SET
quadkey = EXCLUDED.quadkey,
source_id = EXCLUDED.source_id,
built_area = EXCLUDED.built_area,
built_area_size = EXCLUDED.built_area_size,
last_update = EXCLUDED.last_update""".format(
tablename
)
values_template = "(%s, %s, ST_GeomFromText(%s,4326), %s, %s)"
dataframe = pandas.read_csv(csv_filepath)
dataframe[geometry_field] = dataframe[geometry_field].apply(wkt.loads)
geodataframe = geopandas.GeoDataFrame(dataframe, geometry=geometry_field, crs=crs)
del dataframe
engine_string = "postgresql://{}:{}@{}:{}/{}".format(
database.username, database.password, database.host, database.port, database.dbname
else:
sql_query = """INSERT INTO public.{} (quadkey, source_id, built_area_size, last_update) VALUES %s
ON CONFLICT (quadkey, source_id)
DO UPDATE
SET
quadkey = EXCLUDED.quadkey,
source_id = EXCLUDED.source_id,
built_area_size = EXCLUDED.built_area_size,
last_update = EXCLUDED.last_update""".format(
tablename
)
database_engine = create_engine(engine_string)
geodataframe.to_postgis(tablename, database_engine, if_exists="append")
values_template = "(%s, %s, %s, %s)"
try:
with database.cursor as cursor:
extras.execute_values(cursor, sql_query, data, template=values_template)
database.connection.commit()
logger.info("INSERT successful")
except Exception as e:
database.connection.rollback()
logger.info("Error while importing {}".format(csv_filepath))
logger.info("Error caught: {}".format(e))
database.connection.close()
......@@ -63,6 +63,7 @@ if not os.path.exists(obm_output_pathname):
if args.import_csv:
target_db_config = config["target_database"]
import_pathname = os.path.abspath(config["import_pathname"])
def multiprocess_built_estimation_batch(quadkey_batch):
......@@ -177,28 +178,20 @@ def multiprocess_write_csv(csv_filepath):
Args:
csv_filepath (str): File path of the input CSV file
"""
# Connect to the OBM_tiles database
target_database = Database(**target_db_config)
target_database.create_connection_and_cursor()
# Get table crs
target_database_crs_number = target_database.get_crs_from_geometry_field(
**target_db_config["tiles_table"]
)
target_database.connection.close()
target_database_crs = "epsg:{}".format(target_database_crs_number)
logger.info(
"Connection established to {} in {}".format(
target_db_config["dbname"], target_db_config["host"]
)
)
# Import the CSV
ImportCSV.csv_to_postgis(
ImportCSV.csv2postgis(
target_database,
csv_filepath,
**target_db_config["tiles_table"],
crs=target_database_crs
target_db_config["tiles_table"]["tablename"],
)
target_database.connection.close()
def main():
......@@ -206,7 +199,7 @@ def main():
if args.import_csv:
# List CSV files and insert into database
csv_filepaths = [
os.path.join(output_pathname, file) for file in os.listdir(output_pathname)
os.path.join(import_pathname, file) for file in os.listdir(import_pathname)
]
logger.info(
"{} CSV files will be imported into the database".format(len(csv_filepaths))
......
......@@ -27,7 +27,6 @@ from shapely.geometry import mapping
from rasterio import features
import pyproj
from shapely.ops import transform
from functools import partial
import geopandas
from obmgapanalysis.tile import Tile
......@@ -177,14 +176,15 @@ class TileProcessor:
)
input_polygon = transform(project.transform, input_polygon)
project = partial(
pyproj.transform,
pyproj.Proj("epsg:4326"),
pyproj.Proj(
proj="aea", lat_1=input_polygon.bounds[1], lat_2=input_polygon.bounds[3]
),
bbox = input_polygon.bounds
minx, maxx = sorted([bbox[0], bbox[2]])
miny, maxy = sorted([bbox[1], bbox[3]])
project_aea = pyproj.Proj(
"+proj=aea +lat_1={} +lat_2={} +lat_0={} +lon_0={}".format(
miny, maxy, (miny + maxy) / 2.0, (minx + maxx) / 2.0
)
)
geometry = transform(project, input_polygon)
geometry = transform(project_aea, input_polygon)
else:
project = pyproj.Transformer.from_proj(
......
......@@ -202,7 +202,7 @@ def test_albers_area_calculation():
polygon_difference = TileProcessor.polygon_difference(clipped_built_geometry, roads_process)
expected_built_area = 9919.946984796066
expected_built_area = 8471.52645950753
final_built_area = TileProcessor.albers_area_calculation(polygon_difference, tile.crs)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment