Commit 726ed68e authored by Marius Kriegerowski's avatar Marius Kriegerowski

move to own repo

parent 85558ec5
Prereqs
-------
- tensorflow
- scikit-optimize
- git clone git@github.com:HerrMuellerluedenscheid/swarming.git
Invoke
------
To start training:
pinky --config <config_filename> --train
You can dump your examples to TFRecordDatasets to accelerate io operations:
pinky --config <config_filename> --write <new_config_filename>
and use the newly created config file to run `--train`
Tests
-----
- basic learning (synthetics, real data)
# TODO: split data -> training, evaluation
- synthetics, layered cake, train with top and bottom layer containing events.
Validate with events within middle layer
- evaluation using 'unknown' velocity model
- test extrapolation with fault plane geometry
- synthetic pretraining
- hyperparameter optimization using skopt
Outlook
-------
- increase z error weight -> improve z estimates
import sys
from setuptools import setup
packname = 'pinky'
version = '0.1'
setup(
name=packname,
version=version,
license='GPLv3',
python_requires='!=3.0.*, !=3.1.*, !=3.2.*, <4',
install_requires=[],
packages=[packname],
package_dir={'pinky': 'src'},
entry_points={
'console_scripts': [
'pinky = pinky.model:main',
]
}
)
from pyrocko.gui.marker import PhaseMarker, EventMarker, associate_phases_to_events
from pyrocko.model.event import load_events
from pyrocko import util
import numpy as num
debug = True
def onset_stats(fn_markers, fn_events, fn_events_all):
markers = PhaseMarker.load_markers(fn_markers)
markers_all = [EventMarker(e) for e in load_events(fn_events_all)]
associate_phases_to_events(markers_all)
tt_by_nsl = get_tt_by_nsl(markers_all)
if debug:
plot_onsets(tt_by_nsl)
def power(tr):
return num.sqrt(num.sum(tr.get_ydata()**2))/(tr.tmax - tr.tmin)
def normalize(tr):
tr.ydata = tr.ydata / power(tr)
def get_tt_by_nsl(markers):
''' Returns a dictionary with nsl as keys and travel times as value list'''
onsets = {}
for m in markers:
if not isinstance(m, PhaseMarker):
continue
if not m.get_phasename().upper() != 'P':
continue
e = m.get_event()
if not e:
continue
tt = m.tmin - e.time
# remove some outliers:
if 0 < tt < 14:
append_to_dict(onsets, m.one_nslc()[:3], tt)
return onsets
def get_average_onsets(markers):
onsets = {}
for m in markers:
if not isinstance(m, PhaseMarker):
continue
if not m.get_phasename().upper() != 'P':
continue
e = m.get_event()
if e is not None:
append_to_dict(
onsets, m.one_nslc()[:3], m.tmin - e.time)
for k, v in onsets.items():
onsets[k] = num.mean(v)
return onsets
def get_average_scaling(trs_dict, reference_nsl_pattern):
scalings = {}
for event, trs in trs_dict.items():
refs = [tr for tr in trs if util.match_nslc(
reference_nsl_pattern, tr.nslc_id)]
for ref in refs:
for tr in trs:
if tr.channel == ref.channel:
append_to_dict(
scalings, tr.nslc_id, power(tr)/power(ref))
for k, v in scalings.items():
scalings[k] = num.mean(v)
return scalings
def two_sided_percentile(vals, percentile=95.):
p = percentile
vals = num.array(vals)
return num.percentile(vals, p), -1. * num.percentile(-1.*vals, p)
def plot_onsets(onsets):
import matplotlib.pyplot as plt
fig = plt.figure()
vals = list(onsets.values())
for i, v in enumerate(vals):
ax = fig.add_subplot(4, 1 + len(onsets)//4, i+1)
v = num.array(v)
ax.hist(v, bins=31)
ax.axvspan(*two_sided_percentile(v), alpha=0.2)
plt.show()
if __name__ == '__main__':
fn_markers = '/home/marius/josef_dd/markers_with_polarities.pf'
fn_events = '/home/marius/josef_dd/events_from_sebastian_subset.pf'
onset_stats(fn_markers,fn_events, fn_events)
import tensorflow as tf
from pyrocko.guts import Object, Float, Int, String, Bool
from pyrocko.pile import make_pile
# from .data import DataGeneratorBase
import os
class PremadeGeneratorConfig(Object):
window_length = Float.T(default=5.)
data_noise = String.T()
data_events = String.T()
out_fn = String.T()
target_deltat = Int.T(optional=True)
ref_lat = Float.T()
ref_lon = Float.T()
batch_size = Int.T()
n_classes = Int.T()
def __init__(self, *args, **kwargs):
Object.__init__(self, *args, **kwargs)
self._writer = None
self._p1 = None
self._p2 = None
self._effective_deltat = None
def load_data(self):
if None in (self._p1, self._p2):
print('load data... should happen just once!')
self._p1 = make_pile(self.data_noise)
self._p2 = make_pile(self.data_events)
if (len(self._p1.deltats) > 1 or len(self._p2.deltats)>1) and self.target_deltat is None:
raise Exception('WARNING : target_deltat not defined and different sampling rates')
return self._p1, self._p2
@property
def effective_deltat(self):
if self._effective_deltat is None:
if self.target_deltat is not None:
self._effective_deltat = self.target_deltat
else:
self._effective_deltat = list(self._p1.deltats.keys())[0]
return self._effective_deltat
@property
def nslc_ids(self):
p1, p2 = self.load_data()
k1 = tuple(p1.nslc_ids.keys())
k2 = tuple(p2.nslc_ids.keys())
ids = list(set(k1 + k2))
return ids
@property
def writer(self):
if self._writer is None:
outpath = self.out_fn.rsplit('/', 1)[0]
if not os.path.exists(outpath):
os.makedirs(outpath)
self._writer = tf.python_io.TFRecordWriter(self.out_fn)
return self._writer
#!/usr/bin/env python3
import tensorflow as tf
from pyrocko.io import save, load
from pyrocko.model import load_stations
from pyrocko.guts import Object, String, Int, Float, Tuple, Bool
from pyrocko.gui import marker
from pyrocko.gf.seismosizer import Engine, Target, LocalEngine
from pyrocko import orthodrome
from pyrocko import pile
from swarm import synthi, source_region
import logging
import random
import numpy as num
import os
import glob
import sys
from .tf_util import _FloatFeature, _Int64Feature, _BytesFeature
pjoin = os.path.join
EPSILON = 1E-4
# TODOS:
# - remove 'double events'
class Noise(Object):
level = Float.T(default=1.)
def __init__(self, *args, **kwargs):
super(Noise, self).__init__(*args, **kwargs)
logging.info('Applying noise to input data with level %s' % self.level)
def get_chunk(self, n_channels, n_samples):
...
class WhiteNoise(Noise):
def get_chunk(self, n_channels, n_samples):
return num.random.random((n_channels, n_samples)).astype(num.float32) * self.level
class DataGeneratorBase(Object):
_shape = Tuple.T(2, Int.T(), optional=True)
fn_tfrecord = String.T(optional=True)
n_classes = Int.T(default=3)
@property
def tensor_shape(self):
return self._shape
@property
def generate_output_types(self):
return tf.float32, tf.float32
@tensor_shape.setter
def tensor_shape(self, v):
if v == self._shape:
return self._shape
else:
self._shape = v
@property
def output_shapes(self):
return (self.tensor_shape, self.n_classes)
def unpack_examples(self, record_iterator):
'''Parse examples stored in TFRecordData to `tf.train.Example`'''
for string_record in record_iterator:
example = tf.train.Example()
example.ParseFromString(string_record)
chunk = example.features.feature['data'].bytes_list.value[0]
label = example.features.feature['label'].bytes_list.value[0]
chunk = num.fromstring(chunk, dtype=num.float32)
chunk = chunk.reshape(self.tensor_shape)
label = num.fromstring(label, dtype=num.float32)
yield chunk, label
def generate(self):
record_iterator = tf.python_io.tf_record_iterator(
path=self.fn_tfrecord)
return self.unpack_examples(record_iterator)
def get_dataset(self):
return tf.data.Dataset.from_generator(
self.generate,
self.generate_output_types,
output_shapes=self.output_shapes)
def pack_examples(self, generator):
'''Serialize Examples to strings.'''
for ydata, label in self.generate():
yield tf.train.Example(
features=tf.train.Features(
feature={
'data': _BytesFeature(ydata.tobytes()),
'label': _BytesFeature(num.array(label, dtype=num.float32).tobytes()),
}))
def write(self, directory):
'''Write example data to TFRecordDataset using `self.writer`.'''
writer = tf.python_io.TFRecordWriter(directory)
for ex in self.pack_examples(writer):
writer.write(ex.SerializeToString())
class DataGenerator(DataGeneratorBase):
sample_length = Float.T(help='length [s] of data window')
fn_stations = String.T()
absolute = Bool.T(help='Use absolute amplitudes', default=False)
effective_deltat = Float.T(optional=True)
reference_target = Target.T(
default=Target(
codes=('', 'NKC', '', 'SHZ'),
lat=50.2331,
lon=12.448,
elevation=546))
noise = Noise.T(optional=True, help='Add noise to your feature chunks')
highpass = Float.T(optional=True)
lowpass = Float.T(optional=True)
highpass_order = Int.T(default=4, optional=True)
lowpass_order = Int.T(default=4, optional=True)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.setup()
def setup(self):
pass
@property
def output_shapes(self):
return (self.tensor_shape, self.n_classes)
def extract_labels(self, source):
n, e = orthodrome.latlon_to_ne(
self.reference_target.lat, self.reference_target.lon,
source.lat, source.lon)
return (n, e, source.depth)
def get_raw_data_chunk(self):
'''
Return an array of size (Nchannels x Nsamples_max).
When working with noisy data, replace this function.
'''
return num.zeros(self.tensor_shape, dtype=num.float32)
def attach_graph(self, dataset, shape):
'''
Use this method to attach any preprocessing to be done in tensorflow
graph.
'''
return dataset
def regularize_deltat(self, tr):
'''Equalize sampling rates accross the data set according to sampling rate
set in `self.config`.'''
if abs(tr.deltat - self.effective_deltat)>0.0001:
tr.resample(self.effective_deltat)
def fit_data_into_chunk(self, traces, chunk, indices=None, tref=0):
indices = indices or range(len(traces))
for i, tr in zip(indices, traces):
if self.absolute:
tr.data = num.abs(tr.data)
data_len = len(tr.data)
istart_trace = int((tr.tmin - tref) / tr.deltat)
istart_array = max(istart_trace, 0)
istart_trace = max(-istart_trace, 0)
istop_array = istart_array + (data_len - 2* istart_trace)
ydata = tr.data[ \
istart_trace: min(data_len, self.n_samples_max-istart_array) \
+ istart_trace]
chunk[i, istart_array: istart_array+ydata.shape[0]] += ydata
chunk -= num.min(chunk)
chunk /= num.max(chunk)
class PileData(DataGenerator):
'''Data generator for locally saved data.'''
data_path = String.T()
data_format = String.T(default='mseed')
fn_markers = String.T()
deltat_want = Float.T(optional=True)
shuffle = Bool.T(default=False)
def setup(self):
self.data_pile = pile.make_pile(
self.data_path, fileformat=self.data_format)
if self.data_pile.is_empty():
sys.exit('Data pile is empty!')
self.deltat_want = self.deltat_want or \
min(self.data_pile.deltats.keys())
self.n_samples_max = int(self.sample_length/self.deltat_want)
logging.debug('loading markers')
markers = marker.load_markers(self.fn_markers)
marker.associate_phases_to_events(markers)
markers_by_nsl = {}
for m in markers:
if not m.match_nsl(self.reference_target.codes[:3]):
continue
key = m.one_nslc()[:3]
_ms = markers_by_nsl.get(key, [])
_ms.append(m)
markers_by_nsl[key] = _ms
assert(len(markers_by_nsl) == 1)
self.markers = list(markers_by_nsl.values())[0]
if self.shuffle:
random.shuffle(self.markers)
else:
self.markers.sort(key=lambda x: x.tmin)
self.channels = list(self.data_pile.nslc_ids.keys())
self.tensor_shape = (len(self.channels), self.n_samples_max)
def check_inputs(self):
if len(self.data_pile.deltats()) > 1:
logging.warn(
'Different sampling rates in dataset. Preprocessing slow')
def preprocess(self, tr):
'''Trace preprocessing
Ensures equal sampling rates
:param tr: pyrocko.tr.Trace object'''
if tr.delta - self.deltat_want > EPSILON:
tr.resample(self.deltat_want)
elif tr.deltat - self.deltat_want < -EPSILON:
tr.downsample_to(self.deltat_want)
def generate(self):
tr_len = self.n_samples_max * self.deltat_want
nslc_to_index = {nslc: idx for idx, nslc in enumerate(self.channels)}
for m in self.markers:
event = m.get_event()
if event is None:
logging.debug('No event: %s' % m)
continue
for trs in self.data_pile.chopper(
tmin=m.tmin, tmax=m.tmin+tr_len, keep_current_files_open=True):
if not len(trs):
logging.debug('No data at tmin=%s' % m.tmin)
continue
for tr in trs:
tr.data = tr.ydata.astype(num.float)
chunk = self.get_raw_data_chunk()
indices = [nslc_to_index[tr.nslc_id] for tr in trs]
self.fit_data_into_chunk(trs, chunk, indices=indices, tref=m.tmin)
if self.noise is not None:
chunk += self.noise.get_chunk(*self.tensor_shape)
yield chunk, self.extract_labels(event)
class GFSwarmData(DataGenerator):
swarm = source_region.Swarm.T()
n_sources = Int.T(default=100)
onset_phase = String.T(default='p')
quantity = String.T(default='velocity')
tpad = Float.T(default=1., help='padding between p phase onset and data chunk start')
def setup(self):
stations = load_stations(self.fn_stations)
self.targets = synthi.guess_targets_from_stations(
stations, quantity=self.quantity)
self.store = self.swarm.engine.get_store() # uses default store
self.n_samples_max = int(self.sample_length/self.store.config.deltat)
self.tensor_shape = (len(self.targets), self.n_samples_max)
def make_data_chunk(self, source, results):
ydata_stacked = self.get_raw_data_chunk()
tref = self.store.t(
self.onset_phase, (
source.depth,
source.distance_to(self.reference_target))
)