|
- import os, re
- import socket
- from .protocol import SocketTool
- import numpy as np
-
- def to_hex(v, nb_bits=16):
- try:
- v_hex = v.hex()
- except AttributeError:
- v_hex = hex(v)[2:]
- return '0'*(nb_bits//4-len(v_hex)) + v_hex
-
- def split_octet(hexstr):
- return [hexstr[i:i+2] for i in range(0, len(hexstr), 2)]
-
- def to_signed_hex(v, nb_bits=16):
- try:
- return split_octet(to_hex(v & (2**nb_bits-1), nb_bits=nb_bits))
- except TypeError as err:
- raise TypeError('Error to transform a <{}> into signed hex.'.format(type(v))) from err
-
- def write(_input, uintXX, nb_bits=16):
- uintXX = to_signed_hex(uintXX, nb_bits=nb_bits)
- for i in range(nb_bits//8):
- _input.write(uintXX[i]+'\n')
-
- def write_list(_input, uint16_list):
- for uint16 in uint16_list:
- write(_input, uint16)
-
- def launch_simulation(quiet=False, **kwargs):
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('localhost', 5000))
- SocketTool.send_data(s, kwargs)
- if not SocketTool.get_ack(s):
- raise RuntimeError("NACK received: The request has been refused !")
- else:
- data = SocketTool.get_data(s)
- if data['error'] and not quiet:
- raise Exception("The simulation return an error.")
- return data['output'], data['error']
- s.close()
- except IOError as err:
- raise RuntimeError("The connection refused. Has the ELMO server been switch on ?") from err
-
-
- class SimulationProject:
- _nb_bits_for_nb_challenges = 16
- _project_directory = None
-
- ### Define the project
- @classmethod
- def get_project_directory(cl):
- if cl._project_directory:
- return cl._project_directory
- else:
- raise NotImplementedError()
-
- @classmethod
- def set_project_directory(cl, project_directory):
- cl._project_directory = project_directory
-
- @classmethod
- def get_project_label(cl):
- return cl.get_project_directory()
-
- @classmethod
- def get_make_directory(cl):
- return ''
-
- @classmethod
- def get_binary(cl):
- raise NotImplementedError()
-
- @classmethod
- def get_parameters_names(cl):
- return set()
-
- @classmethod
- def adapt_project(cl, parameters):
- return
-
- def get_challenge_format(self):
- raise NotImplementedError()
-
-
- ### Tools to realize the simulation of the project
- def __init__(self, challenges=None):
- self.elmo_folder = os.path.dirname(os.path.abspath(__file__))+'/elmo'
- self.challenges = challenges
- self.is_executed = False
-
- def set_challenges(self, challenges):
- self.challenges = challenges
-
- def get_writable_input_file(self):
- return open('{}/input.txt'.format(self.elmo_folder), 'w')
-
- def set_input_for_each_challenge(self, input, challenge):
- format = self.get_challenge_format()
-
- def aux(sizes, data):
- if len(sizes) == 0:
- write(input, data)
- else:
- assert len(data) == sizes[0], 'Incorrect format for challenge. Get {} instead of {}'.format(len(data), sizes[0])
- for i in range(sizes[0]):
- aux(sizes[1:], data[i])
-
- for num_part in range(len(format)):
- aux(format[num_part], challenge[num_part])
-
- def set_input(self, input):
- assert len(self.challenges) < 2**16, 'The number of challenges must be strictly lower than 65536. Currently, there are {} challenges.'.format(len(self.challenges))
- write(input, len(self.challenges), nb_bits=self._nb_bits_for_nb_challenges)
- for challenge in self.challenges:
- self.set_input_for_each_challenge(input, challenge)
-
- def run(self):
- with open('{}/input.txt'.format(self.elmo_folder), 'w') as _input:
- self.set_input(_input)
- from .manage import execute_simulation
- execute_simulation(self)
- self.is_executed = True
-
- def run_online(self):
- with open('{}/input.txt'.format(self.elmo_folder), 'w') as _input:
- self.set_input(_input)
- launch_simulation(project=self.get_project_label(), quiet=False)
- self.is_executed = True
-
- def get_asmtrace_filename(self):
- return '{}/output/asmoutput/asmtrace00001.txt'.format(self.elmo_folder)
-
- def get_asmtrace(self):
- with open(self.get_asmtrace_filename(), 'r') as _file:
- return [line.strip() for line in _file.readlines()]
-
- def get_indexes_of(self, condition):
- with open(self.get_asmtrace_filename(), 'r') as _file:
- asmtrace = _file.readlines()
- return [i for i, instr in enumerate(asmtrace) if condition(instr)]
-
- def get_number_of_traces(self):
- return len(self.challenges)
-
- def get_results(self, only_filenames=False, reorganise=None, indexes=None):
- assert self.is_executed
- nb_traces = self.get_number_of_traces()
-
- trace_filenames = []
- for filename in os.listdir('{}/output/traces/'.format(self.elmo_folder)):
- if re.search(r'^trace\d+\.trc$', filename):
- trace_filenames.append('{}/output/traces/{}'.format(self.elmo_folder, filename))
- if len(trace_filenames) >= nb_traces:
- break
-
- assert len(trace_filenames) == nb_traces
- results = trace_filenames
-
- if not only_filenames:
- for i in range(len(results)):
- with open(results[i], 'r') as _file:
- if indexes is not None:
- results[i] = list(map(float, _file.readlines()[indexes]))
- else:
- results[i] = list(map(float, _file.readlines()))
-
- if reorganise is not None:
- results = reorganise(results)
-
- return results
-
- def get_traces(self, reorganise=None, indexes=None):
- results = self.get_results(only_filenames=False, reorganise=reorganise,indexes=indexes)
-
- nb_traces = self.get_number_of_traces()
- trace_length = len(results[0])
-
- traces = np.zeros((nb_traces, trace_length))
- for i in range(nb_traces):
- traces[i,:] = results[i]
-
- if reorganise is not None:
- traces = reorganise(traces)
-
- return traces
-
- def get_printed_data(self):
- with open('{}/output/printdata.txt'.format(self.elmo_folder), 'r') as _file:
- data = list(map(lambda x: int(x, 16), _file.readlines()))
-
- nb_traces = self.get_number_of_traces()
- nb_data_per_trace = len(data) // nb_traces
-
- return [data[nb_data_per_trace*i:nb_data_per_trace*(i+1)] for i in range(nb_traces)]
-
- def analyse_operands(self, num_line, num_trace=1):
- num_str = str(num_trace)
- num_str = '0'*(5-len(num_str)) + num_str
-
- operands_filename = self.elmo_folder + '/output/operands/operands{}.txt'.format(num_str)
- trace_filename = self.elmo_folder + '/output/traces/trace0000{}.trc'.format(num_trace)
-
- is_multiple = (type(num_line) is list)
- if not is_multiple:
- num_line = [num_line]
- output = [{}]*len(num_line)
-
- with open(operands_filename, 'r') as _file:
- lines = _file.readlines()
- for i, num in enumerate(num_line):
- line = lines[num].split()
- data = list(map(int, line[0:7]))
- output[i]['previous'] = data[0:2]
- output[i]['current'] = data[2:4]
- output[i]['triplet'] = data[4:7]
- output[i]['other'] = list(map(float, line[7:]))
-
- with open(trace_filename, 'r') as _file:
- lines = _file.readlines()
- for i, num in enumerate(num_line):
- line = lines[num]
- output[i]['power'] = float(line)
-
- return output if is_multiple else output[0]
-
|