Included program to push CSV into SQL

...@@ -22,3 +22,13 @@ database: ...@@ -22,3 +22,13 @@ database:
roads_table: roads_table:
tablename: planet_osm_roads tablename: planet_osm_roads
geometry_field: way geometry_field: way
host: your_host.dir.request_data
dbname: gis
port: 5433
username: postgres
password: secret_pass
tablename: planet_osm_roads
geometry_field: way
...@@ -23,7 +23,6 @@ import pyproj ...@@ -23,7 +23,6 @@ import pyproj
import geopandas import geopandas
from shapely.ops import transform from shapely.ops import transform
# Initialize log # Initialize log
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -36,7 +36,7 @@ class FileProcessor: ...@@ -36,7 +36,7 @@ class FileProcessor:
list_of_dictionaries (list): List of dictionaries with built-up areas to list_of_dictionaries (list): List of dictionaries with built-up areas to
write. write.
outdir (srt): Target path name to write the csv file. output_pathname (str): Target path name for the csv file.
column_geometry (str): Name of the field that contains geometries. column_geometry (str): Name of the field that contains geometries.
Default = "built_area" Default = "built_area"
...@@ -48,7 +48,7 @@ class FileProcessor: ...@@ -48,7 +48,7 @@ class FileProcessor:
list_of_dictionaries, geometry=column_geometry, crs=crs list_of_dictionaries, geometry=column_geometry, crs=crs
) )
filepath_out = os.path.join( filepath_out = os.path.join(
output_pathname, "{}_{}.csv".format(tiles_gdf.tile_id.iloc[0], len(tiles_gdf.index)) output_pathname, "{}_{}.csv".format(tiles_gdf.quadkey.iloc[0], len(tiles_gdf.index))
) )"Creating {}".format(filepath_out))"Creating {}".format(filepath_out))
tiles_gdf.to_csv(filepath_out, index=False) tiles_gdf.to_csv(filepath_out, index=False)
#!/usr/bin/env python3
# Copyright (C) 2021:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see
import logging
import pandas
import geopandas
from sqlalchemy import create_engine
from shapely import wkt
# Initialize log
logger = logging.getLogger(__name__)
class ImportCSV:
def csv_to_postgis(database, csv_filepath, tablename, geometry_field, crs="epsg:4326"):
"""Reads a CSV file with GeoPandas and inserts it into a given database table.
database (database.Database): Database instance with credentials and
connection ready to perform data importing.
csv_filepath (tile.Tile): Tile object with quadkey, crs and geometry attributes.
tablename (str): Table name within database for searching (e.g. "obm_tiles")
geometry_field (str): Name of the column with geometries.
crs (str): EPSG/SRID code for the specified table. Default = "epsg:4326"
dataframe = pandas.read_csv(csv_filepath)
dataframe[geometry_field] = dataframe[geometry_field].apply(wkt.loads)
geodataframe = geopandas.GeoDataFrame(dataframe, geometry=geometry_field, crs=crs)
del dataframe
engine_string = "postgresql://{}:{}@{}:{}/{}".format(
database.username, database.password,, database.port, database.dbname
database_engine = create_engine(engine_string)
geodataframe.to_postgis(tablename, database_engine, if_exists="append")
...@@ -22,11 +22,13 @@ import os ...@@ -22,11 +22,13 @@ import os
import sys import sys
import yaml import yaml
import numpy import numpy
import argparse
from obmgapanalysis.datasource import DataSource from obmgapanalysis.datasource import DataSource
from obmgapanalysis.tileprocessor import TileProcessor from obmgapanalysis.tileprocessor import TileProcessor
from obmgapanalysis.fileprocessor import FileProcessor from obmgapanalysis.fileprocessor import FileProcessor
from obmgapanalysis.database import Database from obmgapanalysis.database import Database
from obmgapanalysis.importcsv import ImportCSV
import multiprocessing import multiprocessing
# Add a logger printing error, warning, info and debug messages to the screen # Add a logger printing error, warning, info and debug messages to the screen
...@@ -35,6 +37,13 @@ logger.setLevel(logging.DEBUG) ...@@ -35,6 +37,13 @@ logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout)) logger.addHandler(logging.StreamHandler(sys.stdout))"Launching obmgapanalysis")"Launching obmgapanalysis")
parser = argparse.ArgumentParser()
"--import_csv", action="store_true", help="Import CSV files into target database"
args = parser.parse_args()
# Get program configuration from config.yml # Get program configuration from config.yml
with open("config.yml", "r") as ymlfile: with open("config.yml", "r") as ymlfile:
config = yaml.load(ymlfile, Loader=yaml.FullLoader) config = yaml.load(ymlfile, Loader=yaml.FullLoader)
...@@ -42,6 +51,7 @@ db_config = config["database"] ...@@ -42,6 +51,7 @@ db_config = config["database"]
datasource_config = config["datasource"] datasource_config = config["datasource"]
tiles_config = config["tiles"] tiles_config = config["tiles"]
output_pathname = os.path.abspath(config["output_pathname"]) output_pathname = os.path.abspath(config["output_pathname"])
target_db_config = config["target_database"]
if not os.path.exists(output_pathname): if not os.path.exists(output_pathname):
os.makedirs(output_pathname) os.makedirs(output_pathname)
...@@ -95,8 +105,64 @@ def multiprocess_chunk(quadkey_batch): ...@@ -95,8 +105,64 @@ def multiprocess_chunk(quadkey_batch):
roads_database.connection.close() roads_database.connection.close()
def multiprocess_csv(csv_filepath):
Wrapper function that imports a CSV file into a configured database table
csv_filepath (str): File path of the input CSV file
# Connect to the OBM_tiles database
target_database = Database(**target_db_config)
# Get table crs
target_database_crs_number = target_database.get_crs_from_geometry_field(
target_database_crs = "epsg:{}".format(target_database_crs_number)
"Connection established to {} in {}".format(
target_db_config["dbname"], target_db_config["host"]
# Import the CSV
def main(): def main():
if args.import_csv:
# List CSV files and insert into database
csv_filepaths = [
os.path.join(output_pathname, file) for file in os.listdir(output_pathname)
"{} CSV files will be imported into the database".format(len(csv_filepaths))
# Generate a parallel process pool with all CSV files to be dealt with
num_processes = config["number_cores"]"Creating multiprocessing pool")
with multiprocessing.Pool(processes=num_processes) as pool:"Start parallel processing"), csv_filepaths)"Parallel processing finished, closing pool")
# Leave the program"Task finished, closing obmgapanalysis")
# Read tiles to be processed # Read tiles to be processed
if tiles_config["use_txt"]: if tiles_config["use_txt"]:
tiles_list = list(numpy.loadtxt(tiles_config["txt_filepath"], dtype=str)) tiles_list = list(numpy.loadtxt(tiles_config["txt_filepath"], dtype=str))
...@@ -290,7 +290,7 @@ class TileProcessor: ...@@ -290,7 +290,7 @@ class TileProcessor:
return return
results = { results = {
"tile_id": tile.quadkey, "quadkey": tile.quadkey,
"method_id": datasource.method_id, "method_id": datasource.method_id,
"built_area": TileProcessor.reproject_polygon(built_polygon,, "epsg:4326"), "built_area": TileProcessor.reproject_polygon(built_polygon,, "epsg:4326"),
"size_built_area": TileProcessor.albers_area_calculation(built_polygon,, "size_built_area": TileProcessor.albers_area_calculation(built_polygon,,
...@@ -39,6 +39,8 @@ setup( ...@@ -39,6 +39,8 @@ setup(
"geopandas", "geopandas",
"rasterio", "rasterio",
"psycopg2", "psycopg2",
"pyyaml", "pyyaml",
], ],
extras_require={ extras_require={
