Unverified Commit 60d33599 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

The EnPTAlgorithm class now also uses a subcommand to run EnPT to be able to use multiprocessing.

Updated EnPT entry point.
parent 9198324a
......@@ -11,6 +11,8 @@ History
* Split enpt_enmapboxapp.py into separate modules - one on case EnPT is installed externally and
one in case it is part of the QGIS environment. Added EnPTAlgorithm for the latter case and respective test.
* Adapted new --exclude-patterns parameter of urlchecker.
* The EnPTAlgorithm class now also uses a subcommand to run EnPT to be able to use multiprocessing.
* Updated EnPT entry point.
0.4.7 (2021-01-11)
......
......@@ -37,11 +37,11 @@ call %ENPT_PYENV_ACTIVATION% enpt
:: echo %*
:: run enpt_cli.py with the provided arguments
FOR /F %%i IN ('where enpt_cli.py') do (SET PATH_ENPT_CLI=%%i)
FOR /F %%i IN ('where enpt') do (SET PATH_ENPT_CLI=%%i)
echo.
echo _______________________________________
python %PATH_ENPT_CLI% %*
%PATH_ENPT_CLI% %*
echo _______________________________________
echo.
......@@ -26,4 +26,4 @@
# activate EnPT Python environment
source $ENPT_PYENV_ACTIVATION enpt
enpt_cli.py "$@"
enpt "$@"
......@@ -26,8 +26,13 @@
import os
from os.path import expanduser
import psutil
from datetime import date
from multiprocessing import cpu_count
from threading import Thread
from queue import Queue
from subprocess import Popen, PIPE
from qgis.core import \
(QgsProcessingAlgorithm,
QgsProcessingParameterFile,
......@@ -319,3 +324,48 @@ class _EnPTBaseAlgorithm(QgsProcessingAlgorithm):
@staticmethod
def helpUrl(*args, **kwargs):
return 'https://enmap.git-pages.gfz-potsdam.de/GFZ_Tools_EnMAP_BOX/enpt_enmapboxapp/doc/'
@staticmethod
def _run_cmd(cmd, qgis_feedback=None, **kwargs):
"""Execute external command and get its stdout, exitcode and stderr.
Code based on: https://stackoverflow.com/a/31867499
:param cmd: a normal shell command including parameters
"""
def reader(pipe, queue):
try:
with pipe:
for line in iter(pipe.readline, b''):
queue.put((pipe, line))
finally:
queue.put(None)
process = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True, **kwargs)
q = Queue()
Thread(target=reader, args=[process.stdout, q]).start()
Thread(target=reader, args=[process.stderr, q]).start()
# for _ in range(2):
for source, line in iter(q.get, None):
if qgis_feedback.isCanceled():
# qgis_feedback.reportError('CANCELED')
proc2kill = psutil.Process(process.pid)
for proc in proc2kill.children(recursive=True):
proc.kill()
proc2kill.kill()
raise KeyboardInterrupt
linestr = line.decode('latin-1').rstrip()
# print("%s: %s" % (source, linestr))
if source.name == 3:
qgis_feedback.pushInfo(linestr)
if source.name == 4:
qgis_feedback.reportError(linestr)
exitcode = process.poll()
return exitcode
......@@ -38,6 +38,34 @@ from ._enpt_alg_base import _EnPTBaseAlgorithm
class EnPTAlgorithm(_EnPTBaseAlgorithm):
@staticmethod
def _prepare_enpt_environment() -> dict:
os.environ['PYTHONUNBUFFERED'] = '1'
enpt_env = os.environ.copy()
enpt_env["PATH"] = ';'.join([i for i in enpt_env["PATH"].split(';') if 'OSGEO' not in i]) # actually not needed
if "PYTHONHOME" in enpt_env.keys():
del enpt_env["PYTHONHOME"]
if "PYTHONPATH" in enpt_env.keys():
del enpt_env["PYTHONPATH"]
# FIXME is this needed?
enpt_env['IPYTHONENABLE'] = 'True'
enpt_env['PROMPT'] = '$P$G'
enpt_env['PYTHONDONTWRITEBYTECODE'] = '1'
enpt_env['PYTHONIOENCODING'] = 'UTF-8'
enpt_env['TEAMCITY_VERSION'] = 'LOCAL'
enpt_env['O4W_QT_DOC'] = 'C:/OSGEO4~3/apps/Qt5/doc'
if 'SESSIONNAME' in enpt_env.keys():
del enpt_env['SESSIONNAME']
# import pprint
# s = pprint.pformat(enpt_env)
# with open('D:\\env.json', 'w') as fp:
# fp.write(s)
return enpt_env
def processAlgorithm(self, parameters, context, feedback):
assert isinstance(parameters, dict)
assert isinstance(context, QgsProcessingContext)
......@@ -47,9 +75,6 @@ class EnPTAlgorithm(_EnPTBaseAlgorithm):
raise ImportError("enpt", "EnPT must be installed into the QGIS Python environment "
"when calling 'EnPTAlgorithm'.")
from enpt.options.config import EnPTConfig
from enpt.execution.controller import EnPT_Controller
feedback.pushInfo("The log messages of the EnMAP processing tool are written to the *.log file "
"in the specified output folder.")
......@@ -58,11 +83,30 @@ class EnPTAlgorithm(_EnPTBaseAlgorithm):
if k not in ['anaconda_root']
and v not in [None, NULL, 'NULL', '']}
# print parameters and console call to log
# for key in sorted(parameters):
# feedback.pushInfo('{} = {}'.format(key, repr(parameters[key])))
keyval_str = ' '.join(['--{} {}'.format(key, parameters[key])
for key in sorted(parameters)
if parameters[key] not in [None, NULL, 'NULL', '']])
print(parameters)
print(keyval_str + '\n\n')
feedback.pushInfo("\nCalling EnPT with the following command:\n"
"enpt %s\n\n" % keyval_str)
# prepare environment for subprocess
enpt_env = self._prepare_enpt_environment()
# path_enpt_runscript = self._locate_enpt_run_script()
# run EnPT in subprocess that activates the EnPT Anaconda environment
# os.environ['PYTHONUNBUFFERED'] = '1'
# from contextlib import redirect_stdout
CTR = EnPT_Controller(EnPTConfig(**parameters))
CTR.run_all_processors()
# feedback.pushDebugInfo('Using %s to start EnPT.' % path_enpt_runscript)
feedback.pushInfo("The log messages of the EnMAP processing tool are written to the *.log file "
"in the specified output folder.")
self._run_cmd(f"enpt {keyval_str}",
qgis_feedback=feedback,
env=enpt_env
)
# list output dir
outdir = parameters['output_dir']
......
......@@ -25,10 +25,7 @@
"""This module provides the ExternalEnPTAlgorithm which is used in case EnPT is installed into separate environment."""
import os
import psutil
from subprocess import Popen, PIPE, check_output, CalledProcessError
from threading import Thread
from queue import Queue
from subprocess import check_output, CalledProcessError
from glob import glob
from qgis.core import \
(QgsProcessingContext,
......@@ -63,51 +60,6 @@ class ExternalEnPTAlgorithm(_EnPTBaseAlgorithm):
else:
return '' # FIXME is there a default location in Linux/OSX?
@staticmethod
def _run_cmd(cmd, qgis_feedback=None, **kwargs):
"""Execute external command and get its stdout, exitcode and stderr.
Code based on: https://stackoverflow.com/a/31867499
:param cmd: a normal shell command including parameters
"""
def reader(pipe, queue):
try:
with pipe:
for line in iter(pipe.readline, b''):
queue.put((pipe, line))
finally:
queue.put(None)
process = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True, **kwargs)
q = Queue()
Thread(target=reader, args=[process.stdout, q]).start()
Thread(target=reader, args=[process.stderr, q]).start()
# for _ in range(2):
for source, line in iter(q.get, None):
if qgis_feedback.isCanceled():
# qgis_feedback.reportError('CANCELED')
proc2kill = psutil.Process(process.pid)
for proc in proc2kill.children(recursive=True):
proc.kill()
proc2kill.kill()
raise KeyboardInterrupt
linestr = line.decode('latin-1').rstrip()
# print("%s: %s" % (source, linestr))
if source.name == 3:
qgis_feedback.pushInfo(linestr)
if source.name == 4:
qgis_feedback.reportError(linestr)
exitcode = process.poll()
return exitcode
@staticmethod
def _locate_EnPT_Anaconda_environment(user_root):
anaconda_rootdir = None
......@@ -164,7 +116,7 @@ class ExternalEnPTAlgorithm(_EnPTBaseAlgorithm):
'is correctly installed into your QGIS Python environment.')
@staticmethod
def _prepare_enpt_environment():
def _prepare_enpt_environment() -> dict:
os.environ['PYTHONUNBUFFERED'] = '1'
enpt_env = os.environ.copy()
......@@ -215,7 +167,8 @@ class ExternalEnPTAlgorithm(_EnPTBaseAlgorithm):
}
# remove all parameters not to be forwarded to the EnPT CLI
parameters = {k: v for k, v in parameters.items() if k not in ['anaconda_root']}
parameters = {k: v for k, v in parameters.items()
if k not in ['anaconda_root']}
# print parameters and console call to log
# for key in sorted(parameters):
......@@ -226,7 +179,7 @@ class ExternalEnPTAlgorithm(_EnPTBaseAlgorithm):
print(parameters)
print(keyval_str + '\n\n')
feedback.pushInfo("\nCalling EnPT with the following command:\n"
"python enpt_cli.py %s\n\n" % keyval_str)
"enpt %s\n\n" % keyval_str)
# prepare environment for subprocess
enpt_env = self._prepare_enpt_environment()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment