Source code for trsfile.trsfile

import os
import mmap
import struct
import numpy

from .trace import Trace
from .common import Header, SampleCoding

[docs]class TrsFile: handle = None file_handle = None temp_folder = None headers = {} # All our magic function to support easy usage of the Trs file format def __init__(self, path): self.path = path self.file_handle = open(path, 'rb', 0) # Disable buffering self.handle = mmap.mmap(self.file_handle.fileno(), 0, access = mmap.ACCESS_READ) self.__initialize_headers() def __del__(self): # Sanity close, no harm from calling twice, but we do expect user calling close! self.close() def __iter__(self): """ reset pointer """ self.iterator_index = -1 return self def __next__(self): self.iterator_index = self.iterator_index + 1 if self.iterator_index >= len(self): raise StopIteration return self[self.iterator_index] def __enter__(self): """Called when entering a `with` block""" if self.handle is None or self.handle.closed: raise ValueError('I/O operation on closed file') return self def __exit__(self, *args): """Called when exiting a `with` block""" self.close() def __repr__(self): return repr([self[i] for i in range(0, len(self))]) def __len__(self): """Returns the total number of traces""" return self.headers[Header.NUMBER_TRACES] if Header.NUMBER_TRACES in self.headers else 0 def __delitem__(self, index): raise TypeError('cannot modify existing trace set') def __setitem__(self, index, trace): raise TypeError('cannot modify existing trace set') def __getitem__(self, index): # check for slicing if isinstance(index, slice): # No bounds checking when using slices, that's how python rolls! r = range(*index.indices(self.headers[Header.NUMBER_TRACES])) else: # Wrap around for negative indexes if index < 0: index = index % self.headers[Header.NUMBER_TRACES] # Check if we are still in bounds if index >= self.headers[Header.NUMBER_TRACES]: raise IndexError('list index out of range') r = range(index, index + 1) # Now read in all traces traces = [] for i in r: # Seek to the beginning of the trace self.handle.seek(self.data_offset + i * self.trace_length) # Read the title if Header.TITLE_SPACE in self.headers: title = self.handle.read(self.headers[Header.TITLE_SPACE]).decode('utf-8') else: title = Header.TRACE_TITLE.default # Read data if Header.LENGTH_DATA in self.headers: data = self.handle.read(self.headers[Header.LENGTH_DATA]) else: data = bytes(Header.LENGTH_DATA.default) # Read all the samples samples = numpy.frombuffer(self.handle.read(self.trace_length), self.headers[Header.SAMPLE_CODING].format, self.headers[Header.NUMBER_SAMPLES]) traces.append(Trace(self.headers[Header.SAMPLE_CODING], samples, data, title, self.headers)) # Return the select item(s) if isinstance(index, slice): return traces else: # Earlier logic should ensure traces contains one element! return traces[0]
[docs] def save(self): raise TypeError('cannot save existing trace set')
[docs] def append(self, trace): raise TypeError('cannot modify existing trace set')
[docs] def extend(self, traces): raise TypeError('cannot modify existing trace set')
[docs] def insert(self, index, trace): raise TypeError('cannot modify existing trace set')
[docs] def reverse(self): return self[::-1]
[docs] def close(self): """Closes the open file handle if it is opened""" if self.file_handle is not None and not self.file_handle.closed: self.file_handle.close() if self.handle is not None and not self.handle.closed: self.handle.close()
def __initialize_headers(self): """Read all internal headers from the file""" self.headers = {} # Add default headers and values if new self.handle.seek(0) # Parse all headers until the TRACE_BLOCK while Header.TRACE_BLOCK not in self.headers: # Obtain the Tag tag = self.handle.read(1)[0] # Obtain the Length tag_length = self.handle.read(1)[0] if (tag_length & 0x80) != 0: tag_length = int.from_bytes(self.handle.read(tag_length & 0x7F), 'little') # Obtain the Value tag_value = self.handle.read(tag_length) if tag_length > 0 else None # Interpreter it header = None if Header.has_value(tag): header = Header(tag) if header.type is int: tag_value = int.from_bytes(tag_value, 'little') elif header.type is float: tag_value, = struct.unpack('<f', tag_value) elif header.type is bool: tag_value, = struct.unpack('<?', tag_value) elif header.type is str: tag_value = tag_value.decode('utf-8') elif header.type is SampleCoding: tag_value = SampleCoding(tag_value[0]) else: print('Warning: tag {tag:02X} is not supported by the library, please let us know.'.format(tag=tag)) self.headers[tag if header is None else header] = tag_value # Pre-compute some static information based on headers self.data_offset = self.handle.tell() self.sample_length = self.headers[Header.NUMBER_SAMPLES] * self.headers[Header.SAMPLE_CODING].size self.trace_length = self.sample_length + self.headers.get(Header.LENGTH_DATA, 0) + self.headers.get(Header.TITLE_SPACE, 0) # Sanity: Check if we have all mandatory headers, if not, throw an error if we are in reading mode! if not Header.get_mandatory().issubset(self.headers): raise IOError('trace set does not contain all mandatory headers') # Sanity: Check if the file has the proper size self.handle.seek(0, os.SEEK_END) file_size = self.handle.tell() if file_size != self.data_offset + self.headers[Header.NUMBER_TRACES] * self.trace_length: raise IOError('trace set has an unexpected length')