obmgapanalysis.py 6.47 KB
Newer Older
1
2
#!/usr/bin/env python3

3
# Copyright (C) 2021:
4
#   Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
Nicolas Garcia Ospina's avatar
Nicolas Garcia Ospina committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.


20
import logging
21
import os
22
import sys
23
import yaml
24
import numpy
25
import argparse
26

27
from obmgapanalysis.datasource import DataSource
28
29
from obmgapanalysis.tileprocessor import TileProcessor
from obmgapanalysis.fileprocessor import FileProcessor
30
from obmgapanalysis.database import Database
31
from obmgapanalysis.importcsv import ImportCSV
32
import multiprocessing
33
34
35
36
37

# Add a logger printing error, warning, info and debug messages to the screen
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
38
logger.info("Launching obmgapanalysis")
39

40
41
42
43
44
45
46
parser = argparse.ArgumentParser()
parser.add_argument(
    "--import_csv", action="store_true", help="Import CSV files into target database"
)
args = parser.parse_args()


47
# Get program configuration from config.yml
48
49
50
51
52
with open("config.yml", "r") as ymlfile:
    config = yaml.load(ymlfile, Loader=yaml.FullLoader)
db_config = config["database"]
datasource_config = config["datasource"]
tiles_config = config["tiles"]
53
output_pathname = os.path.abspath(config["output_pathname"])
54
target_db_config = config["target_database"]
55
56
if not os.path.exists(output_pathname):
    os.makedirs(output_pathname)
57
58


59
60
61
62
63
def multiprocess_chunk(quadkey_batch):
    """
    Wrapper funtion that writes a CSV file with built-up areas found in the
    quadkey_batch. The ouput is based on TileProcessor.build_dictionary.
    This wrapper function is built to work with the multiprocessing library.
64

65
    Output filenames are: <number of tiles>_<first quadkey>.csv
66

67
68
69
70
    Args:
        quadkey_batch (list): List of quadkeys to process with settlement data
    """
    # Create DataSource instance to be used by all child processes
71
    datasource = DataSource(**datasource_config)
72
    logger.debug(
73
        "DataSource raster files are registered in {}".format(
74
            os.path.join(datasource.pathname, datasource_config["raster_files_index"])
75
        )
76
77
    )

78
79
80
    # Connect to the OBM database
    roads_database = Database(**db_config)
    roads_database.create_connection_and_cursor()
81
    roads_database_crs_number = roads_database.get_crs_from_geometry_field(
82
        **db_config["roads_table"]
83
84
    )
    logger.info(
85
        "Connection established to {} in {}".format(db_config["dbname"], db_config["host"])
86
87
    )

88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
    # Build a list with all tiles with reported built-up areas
    build_up_areas = []
    try:
        for quadkey in quadkey_batch:
            result = TileProcessor.get_build_up_area(
                quadkey=quadkey,
                database=roads_database,
                datasource=datasource,
                database_crs_number=roads_database_crs_number,
                table_config=db_config["roads_table"],
            )
            if result is not None:
                build_up_areas.append(result)
        if build_up_areas:
            # Write output into a csv file
            FileProcessor.write_tiles_to_csv(build_up_areas, output_pathname)
    finally:
        roads_database.connection.close()


108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def multiprocess_csv(csv_filepath):
    """
    Wrapper function that imports a CSV file into a configured database table

    Args:
        csv_filepath (str): File path of the input CSV file
    """

    # Connect to the OBM_tiles database
    target_database = Database(**target_db_config)
    target_database.create_connection_and_cursor()
    # Get table crs
    target_database_crs_number = target_database.get_crs_from_geometry_field(
        **target_db_config["tiles_table"]
    )
    target_database.connection.close()
    target_database_crs = "epsg:{}".format(target_database_crs_number)
    logger.info(
        "Connection established to {} in {}".format(
            target_db_config["dbname"], target_db_config["host"]
        )
    )
    # Import the CSV
    ImportCSV.csv_to_postgis(
        target_database,
        csv_filepath,
        **target_db_config["tiles_table"],
        crs=target_database_crs
    )


139
140
def main():

141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
    if args.import_csv:
        # List CSV files and insert into database
        csv_filepaths = [
            os.path.join(output_pathname, file) for file in os.listdir(output_pathname)
        ]
        logger.info(
            "{} CSV files will be imported into the database".format(len(csv_filepaths))
        )

        # Generate a parallel process pool with all CSV files to be dealt with
        num_processes = config["number_cores"]

        logging.info("Creating multiprocessing pool")
        with multiprocessing.Pool(processes=num_processes) as pool:
            logging.info("Start parallel processing")
            pool.map(multiprocess_csv, csv_filepaths)

            logging.info("Parallel processing finished, closing pool")
            pool.close()
            pool.join()

        # Leave the program
        logger.info("Task finished, closing obmgapanalysis")
        sys.exit()

166
    # Read tiles to be processed
167
    if tiles_config["use_txt"]:
168
        tiles_list = list(numpy.loadtxt(tiles_config["txt_filepath"], dtype=str))
169
170
171
172
    else:
        tiles_list = tiles_config["tiles_list"]
    logger.info("{} tiles will be processed".format(len(tiles_list)))

173
174
175
176
177
178
    # Generate a parallel process pool with each quadkey batch and process
    num_processes = config["number_cores"]
    chunk_size = config["chunk_size"]
    quadkey_batchs = [
        tiles_list[i : i + chunk_size] for i in range(0, len(tiles_list), chunk_size)
    ]
179

180
181
182
183
    logging.info("Creating multiprocessing pool")
    with multiprocessing.Pool(processes=num_processes) as pool:
        logging.info("Start parallel processing")
        pool.map(multiprocess_chunk, quadkey_batchs)
184

185
186
187
        logging.info("Parallel processing finished, closing pool")
        pool.close()
        pool.join()
188

189
    # Leave the program
190
    logger.info("Task finished, closing obmgapanalysis")
191
192
193
194
195
    sys.exit()


if __name__ == "__main__":
    main()