import os, re import numpy as np from os.path import join as pjoin from .config import ( MODULE_PATH, ELMO_TOOL_REPOSITORY, ELMO_INPUT_FILE_NAME, DEFAULT_HOST, DEFAULT_PORT, ) from .utils import write class SimulationProject: # TAG: EXCLUDE-FROM-SIMULATION-SEARCH """ Class to manage a simultion It contains all the parameters of the simulation and has method to use it """ _nb_bits_for_nb_challenges = 16 _project_directory = None ### Define the project @classmethod def get_project_directory(cl): """ Return the project directory of the simulation """ if cl._project_directory: return cl._project_directory else: raise NotImplementedError() @classmethod def set_project_directory(cl, project_directory): """ Set the project directory of the simulation """ cl._project_directory = project_directory @classmethod def get_binary_path(cl): """ Return the path of the leaking binary """ raise NotImplementedError() def get_challenge_format(self): """ Return the format of one challenge Used by 'set_input_for_each_challenge' if not rewritten """ raise NotImplementedError() ### Tools to realize the simulation of the project def __init__(self, challenges=None): """ Initialize a simulation project :challenge: The list of challenge for the simulation """ self.elmo_folder = pjoin(MODULE_PATH, ELMO_TOOL_REPOSITORY) self.challenges = challenges self.reset() def reset(self): """ Reset the last simulation """ self.is_executed = False self.has_been_online = False self._nb_traces = None self._complete_asmtrace = None self._complete_results = None self._complete_printed_data = None def get_number_of_challenges(self): """ Return the number of challenge """ return len(self.challenges) if self.challenges else 0 def get_test_challenges(self): """ Return a fixed list of challenges for test """ raise NotImplementedError() def get_random_challenges(self, nb_challenges): """ Return a list of random challenges :nb_challenges: Length of the list """ raise NotImplementedError() def set_challenges(self, challenges): """ Reset the simulation and set the challenges of the next simulation :challenges: The list of the challenges """ self.reset() self.challenges = challenges def get_input_filename(self): """ Return (string) the path of the input file of the local installation of ELMO tool """ return pjoin(self.elmo_folder, ELMO_INPUT_FILE_NAME) def get_printed_data_filename(self): """ Return the path (string) of the file containing the printed data of the local installation of ELMO tool """ return pjoin(self.elmo_folder, 'output', 'printdata.txt') def get_asmtrace_filename(self): """ Return the path (string) of the file containing the ASM trace of the local installation of ELMO tool """ return pjoin(self.elmo_folder, 'output', 'asmoutput', 'asmtrace00001.txt') def set_input_for_each_challenge(self, input, challenge): """ Set the input for one challenge for a simulation with ELMO tool :input: Descriptor of the input of the ELMO tool (write only) :challenge: A challenge for the simulation """ format = self.get_challenge_format() def aux(sizes, data): if len(sizes) == 0: write(input, data) else: assert len(data) == sizes[0], 'Incorrect format for challenge. Get {} instead of {}'.format(len(data), sizes[0]) for i in range(sizes[0]): aux(sizes[1:], data[i]) for num_part in range(len(format)): aux(format[num_part], challenge[num_part]) def set_input(self, input): """ Set the input for a simulation with ELMO tool First, it writes the number of challenges. Then, it writes each challenge one by one thanks to the method 'set_input_for_each_challenge' :input: Descriptor of the input of the ELMO tool (write only) """ if self.challenges: nb_challenges = self.get_number_of_challenges() assert nb_challenges < (1 << self._nb_bits_for_nb_challenges), \ 'The number of challenges must be strictly lower than {}. Currently, there are {} challenges.'.format( 1 << self._nb_bits_for_nb_challenges, nb_challenges, ) write(input, nb_challenges, nb_bits=self._nb_bits_for_nb_challenges) for challenge in self.challenges: self.set_input_for_each_challenge(input, challenge) def run(self): """ Run the simulation thanks the local installation of ELMO tool. Using the leaking binary defined thanks to the method 'get_binary_path', it will run the ELMO tool to output the leaked power traces. The results of the simulation are available via the methods: 'get_results', 'get_traces', 'get_asmtrace' and 'get_printed_data' Return the raw output of the compiled ELMO tool. """ self.reset() with open(self.get_input_filename(), 'w') as _input: self.set_input(_input) from .manage import execute_simulation res = execute_simulation(self) self.is_executed = True self.has_been_online = False self._nb_traces = res['nb_traces'] return res def run_online(self, host=DEFAULT_HOST, port=DEFAULT_PORT): """ Run the simulation thanks to an ELMO server. An ELMO server can be launched thanks to the command >>> python -m elmo run-server 'host' 'port' Using the leaking binary defined thanks to the method 'get_binary_path', it will run the ELMO tool to output the leaked power traces. The results of the simulation are available via the methods: 'get_results', 'get_traces', 'get_asmtrace' and 'get_printed_data' Return the raw output of the compiled ELMO tool. :host: The host of the ELMO server :post! The port where the ELMO server is currently listening """ from .server.protocol import SocketTool import socket class TempInput: def __init__(self): self._buffer = '' def write(self, data): self._buffer += data def get_string(self): return self._buffer self.reset() input = TempInput() self.set_input(input) try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((host, port)) SocketTool.send_data(s, { 'input': input.get_string(), }) if not SocketTool.get_ack(s): raise RuntimeError("NACK received: The request has been refused!") SocketTool.send_file(s, '{}/{}'.format(self.get_project_directory(), self.get_binary_path())) if not SocketTool.get_ack(s): raise RuntimeError("NACK received: The binary file has been refused!") data = SocketTool.get_data(s) if data['error']: raise Exception("The simulation returned an error: {}".format(data['error'])) s.close() except IOError as err: raise RuntimeError("The connection refused. Has the ELMO server been switch on?") from err self.is_executed = True self.has_been_online = True self._nb_traces = data['nb_traces'] self._complete_asmtrace = data['asmtrace'] self._complete_results = data['results'] self._complete_printed_data = data['printed_data'] return { key: value for key, value in data.items() if key not in ['results', 'asmtrace', 'printed_data'] } ### Manipulate the results def get_number_of_traces(self): """ Get the number of traces of the last simulation """ assert self.is_executed return self._nb_traces def get_results_filenames(self): """ Get the filenames of the results of the last simulation Return a list of filenames (strings), each file containing a power trace """ assert self.is_executed assert not self.has_been_online nb_traces = self.get_number_of_traces() output_path = os.path.join(self.elmo_folder, 'output') filenames = [] for i in range(nb_traces): filename = os.path.join(output_path, 'traces', 'trace%05d.trc' % (i+1)) assert os.path.isfile(filename) filenames.append(filename) return filenames def get_results(self): """ Get the raw outputs of the last simulation Return a list of power traces (represented by a list of floats) Warning: The output list is the same object stored in the instance. If you change this object, it will change in the instance too, and the next call to 'get_results' will return the changed object. """ assert self.is_executed nb_traces = self.get_number_of_traces() # Load the power traces if self._complete_results is None: self._complete_results = [] for filename in self.get_results_filenames(): with open(filename, 'r') as _file: self._complete_results.append(list(map(float, _file.readlines()))) return self._complete_results def get_traces(self, indexes=None): """ Get the power trace of the last simulation Return a 2-dimensional numpy array of floats. 1st dimension: number of the trace 2nd dimension: power point of the trace :indexes: if not None, return only the power points contained in :indexes: Must be a list of indexes of power points. """ assert self.is_executed results = self.get_results() nb_traces = self.get_number_of_traces() trace_length = len(results[0]) if indexes is None: traces = np.zeros((nb_traces, trace_length)) for i in range(nb_traces): traces[i,:] = results[i] return traces else: traces = np.zeros((nb_traces, len(indexes))) for i in range(nb_traces): traces[i,:] = np.array(results[i])[indexes] return traces ### Manipulate the ASM trace def get_asmtrace(self): """ Get the ASM trace of the last simulation The ASM trace is the list of the leaking assembler instructions, one instruction each point of the leakage power trace """ assert self.is_executed # Load the ASM trace if self._complete_asmtrace is None: with open(self.get_asmtrace_filename(), 'r') as _file: self._complete_asmtrace = _file.read() if type(self._complete_asmtrace) is not list: self._complete_asmtrace = self._complete_asmtrace.split('\n') return self._complete_asmtrace def get_indexes_of(self, condition): """ Get the list of indexes of the instructions verifying the 'condition' in the ASM trace :condition: Boolean function with ASM instruction (string) for input """ assert self.is_executed return [i for i, instr in enumerate(self.get_asmtrace()) if condition(instr)] ### Manipulate the Printed Data def get_printed_data(self, per_trace=True): """ Get the printed data of the last simulation A printed data is a data which has been given to the function 'printbyte' during the simulation :per_trace: If True (default), split equally the printed data in a table with a length equal to the number of simulated power traces """ assert self.is_executed # Load the printed data if self._complete_printed_data is None: with open(self.get_printed_data_filename(), 'r') as _file: self._complete_printed_data = list(map(lambda x: int(x, 16), _file.readlines())) if per_trace: # Return printed data for each trace data = self._complete_printed_data nb_traces = self.get_number_of_traces() nb_data_per_trace = len(data) // nb_traces return [data[nb_data_per_trace*i:nb_data_per_trace*(i+1)] for i in range(nb_traces)] else: # Return printed data return self._complete_printed_data