@@ -1,2 +1,5 @@ | |||
*.pyc | |||
elmo/* | |||
**/__pycache__/** | |||
build/* | |||
python_elmo.egg-info/* | |||
*.pyc | |||
.DS_Store |
@@ -0,0 +1,3 @@ | |||
recursive-include elmo/projects * | |||
recursive-include elmo/templates * | |||
recursive-exclude elmo/elmo-tool * |
@@ -1,6 +1,6 @@ | |||
# Online ELMO | |||
# Python ELMO | |||
_Online ELMO_ is a Python library which proposes an encapsulation of the project _ELMO_. | |||
_Python ELMO_ is a Python library which proposes an encapsulation of the project _ELMO_. | |||
[MOW17] **Towards Practical Tools for Side | |||
Channel Aware Software Engineering : ’Grey Box’ Modelling for Instruction Leakages** | |||
@@ -11,13 +11,9 @@ https://www.usenix.org/conference/usenixsecurity17/technical-sessions/presentati | |||
## Requirements | |||
To use _Online ELMO_, you need at least Python3.5 and the packages present in the file requirements.txt | |||
To use _Python ELMO_, you need at least Python3.5 and ```numpy```. | |||
```bash | |||
pip3 install -r requirements.txt | |||
``` | |||
The library will install and compile ELMO. So, you need the GCC compiler collection and the command/utility 'make' (for more details, see the documentation of ELMO). On Ubuntu/Debian, | |||
The library will install and compile ELMO. So, you need the GCC compiler collection and the command/utility 'make' (for more details, see the documentation of ELMO). On Ubuntu/Debian, | |||
```bash | |||
sudo apt install build-essential | |||
@@ -27,16 +23,16 @@ To use ELMO on a leaking binary program, you need to compile the C implementatio | |||
## Installation | |||
First, download _Online ELMO_. | |||
First, download _Python ELMO_. | |||
```bash | |||
git clone https://git.aprilas.fr/tfeneuil/OnlineELMO | |||
git clone https://git.aprilas.fr/tfeneuil/python-elmo | |||
``` | |||
And then, install ELMO thanks to the script of installation. | |||
```bash | |||
./install | |||
python setup.py install | |||
``` | |||
## Usage | |||
@@ -51,7 +47,7 @@ What is a _simulation project_ ? It is a project to simulate the traces of _one_ | |||
To start a new project, you can use the following function. | |||
```python | |||
from elmo_online.manage import create_simulation | |||
from elmo.manage import create_simulation | |||
create_simulation( | |||
'dilithium', # The (relative) path of the project | |||
'DilithiumSimulation' # The classname of the simulation | |||
@@ -66,7 +62,7 @@ This function will create a repository _dilithium_ with all the complete squelet | |||
### List all the available simulation | |||
```python | |||
from elmo_online.manage import search_simulations | |||
from elmo.manage import search_simulations | |||
search_simulations('.') | |||
``` | |||
@@ -75,14 +71,14 @@ search_simulations('.') | |||
'KyberNTTSimulation': <class 'KyberNTTSimulation'>} | |||
``` | |||
_Online ELMO_ offers a example project to you in the repository _projects/Examples_ of the module. This example is a project to generate traces of the execution of the NTT implemented in the cryptosystem [Kyber](https://pq-crystals.org/kyber/). | |||
_Python ELMO_ offers a example project to you in the repository _projects/Examples_ of the module. This example is a project to generate traces of the execution of the NTT implemented in the cryptosystem [Kyber](https://pq-crystals.org/kyber/). | |||
### Use a simulation project | |||
Warning! Before using it, you have to compile your project thanks to the provided Makefile. | |||
```python | |||
from elmo_online.manage import get_simulation | |||
from elmo.manage import get_simulation | |||
KyberNTTSimulation = get_simulation_via_classname('KyberNTTSimulation') | |||
import numpy as np | |||
@@ -97,21 +93,21 @@ traces = simulation.get_traces() | |||
# And now, I can draw and analyse the traces | |||
``` | |||
### Use a simulution project thanks to a server | |||
### Use a simulation project thanks to a server | |||
Sometimes, it is impossible to run the simulation thanks the simple method _run_ of the project class. Indeed, sometimes the Python script is executed in the environment where _Online ELMO_ cannot launch the ELMO tool. For example, it is the case where _Online ELMO_ is used in SageMath on Windows. On Windows, SageMath installation relies on the Cygwin POSIX emulation system and it can be a problem. | |||
Sometimes, it is impossible to run the simulation thanks the simple method _run_ of the project class. Indeed, sometimes the Python script is executed in the environment where _Python ELMO_ cannot launch the ELMO tool. For example, it is the case where _Python ELMO_ is used in SageMath on Windows. On Windows, SageMath installation relies on the Cygwin POSIX emulation system and it can be a problem. | |||
To offer a solution, _Online ELMO_ can be used thanks to a link client-server. The idea is you must launch the script _run_server.py_ which will listen (by default) at port 5000 in localhost. | |||
To offer a solution, _Online ELMO_ can be used thanks to a link client-server. The idea is you must launch the script _run\_server.py_ which will listen (by default) at port 5000 in localhost. | |||
```bash | |||
python3 run_server.py | |||
python -m elmo run-server | |||
``` | |||
And after, you can manipulate the projects as described in the previous section by replacing _run_ to _run_online_. | |||
And after, you can manipulate the projects as described in the previous section by replacing _run_ to _run\_online_. | |||
```python | |||
from elmo_online.manage import get_simulation | |||
KyberNTTSimulation = get_simulation_via_classname('KyberNTTSimulation') | |||
from elmo.manage import get_simulation | |||
KyberNTTSimulation = get_simulation('KyberNTTSimulation') | |||
import numpy as np | |||
Kyber512 = {'k': 2, 'n': 256} | |||
@@ -125,7 +121,7 @@ traces = simulation.get_traces() | |||
# And now, I can draw and analyse the traces | |||
``` | |||
Warning! Using the _run_online_ method doesn't exempt you from compiling the project with the provided Makefile. | |||
Warning! Using the _run\_online_ method doesn't exempt you from compiling the project with the provided Makefile. | |||
### Use the ELMO Engine | |||
@@ -143,7 +139,7 @@ The type of the instructions are: | |||
- "_**OTHER**_" for the other instructions. | |||
```python | |||
from elmo_online.engine import ELMOEngine, Instr | |||
from elmo.engine import ELMOEngine, Instr | |||
engine = ELMOEngine() | |||
for i in range(0, 256): | |||
engine.add_point( |
@@ -1,41 +0,0 @@ | |||
import os, shutil | |||
import re | |||
from .manage import create_simulation | |||
print('Creation of a new simulation project...') | |||
### Create the repository of the projects | |||
global_path = 'projects' | |||
os.makedirs(global_path, exist_ok=True) | |||
### Get the project classname | |||
project_classname = '' | |||
search = re.compile(r'[^a-zA-Z0-9_]').search | |||
while not project_classname: | |||
classname = input(' - What is the project classname? ') | |||
if search(classname): | |||
print(' > Illegal characters detected! Please enter a name with only the following characters : a-z, A-Z, 0-9, and "_".') | |||
else: | |||
project_classname = classname.strip() | |||
### Get and create the project repository | |||
search = re.compile(r'[^a-zA-Z0-9.-_/]').search | |||
project_repository = '' | |||
while not project_repository: | |||
repository = input(' - What is the project repository? ') | |||
if search(repository): | |||
print('Illegal characters detected! Please enter a name with only the following characters : a-z, A-Z, 0-9, ".", "-", "_" and "/".') | |||
else: | |||
project_repository = repository | |||
project_path = global_path+'/'+repository | |||
project_path = create_simulation(project_path, classname) | |||
print('') | |||
print('Creation complete !') | |||
print(' - Project repository: {}'.format(project_path)) | |||
print(' - Project class "{}" in {}'.format(project_classname, project_path+'/projectclass.py')) | |||
print(' - Linker script: {}'.format(project_path+'/project.ld'))) | |||
print('') | |||
print('Please don\'t to compile the project with the present Makefile before using it!') |
@@ -0,0 +1,2 @@ | |||
elmo-tool/* | |||
*.pyc |
@@ -0,0 +1,2 @@ | |||
from .manage import * | |||
from .engine import * |
@@ -0,0 +1,89 @@ | |||
import sys | |||
from .utils import Color | |||
if len(sys.argv) <= 1: | |||
print(Color.FAIL + 'Please enter an instruction.' + Color.ENDC) | |||
exit() | |||
command = sys.argv[1] | |||
if command == 'create-simulation': | |||
import os, shutil | |||
import re | |||
print('Creation of a new simulation project...') | |||
### Create the repository of the projects | |||
global_path = 'projects' | |||
os.makedirs(global_path, exist_ok=True) | |||
### Get the project classname | |||
project_classname = '' | |||
search = re.compile(r'[^a-zA-Z0-9_]').search | |||
while not project_classname: | |||
classname = input(' - What is the project classname? ') | |||
if search(classname): | |||
print(' > Illegal characters detected! Please enter a name with only the following characters : a-z, A-Z, 0-9, and "_".') | |||
else: | |||
project_classname = classname.strip() | |||
### Get and create the project repository | |||
search = re.compile(r'[^a-zA-Z0-9.-_/]').search | |||
project_repository = '' | |||
while not project_repository: | |||
repository = input(' - What is the project repository? ') | |||
if search(repository): | |||
print('Illegal characters detected! Please enter a name with only the following characters : a-z, A-Z, 0-9, ".", "-", "_" and "/".') | |||
else: | |||
project_repository = repository | |||
project_path = global_path+'/'+repository | |||
from .manage import create_simulation | |||
project_path = create_simulation(project_path, classname) | |||
print('') | |||
print('Creation complete !') | |||
print(' - Project repository: {}'.format(project_path)) | |||
print(' - Project class "{}" in {}'.format(project_classname, project_path+'/projectclass.py')) | |||
print(' - Linker script: {}'.format(project_path+'/project.ld')) | |||
print('') | |||
print('Please don\'t to compile the project with the present Makefile before using it!') | |||
exit() | |||
if command == 'run-server': | |||
from .server.servicethread import ListeningThread | |||
from .executorthread import ExecutorThread | |||
def do_main_program(): | |||
global thread, stop | |||
thread = ListeningThread('localhost', 5000, ExecutorThread) | |||
thread.start() | |||
def program_cleanup(signum, frame): | |||
global thread, stop | |||
thread.stop() | |||
stop = True | |||
thread = None | |||
stop = False | |||
# Execute | |||
print("Executing...") | |||
do_main_program() | |||
print("Done ! And now, listening...") | |||
import signal | |||
signal.signal(signal.SIGINT, program_cleanup) | |||
signal.signal(signal.SIGTERM, program_cleanup) | |||
# Wait | |||
import time | |||
while not stop: | |||
time.sleep(1) | |||
exit() | |||
print(Color.FAIL + 'Unknown Command.' + Color.ENDC) | |||
exit() |
@@ -0,0 +1,14 @@ | |||
import os | |||
### Configuration of the module | |||
TEMPLATE_REPOSITORY = 'templates' | |||
PROJECTS_REPOSITORY = 'projects' | |||
ELMO_TOOL_REPOSITORY = 'elmo-tool' | |||
ELMO_EXECUTABLE_NAME = 'elmo' | |||
ELMO_OUTPUT_ENCODING = 'latin-1' | |||
ELMO_INPUT_FILE_NAME = 'input.txt' | |||
MODULE_PATH = os.path.dirname(os.path.abspath(__file__)) | |||
SEARCH_EXCLUSION_TAG = 'EXCLUDE-FROM-SIMULATION-SEARCH' |
@@ -2,37 +2,20 @@ import numpy as np | |||
import os | |||
from enum import IntEnum, unique | |||
def hweight(n): | |||
c = 0 | |||
while n>0: | |||
c += (n & 1) | |||
n >>= 1 | |||
return c | |||
def hdistance(x,y): | |||
return hweight(x^y) | |||
def binary_writing(n, nb_bits=32, with_hamming=False): | |||
n = np.array(n) | |||
w, h = np.zeros((nb_bits, len(n))), np.zeros((len(n))) | |||
for ind in range(nb_bits): | |||
w[ind] = (n & 1) | |||
h += w[ind] | |||
n >>= 1 | |||
ind += 1 | |||
return (w, h) if with_hamming else w | |||
from .utils import binary_writing | |||
from .config import ELMO_TOOL_REPOSITORY | |||
@unique | |||
class Instr(IntEnum): | |||
class Instruction(IntEnum): | |||
EOR = 0 | |||
LSL = 1 | |||
STR = 2 | |||
LDR = 3 | |||
MUL = 4 | |||
OTHER = 5 | |||
# Short name for 'Instruction' class | |||
Instr = Instruction | |||
PREVIOUS = 0 | |||
CURRENT = 1 | |||
@@ -49,7 +32,11 @@ class ELMOEngine: | |||
return coeffs | |||
def load_coefficients(self): | |||
filename = os.path.dirname(os.path.abspath(__file__))+'/elmo/coeffs.txt' | |||
filename = os.path.join( | |||
os.path.dirname(os.path.abspath(__file__)), | |||
ELMO_TOOL_REPOSITORY, | |||
'coeffs.txt', | |||
) | |||
self.coefficients = None | |||
with open(filename, 'r') as _file: | |||
self.coefficients = np.array([list(map(float, line.split())) for line in _file.readlines()[:2153]]) |
@@ -0,0 +1,252 @@ | |||
import os, shutil | |||
import re | |||
import inspect | |||
import subprocess | |||
import sys | |||
from os.path import join as pjoin | |||
from .config import ( | |||
TEMPLATE_REPOSITORY, | |||
PROJECTS_REPOSITORY, | |||
ELMO_TOOL_REPOSITORY, | |||
ELMO_EXECUTABLE_NAME, | |||
MODULE_PATH, | |||
ELMO_OUTPUT_ENCODING, | |||
SEARCH_EXCLUSION_TAG, | |||
) | |||
from .project_base import SimulationProject | |||
from .utils import Color | |||
############ GETTERS ############ | |||
def search_simulations_in_repository(repository, criteria=lambda x:True): | |||
""" Search simulation classes in the 'repository' verifying the 'criteria' | |||
Return a list of 'SimulationProject' subclasses | |||
:repository: Repository of the searched simulation classes (string) | |||
:criteria: Boolean function with 'SimulationProject' subclasses for input | |||
""" | |||
projects = {} | |||
from .utils import write | |||
for root, repositories, files in os.walk(repository): | |||
for filename in files: | |||
if re.fullmatch(r'.*project.*\.py', filename): | |||
complete_filename = pjoin(root, filename) | |||
# Encapsulate the project | |||
globals = { | |||
#'__builtins__': {'__build_class__': __build_class__}, | |||
'SimulationProject': SimulationProject, | |||
'write': write, | |||
} | |||
locals = {} | |||
# Read the project code | |||
with open(complete_filename, 'r') as _file: | |||
project = ''.join(_file.read()) | |||
if ('SimulationProject' not in project) or (SEARCH_EXCLUSION_TAG in project): | |||
continue # Exclude this file | |||
exec(project, globals, locals) | |||
# Extract the simulations | |||
for key, obj in locals.items(): | |||
if inspect.isclass(obj) and issubclass(obj, SimulationProject): | |||
if criteria(obj): | |||
if key in projects: | |||
print(Color.WARNING + \ | |||
'Warning! Many simulations with the same name. ' + \ | |||
'Simulation ignored: {} in {}'.format( | |||
key, complete_filename[len(repository)+1:]) + \ | |||
Color.ENDC | |||
) | |||
else: | |||
obj.set_project_directory(os.path.abspath(root)) | |||
projects[key] = obj | |||
return projects | |||
def search_simulations_in_module(criteria=lambda x:True): | |||
""" Search simulation classes among the module projects verifying the 'criteria' | |||
Return a list of 'SimulationProject' subclasses | |||
:criteria: Boolean function with 'SimulationProject' subclasses for input | |||
""" | |||
projects_path = pjoin(MODULE_PATH, PROJECTS_REPOSITORY) | |||
return search_simulations_in_repository(projects_path, criteria) | |||
def search_simulations(repository='.', criteria=lambda x:True): | |||
""" Search simulation classes in the 'repository' and among | |||
the module projects verifying the 'criteria' | |||
Return a list of 'SimulationProject' subclasses | |||
:repository: Repository of the searched simulation classes (string) | |||
:criteria: Boolean function with 'SimulationProject' subclasses for input | |||
""" | |||
projects = search_simulations_in_repository(repository, criteria) | |||
module_projects = search_simulations_in_module(criteria) | |||
for key, project in module_projects.items(): | |||
if key not in projects: | |||
projects[key] = project | |||
return projects | |||
class SimulationNotFoundError(Exception): | |||
pass | |||
class TooManySimulationsError(Exception): | |||
pass | |||
def get_simulation(classname=None, repository='.'): | |||
""" Get a simulation class in the 'repository' with the specific 'classname' | |||
Return a subclass of 'SimulationProject' class | |||
:classname: Name of the searched simulation class (string, optional) | |||
:repository: Repository of the searched simulation class (string, optional) | |||
""" | |||
criteria = lambda x: True | |||
if classname is not None: | |||
criteria = lambda x: x.__name__ == classname.strip() | |||
projects = search_simulations(repository, criteria) | |||
if len(projects) == 1: | |||
return list(projects.values())[0] | |||
elif len(projects) < 1: | |||
raise SimulationNotFoundError() | |||
else: | |||
raise TooManySimulationsError() | |||
############ SETTERS ############ | |||
def create_simulation(repository, classname, is_a_module_project=False): | |||
""" Create a Simulation Project. | |||
It builds a repository with the standard content for the project. | |||
Return the absolute path of the created repository | |||
:repository: Name of repository which will be created (string) | |||
:classname: Name of the Python class to manage the project (string) | |||
:is_a_module_project: If True and :repository: is a relative path, | |||
create the project among the module projects. | |||
""" | |||
# Update the repository, if necessary | |||
if is_a_module_project and (not os.path.isabs(repository)): | |||
repository = pjoin( | |||
MODULE_PATH, | |||
PROJECTS_REPOSITORY, | |||
repository, | |||
) | |||
# Build the project repository | |||
try: | |||
os.makedirs(repository, exist_ok=False) | |||
except FileExistsError as err: | |||
raise FileExistsError('Error, a project with this repository already exists!') from err | |||
elmo_path = pjoin(MODULE_PATH, ELMO_TOOL_REPOSITORY) | |||
template_path = pjoin(MODULE_PATH, TEMPLATE_REPOSITORY) | |||
project_path = repository | |||
# Add standard content in the project | |||
files_from_ELMO = [ | |||
pjoin('Examples', 'elmoasmfunctions.o'), | |||
pjoin('Examples', 'elmoasmfunctions.s'), | |||
pjoin('Examples', 'elmoasmfunctionsdef.h'), | |||
pjoin('Examples', 'DPATraces', 'MBedAES', 'vector.o'), | |||
] | |||
files_from_templates = [ | |||
'elmoasmfunctionsdef-extension.h', | |||
'Makefile', | |||
'project.c' | |||
] | |||
for filename in files_from_ELMO: | |||
shutil.copy(pjoin(elmo_path, filename), project_path) | |||
for filename in files_from_templates: | |||
shutil.copy(pjoin(template_path, filename), project_path) | |||
shutil.copy( | |||
pjoin(elmo_path, 'Examples', 'DPATraces', 'MBedAES', 'MBedAES.ld'), | |||
pjoin(project_path, 'project.ld') | |||
) | |||
# Create the project class | |||
with open(pjoin(template_path, 'projectclass.txt')) as _source: | |||
code = ''.join(_source.readlines()) | |||
code = code.replace('{{PROJECTCLASSNAME}}', classname) | |||
with open(pjoin(project_path, 'projectclass.py'), 'w') as _dest: | |||
_dest.write(code) | |||
# Return the path of the created repository | |||
return os.path.abspath(project_path) | |||
############ USAGE FUNCTIONS ############ | |||
class DontFindBinaryError(Exception): | |||
pass | |||
def execute_simulation(project): | |||
""" Execute a simulation of the power leakage using ELMO tool | |||
Return the output and the errors of the execution of ELMO tool | |||
:project: Subclass of 'SimulationProject' defining all the parameters of the simulation | |||
""" | |||
elmo_path = pjoin(MODULE_PATH, ELMO_TOOL_REPOSITORY) | |||
# Some checking before executing | |||
if not isinstance(project, SimulationProject): | |||
raise TypeError('The project is not an instance of \'SimulationProject\' class.') | |||
leaking_binary_path = pjoin(project.get_project_directory(), project.get_binary_path()) | |||
if not os.path.isfile(leaking_binary_path): | |||
raise BinaryNotFoundError('Binary not found. Did you compile your project?') | |||
if not os.path.isfile(pjoin(elmo_path, ELMO_EXECUTABLE_NAME)): | |||
raise Exception('Installation Error: the executable of the ELMO tool is not found.') | |||
# Launch generation of the traces by launching ELMO | |||
command = '{} "{}"'.format( | |||
pjoin('.', ELMO_EXECUTABLE_NAME), | |||
leaking_binary_path, | |||
) | |||
process = subprocess.Popen(command, shell=True, | |||
cwd=elmo_path, executable='/bin/bash', | |||
stdout=subprocess.PIPE, stderr=subprocess.PIPE | |||
) | |||
# Follow the generation | |||
output, error = b'', b'' | |||
num_trace = 0 | |||
while True: | |||
output_line = process.stdout.readline() | |||
error_line = process.stderr.readline() | |||
if (not output_line) and (not error_line) and (process.poll() is not None): | |||
break | |||
if error_line: | |||
error += error_line | |||
if output_line: | |||
output += output_line | |||
if 'TRACE NO' in output_line.decode(ELMO_OUTPUT_ENCODING): | |||
num_trace += 1 | |||
return_code = process.poll() | |||
# Treat data | |||
output = output.decode(ELMO_OUTPUT_ENCODING) if output else None | |||
error = error.decode(ELMO_OUTPUT_ENCODING) if error else None | |||
nb_traces = None | |||
nb_instructions = None | |||
if output: | |||
nb_traces = output.count('TRACE NO') | |||
nb_instructions_pattern = re.compile(r'\s*instructions/cy..es\s+(\d+)\s*') | |||
for line in output.split('\n'): | |||
res = nb_instructions_pattern.fullmatch(line) | |||
if res: | |||
nb_instructions = res.group(1) | |||
# Return results | |||
return { | |||
'nb_traces': nb_traces, | |||
'nb_instructions': nb_instructions, | |||
'output': output, | |||
'error': error, | |||
} | |||
@@ -1,41 +1,21 @@ | |||
import os, re | |||
import socket | |||
from .protocol import SocketTool | |||
import numpy as np | |||
def to_hex(v, nb_bits=16): | |||
try: | |||
v_hex = v.hex() | |||
except AttributeError: | |||
v_hex = hex(v)[2:] | |||
return '0'*(nb_bits//4-len(v_hex)) + v_hex | |||
def split_octet(hexstr): | |||
return [hexstr[i:i+2] for i in range(0, len(hexstr), 2)] | |||
def to_signed_hex(v, nb_bits=16): | |||
try: | |||
return split_octet(to_hex(v & (2**nb_bits-1), nb_bits=nb_bits)) | |||
except TypeError as err: | |||
raise TypeError('Error to transform a <{}> into signed hex.'.format(type(v))) from err | |||
def write(_input, uintXX, nb_bits=16): | |||
uintXX = to_signed_hex(uintXX, nb_bits=nb_bits) | |||
for i in range(nb_bits//8): | |||
_input.write(uintXX[i]+'\n') | |||
def write_list(_input, uint16_list): | |||
for uint16 in uint16_list: | |||
write(_input, uint16) | |||
from .config import MODULE_PATH, ELMO_TOOL_REPOSITORY | |||
from .utils import write | |||
class SimulationProject: | |||
# TAG: EXCLUDE-FROM-SIMULATION-SEARCH | |||
""" Class to manage a simultion | |||
It contains all the parameters of the simulation and has method to use it | |||
""" | |||
_nb_bits_for_nb_challenges = 16 | |||
_project_directory = None | |||
### Define the project | |||
@classmethod | |||
def get_project_directory(cl): | |||
""" """ | |||
if cl._project_directory: | |||
return cl._project_directory | |||
else: | |||
@@ -54,24 +34,20 @@ class SimulationProject: | |||
return '' | |||
@classmethod | |||
def get_binary(cl): | |||
def get_binary_path(cl): | |||
raise NotImplementedError() | |||
@classmethod | |||
def get_parameters_names(cl): | |||
return set() | |||
@classmethod | |||
def adapt_project(cl, parameters): | |||
return | |||
def get_challenge_format(self): | |||
raise NotImplementedError() | |||
### Tools to realize the simulation of the project | |||
def __init__(self, challenges=None): | |||
self.elmo_folder = os.path.dirname(os.path.abspath(__file__))+'/elmo' | |||
self.elmo_folder = os.path.join(MODULE_PATH, ELMO_TOOL_REPOSITORY) | |||
self.challenges = challenges | |||
self.reset() | |||
@@ -83,6 +59,12 @@ class SimulationProject: | |||
self._complete_results = None | |||
self._complete_printed_data = None | |||
def get_test_challenges(self): | |||
raise NotImplementedError() | |||
def get_random_challenges(self, nb_challenges): | |||
raise NotImplementedError() | |||
def set_challenges(self, challenges): | |||
self.challenges = challenges | |||
@@ -104,10 +86,15 @@ class SimulationProject: | |||
aux(format[num_part], challenge[num_part]) | |||
def set_input(self, input): | |||
assert len(self.challenges) < 2**16, 'The number of challenges must be strictly lower than 65536. Currently, there are {} challenges.'.format(len(self.challenges)) | |||
write(input, len(self.challenges), nb_bits=self._nb_bits_for_nb_challenges) | |||
for challenge in self.challenges: | |||
self.set_input_for_each_challenge(input, challenge) | |||
if self.challenges: | |||
assert len(self.challenges) < (1 << self._nb_bits_for_nb_challenges), \ | |||
'The number of challenges must be strictly lower than {}. Currently, there are {} challenges.'.format( | |||
1 << self._nb_bits_for_nb_challenges, | |||
len(self.challenges), | |||
) | |||
write(input, len(self.challenges), nb_bits=self._nb_bits_for_nb_challenges) | |||
for challenge in self.challenges: | |||
self.set_input_for_each_challenge(input, challenge) | |||
def run(self): | |||
self.reset() | |||
@@ -120,6 +107,9 @@ class SimulationProject: | |||
return res | |||
def run_online(self, host='localhost', port=5000): | |||
from .server.protocol import SocketTool | |||
import socket | |||
class TempInput: | |||
def __init__(self): | |||
self._buffer = '' |
@@ -1,6 +1,5 @@ | |||
*.elf | |||
*.list | |||
*.bin | |||
*.map | |||
*.d | |||
*.o |
@@ -8,8 +8,11 @@ | |||
### - get_simulation_via_classname(classname) | |||
class KyberNTTSimulation(SimulationProject): | |||
KYBER_K = 2 #k=2 for Kyber512 | |||
KYBER_N = 256 #n=256 for Kyber512 | |||
@classmethod | |||
def get_binary(cl): | |||
def get_binary_path(cl): | |||
return 'project.bin' | |||
def __init__(self, *args, **kwargs): | |||
@@ -26,6 +29,23 @@ class KyberNTTSimulation(SimulationProject): | |||
secret = challenge | |||
# Write the secret vector | |||
for j in range(2): #k=2 for Kyber512 | |||
for k in range(256): #n=256 for Kyber512 | |||
write(input, secret[j,k]) | |||
for j in range(self.KYBER_K): | |||
for k in range(self.KYBER_N): | |||
write(input, secret[j,k]) | |||
def get_test_challenges(self): | |||
import numpy as np | |||
just_ones = np.ones((self.KYBER_K, self.KYBER_N), dtype=int) | |||
return [ | |||
0 * just_ones, | |||
1 * just_ones, | |||
-2 * just_ones, | |||
] | |||
def get_random_challenges(self, nb_challenges=5): | |||
import numpy as np | |||
return [ np.random.choice( | |||
[-2, -1, 0, 1, 2], | |||
(self.KYBER_K, self.KYBER_N), | |||
p=[1/16, 4/16, 6/16, 4/16, 1/16], | |||
) for _ in range(nb_challenges) ] |
@@ -1,4 +1,4 @@ | |||
from servicethread import OneShotServiceThread | |||
from .server.servicethread import OneShotServiceThread | |||
import subprocess | |||
import shutil |
@@ -1,5 +1,5 @@ | |||
import threading | |||
from protocol import Protocol, ClosureException | |||
from .server.protocol import Protocol, ClosureException | |||
import socket | |||
@@ -4,12 +4,11 @@ | |||
### to write an integer of 'nb_bits' bits in the 'input_file'. | |||
### To get this simulation class in Python scripts, please use the functions in manage.py as | |||
### - search_simulations(repository) | |||
### - get_simulation(repository, classname=None) | |||
### - get_simulation_via_classname(classname) | |||
### - get_simulation(repository='.', classname=None) | |||
class {{PROJECTCLASSNAME}}(SimulationProject): | |||
@classmethod | |||
def get_binary(cl): | |||
def get_binary_path(cl): | |||
return 'project.bin' | |||
def __init__(self, *args, **kwargs): |
@@ -0,0 +1,64 @@ | |||
import numpy as np | |||
### Binary operations | |||
def hweight(n): | |||
c = 0 | |||
while n>0: | |||
c += (n & 1) | |||
n >>= 1 | |||
return c | |||
def hdistance(x,y): | |||
return hweight(x^y) | |||
def binary_writing(n, nb_bits=32, with_hamming=False): | |||
n = np.array(n) | |||
w, h = np.zeros((nb_bits, len(n))), np.zeros((len(n))) | |||
for ind in range(nb_bits): | |||
w[ind] = (n & 1) | |||
h += w[ind] | |||
n >>= 1 | |||
ind += 1 | |||
return (w, h) if with_hamming else w | |||
### Conversion | |||
def to_hex(v, nb_bits=16): | |||
try: | |||
v_hex = v.hex() | |||
except AttributeError: | |||
v_hex = hex(v)[2:] | |||
return '0'*(nb_bits//4-len(v_hex)) + v_hex | |||
def split_octet(hexstr): | |||
return [hexstr[i:i+2] for i in range(0, len(hexstr), 2)] | |||
def to_signed_hex(v, nb_bits=16): | |||
try: | |||
return split_octet(to_hex(v & ((1<<nb_bits)-1), nb_bits=nb_bits)) | |||
except TypeError as err: | |||
raise TypeError('Error to transform a <{}> into signed hex.'.format(type(v))) from err | |||
### Write function | |||
def write(_input, uint, nb_bits=16): | |||
uint = to_signed_hex(uint, nb_bits=nb_bits) | |||
for i in range(nb_bits//8): | |||
_input.write(uint[i]+'\n') | |||
def write_list(_input, uint_list, nb_bits=16): | |||
for uint in uint_list: | |||
write(_input, uint, nb_bits=nb_bits) | |||
### Print Utils | |||
class Color: | |||
HEADER = '\033[95m' | |||
OKBLUE = '\033[94m' | |||
OKCYAN = '\033[96m' | |||
OKGREEN = '\033[92m' | |||
WARNING = '\033[93m' | |||
FAIL = '\033[91m' | |||
ENDC = '\033[0m' | |||
BOLD = '\033[1m' | |||
UNDERLINE = '\033[4m' |
@@ -1,18 +0,0 @@ | |||
#!/bin/bash | |||
elmo_repository="elmo" | |||
elmo_source="https://github.com/sca-research/ELMO.git" | |||
if [ -d ${elmo_repository} ]; then | |||
echo "ELMO tool already installed." | |||
else | |||
echo "ELMO tool NOT installed... Installing via online resources..." | |||
# Download the tool | |||
git clone --depth=1 --branch=master ${elmo_source} ${elmo_repository} | |||
rm -rf ./${elmo_repository}/.git | |||
# Compile the tool | |||
cd ./${elmo_repository} | |||
make | |||
cd .. | |||
fi |
@@ -1,163 +0,0 @@ | |||
import os, shutil | |||
import re | |||
import inspect | |||
import subprocess | |||
def search_simulations_in_repository(repository, criteria=lambda x:True): | |||
""" To search simulation classes in the 'repository' verifying the 'criteria' """ | |||
projects = {} | |||
from .project_base import SimulationProject, write | |||
for root, repositories, files in os.walk(repository): | |||
for filename in files: | |||
if re.fullmatch(r'.*project.*\.py', filename): | |||
# Encapsulation the project | |||
complete_filename = root+'/'+filename | |||
globals = { | |||
#'__builtins__': {'__build_class__': __build_class__}, | |||
'SimulationProject': SimulationProject, | |||
'write': write, | |||
} | |||
locals = {} | |||
# Read the project code | |||
with open(complete_filename, 'r') as _file: | |||
project = '\n'.join(_file.readlines()) | |||
exec(project, globals, locals) | |||
# Extract the simulations | |||
for key, obj in locals.items(): | |||
if inspect.isclass(obj) and issubclass(obj, SimulationProject): | |||
if criteria(obj): | |||
if key in projects: | |||
print('Warning! Multiplie simulation with the same name. Simulation ignored: {} in {}'.format(key, complete_filename[len(repository)+1:])) | |||
else: | |||
obj.set_project_directory(os.path.abspath(root)) | |||
projects[key] = obj | |||
return projects | |||
def search_simulations_in_module(criteria=lambda x:True): | |||
module_path = os.path.dirname(os.path.abspath(__file__)) | |||
projects_path = module_path+'/projects' | |||
return search_simulations_in_repository(projects_path, criteria) | |||
def search_simulations(repository, criteria=lambda x:True): | |||
projects = search_simulations_in_repository(repository, criteria) | |||
module_projects = search_simulations_in_module(criteria) | |||
for key, project in module_projects.items(): | |||
if key not in projects: | |||
projects[key] = project | |||
return projects | |||
class SimulationNotFoundError(Exception): | |||
pass | |||
class TooManySimulationsError(Exception): | |||
pass | |||
def get_simulation(repository, classname=None): | |||
""" Get a simulation class in the 'repository' with the specific 'classname' """ | |||
criteria = lambda x: True | |||
if classname is not None: | |||
criteria = lambda x: x.__name__ == classname.strip() | |||
projects = search_simulations(repository, criteria) | |||
if len(projects) == 1: | |||
return list(projects.values())[0] | |||
elif len(projects) < 1: | |||
raise SimulationNotFoundError() | |||
else: | |||
raise TooManySimulationsError() | |||
def get_simulation_via_classname(classname): | |||
return get_simulation('.', classname) | |||
def create_simulation(repository, classname): | |||
""" Create a simulation class """ | |||
try: | |||
os.makedirs(repository, exist_ok=False) | |||
except FileExistsError as err: | |||
raise FileExistsError('Error, a project with this repository already exists!') from err | |||
module_path = os.path.dirname(os.path.abspath(__file__)) | |||
elmo_path = module_path+'/elmo' | |||
template_path = module_path+'/templates' | |||
project_path = repository | |||
### Add contents in the project | |||
files_from_ELMO = [ | |||
'Examples/elmoasmfunctions.o', | |||
'Examples/elmoasmfunctions.s', | |||
'Examples/elmoasmfunctionsdef.h', | |||
'Examples/DPATraces/MBedAES/vector.o', | |||
] | |||
files_from_templates = [ | |||
'elmoasmfunctionsdef-extension.h', | |||
'Makefile', | |||
'project.c' | |||
] | |||
for filename in files_from_ELMO: | |||
shutil.copy(elmo_path+'/'+filename, project_path) | |||
for filename in files_from_templates: | |||
shutil.copy(template_path+'/'+filename, project_path) | |||
shutil.copy(elmo_path+'/'+'Examples/DPATraces/MBedAES/MBedAES.ld', project_path+'/'+'project.ld') | |||
### Create the project class | |||
with open(template_path+'/projectclass.py') as _source: | |||
code = ''.join(_source.readlines()) | |||
code = code.replace('{{PROJECTCLASSNAME}}', classname) | |||
with open(project_path+'/'+'projectclass.py', 'w') as _dest: | |||
_dest.write(code) | |||
return os.path.abspath(project_path) | |||
def execute_simulation(project, data=None): | |||
""" Execute a simulation with 'data' """ | |||
# Make the compilation | |||
if False: | |||
print('Compiling binary...') | |||
# Adapt the project source | |||
with open('binaries/Frodo/frodo-base.c', 'r') as _src_file: | |||
content = _src_file.read() | |||
with open('binaries/Frodo/frodo.c', 'w') as _dst_file: | |||
_dst_file.write(content.replace('%NEED_TO_FILL%', str(n))) | |||
# Compile the project | |||
make_directory = 'projects/{}/{}'.format(project.get_project_directory(), project.get_make_directory()) | |||
process = subprocess.Popen('make', shell=True, cwd=make_directory, executable='/bin/bash', stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |||
output, error = process.communicate() | |||
if error and ('error' in error.decode('latin-1')): | |||
print("Error to compile") | |||
print(error) | |||
raise Exception() | |||
# Save last compilation data | |||
global_variables[project_name] = { | |||
'last_n': n, | |||
} | |||
# Generate the trace by launching ELMO | |||
command = './elmo "{}/{}"'.format( | |||
project.get_project_directory(), | |||
project.get_binary() | |||
) | |||
cwd = os.path.dirname(os.path.abspath(__file__))+'/elmo' | |||
process = subprocess.Popen(command, shell=True, cwd=cwd, executable='/bin/bash', stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |||
output, error = process.communicate() | |||
output = output.decode('latin-1') if output else None | |||
error = error.decode('latin-1') if error else None | |||
nb_traces = output.count('TRACE NO') | |||
# Return results | |||
return { | |||
'output': output, | |||
'error': error, | |||
'nb_traces': nb_traces, | |||
} | |||
@@ -1 +0,0 @@ | |||
numpy==1.19.1 |
@@ -1,29 +0,0 @@ | |||
from servicethread import ListeningThread | |||
from executorthread import ExecutorThread | |||
def do_main_program(): | |||
global thread, stop | |||
thread = ListeningThread('localhost', 5000, ExecutorThread) | |||
thread.start() | |||
def program_cleanup(signum, frame): | |||
global thread, stop | |||
thread.stop() | |||
stop = True | |||
thread = None | |||
stop = False | |||
# Execute | |||
print("Executing...") | |||
do_main_program() | |||
print("Done ! And now, listening...") | |||
import signal | |||
signal.signal(signal.SIGINT, program_cleanup) | |||
signal.signal(signal.SIGTERM, program_cleanup) | |||
# Wait | |||
import time | |||
while not stop: | |||
time.sleep(1) |
@@ -0,0 +1,96 @@ | |||
import setuptools | |||
from setuptools.command.build_py import build_py | |||
from setuptools.command.install import install | |||
from subprocess import check_call | |||
import os, shutil | |||
import re | |||
### CONFIGURATION | |||
PELMO_SOURCE = 'https://github.com/ThFeneuil/python-elmo' | |||
ELMO_MODULE_NAME = 'elmo' | |||
ELMO_SOURCE = 'https://github.com/sca-research/ELMO.git' | |||
# Import configuration of ELMO_MODULE, | |||
# cannot use 'import elmo' because it can be ambiguous which module it will call | |||
with open(os.path.join(ELMO_MODULE_NAME, 'config.py')) as _file: | |||
globals = {'__file__': os.path.join( | |||
os.path.abspath(ELMO_MODULE_NAME), | |||
'config.py' | |||
)} | |||
exec(_file.read(), globals) | |||
ELMO_TOOL_REPOSITORY = globals['ELMO_TOOL_REPOSITORY'] | |||
ELMO_INPUT_FILE_NAME = globals['ELMO_INPUT_FILE_NAME'] | |||
def install_elmo_tool(elmo_complete_path): | |||
## Download the tool | |||
elmo_download_command = "git clone --depth=1 --branch=master {url} {elmo_path}".format( | |||
url=ELMO_SOURCE, | |||
elmo_path=elmo_complete_path, | |||
) | |||
check_call(elmo_download_command.split()) | |||
shutil.rmtree(os.path.join(elmo_complete_path, '.git')) | |||
# 'test' contains a Python2 test, and it raises an error during byte-compiling | |||
shutil.rmtree(os.path.join(elmo_complete_path, 'test')) | |||
## Setup the tool | |||
elmodefines_h = None | |||
elmodefines_h_path = os.path.join(elmo_complete_path, 'elmodefines.h') | |||
with open(elmodefines_h_path, 'r') as _file: | |||
elmodefines_lines = _file.readlines() | |||
for i, line in enumerate(elmodefines_lines): | |||
if re.match(r'\s*#define\s+DATAFILEPATH', line): | |||
elmodefines_lines[i] = '#define DATAFILEPATH "{}"'.format(ELMO_INPUT_FILE_NAME) | |||
elmodefines_h = ''.join(elmodefines_lines) | |||
with open(elmodefines_h_path, 'w') as _file: | |||
_file.write(elmodefines_h) | |||
# Compile the tool | |||
check_call("make clean".split(), cwd=elmo_complete_path) | |||
check_call("make".split(), cwd=elmo_complete_path) | |||
class PostBuildCommand(build_py): | |||
""" Build Command to add the ELMO installation """ | |||
def run(self): | |||
build_py.run(self) | |||
# ELMO Installation | |||
elmo_complete_path = os.path.join( | |||
self.build_lib, | |||
ELMO_MODULE_NAME, | |||
ELMO_TOOL_REPOSITORY, | |||
) | |||
shutil.rmtree(elmo_complete_path, ignore_errors=True) | |||
install_elmo_tool(elmo_complete_path) | |||
if __name__ == '__main__': | |||
with open("README.md", "r") as fh: | |||
long_description = fh.read() | |||
setuptools.setup( | |||
name="python-elmo", | |||
version="0.0.1", | |||
author="Thibauld Feneuil", | |||
author_email="thibauld.feneuil@cryptoexperts.com", | |||
description="Emulator for power Leakage for the M0", | |||
long_description=long_description, | |||
long_description_content_type="text/markdown", | |||
url=PELMO_SOURCE, | |||
project_urls={ | |||
'ELMO Source': ELMO_SOURCE, | |||
'pELMO Source': PELMO_SOURCE, | |||
}, | |||
packages=setuptools.find_packages(), | |||
classifiers=[ | |||
"Programming Language :: Python :: 3", | |||
], | |||
python_requires='>=3.5', | |||
cmdclass={ | |||
'build_py': PostBuildCommand, | |||
}, | |||
install_requires=['numpy'], | |||
include_package_data=True, | |||
) |
@@ -0,0 +1,83 @@ | |||
import os, shutil | |||
from subprocess import check_call | |||
### Install ELMO | |||
from elmo.config import ELMO_TOOL_REPOSITORY | |||
ELMO_SOURCE = 'https://github.com/sca-research/ELMO.git' | |||
elmo_complete_path = os.path.join('elmo', ELMO_TOOL_REPOSITORY) | |||
if not os.path.isdir(elmo_complete_path): | |||
from setup import install_elmo_tool | |||
install_elmo_tool(elmo_complete_path) | |||
def print_success(text): | |||
OKGREEN = '\033[92m' | |||
ENDC = '\033[0m' | |||
print(OKGREEN+text+ENDC) | |||
######################################################### | |||
# TEST 1 : MANAGE A FRESH SIMULATION # | |||
######################################################### | |||
foldername = 'test-project' | |||
classname = 'TestSimu' | |||
### Create Simulation | |||
from elmo.manage import create_simulation | |||
shutil.rmtree('test-project', ignore_errors=True) # Clean before | |||
create_simulation(foldername, classname) | |||
assert os.path.isdir(foldername), 'Folder not created' | |||
### List Available Simulations | |||
from elmo.manage import search_simulations | |||
simulations = search_simulations(foldername) | |||
assert classname in simulations, 'Test Simulation not found' | |||
### Use a Simulation | |||
from elmo.manage import get_simulation | |||
simu = get_simulation(classname, foldername) | |||
shutil.rmtree(foldername) | |||
print_success(' - Test 1 "Manage A Fresh Simulation": Success!') | |||
######################################################### | |||
# TEST 2 : USE ELMO ENGINE # | |||
######################################################### | |||
### Use the ELMO Engine | |||
from elmo.engine import ELMOEngine, Instr | |||
engine = ELMOEngine() | |||
for i in range(0, 256): | |||
engine.add_point( | |||
(Instr.LDR, Instr.MUL, Instr.OTHER), # Types of the previous, current and next instructions | |||
(0x0000, i), # Operands of the previous instructions | |||
(0x2BAC, i) # Operands of the current instructions | |||
) | |||
engine.run() # Compute the power consumption of all these points | |||
power = engine.power # Numpy 1D array with an entry for each previous point | |||
engine.reset_points() # Reset the engine to study other points | |||
assert power.shape == (256, ) | |||
print_success(' - Test 2 "Use ELMO Engine": Success!') | |||
######################################################### | |||
# TEST 3 : USE A REAL SIMULATION # | |||
######################################################### | |||
from elmo import get_simulation | |||
KyberNTTSimulation = get_simulation('KyberNTTSimulation') | |||
simulation = KyberNTTSimulation() | |||
simulation.set_challenges(simulation.get_random_challenges(10)) | |||
res = simulation.run() | |||
assert not res['error'] | |||
assert res['nb_traces'] == 10 | |||
assert res['nb_instructions'] | |||
print_success(' - Test 3 "Use A Real Simulation": Success!') | |||
######################################################### | |||
print_success('All seems fine!') |