Commit 9a310e45 authored by Javier Quinteros's avatar Javier Quinteros
Browse files

First version of a TDMS class with a very basic metadata iterator

Still needs to be tested.
parent 22351ff7
......@@ -22,218 +22,245 @@ import struct
import datetime
HEADERLEN = 28
kTocMetaData = 1 << 1
kTocNewObjList = 1 << 2
kTocRawData = 1 << 3
kTocInterleavedData = 1 << 5
kTocBigEndian = 1 << 6
kTocDAQmxRawData = 1 << 7
FF64b = 0xFFFFFFFFFFFFFFFF
FF32b = 0xFFFFFFFF
def main():
# Check verbosity in the output
msg = 'Read and convert waveforms generated by a DAS system.'
parser = argparse.ArgumentParser(description=msg)
parser.add_argument('-l', '--loglevel',
help='Verbosity in the output.',
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO',
'DEBUG'],
default='WARNING')
parser.add_argument('filename', help='File name to read and process.')
args = parser.parse_args()
logging.basicConfig(level=args.loglevel)
logs = logging.getLogger('OpenFile')
logs.setLevel(args.loglevel)
with open(args.filename, 'rb') as fi:
leadin = fi.read(HEADERLEN)
class TDMS(object):
def __init__(self, filename, loglevel='INFO'):
logs = logging.getLogger('OpenFile')
logs.setLevel(loglevel)
# Initialization of local variables
HEADERLEN = 28
kTocMetaData = 1 << 1
kTocNewObjList = 1 << 2
kTocRawData = 1 << 3
kTocInterleavedData = 1 << 5
kTocBigEndian = 1 << 6
kTocDAQmxRawData = 1 << 7
self.__FF64b = 0xFFFFFFFFFFFFFFFF
self.__FF32b = 0xFFFFFFFF
self.__data2mask = {
0: ('c', 1), # tdsTypeVoid
1: ('b', 1), # tdsTypeI8
2: ('h', 2), # tdsTypeI16
3: ('i', 4), # tdsTypeI32
4: ('q', 8), # tdsTypeI64
5: ('b', 1), # tdsTypeU8
6: ('h', 2), # tdsTypeU16
7: ('i', 4), # tdsTypeU32
8: ('q', 8), # tdsTypeU64
9: ('f', 4), # tdsTypeSingleFloat
10: ('d', 8), # tdsTypeDoubleFloat
0x20: ('I', 4), # tdsTypeString
0x21: ('?', 1), # tdsTypeBoolean
0x44: ('Qq', 16) # tdsTypeTimeStamp
}
# tdsTypeFixedPoint = 0x4F,
# tdsTypeComplexSingleFloat = 0x08000c,
# tdsTypeComplexDoubleFloat = 0x10000d,
# tdsTypeDAQmxRawData = 0xFFFFFFFF
if input is None:
logs.error('No input file was specified!')
self.__fi = open(filename, 'rb')
leadin = self.__fi.read(HEADERLEN)
(tag, ToCmask) = struct.unpack('<4si', leadin[:8])
self.metadata = bool(ToCmask & kTocMetaData)
self.newObjects = bool(ToCmask & kTocNewObjList)
self.rawData = bool(ToCmask & kTocRawData)
self.interleavedData = bool(ToCmask & kTocInterleavedData)
self.DAQmxRawData = bool(ToCmask & kTocDAQmxRawData)
# All input from now on will be formatted by this
endian = '>' if ToCmask & kTocBigEndian else '<'
self.__endian = '>' if ToCmask & kTocBigEndian else '<'
(version, segmentOffset, dataOffset) = struct.unpack('%ciQQ' % endian, leadin[8:])
if tag.decode() != 'TDSm':
raise Exception('Tag is not TDSm!')
(version, self.__segmentOffset, self.__dataOffset) = \
struct.unpack('%ciQQ' % self.__endian, leadin[8:])
logs.info((tag, ToCmask, version, self.__segmentOffset, self.__dataOffset))
if version != 4713:
logs.warning('Version number is not 4713!')
logs.info('Metadata: ' + ('yes' if ToCmask & kTocMetaData else 'no'))
logs.info('Object list: ' + ('yes' if ToCmask & kTocNewObjList else 'no'))
logs.info('Raw data: ' + ('yes' if ToCmask & kTocRawData else 'no'))
logs.info('Interleaved data: ' + ('yes' if ToCmask & kTocInterleavedData else 'no'))
logs.info('BigEndian: ' + ('yes' if ToCmask & kTocBigEndian else 'no'))
logs.info('DAQmx raw data: ' + ('yes' if ToCmask & kTocDAQmxRawData else 'no'))
if segmentOffset == FF64b:
if self.__segmentOffset == self.__FF64b:
logs.error('Severe problem while writing data (crash, power outage)')
if ((ToCmask & kTocMetaData) and not dataOffset):
if self.metadata and not self.__dataOffset:
logs.error('Flag indicates Metadata but its length is 0!')
if (ToCmask & kTocDAQmxRawData):
if self.DAQmxRawData:
logs.warning('DAQmx raw data is still not supported!')
logs.info((tag, ToCmask, version, segmentOffset, dataOffset))
# Absolute offsets
self.__segmentOffset += HEADERLEN
self.__dataOffset += HEADERLEN
logs.info('Metadata: ' + ('yes' if self.metadata else 'no'))
logs.info('Object list: ' + ('yes' if self.newObjects else 'no'))
logs.info('Raw data: ' + ('yes' if self.rawData else 'no'))
logs.info('Interleaved data: ' + ('yes' if self.interleavedData else 'no'))
logs.info('BigEndian: ' + ('yes' if self.__endian == '<' else 'no'))
logs.info('DAQmx raw data: ' + ('yes' if self.DAQmxRawData else 'no'))
def __iter__(self):
return self
def __next__(self):
return self.__next__metadata__()
def __next__metadata__(self):
logs = logging.getLogger('Iterate Metadata')
# Metadata
# Number of objects (unsigned int - 32b)
numObjects = struct.unpack('%cI' % endian, fi.read(4))[0]
numObjects = struct.unpack('%cI' % self.__endian, self.__fi.read(4))[0]
logs.info('Number of objects in metadata: %s' % numObjects)
numChannels = 0
chunkSize = 0
for obj in range(numObjects):
channelSize = 0
objPath = readstring(fi, endian)
logs.debug('Object %s: %s' % (obj, objPath))
objPath = self.__readstring()
logs.info('Object %s: %s' % (obj, objPath))
rawDataIdx = struct.unpack('%cI' % endian, fi.read(4))[0]
rawDataIdx = struct.unpack('%cI' % self.__endian, self.__fi.read(4))[0]
if rawDataIdx == FF32b:
if rawDataIdx == self.__FF32b:
logs.info('No raw data assigned to this segment')
readproperties(fi, endian)
self.__readproperties()
continue
elif not rawDataIdx:
logs.info('Raw data index in this segment matches the index the same object had in the previous segment')
else:
numChannels = numChannels + 1
numChannels += 1
# There is raw data!
sizeBytes = None
datatype, arraylen, numValues = struct.unpack('%cIIQ' % endian, fi.read(16))
datatype, arraylen, numValues = struct.unpack('%cIIQ' % self.__endian, self.__fi.read(16))
if datatype == 0x20:
sizeBytes = struct.unpack('%cQ' % endian, fi.read(8))[0]
sizeBytes = struct.unpack('%cQ' % self.__endian, self.__fi.read(8))[0]
if arraylen != 1:
logs.error('Array length MUST be 1! Actual value: %s' % arraylen)
# logs.debug('obj %s; datatype: %s; numValues: %s; size: %s' % (obj, datatype, numValues,
# data2mask[datatype][1]*numValues))
channelSize = channelSize + data2mask[datatype][1]*numValues
readproperties(fi, endian)
logs.debug('obj %s; datatype: %s; numValues: %s; size: %s' % (obj, datatype, numValues,
self.__data2mask[datatype][1]*numValues))
channelSize = self.__data2mask[datatype][1]*numValues
self.__readproperties()
logs.debug('channelSize: %s bytes' % channelSize)
# logs.debug('channelSize: %s bytes' % channelSize)
chunkSize = chunkSize + channelSize
samples = int((segmentOffset - dataOffset)/numChannels/data2mask[datatype][1])
logs.info('chunkSize: %s bytes' % chunkSize)
logs.info('Total chunks size: %s' % (segmentOffset - dataOffset))
logs.info('Length of channel: %d' % ((segmentOffset - dataOffset)/numChannels/data2mask[datatype][1]))
# New or changed objects
newObjects = struct.unpack('%cI' % endian, fi.read(4))[0]
# Got to the beginning of the raw data
fi.seek(dataOffset + HEADERLEN, 0)
result = readdata(fi, data2mask[datatype][0], data2mask[datatype][1], samples, ToCmask & kTocInterleavedData,
numChannels, endian)
print(result)
def readdata(fi, datatype, datasize, samples, interleaved, numChannels, endian='<'):
if not interleaved:
# Read from first channel
result = struct.unpack('%c%s' % (endian, datatype*samples), fi.read(datasize*samples))
result = list(result)
else:
result = list()
for ch in range(samples):
# Read from first channel (0)
result.append(struct.unpack('%c%s' % (endian, datatype*numChannels), fi.read(datasize*numChannels))[0])
return result
def readproperties(fi, endian='<'):
logs = logging.getLogger('readproperties')
numProps = struct.unpack('%cI' % endian, fi.read(4))[0]
logs.debug('%s properties' % numProps)
for prop in range(numProps):
propStr = readstring(fi, endian)
logs.debug('Prop: %s' % propStr)
value = readvalue(fi, endian)
logs.debug(value)
data2mask = {
0: ('c', 1), # tdsTypeVoid
1: ('b', 1), # tdsTypeI8
2: ('h', 2), # tdsTypeI16
3: ('i', 4), # tdsTypeI32
4: ('q', 8), # tdsTypeI64
5: ('b', 1), # tdsTypeU8
6: ('h', 2), # tdsTypeU16
7: ('i', 4), # tdsTypeU32
8: ('q', 8), # tdsTypeU64
9: ('f', 4), # tdsTypeSingleFloat
10: ('d', 8), # tdsTypeDoubleFloat
0x20: ('I', 4), # tdsTypeString
0x21: ('?', 1), # tdsTypeBoolean
0x44: ('Qq', 16) # tdsTypeTimeStamp
}
# tdsTypeFixedPoint = 0x4F,
# tdsTypeComplexSingleFloat = 0x08000c,
# tdsTypeComplexDoubleFloat = 0x10000d,
# tdsTypeDAQmxRawData = 0xFFFFFFFF
def readdatatype(fi, endian='<'):
return struct.unpack('%cI' % endian, fi.read(4))[0]
def readvalue(fi, endian='<'):
logs = logging.getLogger('readvalue')
datatype = readdatatype(fi, endian)
logs.debug('datatype: 0x%x' % datatype)
# Consider cases which need another read
# 0x20 is a string. Read again!
if datatype == 0x20:
return readstring(fi, endian)
(mask, numBytes) = data2mask[datatype]
logs.debug('Mask: %s; Bytes: %s' % (mask, numBytes))
# This instruction returns a tuple. Needed for timestamps
result = struct.unpack('%c%s' % (endian, mask), fi.read(numBytes))
# 0x44 is a timestamp. Read again!
if datatype == 0x44:
result = tup2time(*result)
else:
# Disassemble the tuple if not a timestamp
result = result[0]
logs.debug('result: %s' % result)
return result
def tup2time(fraction, seconds):
logs = logging.getLogger('tup2time')
logs.debug('seconds: %s' % seconds)
logs.debug('fraction: %s' % fraction)
dt1904 = datetime.datetime(1904, 1, 1)
delta = seconds + fraction * 2**(-64)
result = dt1904 + datetime.timedelta(seconds=delta)
logs.debug('Date-time %s' % result)
return result
def readstring(fi, endian='<'):
# logs = logging.getLogger('readstring')
strlen = struct.unpack('%cI' % endian, fi.read(4))
# logs.debug('String of length %s' % strlen)
return fi.read(strlen[0]).decode()
# print(self.__segmentOffset, self.__dataOffset, numChannels, self.__data2mask[datatype][1])
# samples = int((self.__segmentOffset - self.__dataOffset)/numChannels/self.__data2mask[datatype][1])
# logs.info('chunkSize: %s bytes' % chunkSize)
# logs.info('Total chunks size: %s' % (self.__segmentOffset - self.__dataOffset))
# logs.info('Length of channel: %d' % ((self.__segmentOffset - self.__dataOffset)/numChannels/self.__data2mask[datatype][1]))
# # New or changed objects
# newObjects = struct.unpack('%cI' % self.__endian, self.__fi.read(4))[0]
#
# # Loop through channels
# for ch in range(1):
# result = self.__readdata(self.__data2mask[datatype][0], self.__data2mask[datatype][1], samples,
# numChannels, channel=ch)
def __readstring(self):
# logs = logging.getLogger('readstring')
strlen = struct.unpack('%cI' % self.__endian, self.__fi.read(4))
# logs.debug('String of length %s' % strlen)
return self.__fi.read(strlen[0]).decode()
def __readvalue(self):
logs = logging.getLogger('readvalue')
datatype = self.__readdatatype()
logs.debug('datatype: 0x%x' % datatype)
# Consider cases which need another read
# 0x20 is a string. Read again!
if datatype == 0x20:
return self.__readstring()
(mask, numBytes) = self.__data2mask[datatype]
logs.debug('Mask: %s; Bytes: %s' % (mask, numBytes))
# This instruction returns a tuple. Needed for timestamps
result = struct.unpack('%c%s' % (self.__endian, mask), self.__fi.read(numBytes))
# 0x44 is a timestamp. Read again!
if datatype == 0x44:
result = self.__tup2time(*result)
else:
# Disassemble the tuple if not a timestamp
result = result[0]
logs.debug('result: %s' % result)
return result
def __readproperties(self):
logs = logging.getLogger('readproperties')
numProps = struct.unpack('%cI' % self.__endian, self.__fi.read(4))[0]
logs.debug('%s properties' % numProps)
for prop in range(numProps):
propStr = self.__readstring()
logs.debug('Prop: %s' % propStr)
value = self.__readvalue()
logs.debug(value)
def __readdatatype(self):
return struct.unpack('%cI' % self.__endian, self.__fi.read(4))[0]
def __tup2time(self, fraction, seconds):
logs = logging.getLogger('tup2time')
logs.debug('seconds: %s' % seconds)
logs.debug('fraction: %s' % fraction)
dt1904 = datetime.datetime(1904, 1, 1)
delta = seconds + fraction * 2**(-64)
result = dt1904 + datetime.timedelta(seconds=delta)
logs.debug('Date-time %s' % result)
return result
def __readdata(self, datatype, datasize, samples, numChannels, channel=0):
if not self.interleavedData:
# Seek where the channel starts
self.__fi.seek(self.__dataOffset + datasize*samples*channel, 0)
result = struct.unpack('%c%s' % (self.__endian, datatype*samples), self.__fi.read(datasize*samples))
# result = list(result)
else:
# Seek where the raw data starts
self.__fi.seek(self.__dataOffset, 0)
result = list()
for ch in range(samples):
# Read from all channels and select the specific one with an index (channel)
result.append(struct.unpack('%c%s' % (self.__endian, datatype*numChannels), self.__fi.read(datasize*numChannels))[channel])
return result
def main():
# Check verbosity in the output
msg = 'Read and convert waveforms generated by a DAS system.'
parser = argparse.ArgumentParser(description=msg)
parser.add_argument('-l', '--loglevel',
help='Verbosity in the output.',
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO',
'DEBUG'],
default='WARNING')
parser.add_argument('filename', help='File name to read and process.')
args = parser.parse_args()
logging.basicConfig(level=args.loglevel)
logs = logging.getLogger('OpenFile')
logs.setLevel(args.loglevel)
TDMS(args.filename)
if __name__ == '__main__':
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment