# Python ELMO | |||||
# Python-ELMO | |||||
_Python ELMO_ is a Python library which proposes an encapsulation of the project _ELMO_. | |||||
_Python-ELMO_ is a Python library which proposes an encapsulation of the project _ELMO_. | |||||
[MOW17] **Towards Practical Tools for Side | [MOW17] **Towards Practical Tools for Side | ||||
Channel Aware Software Engineering : ’Grey Box’ Modelling for Instruction Leakages** | Channel Aware Software Engineering : ’Grey Box’ Modelling for Instruction Leakages** | ||||
**ELMO GitHub**: https://github.com/sca-research/ELMO | **ELMO GitHub**: https://github.com/sca-research/ELMO | ||||
**Python-ELMO Contributors**: Thibauld Feneuil | |||||
## Requirements | ## Requirements | ||||
To use _Python ELMO_, you need at least Python3.5 and ```numpy```. | |||||
To use _Python-ELMO_, you need at least Python3.5 and ```numpy```. | |||||
The library will install and compile ELMO. So, you need the GCC compiler collection and the command/utility 'make' (for more details, see the documentation of ELMO). On Ubuntu/Debian, | |||||
The library will install and compile ELMO. So, you need the GCC compiler collection and the command/utility ```make``` (for more details, see the documentation of ELMO). On Ubuntu/Debian, | |||||
```bash | ```bash | ||||
sudo apt install build-essential | sudo apt install build-essential | ||||
## Installation | ## Installation | ||||
First, download _Python ELMO_. | |||||
First, download _Python-ELMO_. | |||||
```bash | ```bash | ||||
git clone https://git.aprilas.fr/tfeneuil/python-elmo | git clone https://git.aprilas.fr/tfeneuil/python-elmo | ||||
``` | ``` | ||||
And then, install ELMO thanks to the script of installation. | |||||
And then, install ELMO thanks to the installation script. It will use Internet to download the [ELMO project](https://github.com/sca-research/ELMO). | |||||
```bash | ```bash | ||||
python setup.py install | python setup.py install | ||||
To start a new project, you can use the following function. | To start a new project, you can use the following function. | ||||
```python | ```python | ||||
from elmo.manage import create_simulation | |||||
from elmo import create_simulation | |||||
create_simulation( | create_simulation( | ||||
'dilithium', # The (relative) path of the project | 'dilithium', # The (relative) path of the project | ||||
'DilithiumSimulation' # The classname of the simulation | 'DilithiumSimulation' # The classname of the simulation | ||||
### List all the available simulation | ### List all the available simulation | ||||
```python | ```python | ||||
from elmo.manage import search_simulations | |||||
from elmo import search_simulations | |||||
search_simulations('.') | search_simulations('.') | ||||
``` | ``` | ||||
```python | |||||
```text | |||||
{'DilithiumSimulation': <class 'DilithiumSimulation'>, | {'DilithiumSimulation': <class 'DilithiumSimulation'>, | ||||
'KyberNTTSimulation': <class 'KyberNTTSimulation'>} | 'KyberNTTSimulation': <class 'KyberNTTSimulation'>} | ||||
``` | ``` | ||||
_Python ELMO_ offers a example project to you in the repository _projects/Examples_ of the module. This example is a project to generate traces of the execution of the NTT implemented in the cryptosystem [Kyber](https://pq-crystals.org/kyber/). | |||||
_Python-ELMO_ offers a example project to you in the repository _projects/Examples_ of the module. This example is a project to generate traces of the execution of the NTT implemented in the cryptosystem [Kyber](https://pq-crystals.org/kyber/). | |||||
### Use a simulation project | ### Use a simulation project | ||||
Warning! Before using it, you have to compile your project thanks to the provided Makefile. | Warning! Before using it, you have to compile your project thanks to the provided Makefile. | ||||
```python | ```python | ||||
from elmo.manage import get_simulation | |||||
KyberNTTSimulation = get_simulation_via_classname('KyberNTTSimulation') | |||||
from elmo import get_simulation | |||||
KyberNTTSimulation = get_simulation('KyberNTTSimulation') | |||||
import numpy as np | |||||
Kyber512 = {'k': 2, 'n': 256} | |||||
challenges = [ | |||||
np.ones((Kyber512['k'], Kyber512['n']), dtype=int), | |||||
] | |||||
simulation = KyberNTTSimulation() | |||||
challenges = simulation.get_random_challenges(10) | |||||
simulation.set_challenges(challenges) | |||||
simulation = KyberNTTSimulation(challenges) | |||||
simulation.run() # Launch the simulation | simulation.run() # Launch the simulation | ||||
traces = simulation.get_traces() | traces = simulation.get_traces() | ||||
# And now, I can draw and analyse the traces | # And now, I can draw and analyse the traces | ||||
### Use a simulation project thanks to a server | ### Use a simulation project thanks to a server | ||||
Sometimes, it is impossible to run the simulation thanks the simple method _run_ of the project class. Indeed, sometimes the Python script is executed in the environment where _Python ELMO_ cannot launch the ELMO tool. For example, it is the case where _Python ELMO_ is used in SageMath on Windows. On Windows, SageMath installation relies on the Cygwin POSIX emulation system and it can be a problem. | |||||
Sometimes, it is impossible to run the simulation thanks the simple method ```run``` of the project class. Indeed, sometimes the Python script is executed in the environment where _Python-ELMO_ cannot launch the ELMO tool. For example, it is the case where _Python-ELMO_ is used in SageMath on Windows. On Windows, SageMath installation relies on the Cygwin POSIX emulation system and it can be a problem. | |||||
To offer a solution, _Online ELMO_ can be used thanks to a link client-server. The idea is you must launch the script _run\_server.py_ which will listen (by default) at port 5000 in localhost. | |||||
To offer a solution, _Python-ELMO_ can be used thanks to a client-server link. The idea is you must launch the script ```run_server.py``` which will listen (by default) at port 5000 in localhost. | |||||
```bash | ```bash | ||||
python -m elmo run-server | python -m elmo run-server | ||||
``` | ``` | ||||
And after, you can manipulate the projects as described in the previous section by replacing _run_ to _run\_online_. | |||||
And after, you can manipulate the projects as described in the previous section by replacing ```run``` to ```run_online```. | |||||
```python | ```python | ||||
from elmo.manage import get_simulation | from elmo.manage import get_simulation | ||||
KyberNTTSimulation = get_simulation('KyberNTTSimulation') | KyberNTTSimulation = get_simulation('KyberNTTSimulation') | ||||
import numpy as np | |||||
Kyber512 = {'k': 2, 'n': 256} | |||||
challenges = [ | |||||
np.ones((Kyber512['k'], Kyber512['n']), dtype=int), | |||||
] | |||||
simulation = KyberNTTSimulation() | |||||
challenges = simulation.get_random_challenges(10) | |||||
simulation.set_challenges(challenges) | |||||
simulation = KyberNTTSimulation(challenges) | |||||
simulation.run_online() # Launch the simulation THANKS TO A SERVER | simulation.run_online() # Launch the simulation THANKS TO A SERVER | ||||
traces = simulation.get_traces() | traces = simulation.get_traces() | ||||
# And now, I can draw and analyse the traces | # And now, I can draw and analyse the traces | ||||
``` | ``` | ||||
Warning! Using the _run\_online_ method doesn't exempt you from compiling the project with the provided Makefile. | |||||
Warning! Using the ```run_online``` method doesn't exempt you from compiling the project with the provided Makefile. | |||||
### Use the ELMO Engine | ### Use the ELMO Engine | ||||
- the type of the next assembler instruction | - the type of the next assembler instruction | ||||
The type of the instructions are: | The type of the instructions are: | ||||
- "_**EOR**_" for ADD(1-4), AND, CMP, CPY, EOR, MOV, ORR, ROR, SUB; | |||||
- "_**LSL**_" for LSL(2), LSR(2); | |||||
- "_**STR**_" for STR, STRB, STRH; | |||||
- "_**LDR**_" for LDR, LDRB, LDRH; | |||||
- "_**MUL**_" for MUL; | |||||
- "_**OTHER**_" for the other instructions. | |||||
- ```EOR``` for ADD(1-4), AND, CMP, CPY, EOR, MOV, ORR, ROR, SUB; | |||||
- ```LSL``` for LSL(2), LSR(2); | |||||
- ```STR``` for STR, STRB, STRH; | |||||
- ```LDR``` for LDR, LDRB, LDRH; | |||||
- ```MUL``` for MUL; | |||||
- ```OTHER``` for the other instructions. | |||||
```python | ```python | ||||
from elmo.engine import ELMOEngine, Instr | from elmo.engine import ELMOEngine, Instr | ||||
engine.reset_points() # Reset the engine to study other points | engine.reset_points() # Reset the engine to study other points | ||||
``` | ``` | ||||
## Limitations | |||||
Since the [ELMO project](https://github.com/sca-research/ELMO) takes its inputs and outputs from files, _Python-ELMO_ **can not** manage simultaneous runs. | |||||
## Licences | ## Licences | ||||
[MIT](LICENCE.txt) | [MIT](LICENCE.txt) |
exit() | exit() | ||||
if command == 'run-server': | if command == 'run-server': | ||||
from .server.servicethread import ListeningThread | |||||
from .executorthread import ExecutorThread | |||||
def do_main_program(): | |||||
global thread, stop | |||||
thread = ListeningThread('localhost', 5000, ExecutorThread) | |||||
thread.start() | |||||
def program_cleanup(signum, frame): | |||||
global thread, stop | |||||
thread.stop() | |||||
stop = True | |||||
thread = None | |||||
stop = False | |||||
# Execute | |||||
print("Executing...") | |||||
do_main_program() | |||||
print("Done ! And now, listening...") | |||||
import signal | |||||
signal.signal(signal.SIGINT, program_cleanup) | |||||
signal.signal(signal.SIGTERM, program_cleanup) | |||||
from .executor import launch_executor | |||||
host = sys.argv[2] if len(sys.argv) >= 3 else 'localhost' | |||||
port = int(sys.argv[3]) if len(sys.argv) >= 4 else 5000 | |||||
# Wait | |||||
import time | |||||
while not stop: | |||||
time.sleep(1) | |||||
launch_executor(host, port) | |||||
exit() | exit() | ||||
print(Color.FAIL + 'Unknown Command.' + Color.ENDC) | print(Color.FAIL + 'Unknown Command.' + Color.ENDC) | ||||
exit() | exit() |
import shutil | |||||
import os, re | |||||
import sys | |||||
from .server.servicethread import OneShotServiceThread | |||||
from .config import MODULE_PATH, ELMO_TOOL_REPOSITORY, ELMO_INPUT_FILE_NAME | |||||
from .project_base import SimulationProject | |||||
from .manage import execute_simulation | |||||
from .utils import Color | |||||
class Executor(OneShotServiceThread): | |||||
def execute(self): | |||||
data = self.protocol.get_data() | |||||
self.protocol.please_assert(data) | |||||
elmo_path = os.path.join(MODULE_PATH, ELMO_TOOL_REPOSITORY) | |||||
# Set the input of ELMO | |||||
self.protocol.please_assert('input' in data) | |||||
with open(os.path.join(elmo_path, ELMO_INPUT_FILE_NAME), 'w') as _input_file: | |||||
_input_file.write(data['input']) | |||||
self.protocol.send_ack() | |||||
# Get the binary | |||||
binary_content = self.protocol.get_file() | |||||
binary_path = os.path.join(elmo_path, 'project.bin') | |||||
with open(binary_path, 'wb') as _binary_file: | |||||
_binary_file.write(binary_content) | |||||
self.protocol.send_ack() | |||||
### Generate the traces by launching ELMO | |||||
print(Color.OKGREEN + ' - Simulation accepted...' + Color.ENDC) | |||||
simulation = SimulationProject() | |||||
simulation.get_binary_path = lambda: os.path.abspath(binary_path) | |||||
data = execute_simulation(simulation) | |||||
if data['error']: | |||||
print(Color.FAIL + ' - Simulation failed.' + Color.ENDC) | |||||
self.protocol.send_data(results) | |||||
self.protocol.close() | |||||
return | |||||
print(Color.OKGREEN + ' - Simulation finished: {} traces, {} instructions'.format( | |||||
data['nb_traces'], | |||||
data['nb_instructions'], | |||||
) + Color.ENDC) | |||||
output_path = os.path.join(elmo_path, 'output') | |||||
### Get the trace | |||||
data['results'] = [] | |||||
for i in range(data['nb_traces']): | |||||
filename = os.path.join(output_path, 'traces', 'trace%05d.trc' % (i+1)) | |||||
with open(filename, 'r') as _file: | |||||
data['results'].append( | |||||
list(map(float, _file.readlines())) | |||||
) | |||||
### Get asmtrace and printed data | |||||
asmtrace = None | |||||
if ('asmtrace' not in data) or data['asmtrace']: | |||||
with open(os.path.join(output_path, 'asmoutput', 'asmtrace00001.txt'), 'r') as _file: | |||||
data['asmtrace'] = _file.read() | |||||
printed_data = None | |||||
if ('printdata' not in data) or data['printdata']: | |||||
with open(os.path.join(output_path, 'printdata.txt'), 'r') as _file: | |||||
data['printed_data'] = list(map(lambda x: int(x, 16), _file.readlines())) | |||||
### Send results | |||||
print(Color.OKCYAN + ' - Sending results...' + Color.ENDC, end='') | |||||
sys.stdout.flush() | |||||
self.protocol.send_data(data) | |||||
print(Color.OKGREEN + ' Sent!' + Color.ENDC) | |||||
self.protocol.close() | |||||
def launch_executor(host, port, waiting_function=True): | |||||
from .server.servicethread import ListeningThread | |||||
def do_main_program(): | |||||
thread = ListeningThread(host, port, Executor, debug=True) | |||||
thread.start() | |||||
return thread | |||||
def program_cleanup(signum, frame): | |||||
thread.stop() | |||||
thread = do_main_program() | |||||
import signal | |||||
signal.signal(signal.SIGINT, program_cleanup) | |||||
signal.signal(signal.SIGTERM, program_cleanup) | |||||
if waiting_function is True: | |||||
import time | |||||
while thread.is_running(): | |||||
time.sleep(1) | |||||
return | |||||
return_value = None | |||||
if waiting_function: | |||||
return_value = waiting_function() | |||||
if thread.is_running(): | |||||
program_cleanup(None, None) | |||||
return return_value |
if not isinstance(project, SimulationProject): | if not isinstance(project, SimulationProject): | ||||
raise TypeError('The project is not an instance of \'SimulationProject\' class.') | raise TypeError('The project is not an instance of \'SimulationProject\' class.') | ||||
leaking_binary_path = pjoin(project.get_project_directory(), project.get_binary_path()) | |||||
leaking_binary_path = project.get_binary_path() | |||||
if not os.path.isabs(leaking_binary_path): | |||||
leaking_binary_path = pjoin(project.get_project_directory(), project.get_binary_path()) | |||||
if not os.path.isfile(leaking_binary_path): | if not os.path.isfile(leaking_binary_path): | ||||
raise BinaryNotFoundError('Binary not found. Did you compile your project?') | raise BinaryNotFoundError('Binary not found. Did you compile your project?') | ||||
'input': input.get_string(), | 'input': input.get_string(), | ||||
}) | }) | ||||
if not SocketTool.get_ack(s): | if not SocketTool.get_ack(s): | ||||
raise RuntimeError("NACK received: The request has been refused !") | |||||
else: | |||||
SocketTool.send_file(s, '{}/{}'.format(self.get_project_directory(), self.get_binary())) | |||||
data = SocketTool.get_data(s) | |||||
if data['error']: | |||||
raise Exception("The simulation returned an error: {}".format(data['error'])) | |||||
raise RuntimeError("NACK received: The request has been refused!") | |||||
SocketTool.send_file(s, '{}/{}'.format(self.get_project_directory(), self.get_binary_path())) | |||||
if not SocketTool.get_ack(s): | |||||
raise RuntimeError("NACK received: The binary file has been refused!") | |||||
data = SocketTool.get_data(s) | |||||
if data['error']: | |||||
raise Exception("The simulation returned an error: {}".format(data['error'])) | |||||
s.close() | s.close() | ||||
except IOError as err: | except IOError as err: | ||||
raise RuntimeError("The connection refused. Has the ELMO server been switch on ?") from err | |||||
raise RuntimeError("The connection refused. Has the ELMO server been switch on?") from err | |||||
self.is_executed = True | self.is_executed = True | ||||
self.has_been_online = True | self.has_been_online = True | ||||
self._complete_asmtrace = data['asmtrace'] | self._complete_asmtrace = data['asmtrace'] | ||||
self._complete_results = data['results'] | self._complete_results = data['results'] | ||||
self._complete_printed_data = data['printed_data'] | self._complete_printed_data = data['printed_data'] | ||||
return { key: value | |||||
for key, value in data.items() | |||||
if key not in ['results', 'asmtrace', 'printed_data'] | |||||
} | |||||
### Manipulate the ASM trace | ### Manipulate the ASM trace | ||||
def get_asmtrace_filename(self): | def get_asmtrace_filename(self): |
from .server.servicethread import OneShotServiceThread | |||||
import subprocess | |||||
import shutil | |||||
import os, re | |||||
class ExecutorThread(OneShotServiceThread): | |||||
def __init__(self, ip, port, clientsocket, **kwargs): | |||||
super().__init__(ip, port, clientsocket) | |||||
def execute(self): | |||||
data = self.protocol.get_data() | |||||
self.protocol.please_assert(data) | |||||
# Set the input of ELMO | |||||
self.protocol.please_assert('input' in data) | |||||
with open('elmo/input.txt', 'w') as _input_file: | |||||
_input_file.write(data['input']) | |||||
self.protocol.send_ack() | |||||
# Get the binary | |||||
binary_content = self.protocol.get_file() | |||||
with open('elmo/project.bin', 'wb') as _binary_file: | |||||
_binary_file.write(binary_content) | |||||
### Generate the traces by launching ELMO | |||||
command = './elmo ./project.bin' | |||||
cwd = './elmo/' | |||||
process = subprocess.Popen( | |||||
command, shell=True, cwd=cwd, | |||||
executable='/bin/bash', | |||||
stdout=subprocess.PIPE, | |||||
stderr=subprocess.PIPE, | |||||
) | |||||
output, error = process.communicate() | |||||
output = output.decode('latin-1') if output else None | |||||
error = error.decode('latin-1') if error else None | |||||
if error: | |||||
self.protocol.send_data({ | |||||
'output': output, | |||||
'error': error, | |||||
}) | |||||
self.protocol.close() | |||||
return | |||||
### Get traces | |||||
nb_traces = output.count('TRACE NO') | |||||
trace_filenames = [] | |||||
for filename in os.listdir('elmo/output/traces/'): | |||||
if len(trace_filenames) < nb_traces: | |||||
if re.search(r'^trace\d+\.trc$', filename): | |||||
trace_filenames.append('elmo/output/traces/{}'.format(filename)) | |||||
else: | |||||
break | |||||
assert len(trace_filenames) == nb_traces | |||||
results = trace_filenames | |||||
for i in range(len(results)): | |||||
with open(results[i], 'r') as _file: | |||||
results[i] = list(map(float, _file.readlines())) | |||||
### Get asmtrace and printed data | |||||
asmtrace = None | |||||
if not ('asmtrace' in data and not data['asmtrace']): | |||||
with open('elmo/output/asmoutput/asmtrace00001.txt', 'r') as _file: | |||||
asmtrace = ''.join(_file.readlines()) | |||||
printed_data = None | |||||
if not ('printdata' in data and not data['printdata']): | |||||
with open('elmo/output/printdata.txt', 'r') as _file: | |||||
printed_data = list(map(lambda x: int(x, 16), _file.readlines())) | |||||
### Send results | |||||
self.protocol.send_data({ | |||||
'output': output, | |||||
'error': error, | |||||
'nb_traces': nb_traces, | |||||
'results': results, | |||||
'asmtrace': asmtrace, | |||||
'printed_data': printed_data, | |||||
}) | |||||
self.protocol.close() |
import threading | import threading | ||||
from .server.protocol import Protocol, ClosureException | |||||
from .protocol import Protocol, ClosureException | |||||
import socket | import socket | ||||
class ListeningThread(PermanentServiceThread): | class ListeningThread(PermanentServiceThread): | ||||
def __init__(self, host, port, threadclass, **kwargs): | |||||
def __init__(self, host, port, threadclass, debug=False, **kwargs): | |||||
super().__init__() | super().__init__() | ||||
self.hostname = host | self.hostname = host | ||||
self.port = port | self.port = port | ||||
self.threadclass = threadclass | self.threadclass = threadclass | ||||
self.kwargs = kwargs | self.kwargs = kwargs | ||||
self.debug = debug | |||||
def execute(self): | def execute(self): | ||||
self.tcpsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | self.tcpsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
# self.tcpsock.setsockopt(socket.SOL_SOCKET, socket.SO_ATTACH_REUSEPORT_CBPF, 1) | # self.tcpsock.setsockopt(socket.SOL_SOCKET, socket.SO_ATTACH_REUSEPORT_CBPF, 1) | ||||
self.tcpsock.bind((self.hostname, self.port)) | self.tcpsock.bind((self.hostname, self.port)) | ||||
self.tcpsock.listen(5) | self.tcpsock.listen(5) | ||||
print('[port][%s] Listening' % self.port) | |||||
if self.debug: | |||||
print('[port][%s] Listening' % self.port) | |||||
while self.is_running(): | while self.is_running(): | ||||
try: | try: | ||||
(clientsocket, (ip, port)) = self.tcpsock.accept() | (clientsocket, (ip, port)) = self.tcpsock.accept() | ||||
print('[port][{}] Accepted: {} <=> {}'.format( | |||||
self.port, | |||||
clientsocket.getsockname(), | |||||
clientsocket.getpeername(), | |||||
)) | |||||
newthread = self.threadclass(ip, port, clientsocket, **self.kwargs) | |||||
newthread.start() | |||||
if self.is_running(): | |||||
if self.debug: | |||||
print('[port][{}] Accepted: {} <=> {}'.format( | |||||
self.port, | |||||
clientsocket.getsockname(), | |||||
clientsocket.getpeername(), | |||||
)) | |||||
newthread = self.threadclass(ip, port, clientsocket, **self.kwargs) | |||||
newthread.start() | |||||
else: | |||||
break | |||||
except socket.timeout: | except socket.timeout: | ||||
pass | pass | ||||
print('[port][%s] Stop listening' % self.port) | |||||
def stop(self): | def stop(self): | ||||
super().stop() | super().stop() | ||||
clientsocker = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | clientsocker = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
clientsocker.connect( (self.hostname, self.port) ) | |||||
clientsocker.connect((self.hostname, self.port)) | |||||
self.tcpsock.close() | self.tcpsock.close() | ||||
print('[port][%s] Stop listening' % self.port) |
print_success(' - Test 3 "Use A Real Simulation": Success!') | print_success(' - Test 3 "Use A Real Simulation": Success!') | ||||
######################################################### | ######################################################### | ||||
# TEST 4 : USE ELMO BY RUNNING ONLINE # | |||||
######################################################### | |||||
def test_online_server(): | |||||
from elmo import get_simulation | |||||
KyberNTTSimulation = get_simulation('KyberNTTSimulation') | |||||
simulation = KyberNTTSimulation() | |||||
simulation.set_challenges(simulation.get_random_challenges(10)) | |||||
return simulation.run_online() | |||||
from elmo.executor import launch_executor | |||||
# Launch the server and realize the test | |||||
res = launch_executor('localhost', 5000, waiting_function=test_online_server) | |||||
assert not res['error'] | |||||
assert res['nb_traces'] == 10 | |||||
assert res['nb_instructions'] | |||||
print_success(' - Test 4 "Use ELMO By Running Online": Success!') | |||||
######################################################### | |||||
print_success('All seems fine!') | print_success('All seems fine!') |