Commit 1d6ad7ba authored by Sebastian Heimann's avatar Sebastian Heimann
Browse files

can now suspend and continue

parent 2f626600
# -*- coding: utf-8 -*-
import numpy as num
import logging, os, shutil, sys, glob, math, copy
from multiprocessing import Pool
import logging, os, shutil, sys, glob, math, copy, signal, errno
from tempfile import mkdtemp
from subprocess import Popen, PIPE
......@@ -13,6 +9,7 @@ from guts import *
from guts_array import *
from pyrocko import trace, util, cake
from pyrocko import gf
from pyrocko.parimap import parimap
Timing = gf.meta.Timing
......@@ -521,8 +518,14 @@ taken finally for the sublayer thickness.
return template % d
class QSeisError(Exception):
class QSeisError(gf.store.StoreError):
pass
class Interrupted(gf.store.StoreError):
def __str__(self):
return 'Interrupted.'
class QSeisRunner:
......@@ -532,6 +535,7 @@ class QSeisRunner:
self.keep_tmp = keep_tmp
self.program = program_bins['qseis']
self.config = None
def run(self, config):
self.config = config
......@@ -551,20 +555,32 @@ class QSeisRunner:
old_wd = os.getcwd()
os.chdir(self.tempdir)
interrupted = []
def signal_handler(signum, frame):
os.kill(proc.pid, signal.SIGTERM)
interrupted.append(True)
original = signal.signal(signal.SIGINT, signal_handler)
try:
proc = Popen(program, stdin=PIPE, stdout=PIPE, stderr=PIPE)
except OSError:
os.chdir(old_wd)
raise QSeisError('could not start qseis: "%s"' % program)
try:
proc = Popen(program, stdin=PIPE, stdout=PIPE, stderr=PIPE)
except OSError:
os.chdir(old_wd)
raise QSeisError('could not start qseis: "%s"' % program)
(output_str, error_str) = proc.communicate('input\n')
(output_str, error_str) = proc.communicate('input\n')
finally:
signal.signal(signal.SIGINT, original)
if interrupted:
raise KeyboardInterrupt()
logger.debug('===== begin qseis output =====\n'
'%s===== end qseis output =====' % output_str)
if error_str:
logger.error('===== begin qseis error =====\n'
'%s===== end qseis error =====' % error_str)
errmess = []
if proc.returncode != 0:
......@@ -620,17 +636,18 @@ in the directory %s'''.lstrip() %
meta=dict(
distance = distance*km,
azimuth = azimuth ))
traces.append(tr)
return traces
def __del__(self):
if not self.keep_tmp:
shutil.rmtree(self.tempdir)
else:
logger.warn('not removing temporary directory: %s' % self.tempdir)
if self.tempdir:
if not self.keep_tmp:
shutil.rmtree(self.tempdir)
self.tempdir = None
else:
logger.warn('not removing temporary directory: %s' % self.tempdir)
class QSeisGFBuilder(gf.builder.Builder):
def __init__(self, store_dir, shared, block_size=None, tmp=None ):
......@@ -729,37 +746,56 @@ class QSeisGFBuilder(gf.builder.Builder):
rawtraces = runner.get_traces()
interrupted = []
def signal_handler(signum, frame):
interrupted.append(True)
original = signal.signal(signal.SIGINT, signal_handler)
self.store.lock()
duplicate_inserts = 0
try:
for itr, tr in enumerate(rawtraces):
if tr.channel not in gfmap:
continue
x = tr.meta['distance']
if x > firstx + (nx-1)*dx:
continue
for itr, tr in enumerate(rawtraces):
if tr.channel not in gfmap:
continue
x = tr.meta['distance']
if x > firstx + (nx-1)*dx:
continue
ig, factor = gfmap[tr.channel]
ig, factor = gfmap[tr.channel]
if len(self.store.config.ns) == 2:
args = (sz,x,ig)
else:
args = (rz,sz,x,ig)
if len(self.store.config.ns) == 2:
args = (sz,x,ig)
else:
args = (rz,sz,x,ig)
if conf.cut:
tmin = self.store.t(conf.cut[0], args[:-1])
tmax = self.store.t(conf.cut[1], args[:-1])
if None in (tmin, tmax):
continue
if conf.cut:
tmin = self.store.t(conf.cut[0], args[:-1])
tmax = self.store.t(conf.cut[1], args[:-1])
if None in (tmin, tmax):
continue
tr.chop(tmin, tmax)
tr.chop(tmin, tmax)
gf_tr = gf.store.GFTrace.from_trace(tr)
gf_tr.data *= factor
gf_tr = gf.store.GFTrace.from_trace(tr)
gf_tr.data *= factor
try:
self.store.put(args, gf_tr)
except gf.store.DuplicateInsert, e:
duplicate_inserts += 1
self.store.put(args, gf_tr)
finally:
if duplicate_inserts:
logger.warn('%i insertions skipped (duplicates)' %
duplicate_inserts)
self.store.unlock()
signal.signal(signal.SIGINT, original)
if interrupted:
raise KeyboardInterrupt()
self.store.unlock()
conf.gf_sw_source_types = (0,0,0,0,0,0)
logger.info('Done with block %i / %i' %
......@@ -821,25 +857,54 @@ def init(store_dir):
return gf.store.Store.create_editables(store_dir, config=config, extra={'qseis': qseis})
def __work_block(args):
store_dir, iblock, shared = args
builder = QSeisGFBuilder(store_dir, shared)
builder.work_block(iblock)
try:
store_dir, iblock, shared = args
builder = QSeisGFBuilder(store_dir, shared)
builder.work_block(iblock)
except KeyboardInterrupt:
raise Interrupted()
except IOError, e:
if e.errno == errno.EINTR:
raise Interrupted()
else:
raise
return store_dir, iblock
def build(store_dir, force=False, nworkers=None):
def build(store_dir, force=False, nworkers=None, continue_=False):
gf.store.Store.create_dependants(store_dir, force)
done = set()
status_fn = pjoin(store_dir, '.status')
if not continue_:
gf.store.Store.create_dependants(store_dir, force)
with open(status_fn, 'w') as status:
pass
else:
try:
with open(status_fn, 'r') as status:
for line in status:
done.add(int(line))
except IOError:
raise gf.StoreError('nothing to continue')
shared = {}
builder = QSeisGFBuilder(store_dir, shared)
iblocks = builder.all_block_indices()
iblocks = [ x for x in builder.all_block_indices() if x not in done ]
del builder
if nworkers is None or nworkers > 1:
p = Pool(nworkers)
p.map(__work_block, [ (store_dir, iblock, shared) for iblock in iblocks ])
original = signal.signal(signal.SIGINT, signal.SIG_IGN)
try:
for x in parimap(__work_block, [ (store_dir, iblock, shared) for iblock in iblocks ],
nprocs=nworkers):
else:
map(__work_block, [ (store_dir, iblock, shared) for iblock in iblocks ])
store_dir, iblock = x
with open(status_fn, 'a') as status:
status.write('%i\n' % iblock)
finally:
signal.signal(signal.SIGINT, original)
os.remove(status_fn)
if __name__ == '__main__':
......@@ -850,7 +915,6 @@ if __name__ == '__main__':
runner = QSeisRunner()
runner.run(conf)
traces = runner.get_traces()
trace.snuffle(traces)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment