Commit f44d4d58 authored by Javier Quinteros's avatar Javier Quinteros
Browse files

Read a TDMS file and cycle through objects in the metadata

All objects, its properties and its values can be read.
Functions are included to cope with the different types of data as well as how
they are represented in a TDMS file.
parent ec659c59
......@@ -18,6 +18,22 @@
import argparse
import logging
import struct
import datetime
HEADERLEN = 28
kTocMetaData = 1 << 1
kTocNewObjList = 1 << 2
kTocRawData = 1 << 3
kTocInterleavedData = 1 << 5
kTocBigEndian = 1 << 6
kTocDAQmxRawData = 1 << 7
FF64b = 0xFFFFFFFFFFFFFFFF
FF32b = 0xFFFFFFFF
def main():
# Check verbosity in the output
......@@ -28,12 +44,181 @@ def main():
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO',
'DEBUG'],
default='WARNING')
# parser.add_argument('-c', '--config',
# help='Config file to use.',
# default='../routing.cfg')
parser.add_argument('filename', help='File name to read and process.')
args = parser.parse_args()
logging.basicConfig(level=args.loglevel)
logs = logging.getLogger('OpenFile')
logs.setLevel(args.loglevel)
with open(args.filename, 'rb') as fi:
leadin = fi.read(HEADERLEN)
(tag, ToCmask, version, segmentOffset, dataOffset) = struct.unpack('<4siiQQ', leadin)
if tag.decode() != 'TDSm':
raise Exception('Tag is not TDSm!')
if version != 4713:
logs.warning('Version number is not 4713!')
logs.debug('Metadata: ' + ('yes' if ToCmask & kTocMetaData else 'no'))
logs.debug('Object list: ' + ('yes' if ToCmask & kTocNewObjList else 'no'))
logs.debug('Raw data: ' + ('yes' if ToCmask & kTocRawData else 'no'))
logs.debug('Interleaved data: ' + ('yes' if ToCmask & kTocInterleavedData else 'no'))
logs.debug('BigEndian: ' + ('yes' if ToCmask & kTocBigEndian else 'no'))
logs.debug('DAQmx raw data: ' + ('yes' if ToCmask & kTocDAQmxRawData else 'no'))
if segmentOffset == FF64b:
logs.error('Severe problem while writing data (crash, power outage)')
if ((ToCmask & kTocMetaData) and not dataOffset):
logs.error('Flag indicates Metadata but its length is 0!')
logs.info((tag, ToCmask, version, segmentOffset, dataOffset))
# Metadata
# Number of objects (unsigned int - 32b)
numObjects = struct.unpack('<I', fi.read(4))[0]
logs.info('Number of objects in metadata: %s' % numObjects)
for obj in range(numObjects):
objPath = readstring(fi)
logs.info('Object %s: %s' % (obj, objPath))
rawDataIdx = struct.unpack('<I', fi.read(4))[0]
if rawDataIdx == FF32b:
logs.warning('No raw data assigned to this segment')
readproperties(fi)
elif not rawDataIdx:
logs.info('Raw data index in this segment matches the index the same object had in the previous segment')
else:
# There is raw data!
sizeBytes = None
datatype = readdatatype(fi)
arraylen = struct.unpack('<I', fi.read(4))[0]
numValues = struct.unpack('<Q', fi.read(8))[0]
if datatype == 0x20:
sizeBytes = struct.unpack('<Q', fi.read(8))[0]
logs.debug('rawDataIdx %s' % rawDataIdx)
logs.debug(datatype)
logs.debug(arraylen)
logs.debug(numValues)
logs.debug(sizeBytes)
readproperties(fi)
# logs.debug('rawDataIdx: %s %x %o' % (rawDataIdx, rawDataIdx, rawDataIdx))
# raise Exception()
def readproperties(fi):
logs = logging.getLogger('readproperties')
numProps = struct.unpack('<I', fi.read(4))[0]
logs.debug('%s properties' % numProps)
for prop in range(numProps):
propStr = readstring(fi)
logs.debug('Prop: %s' % propStr)
value = readvalue(fi)
logs.debug(value)
# enum
# {
# tdsTypeVoid,
# tdsTypeI8,
# tdsTypeI16,
# tdsTypeI32,
# tdsTypeI64,
# tdsTypeU8,
# tdsTypeU16,
# tdsTypeU32,
# tdsTypeU64,
# tdsTypeSingleFloat,
# tdsTypeDoubleFloat,
# tdsTypeExtendedFloat,
# tdsTypeSingleFloatWithUnit = 0x19,
# tdsTypeDoubleFloatWithUnit,
# tdsTypeExtendedFloatWithUnit,
# tdsTypeString = 0x20,
# tdsTypeBoolean = 0x21,
# tdsTypeTimeStamp = 0x44,
# tdsTypeFixedPoint = 0x4F,
# tdsTypeComplexSingleFloat = 0x08000c,
# tdsTypeComplexDoubleFloat = 0x10000d,
# tdsTypeDAQmxRawData = 0xFFFFFFFF
# } tdsDataType;
data2mask = {
0: ('c', 1),
1: ('b', 1),
2: ('h', 2),
3: ('i', 4),
4: ('q', 8),
5: ('b', 1),
6: ('h', 2),
7: ('i', 4),
8: ('q', 8),
9: ('f', 4),
10: ('d', 8),
0x20: ('I', 4),
0x21: ('?', 1),
0x44: ('Qq', 16)
}
def readdatatype(fi):
return struct.unpack('<I', fi.read(4))[0]
def readvalue(fi):
logs = logging.getLogger('readvalue')
datatype = readdatatype(fi)
logs.debug('datatype: 0x%x' % datatype)
# Consider cases which need another read
# 0x20 is a string. Read again!
if datatype == 0x20:
return readstring(fi)
(mask, numBytes) = data2mask[datatype]
logs.debug('Mask: %s; Bytes: %s' % (mask, numBytes))
# This instruction returns a tuple. Needed for timestamps
result = struct.unpack('<%s' % mask, fi.read(numBytes))
# 0x44 is a timestamp. Read again!
if datatype == 0x44:
result = tup2time(*result)
else:
# Disassemble the tuple if not a timestamp
result = result[0]
logs.debug('result: %s' % result)
return result
def tup2time(fraction, seconds):
logs = logging.getLogger('tup2time')
logs.debug('seconds: %s' % seconds)
logs.debug('fraction: %s' % fraction)
dt1904 = datetime.datetime(1904, 1, 1)
delta = seconds + fraction * 2**(-64)
result = dt1904 + datetime.timedelta(seconds=delta)
logs.debug('Date-time %s' % result)
return result
def readstring(fi):
logs = logging.getLogger('readstring')
strlen = struct.unpack('<I', fi.read(4))
logs.debug('String of length %s' % strlen)
return fi.read(strlen[0]).decode()
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment