Commit f61a7ec3 authored by Javier Quinteros's avatar Javier Quinteros

Clean code moving TDMS class to a new file called tdms.py

parent 660ff7b4
......@@ -17,460 +17,17 @@
###############################################################################
import argparse
import os
import sys
import logging
import struct
import pprint
import datetime
import numpy as np
from math import floor
from math import ceil
from obspy import UTCDateTime
from obspy import Trace
from obspy import Stream
from .tdms import TDMS
version = '0.2b2'
class TDMS(object):
def __exit__(self, exc_type, exc_val, exc_tb):
if self.__fi is not None:
self.__fi.close()
def __init__(self, filename, directory='.', chstart=0, chstop=None, chstep=1,
starttime=None, endtime=None, iterate='D', loglevel='INFO'):
logs = logging.getLogger('Init TDMS')
logs.setLevel(loglevel)
# Log level
self.__loglevel = loglevel
# Channel from and to
self.__chstart = chstart
self.__chstop = chstop
self.__chstep = chstep
# Timewindow selection
self.__twstart = starttime
self.__twend = endtime
# Available time window
self.starttime = None
self.endtime = None
# Sampling Rate
self.sampling_rate = None
# File currently being processed
self.__currentfile = None
# Name of file
self.__filename = filename
self.__directory = directory
self.__available = list()
for file in sorted(os.listdir(directory)):
if not file.startswith(filename):
continue
dt = datetime.datetime.strptime(file[len(filename):-len('.tdms')], '_%Z_%Y%m%d_%H%M%S.%f')
self.__available.append({'dt': dt, 'name': file, 'samples': None})
if self.__twstart is None:
self.__twstart = dt
if self.__twstart < self.__available[0]['dt']:
self.__twstart = self.__available[0]['dt']
# Keep the values in case we need to reset them
self.__origstarttime = self.__twstart
self.__origendtime = self.__twend
# What should we iterate? D: Data; M: Metadata
self.iterate = iterate
# Datatype of each channel
self.__datatypes = dict()
# Dictionary to save the metadata defined in the file
self.metadata = dict()
# Initialization of local variables
self.__HEADERLEN = 28
self.__MAXSAMPLES = 30000
self.__FF64b = 0xFFFFFFFFFFFFFFFF
self.__FF32b = 0xFFFFFFFF
self.__data2mask = {
0: ('c', 1), # tdsTypeVoid
1: ('b', 1), # tdsTypeI8
2: ('h', 2), # tdsTypeI16
3: ('i', 4), # tdsTypeI32
4: ('q', 8), # tdsTypeI64
5: ('b', 1), # tdsTypeU8
6: ('h', 2), # tdsTypeU16
7: ('i', 4), # tdsTypeU32
8: ('q', 8), # tdsTypeU64
9: ('f', 4), # tdsTypeSingleFloat
10: ('d', 8), # tdsTypeDoubleFloat
0x20: ('I', 4), # tdsTypeString
0x21: ('?', 1), # tdsTypeBoolean
0x44: ('Qq', 16) # tdsTypeTimeStamp
}
# tdsTypeFixedPoint = 0x4F,
# tdsTypeComplexSingleFloat = 0x08000c,
# tdsTypeComplexDoubleFloat = 0x10000d,
# tdsTypeDAQmxRawData = 0xFFFFFFFF
def __select_file(self):
logs = logging.getLogger('Select file')
logs.setLevel(self.__loglevel)
if self.__currentfile is None:
for idx, fi in enumerate(self.__available):
if self.__twstart < fi['dt']:
if not idx:
raise Exception('Data not available in the specified time window')
filename = os.path.join(self.__directory, self.__available[idx-1]['name'])
self.__currentfile = idx-1
break
else:
raise Exception('Data not available in the specified time window')
elif self.__currentfile >= len(self.__available):
logs.debug('Last file already processed')
# No more data to iterate
raise IndexError
else:
filename = os.path.join(self.__directory, self.__available[self.__currentfile]['name'])
self.__twstart = self.__available[self.__currentfile]['dt']
if (self.__twend is not None) and (self.__twstart > self.__twend):
logs.debug('Start is greater than end. %s %s' % (self.__twstart, self.__twend))
raise IndexError
logs.debug('Opening %s; Startime: %s' % (self.__available[self.__currentfile]['name'], self.__twstart))
# Reset some properties before opening the new file
self.starttime = self.__available[self.__currentfile]['dt']
self.endtime = None
self.metadata = dict()
self.__fi = open(filename, 'rb')
leadin = self.__fi.read(self.__HEADERLEN)
(tag, ToCmask) = struct.unpack('<4si', leadin[:8])
kTocMetaData = 1 << 1
kTocNewObjList = 1 << 2
kTocRawData = 1 << 3
kTocInterleavedData = 1 << 5
kTocBigEndian = 1 << 6
kTocDAQmxRawData = 1 << 7
self.hasmetadata = bool(ToCmask & kTocMetaData)
self.hasnewObjects = bool(ToCmask & kTocNewObjList)
self.hasrawData = bool(ToCmask & kTocRawData)
self.hasInterleavedData = bool(ToCmask & kTocInterleavedData)
self.hasDAQmxRawData = bool(ToCmask & kTocDAQmxRawData)
# All input from now on will be formatted by this
self.__endian = '>' if ToCmask & kTocBigEndian else '<'
if tag.decode() != 'TDSm':
raise Exception('Tag is not TDSm!')
(versionTDMS, self.__segmentOffset, self.__dataOffset) = \
struct.unpack('%ciQQ' % self.__endian, leadin[8:])
logs.debug((tag, ToCmask, versionTDMS, self.__segmentOffset, self.__dataOffset))
if versionTDMS != 4713:
logs.warning('Version number is not 4713!')
if self.__segmentOffset == self.__FF64b:
logs.error('Severe problem while writing data (crash, power outage)')
if self.hasmetadata and not self.__dataOffset:
logs.error('Flag indicates Metadata but its length is 0!')
if self.hasDAQmxRawData:
logs.warning('DAQmx raw data is still not supported!')
# Absolute offsets
self.__segmentOffset += self.__HEADERLEN
self.__dataOffset += self.__HEADERLEN
logs.debug('Metadata: ' + ('yes' if self.hasmetadata else 'no'))
logs.debug('Object list: ' + ('yes' if self.hasnewObjects else 'no'))
logs.debug('Raw data: ' + ('yes' if self.hasrawData else 'no'))
logs.debug('Interleaved data: ' + ('yes' if self.hasInterleavedData else 'no'))
logs.debug('BigEndian: ' + ('yes' if self.__endian == '<' else 'no'))
logs.debug('DAQmx raw data: ' + ('yes' if self.hasDAQmxRawData else 'no'))
self.readMetadata()
def resetcurrenttime(self):
self.__twstart = self.__origstarttime
self.__twend = self.__origendtime
self.__currentfile = None
self.__select_file()
def __enter__(self):
self.__select_file()
return self
def readMetadata(self):
# Metadata
logs = logging.getLogger('Read Metadata')
handler = logging.StreamHandler(sys.stdout)
logs.addHandler(handler)
self.__fi.seek(self.__HEADERLEN, 0)
# Number of objects (unsigned int - 32b)
numObjects = struct.unpack('%cI' % self.__endian, self.__fi.read(4))[0]
logs.debug('Number of objects in metadata: %s' % numObjects)
numChannels = 0
# chunkSize = 0
for obj in range(numObjects):
# channelSize = 0
objPath = self.__readstring()
# logs.debug('Object %s: %s' % (obj, objPath))
self.metadata[obj] = {'path': objPath}
rawDataIdx = struct.unpack('%cI' % self.__endian, self.__fi.read(4))[0]
if rawDataIdx == self.__FF32b:
logs.debug('No raw data assigned to this segment')
self.metadata[obj]['data'] = False
self.__readproperties(self.metadata[obj])
try:
if self.sampling_rate is None:
self.sampling_rate = self.metadata[obj]['SamplingFrequency[Hz]']
except:
pass
try:
if self.starttime is None:
self.starttime = self.metadata[obj]['GPSTimeStamp']
except:
pass
continue
elif not rawDataIdx:
logs.debug('Raw data index in this segment matches the index the same object had in the previous segment')
else:
self.metadata[obj]['data'] = True
numChannels += 1
# There is raw data!
sizeBytes = None
datatype, arraylen, numValues = struct.unpack('%cIIQ' % self.__endian, self.__fi.read(16))
if datatype == 0x20:
self.metadata[obj]['sizeBytes'] = struct.unpack('%cQ' % self.__endian, self.__fi.read(8))[0]
if arraylen != 1:
logs.error('Array length MUST be 1! Actual value: %s' % arraylen)
self.metadata[obj]['datatype'] = self.__data2mask[datatype][0]
# logs.debug('obj %s; datatype: %s; numValues: %s; size: %s' % (obj, datatype, result['numValues'],
# self.__data2mask[datatype][1]*result['numValues']))
# channelSize = self.__data2mask[datatype][1]*numValues
self.__readproperties(self.metadata[obj])
# Set the data type as numpy expects it
if self.__data2mask[datatype][0] == 'h':
self.datatype = '%ci2' % self.__endian
elif self.__data2mask[datatype][0] == 'f':
self.datatype = '%cf4' % self.__endian
else:
raise Exception('Data type not supported! (%s)' % self.__data2mask[datatype][0])
self.datatypesize = self.__data2mask[datatype][1]
self.numChannels = numChannels
self.samples = int((self.__segmentOffset - self.__dataOffset) / numChannels / self.datatypesize)
# Calculate endtime based on the number of samples declared and the sampling rate
self.endtime = self.starttime + datetime.timedelta(seconds=(self.samples-1)/self.sampling_rate)
self.__samplestart = max(floor((self.__twstart - self.starttime).total_seconds() * self.sampling_rate), 0)
# Should I readjust __twstart to align it exactly with the time of the samples?
# print(self.__twstart, self.starttime + datetime.timedelta(seconds=self.__samplestart/self.sampling_rate))
self.__twstart = self.starttime + datetime.timedelta(seconds=self.__samplestart/self.sampling_rate)
self.__samplecur = self.__samplestart
if (self.__twend is None) or (self.__twend >= self.endtime):
self.__sampleend = self.samples-1
else:
self.__sampleend = ceil((self.__twend - self.starttime).total_seconds() * self.sampling_rate)
# print(self.__twend, self.starttime, (self.__twend - self.starttime).total_seconds(), self.__sampleend)
logs.debug('Samples: %s' % self.samples)
logs.debug('Samples selected: %s-%s' % (self.__samplestart, self.__sampleend))
logs.debug('Total chunks size: %s' % (self.__segmentOffset - self.__dataOffset))
logs.debug('Length of channel: %d' % ((self.__segmentOffset - self.__dataOffset)/numChannels/self.__data2mask[datatype][1]))
if self.__chstart >= numChannels:
logs.error('Cannot export from channel %s. Only %s channels present.' % (self.__chstart, numChannels))
raise IndexError
if self.__chstop is None:
self.__chstop = numChannels-1
elif self.__chstop >= numChannels:
logs.warning('Resetting chstart to %s' % (numChannels-1))
self.__chstop = numChannels-1
# New or changed objects
newObjects = struct.unpack('%cI' % self.__endian, self.__fi.read(4))[0]
def __iter__(self):
if self.iterate == 'M':
return self.__iter_metadata__()
else:
return self.__iter_data__()
def __iter_data__(self):
# Data
logs = logging.getLogger('Iterate Data')
# Check encoding based on data type
if self.datatype.endswith('i2'):
enc = 1
elif self.datatype.endswith('f4'):
enc = 4
else:
raise Exception('Encoding type not supported to export in MiniSEED!')
for ch in range(self.__chstart, self.__chstop + 1, self.__chstep):
self.resetcurrenttime()
while (self.__twend is None) or (self.__twstart < self.__twend):
# Loop through channels
while self.__samplecur <= self.__sampleend:
data = self.__readdata(channel=ch)
stats = {'network': 'XX', 'station': '%05d' % ch, 'location': '',
'channel': 'FH1', 'npts': len(data),
'sampling_rate': self.sampling_rate,
'starttime': UTCDateTime(self.__twstart),
'mseed': {'byteorder': self.__endian,
'reclen': 512}}
logs.debug('Stats: %s' % stats)
logs.debug('Data length: %d; First component: %s' % (len(data), data[0]))
yield (data, stats)
# Read data in blocks
self.__samplecur += len(data)
# No more data in this file. Skip to the next one.
self.__currentfile += 1
try:
self.__select_file()
except IndexError:
break
def __iter_metadata__(self):
# Metadata
# logs = logging.getLogger('Iterate Metadata')
while (self.__twend is None) or (self.__twstart < self.__twend):
for ch in self.metadata:
yield self.metadata[ch]
# No more data in this file. Skip to the next one.
self.__currentfile += 1
try:
self.__select_file()
except IndexError:
break
def __readstring(self):
# logs = logging.getLogger('readstring')
strlen = struct.unpack('%cI' % self.__endian, self.__fi.read(4))
# logs.debug('String of length %s' % strlen)
return self.__fi.read(strlen[0]).decode()
def __readvalue(self):
logs = logging.getLogger('readvalue')
datatype = self.__readdatatype()
# logs.debug('datatype: 0x%x' % datatype)
# Consider cases which need another read
# 0x20 is a string. Read again!
if datatype == 0x20:
return self.__readstring()
(mask, numBytes) = self.__data2mask[datatype]
# logs.debug('Mask: %s; Bytes: %s' % (mask, numBytes))
# This instruction returns a tuple. Needed for timestamps
result = struct.unpack('%c%s' % (self.__endian, mask), self.__fi.read(numBytes))
# 0x44 is a timestamp. Read again!
if datatype == 0x44:
result = self.__tup2time(*result)
else:
# Disassemble the tuple if not a timestamp
result = result[0]
# logs.debug('result: %s' % result)
return result
def __readproperties(self, result=dict()):
logs = logging.getLogger('readproperties')
numProps = struct.unpack('%cI' % self.__endian, self.__fi.read(4))[0]
if numProps:
logs.debug('%s properties' % numProps)
for prop in range(numProps):
propStr = self.__readstring()
value = self.__readvalue()
result[propStr] = value
logs.debug('%s: %s' % (propStr, value))
return result
def __readdatatype(self):
return struct.unpack('%cI' % self.__endian, self.__fi.read(4))[0]
def __tup2time(self, fraction, seconds):
logs = logging.getLogger('tup2time')
# logs.debug('seconds: %s' % seconds)
# logs.debug('fraction: %s' % fraction)
dt1904 = datetime.datetime(1904, 1, 1)
delta = seconds + fraction * 2**(-64)
result = dt1904 + datetime.timedelta(seconds=delta)
# logs.debug('Date-time %s' % result)
return result
def __readdata(self, channel=0):
# numSamples = self.__sampleend - self.__samplestart + 1
numSamples = min(self.__sampleend - self.__samplecur + 1, self.__MAXSAMPLES)
if not self.hasInterleavedData:
# Seek where the channel starts and add the offset to the first
# sample to read based in the time window selection
# self.__fi.seek(self.__dataOffset + self.datatypesize*self.samples*channel + self.__samplestart, 0)
self.__fi.seek(self.__dataOffset + self.datatypesize*self.samples*channel + self.__samplecur, 0)
# Read all selected data from the channel in one step
result = np.fromfile(self.__fi, dtype=self.datatype, count=numSamples)
else:
# Seek where the raw data starts and add the offset to the first
# sample to read based in the time window selection
# self.__fi.seek(self.__dataOffset + self.__samplestart*self.datatypesize*self.numChannels, 0)
self.__fi.seek(self.__dataOffset + self.__samplecur*self.datatypesize*self.numChannels, 0)
# Reserve the data for the result
result = np.zeros((numSamples,), dtype=self.datatype)
for ch in range(numSamples):
# Read from all channels and select the specific one with an index (channel)
result[ch] = np.fromfile(self.__fi, dtype=self.datatype, count=self.numChannels)[channel]
version = '0.2b2'
return result
def str2date(dStr):
......
import logging
import datetime
import os
import sys
import struct
from obspy import UTCDateTime
import numpy as np
from math import floor
from math import ceil
class TDMS(object):
def __exit__(self, exc_type, exc_val, exc_tb):
if self.__fi is not None:
self.__fi.close()
def __init__(self, filename, directory='.', chstart=0, chstop=None, chstep=1,
starttime=None, endtime=None, iterate='D', loglevel='INFO'):
logs = logging.getLogger('Init TDMS')
logs.setLevel(loglevel)
# Log level
self.__loglevel = loglevel
# Channel from and to
self.__chstart = chstart
self.__chstop = chstop
self.__chstep = chstep
# Timewindow selection
self.__twstart = starttime
self.__twend = endtime
# Available time window
self.starttime = None
self.endtime = None
# Sampling Rate
self.sampling_rate = None
# File currently being processed
self.__currentfile = None
# Name of file
self.__filename = filename
self.__directory = directory
self.__available = list()
for file in sorted(os.listdir(directory)):
if not file.startswith(filename):
continue
dt = datetime.datetime.strptime(file[len(filename):-len('.tdms')], '_%Z_%Y%m%d_%H%M%S.%f')
self.__available.append({'dt': dt, 'name': file, 'samples': None})
if self.__twstart is None:
self.__twstart = dt
if self.__twstart < self.__available[0]['dt']:
self.__twstart = self.__available[0]['dt']
# Keep the values in case we need to reset them
self.__origstarttime = self.__twstart
self.__origendtime = self.__twend
# What should we iterate? D: Data; M: Metadata
self.iterate = iterate
# Datatype of each channel
self.__datatypes = dict()
# Dictionary to save the metadata defined in the file
self.metadata = dict()
# Initialization of local variables
self.__HEADERLEN = 28
self.__MAXSAMPLES = 30000
self.__FF64b = 0xFFFFFFFFFFFFFFFF
self.__FF32b = 0xFFFFFFFF
self.__data2mask = {
0: ('c', 1), # tdsTypeVoid
1: ('b', 1), # tdsTypeI8
2: ('h', 2), # tdsTypeI16
3: ('i', 4), # tdsTypeI32
4: ('q', 8), # tdsTypeI64
5: ('b', 1), # tdsTypeU8
6: ('h', 2), # tdsTypeU16
7: ('i', 4), # tdsTypeU32
8: ('q', 8), # tdsTypeU64
9: ('f', 4), # tdsTypeSingleFloat
10: ('d', 8), # tdsTypeDoubleFloat
0x20: ('I', 4), # tdsTypeString
0x21: ('?', 1), # tdsTypeBoolean
0x44: ('Qq', 16) # tdsTypeTimeStamp
}
# tdsTypeFixedPoint = 0x4F,
# tdsTypeComplexSingleFloat = 0x08000c,
# tdsTypeComplexDoubleFloat = 0x10000d,
# tdsTypeDAQmxRawData = 0xFFFFFFFF
def __select_file(self):
logs = logging.getLogger('Select file')
logs.setLevel(self.__loglevel)
if self.__currentfile is None:
for idx, fi in enumerate(self.__available):
if self.__twstart < fi['dt']:
if not idx:
raise Exception('Data not available in the specified time window')
filename = os.path.join(self.__directory, self.__available[idx-1]['name'])
self.__currentfile = idx-1
break
else:
raise Exception('Data not available in the specified time window')
elif self.__currentfile >= len(self.__available):
logs.debug('Last file already processed')
# No more data to iterate
raise IndexError
else:
filename = os.path.join(self.__directory, self.__available[self.__currentfile]['name'])
self.__twstart = self.__available[self.__currentfile]['dt']
if (self.__twend is not None) and (self.__twstart > self.__twend):
logs.debug('Start is greater than end. %s %s' % (self.__twstart, self.__twend))
raise IndexError
logs.debug('Opening %s; Startime: %s' % (self.__available[self.__currentfile]['name'], self.__twstart))
# Reset some properties before opening the new file
self.starttime = self.__available[self.__currentfile]['dt']
self.endtime = None
self.metadata = dict()
self.__fi = open(filename, 'rb')
leadin = self.__fi.read(self.__HEADERLEN)
(tag, ToCmask) = struct.unpack('<4si', leadin[:8])
kTocMetaData = 1 << 1
kTocNewObjList = 1 << 2
kTocRawData = 1 << 3
kTocInterleavedData = 1 << 5
kTocBigEndian = 1 << 6
kTocDAQmxRawData = 1 << 7