Source code for trsfile.trsfile_mutable

import os
import struct
import shutil
import numpy
import math

from .trace import Trace
from .common import Header, SampleCoding, TracePadding
from .trsfile import TrsFile

[docs]class TrsFileMutable(TrsFile): temp_folder = None is_saved = False def __init__(self, path, padding_mode = TracePadding.PAD, force_overwrite = False): # Check if trs file already exists, warn if it does if not force_overwrite and os.path.isfile(path): raise IOError('trs file already exists, use force_overwrite argument to overwrite.') self.path = path self.padding_mode = padding_mode # Shadow list of traces in files self.shadow_trace_index = -1 self.shadow_traces = [] # Create the temporary folder self.file_path, self.file_ext = os.path.splitext(self.path) if len(self.file_ext) <= 1: self.file_ext = '.trs' self.temp_folder = self.file_path + '.tmp' # Make sure to always start with a fresh start if os.path.isdir(self.temp_folder): shutil.rmtree(self.temp_folder, True) os.mkdir(self.temp_folder) self.__initialize_headers() def __del__(self): if self.temp_folder is not None and os.path.isdir(self.temp_folder): shutil.rmtree(self.temp_folder, True) def __enter__(self): """Called when entering a `with` block""" if not os.path.isdir(self.temp_folder): raise ValueError('temp folder for traces does not exists') return self def __len__(self): """Returns the total number of traces""" return len(self.shadow_traces) def __delitem__(self, index): self.is_saved = False # Remove the shadow traces and with that check if indexes are correct exception = None try: indices = self.shadow_traces[index] del self.shadow_traces[index] if not isinstance(index, slice): indices = [indices] except IndexError as err: exception = err # Delete all traces on the file system for trace_index in indices: for category in ['title', 'data', 'samples']: path = self.__get_temp_trace(trace_index, category) if os.path.isfile(path): os.remove(path) # Do we have an exception, re-raise if exception is not None: raise IndexError(exception) def __getitem__(self, index): self.is_saved = False traces = [] # Try access, and re-raise if wrong for fancy indexing errors try: indices = self.shadow_traces[index] if not isinstance(index, slice): indices = [indices] except IndexError as exception: raise IndexError(exception) # Now obtain all requested traces from file for i in indices: samples = None # Read the samples path = self.__get_temp_trace(i, 'samples') if os.path.isfile(path): with open(path, 'rb') as tmp_file: # First byte is always sample coding sample_coding = SampleCoding(tmp_file.read(1)[0]) samples = numpy.fromfile(tmp_file, sample_coding.format, -1) # Title path = self.__get_temp_trace(i, 'title') if os.path.isfile(path): with open(path, 'rb') as tmp_file: title = tmp_file.read().decode('utf-8') else: title = Header.TRACE_TITLE.default # Read the data path = self.__get_temp_trace(i, 'data') if os.path.isfile(path): with open(path, 'rb') as tmp_file: data = tmp_file.read() else: data = b'' # Sanity check if samples is None: raise IOError('unable to read samples from trace {0:d}'.format(i)) traces.append(Trace(sample_coding, samples, data, title, self.headers)) # Return the list of traces, or single trace if isinstance(index, slice): return traces else: # Earlier logic should ensure traces contains one element! return traces[0] def __setitem__(self, index, traces): self.is_saved = False # Make sure we have iterable traces if isinstance(traces, Trace): traces = [traces] # Store all traces with the next sequence numbers and keep these numbers as a list new_traces = [] for trace in traces: # Make sure we do not do something weird if not isinstance(trace, Trace): raise ValueError('the data added to the trace set is not of type Trace') self.shadow_trace_index += 1 new_traces.append(self.shadow_trace_index) # Save the trace data # Write the title as ascii with open(self.__get_temp_trace(self.shadow_trace_index, 'title'), 'wb') as tmp_file: tmp_file.write(trace.title if not isinstance(trace.title, str) else trace.title.encode('utf-8')) # Write the data file if trace.data is not None and len(trace.data) > 0: with open(self.__get_temp_trace(self.shadow_trace_index, 'data'), 'wb') as tmp_file: tmp_file.write(trace.data) # Write the sample file with open(self.__get_temp_trace(self.shadow_trace_index, 'samples'), 'wb') as tmp_file: tmp_file.write(bytes([trace.sample_coding.value])) numpy.array(trace.samples).astype(trace.sample_coding.format).tofile(tmp_file) # Lets delete the files that we are replacing to prevent storage explosions try: indices = self.shadow_traces[index] if not isinstance(index, slice): indices = [indices] except IndexError as exception: raise IndexError(exception) for i in indices: for category in ['title', 'data', 'samples']: path = self.__get_temp_trace(i, category) if os.path.isfile(path): os.remove(path) # Now we just assign the new_traces however, the slicing works if isinstance(index, slice): self.shadow_traces[index] = new_traces else: if len(new_traces) != 1: raise TypeError('assigning multiple new traces to single trace') self.shadow_traces[index] = new_traces[0]
[docs] def save(self, padding_mode = None): # Do not save twice, just a waste! if self.is_saved: return # Get argument or default padding_mode = self.padding_mode if padding_mode is None else padding_mode # Calculate some fields check_values = {} check_values[Header.NUMBER_SAMPLES] = set([len(trace) for trace in self]) or [Header.NUMBER_SAMPLES.default] check_values[Header.SAMPLE_CODING] = set([trace.sample_coding for trace in self]) or [Header.SAMPLE_CODING.default] check_values[Header.LENGTH_DATA] = set([len(trace.data) for trace in self]) or [Header.LENGTH_DATA.default] # Calculate headers based on shadow list for header in Header.get_mandatory(): if not header in self.headers: self.headers[header] = header.default # Check padding mode if padding_mode == TracePadding.PAD: check_values[Header.NUMBER_SAMPLES] = [max(check_values[Header.NUMBER_SAMPLES])] elif padding_mode == TracePadding.TRUNCATE: check_values[Header.NUMBER_SAMPLES] = [min(check_values[Header.NUMBER_SAMPLES])] # Set the headers and make sure all traces share the common header settings self.headers[Header.NUMBER_TRACES] = len(self) self.headers[Header.TITLE_SPACE] = max([len(trace.title) for trace in self] or [Header.TITLE_SPACE.default]) for header, check_values in check_values.items(): if len(check_values) != 1: raise TypeError('traces have different values for header {0:s}'.format(header.name)) self.headers[header] = check_values.pop() # Save the file with open(self.path, 'wb') as trs_file: # Save the headers for header, value in self.headers.items(): # Skip TRACE_BLOCK header as we write that last! if header == Header.TRACE_BLOCK: continue # Check if we have definitions for all headers if not isinstance(header, Header): raise TypeError('cannot write unknown header to trace set') # Obtain the tag value if header.type is int: tag_value = value.to_bytes(header.length, 'little') elif header.type is float: tag_value = struct.pack('<f', value) elif header.type is bool: tag_value = struct.pack('<?', value) elif header.type is str: tag_value = value.encode('utf-8') elif header.type is SampleCoding: tag_value = value.value.to_bytes(1, 'little') elif header.type is bytes: tag_value = value else: raise TypeError('header has a type that can not be serialized') # Write the Tag trs_file.write(bytes([header.value])) # Write the Length tag_length = len(tag_value) if tag_length >= 0x80: tag_length_length = math.ceil(x.bit_length() / 8.0) trs_file.write(bytes([0x80 | tag_length_length]) + tag_length.to_bytes(tag_length_length, 'little')) else: trs_file.write(bytes([tag_length])) # Write the value trs_file.write(tag_value) # Save the TRACE_BLOCK trs_file.write(bytes([Header.TRACE_BLOCK.value, 0])) # Start saving all traces for trace in self: # Title and title padding title = trace.title.encode('utf-8') trs_file.write(title) if len(title) < self.headers[Header.TITLE_SPACE]: trs_file.write(bytes([0] * (self.headers[Header.TITLE_SPACE] - len(title)))) # Data trs_file.write(trace.data) # Automatic truncate numpy.array(trace.samples[:self.headers[Header.NUMBER_SAMPLES]]) \ .astype(self.headers[Header.SAMPLE_CODING].format).tofile(trs_file) # Add any required padding if len(trace.samples) < self.headers[Header.NUMBER_SAMPLES]: numpy.array([0] * (self.headers[Header.NUMBER_SAMPLES] - len(trace.samples))) \ .astype(self.headers[Header.SAMPLE_CODING].format).tofile(trs_file) # Set save flag to true self.is_saved = True
[docs] def close(self): # Make sure to save any changes if we are creating a trace file self.save()
[docs] def append(self, trace): self[len(self):len(self)] = trace
[docs] def extend(self, traces): self[len(self):len(self)] = traces
[docs] def insert(self, index, trace): self[index:index] = [trace]
def __get_temp_trace(self, i, name): return self.temp_folder + '/{0:d}.{1:s}'.format(i, name) def __initialize_headers(self): """Read all internal headers from the file""" self.headers = {}