- The file _projectclass.py_ where there is the class of the simulation which will enable you to generate traces of the project in Python scripts; | - The file _projectclass.py_ where there is the class of the simulation which will enable you to generate traces of the project in Python scripts; | ||||
- A _Makefile_ ready to be used with a compiler _arm-none-eabi-gcc_. | - A _Makefile_ ready to be used with a compiler _arm-none-eabi-gcc_. | ||||
Usually a leaking code runs challenges, one challenge giving a power trace. A challenge is the execution of a code with a specific set of data. This set of data is given in the input of the leakage simulation. For example, one can imagine that the leaking code is a symmetric encryption and one wants to study its power leakage according to the message and the secret key. Then, a challenge is the simulation of the leakage for a specific message and for a specific secret key. | |||||
So, the classical form of _project.c_ is the following one: | |||||
- It gets a number of challenges with ```readbyte```. | |||||
- Then, it loops for each challenge. | |||||
- For the challenge, load the specific set of data with ```readbyte```. | |||||
- Start the record of the power leakage (start a power trace) | |||||
- Realise the leaking operations with the loaded set of data | |||||
- Stop the record of the power leakage (end a power trace) | |||||
- Eventually output some data with ```printbyte``` | |||||
- Indicate to ELMO tool that the simulation is finished | |||||
The file _projectclass.py_ contains a subclass of ```SimulationProject```. It contains the description of the _project.c_ file for the ELMO tool, in order to correctly realise the simulation. The classmethod ```get_binary_path(cl)``` must return the relative path of the leakage binary (_project.c_ correctly compiled). The method ```set_input_for_each_challenge(self, input, challenge)``` must write a ```challenge``` in ```input``` using the function ```write```. Many methods of ```SimulationProject``` can be rewritten in the subclass if necessary. For example, in the case where your _project.c_ doesn't run challenges, you can rewrite the method ```set_input(self, input)```. | |||||
Important! Don't forget that _ELMO_ (and so _Python-ELMO_) needs a **compiled** version of _project.c_ (see the "Requirements" section for more details). The provided _Makefile_ is here to help you to compile. | |||||
### List all the available simulation | ### List all the available simulation | ||||
```python | ```python | ||||
Sometimes, it is impossible to run the simulation thanks the simple method ```run``` of the project class. Indeed, sometimes the Python script is executed in the environment where _Python-ELMO_ cannot launch the ELMO tool. For example, it is the case where _Python-ELMO_ is used in SageMath on Windows. On Windows, SageMath installation relies on the Cygwin POSIX emulation system and it can be a problem. | Sometimes, it is impossible to run the simulation thanks the simple method ```run``` of the project class. Indeed, sometimes the Python script is executed in the environment where _Python-ELMO_ cannot launch the ELMO tool. For example, it is the case where _Python-ELMO_ is used in SageMath on Windows. On Windows, SageMath installation relies on the Cygwin POSIX emulation system and it can be a problem. | ||||
To offer a solution, _Python-ELMO_ can be used thanks to a client-server link. The idea is you must launch the script ```run_server.py``` which will listen (by default) at port 5000 in localhost. | |||||
To offer a solution, _Python-ELMO_ can be used thanks to a client-server link. The idea is you must launch the following script which will listen (by default) at port 5000 in localhost. | |||||
```bash | ```bash | ||||
python -m elmo run-server | python -m elmo run-server |
SUBSEQUENT = 2 | SUBSEQUENT = 2 | ||||
class ELMOEngine: | class ELMOEngine: | ||||
### Initialization | |||||
def __init__(self): | def __init__(self): | ||||
self.load_coefficients() | self.load_coefficients() | ||||
self.reset_points() | self.reset_points() | ||||
return coeffs | return coeffs | ||||
def load_coefficients(self): | def load_coefficients(self): | ||||
""" Load the coefficients for the ELMO model about power leakage """ | |||||
filename = os.path.join( | filename = os.path.join( | ||||
os.path.dirname(os.path.abspath(__file__)), | os.path.dirname(os.path.abspath(__file__)), | ||||
ELMO_TOOL_REPOSITORY, | ELMO_TOOL_REPOSITORY, | ||||
self.BitFlip1_bitinteractions = self._extract_data(496) | self.BitFlip1_bitinteractions = self._extract_data(496) | ||||
self.BitFlip2_bitinteractions = self._extract_data(496) | self.BitFlip2_bitinteractions = self._extract_data(496) | ||||
def reset_points(self): | |||||
self.points = [] | |||||
self.power = None | |||||
def add_point(self, triplet, previous_ops, current_ops): | |||||
self.points.append((triplet, previous_ops, current_ops)) | |||||
### Computation core | |||||
def _dot(self, a, b): | def _dot(self, a, b): | ||||
return np.sum(a * b, axis=0) | return np.sum(a * b, axis=0) | ||||
BitFlip1_bitinteractions_data, BitFlip2_bitinteractions_data]) | BitFlip1_bitinteractions_data, BitFlip2_bitinteractions_data]) | ||||
return power | return power | ||||
### To manage studied points | |||||
def reset_points(self): | |||||
""" Reset all the points previously added """ | |||||
self.points = [] | |||||
self.power = None | |||||
def add_point(self, triplet, previous_ops, current_ops): | |||||
""" Add a new point to analyse """ | |||||
self.points.append((triplet, previous_ops, current_ops)) | |||||
def run(self): | def run(self): | ||||
""" Compute the power leakage of all the points previously added | |||||
Store the results in 'self.power' | |||||
""" | |||||
nb_points = len(self.points) | nb_points = len(self.points) | ||||
triplet = np.array([p[0] for p in self.points]).T # shape = (3, nb_points) | triplet = np.array([p[0] for p in self.points]).T # shape = (3, nb_points) | ||||
previous_ops = np.array([p[1] for p in self.points]).T # shape = (2, nb_points) | previous_ops = np.array([p[1] for p in self.points]).T # shape = (2, nb_points) | ||||
self.power = self.calculate_point(triplet, previous_ops, current_ops) | self.power = self.calculate_point(triplet, previous_ops, current_ops) | ||||
def oneshot_point(self, triplet, previous_ops, current_ops): | def oneshot_point(self, triplet, previous_ops, current_ops): | ||||
""" Compute the power of a single point | |||||
defined by 'triplet', 'previous_ops', and 'current_ops' | |||||
""" | |||||
self.reset_points() | self.reset_points() | ||||
self.add_point(triplet, previous_ops, current_ops) | self.add_point(triplet, previous_ops, current_ops) | ||||
self.run() | self.run() |
class Executor(OneShotServiceThread): | class Executor(OneShotServiceThread): | ||||
def execute(self): | def execute(self): | ||||
""" Answer a request of simulation """ | |||||
# Get simulation data | |||||
data = self.protocol.get_data() | data = self.protocol.get_data() | ||||
self.protocol.please_assert(data) | self.protocol.please_assert(data) | ||||
def launch_executor(host=DEFAULT_HOST, port=DEFAULT_PORT, waiting_function=True): | def launch_executor(host=DEFAULT_HOST, port=DEFAULT_PORT, waiting_function=True): | ||||
""" Launch ELMO server on 'host' listening to the 'port' """ | |||||
from .server.servicethread import ListeningThread | from .server.servicethread import ListeningThread | ||||
def do_main_program(): | def do_main_program(): | ||||
def program_cleanup(signum, frame): | def program_cleanup(signum, frame): | ||||
thread.stop() | thread.stop() | ||||
# Launch the listening server | |||||
thread = do_main_program() | thread = do_main_program() | ||||
# Give a way to stop the server | |||||
import signal | import signal | ||||
signal.signal(signal.SIGINT, program_cleanup) | signal.signal(signal.SIGINT, program_cleanup) | ||||
signal.signal(signal.SIGTERM, program_cleanup) | signal.signal(signal.SIGTERM, program_cleanup) | ||||
# Do somthing during the execution of the server | |||||
if waiting_function is True: | if waiting_function is True: | ||||
import time | import time | ||||
while thread.is_running(): | while thread.is_running(): |
### Define the project | ### Define the project | ||||
@classmethod | @classmethod | ||||
def get_project_directory(cl): | def get_project_directory(cl): | ||||
""" """ | |||||
""" Return the project directory of the simulation """ | |||||
if cl._project_directory: | if cl._project_directory: | ||||
return cl._project_directory | return cl._project_directory | ||||
else: | else: | ||||
@classmethod | @classmethod | ||||
def set_project_directory(cl, project_directory): | def set_project_directory(cl, project_directory): | ||||
""" Set the project directory of the simulation """ | |||||
cl._project_directory = project_directory | cl._project_directory = project_directory | ||||
@classmethod | @classmethod | ||||
def get_project_label(cl): | |||||
return cl.get_project_directory() | |||||
@classmethod | |||||
def get_make_directory(cl): | |||||
return '' | |||||
@classmethod | |||||
def get_binary_path(cl): | def get_binary_path(cl): | ||||
""" Return the path of the leaking binary """ | |||||
raise NotImplementedError() | raise NotImplementedError() | ||||
@classmethod | |||||
def get_parameters_names(cl): | |||||
return set() | |||||
def get_challenge_format(self): | def get_challenge_format(self): | ||||
""" Return the format of one challenge | |||||
Used by 'set_input_for_each_challenge' if not rewritten | |||||
""" | |||||
raise NotImplementedError() | raise NotImplementedError() | ||||
### Tools to realize the simulation of the project | ### Tools to realize the simulation of the project | ||||
def __init__(self, challenges=None): | def __init__(self, challenges=None): | ||||
""" Initialize a simulation project | |||||
:challenge: The list of challenge for the simulation | |||||
""" | |||||
self.elmo_folder = pjoin(MODULE_PATH, ELMO_TOOL_REPOSITORY) | self.elmo_folder = pjoin(MODULE_PATH, ELMO_TOOL_REPOSITORY) | ||||
self.challenges = challenges | self.challenges = challenges | ||||
self.reset() | self.reset() | ||||
def reset(self): | def reset(self): | ||||
""" Reset the last simulation """ | |||||
self.is_executed = False | self.is_executed = False | ||||
self.has_been_online = False | self.has_been_online = False | ||||
self._complete_results = None | self._complete_results = None | ||||
self._complete_printed_data = None | self._complete_printed_data = None | ||||
def get_number_of_challenges(self): | |||||
""" Return the number of challenge """ | |||||
return len(self.challenges) if self.challenges else 0 | |||||
def get_test_challenges(self): | def get_test_challenges(self): | ||||
""" Return a fixed list of challenges for test """ | |||||
raise NotImplementedError() | raise NotImplementedError() | ||||
def get_random_challenges(self, nb_challenges): | def get_random_challenges(self, nb_challenges): | ||||
""" Return a list of random challenges | |||||
:nb_challenges: Length of the list | |||||
""" | |||||
raise NotImplementedError() | raise NotImplementedError() | ||||
def set_challenges(self, challenges): | def set_challenges(self, challenges): | ||||
""" Reset the simulation and set the challenges of the next simulation | |||||
:challenges: The list of the challenges | |||||
""" | |||||
self.reset() | self.reset() | ||||
self.challenges = challenges | self.challenges = challenges | ||||
def get_input_filename(self): | def get_input_filename(self): | ||||
""" Return (string) the path of the input file | |||||
of the local installation of ELMO tool | |||||
""" | |||||
return pjoin(self.elmo_folder, ELMO_INPUT_FILE_NAME) | return pjoin(self.elmo_folder, ELMO_INPUT_FILE_NAME) | ||||
def get_printed_data_filename(self): | def get_printed_data_filename(self): | ||||
""" Return the path (string) of the file containing the printed data | |||||
of the local installation of ELMO tool | |||||
""" | |||||
return pjoin(self.elmo_folder, 'output', 'printdata.txt') | return pjoin(self.elmo_folder, 'output', 'printdata.txt') | ||||
def get_asmtrace_filename(self): | def get_asmtrace_filename(self): | ||||
""" Return the path (string) of the file containing the ASM trace | |||||
of the local installation of ELMO tool | |||||
""" | |||||
return pjoin(self.elmo_folder, 'output', 'asmoutput', 'asmtrace00001.txt') | return pjoin(self.elmo_folder, 'output', 'asmoutput', 'asmtrace00001.txt') | ||||
def set_input_for_each_challenge(self, input, challenge): | def set_input_for_each_challenge(self, input, challenge): | ||||
""" Set the input for one challenge for a simulation with ELMO tool | |||||
:input: Descriptor of the input of the ELMO tool (write only) | |||||
:challenge: A challenge for the simulation | |||||
""" | |||||
format = self.get_challenge_format() | format = self.get_challenge_format() | ||||
def aux(sizes, data): | def aux(sizes, data): | ||||
aux(format[num_part], challenge[num_part]) | aux(format[num_part], challenge[num_part]) | ||||
def set_input(self, input): | def set_input(self, input): | ||||
""" Set the input for a simulation with ELMO tool | |||||
First, it writes the number of challenges. | |||||
Then, it writes each challenge one by one thanks to the method 'set_input_for_each_challenge' | |||||
:input: Descriptor of the input of the ELMO tool (write only) | |||||
""" | |||||
if self.challenges: | if self.challenges: | ||||
assert len(self.challenges) < (1 << self._nb_bits_for_nb_challenges), \ | |||||
nb_challenges = self.get_number_of_challenges() | |||||
assert nb_challenges < (1 << self._nb_bits_for_nb_challenges), \ | |||||
'The number of challenges must be strictly lower than {}. Currently, there are {} challenges.'.format( | 'The number of challenges must be strictly lower than {}. Currently, there are {} challenges.'.format( | ||||
1 << self._nb_bits_for_nb_challenges, | 1 << self._nb_bits_for_nb_challenges, | ||||
len(self.challenges), | |||||
nb_challenges, | |||||
) | ) | ||||
write(input, len(self.challenges), nb_bits=self._nb_bits_for_nb_challenges) | |||||
write(input, nb_challenges, nb_bits=self._nb_bits_for_nb_challenges) | |||||
for challenge in self.challenges: | for challenge in self.challenges: | ||||
self.set_input_for_each_challenge(input, challenge) | self.set_input_for_each_challenge(input, challenge) | ||||
def run(self): | def run(self): | ||||
""" Run the simulation thanks the local installation of ELMO tool. | |||||
Using the leaking binary defined thanks to the method 'get_binary_path', | |||||
it will run the ELMO tool to output the leaked power traces. | |||||
The results of the simulation are available via the methods: | |||||
'get_results', 'get_traces', 'get_asmtrace' and 'get_printed_data' | |||||
Return the raw output of the compiled ELMO tool. | |||||
""" | |||||
self.reset() | self.reset() | ||||
with open(self.get_input_filename(), 'w') as _input: | with open(self.get_input_filename(), 'w') as _input: | ||||
self.set_input(_input) | self.set_input(_input) | ||||
return res | return res | ||||
def run_online(self, host=DEFAULT_HOST, port=DEFAULT_PORT): | def run_online(self, host=DEFAULT_HOST, port=DEFAULT_PORT): | ||||
""" Run the simulation thanks to an ELMO server. | |||||
An ELMO server can be launched thanks to the command | |||||
>>> python -m elmo run-server 'host' 'port' | |||||
Using the leaking binary defined thanks to the method 'get_binary_path', | |||||
it will run the ELMO tool to output the leaked power traces. | |||||
The results of the simulation are available via the methods: | |||||
'get_results', 'get_traces', 'get_asmtrace' and 'get_printed_data' | |||||
Return the raw output of the compiled ELMO tool. | |||||
:host: The host of the ELMO server | |||||
:post! The port where the ELMO server is currently listening | |||||
""" | |||||
from .server.protocol import SocketTool | from .server.protocol import SocketTool | ||||
import socket | import socket | ||||
} | } | ||||
### Manipulate the results | ### Manipulate the results | ||||
def get_number_of_challenges(self): | |||||
return len(self.challenges) | |||||
def get_number_of_traces(self): | def get_number_of_traces(self): | ||||
""" Get the number of traces of the last simulation """ | |||||
assert self.is_executed | assert self.is_executed | ||||
return self._nb_traces | return self._nb_traces | ||||
def get_results_filenames(self): | def get_results_filenames(self): | ||||
""" Get the filenames of the results of the last simulation | |||||
Return a list of filenames (strings), each file containing a power trace | |||||
""" | |||||
assert self.is_executed | assert self.is_executed | ||||
assert not self.has_been_online | assert not self.has_been_online | ||||
nb_traces = self.get_number_of_traces() | nb_traces = self.get_number_of_traces() | ||||
return filenames | return filenames | ||||
def get_results(self): | def get_results(self): | ||||
""" | |||||
""" Get the raw outputs of the last simulation | |||||
Return a list of power traces (represented by a list of floats) | |||||
Warning: The output list is the same object stored in the instance. | Warning: The output list is the same object stored in the instance. | ||||
If you change this object, it will change in the instance too, and the | If you change this object, it will change in the instance too, and the | ||||
next call to 'get_results' will return the changed object. | next call to 'get_results' will return the changed object. | ||||
return self._complete_results | return self._complete_results | ||||
def get_traces(self, indexes=None): | def get_traces(self, indexes=None): | ||||
""" | |||||
""" Get the power trace of the last simulation | |||||
Return a 2-dimensional numpy array of floats. | |||||
1st dimension: number of the trace | |||||
2nd dimension: power point of the trace | |||||
:indexes: if not None, return only the power points contained in :indexes: | |||||
Must be a list of indexes of power points. | |||||
""" | """ | ||||
assert self.is_executed | assert self.is_executed | ||||
results = self.get_results() | results = self.get_results() | ||||
else: | else: | ||||
traces = np.zeros((nb_traces, len(indexes))) | traces = np.zeros((nb_traces, len(indexes))) | ||||
for i in range(nb_traces): | for i in range(nb_traces): | ||||
traces[i,:] = results[i][indexes] | |||||
traces[i,:] = np.array(results[i])[indexes] | |||||
return traces | return traces | ||||
### Manipulate the ASM trace | ### Manipulate the ASM trace | ||||
if self._complete_asmtrace is None: | if self._complete_asmtrace is None: | ||||
with open(self.get_asmtrace_filename(), 'r') as _file: | with open(self.get_asmtrace_filename(), 'r') as _file: | ||||
self._complete_asmtrace = _file.read() | self._complete_asmtrace = _file.read() | ||||
if type(self._complete_asmtrace): | |||||
if type(self._complete_asmtrace) is not list: | |||||
self._complete_asmtrace = self._complete_asmtrace.split('\n') | self._complete_asmtrace = self._complete_asmtrace.split('\n') | ||||
return self._complete_asmtrace | return self._complete_asmtrace |
read2bytes(&nb_challenges); | read2bytes(&nb_challenges); | ||||
for(num_challenge=0; num_challenge<nb_challenges; num_challenge++) { | for(num_challenge=0; num_challenge<nb_challenges; num_challenge++) { | ||||
// Set variables for the current challenge | |||||
starttrigger(); // To start a new trace | starttrigger(); // To start a new trace | ||||
// Do the leaking operations here... | // Do the leaking operations here... |
### Binary operations | ### Binary operations | ||||
def hweight(n): | def hweight(n): | ||||
""" Return the Hamming weight of 'n' """ | |||||
c = 0 | c = 0 | ||||
while n>0: | while n>0: | ||||
c += (n & 1) | c += (n & 1) | ||||
return c | return c | ||||
def hdistance(x,y): | def hdistance(x,y): | ||||
""" Return the Hamming distance between 'x' and 'y' """ | |||||
return hweight(x^y) | return hweight(x^y) | ||||
def binary_writing(n, nb_bits=32, with_hamming=False): | def binary_writing(n, nb_bits=32, with_hamming=False): | ||||
""" Return the binary writing 'w' of 'n' with 'nb_bits' | |||||
If with_hamming is True, return a couple (w, h) with 'h' the Hamming weight of 'n' | |||||
""" | |||||
n = np.array(n) | n = np.array(n) | ||||
w, h = np.zeros((nb_bits, len(n))), np.zeros((len(n))) | w, h = np.zeros((nb_bits, len(n))), np.zeros((len(n))) | ||||
### Conversion | ### Conversion | ||||
def to_hex(v, nb_bits=16): | def to_hex(v, nb_bits=16): | ||||
""" Convert the value 'v' into a hexadecimal string (without the prefix 0x)""" | |||||
try: | try: | ||||
v_hex = v.hex() | v_hex = v.hex() | ||||
except AttributeError: | except AttributeError: | ||||
return '0'*(nb_bits//4-len(v_hex)) + v_hex | return '0'*(nb_bits//4-len(v_hex)) + v_hex | ||||
def split_octet(hexstr): | def split_octet(hexstr): | ||||
""" Split a hexadecimal string (without the prefix 0x) | |||||
into a list of bytes described with a 2-length hexadecimal string | |||||
""" | |||||
return [hexstr[i:i+2] for i in range(0, len(hexstr), 2)] | return [hexstr[i:i+2] for i in range(0, len(hexstr), 2)] | ||||
def to_signed_hex(v, nb_bits=16): | def to_signed_hex(v, nb_bits=16): | ||||
""" Convert the signed value 'v' | |||||
into a list of bytes described with a 2-length hexadecimal string | |||||
""" | |||||
try: | try: | ||||
return split_octet(to_hex(v & ((1<<nb_bits)-1), nb_bits=nb_bits)) | return split_octet(to_hex(v & ((1<<nb_bits)-1), nb_bits=nb_bits)) | ||||
except TypeError as err: | except TypeError as err: | ||||
### Write function | ### Write function | ||||
def write(_input, uint, nb_bits=16): | def write(_input, uint, nb_bits=16): | ||||
""" Write in '_input' the value 'uint' by writing each byte | |||||
with hexadecimal format in a new line | |||||
""" | |||||
uint = to_signed_hex(uint, nb_bits=nb_bits) | uint = to_signed_hex(uint, nb_bits=nb_bits) | ||||
for i in range(nb_bits//8): | for i in range(nb_bits//8): | ||||
_input.write(uint[i]+'\n') | _input.write(uint[i]+'\n') | ||||
def write_list(_input, uint_list, nb_bits=16): | def write_list(_input, uint_list, nb_bits=16): | ||||
""" Write in '_input' the list of values 'uint_list' by writing each byte | |||||
with hexadecimal format in a new line | |||||
""" | |||||
for uint in uint_list: | for uint in uint_list: | ||||
write(_input, uint, nb_bits=nb_bits) | write(_input, uint, nb_bits=nb_bits) | ||||
### Print Utils | ### Print Utils | ||||
class Color: | class Color: | ||||
""" Color codes to print colored text in stdout """ | |||||
HEADER = '\033[95m' | HEADER = '\033[95m' | ||||
OKBLUE = '\033[94m' | OKBLUE = '\033[94m' | ||||
OKCYAN = '\033[96m' | OKCYAN = '\033[96m' |
assert res['nb_traces'] == 10 | assert res['nb_traces'] == 10 | ||||
assert res['nb_instructions'] | assert res['nb_instructions'] | ||||
# Test the methods for analysis | |||||
multiplication_indexes = simulation.get_indexes_of(lambda instr: 'mul' in instr) | |||||
asmtrace = simulation.get_asmtrace() | |||||
for index, instr in enumerate(asmtrace): | |||||
assert ('mul' in instr) == (index in multiplication_indexes) | |||||
traces = simulation.get_traces() | |||||
traces = simulation.get_traces(multiplication_indexes) | |||||
assert traces.shape == (10, len(multiplication_indexes)) | |||||
printed_data = simulation.get_printed_data() | |||||
print_success(' - Test 3 "Use A Real Simulation": Success!') | print_success(' - Test 3 "Use A Real Simulation": Success!') | ||||
######################################################### | ######################################################### |