Python-ELMO is a Python library which offers an encapsulation of the binary tool ELMO, in order to manipulate it easily in Python and SageMath script.
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

project_base.py 13KB

  1. import os, re
  2. import numpy as np
  3. from os.path import join as pjoin
  4. from .config import (
  5. MODULE_PATH,
  6. ELMO_TOOL_REPOSITORY,
  7. ELMO_INPUT_FILE_NAME,
  8. DEFAULT_HOST,
  9. DEFAULT_PORT,
  10. )
  11. from .utils import write
  12. class SimulationProject:
  13. # TAG: EXCLUDE-FROM-SIMULATION-SEARCH
  14. """ Class to manage a simultion
  15. It contains all the parameters of the simulation and has method to use it
  16. """
  17. _nb_bits_for_nb_challenges = 16
  18. _project_directory = None
  19. ### Define the project
  20. @classmethod
  21. def get_project_directory(cl):
  22. """ Return the project directory of the simulation """
  23. if cl._project_directory:
  24. return cl._project_directory
  25. else:
  26. raise NotImplementedError()
  27. @classmethod
  28. def set_project_directory(cl, project_directory):
  29. """ Set the project directory of the simulation """
  30. cl._project_directory = project_directory
  31. @classmethod
  32. def get_binary_path(cl):
  33. """ Return the path of the leaking binary """
  34. raise NotImplementedError()
  35. def get_challenge_format(self):
  36. """ Return the format of one challenge
  37. Used by 'set_input_for_each_challenge' if not rewritten
  38. """
  39. raise NotImplementedError()
  40. ### Tools to realize the simulation of the project
  41. def __init__(self, challenges=None):
  42. """ Initialize a simulation project
  43. :challenge: The list of challenge for the simulation
  44. """
  45. self.elmo_folder = pjoin(MODULE_PATH, ELMO_TOOL_REPOSITORY)
  46. self.challenges = challenges
  47. self.reset()
  48. def reset(self):
  49. """ Reset the last simulation """
  50. self.is_executed = False
  51. self.has_been_online = False
  52. self._nb_traces = None
  53. self._complete_asmtrace = None
  54. self._complete_results = None
  55. self._complete_printed_data = None
  56. def get_number_of_challenges(self):
  57. """ Return the number of challenge """
  58. return len(self.challenges) if self.challenges else 0
  59. def get_test_challenges(self):
  60. """ Return a fixed list of challenges for test """
  61. raise NotImplementedError()
  62. def get_random_challenges(self, nb_challenges):
  63. """ Return a list of random challenges
  64. :nb_challenges: Length of the list
  65. """
  66. raise NotImplementedError()
  67. def set_challenges(self, challenges):
  68. """ Reset the simulation and set the challenges of the next simulation
  69. :challenges: The list of the challenges
  70. """
  71. self.reset()
  72. self.challenges = challenges
  73. def get_input_filename(self):
  74. """ Return (string) the path of the input file
  75. of the local installation of ELMO tool
  76. """
  77. return pjoin(self.elmo_folder, ELMO_INPUT_FILE_NAME)
  78. def get_printed_data_filename(self):
  79. """ Return the path (string) of the file containing the printed data
  80. of the local installation of ELMO tool
  81. """
  82. return pjoin(self.elmo_folder, 'output', 'printdata.txt')
  83. def get_asmtrace_filename(self):
  84. """ Return the path (string) of the file containing the ASM trace
  85. of the local installation of ELMO tool
  86. """
  87. return pjoin(self.elmo_folder, 'output', 'asmoutput', 'asmtrace00001.txt')
  88. def set_input_for_each_challenge(self, input, challenge):
  89. """ Set the input for one challenge for a simulation with ELMO tool
  90. :input: Descriptor of the input of the ELMO tool (write only)
  91. :challenge: A challenge for the simulation
  92. """
  93. format = self.get_challenge_format()
  94. def aux(sizes, data):
  95. if len(sizes) == 0:
  96. write(input, data)
  97. else:
  98. assert len(data) == sizes[0], 'Incorrect format for challenge. Get {} instead of {}'.format(len(data), sizes[0])
  99. for i in range(sizes[0]):
  100. aux(sizes[1:], data[i])
  101. for num_part in range(len(format)):
  102. aux(format[num_part], challenge[num_part])
  103. def set_input(self, input):
  104. """ Set the input for a simulation with ELMO tool
  105. First, it writes the number of challenges.
  106. Then, it writes each challenge one by one thanks to the method 'set_input_for_each_challenge'
  107. :input: Descriptor of the input of the ELMO tool (write only)
  108. """
  109. if self.challenges:
  110. nb_challenges = self.get_number_of_challenges()
  111. assert nb_challenges < (1 << self._nb_bits_for_nb_challenges), \
  112. 'The number of challenges must be strictly lower than {}. Currently, there are {} challenges.'.format(
  113. 1 << self._nb_bits_for_nb_challenges,
  114. nb_challenges,
  115. )
  116. write(input, nb_challenges, nb_bits=self._nb_bits_for_nb_challenges)
  117. for challenge in self.challenges:
  118. self.set_input_for_each_challenge(input, challenge)
  119. def run(self):
  120. """ Run the simulation thanks the local installation of ELMO tool.
  121. Using the leaking binary defined thanks to the method 'get_binary_path',
  122. it will run the ELMO tool to output the leaked power traces.
  123. The results of the simulation are available via the methods:
  124. 'get_results', 'get_traces', 'get_asmtrace' and 'get_printed_data'
  125. Return the raw output of the compiled ELMO tool.
  126. """
  127. self.reset()
  128. with open(self.get_input_filename(), 'w') as _input:
  129. self.set_input(_input)
  130. from .manage import execute_simulation
  131. res = execute_simulation(self)
  132. self.is_executed = True
  133. self.has_been_online = False
  134. self._nb_traces = res['nb_traces']
  135. return res
  136. def run_online(self, host=DEFAULT_HOST, port=DEFAULT_PORT):
  137. """ Run the simulation thanks to an ELMO server.
  138. An ELMO server can be launched thanks to the command
  139. >>> python -m elmo run-server 'host' 'port'
  140. Using the leaking binary defined thanks to the method 'get_binary_path',
  141. it will run the ELMO tool to output the leaked power traces.
  142. The results of the simulation are available via the methods:
  143. 'get_results', 'get_traces', 'get_asmtrace' and 'get_printed_data'
  144. Return the raw output of the compiled ELMO tool.
  145. :host: The host of the ELMO server
  146. :post! The port where the ELMO server is currently listening
  147. """
  148. from .server.protocol import SocketTool
  149. import socket
  150. class TempInput:
  151. def __init__(self):
  152. self._buffer = ''
  153. def write(self, data):
  154. self._buffer += data
  155. def get_string(self):
  156. return self._buffer
  157. self.reset()
  158. input = TempInput()
  159. self.set_input(input)
  160. try:
  161. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  162. s.connect((host, port))
  163. SocketTool.send_data(s, {
  164. 'input': input.get_string(),
  165. })
  166. if not SocketTool.get_ack(s):
  167. raise RuntimeError("NACK received: The request has been refused!")
  168. SocketTool.send_file(s, '{}/{}'.format(self.get_project_directory(), self.get_binary_path()))
  169. if not SocketTool.get_ack(s):
  170. raise RuntimeError("NACK received: The binary file has been refused!")
  171. data = SocketTool.get_data(s)
  172. if data['error']:
  173. raise Exception("The simulation returned an error: {}".format(data['error']))
  174. s.close()
  175. except IOError as err:
  176. raise RuntimeError("The connection refused. Has the ELMO server been switch on?") from err
  177. self.is_executed = True
  178. self.has_been_online = True
  179. self._nb_traces = data['nb_traces']
  180. self._complete_asmtrace = data['asmtrace']
  181. self._complete_results = data['results']
  182. self._complete_printed_data = data['printed_data']
  183. return { key: value
  184. for key, value in data.items()
  185. if key not in ['results', 'asmtrace', 'printed_data']
  186. }
  187. ### Manipulate the results
  188. def get_number_of_traces(self):
  189. """ Get the number of traces of the last simulation """
  190. assert self.is_executed
  191. return self._nb_traces
  192. def get_results_filenames(self):
  193. """ Get the filenames of the results of the last simulation
  194. Return a list of filenames (strings), each file containing a power trace
  195. """
  196. assert self.is_executed
  197. assert not self.has_been_online
  198. nb_traces = self.get_number_of_traces()
  199. output_path = os.path.join(self.elmo_folder, 'output')
  200. filenames = []
  201. for i in range(nb_traces):
  202. filename = os.path.join(output_path, 'traces', 'trace%05d.trc' % (i+1))
  203. assert os.path.isfile(filename)
  204. filenames.append(filename)
  205. return filenames
  206. def get_results(self):
  207. """ Get the raw outputs of the last simulation
  208. Return a list of power traces (represented by a list of floats)
  209. Warning: The output list is the same object stored in the instance.
  210. If you change this object, it will change in the instance too, and the
  211. next call to 'get_results' will return the changed object.
  212. """
  213. assert self.is_executed
  214. nb_traces = self.get_number_of_traces()
  215. # Load the power traces
  216. if self._complete_results is None:
  217. self._complete_results = []
  218. for filename in self.get_results_filenames():
  219. with open(filename, 'r') as _file:
  220. self._complete_results.append(list(map(float, _file.readlines())))
  221. return self._complete_results
  222. def get_traces(self, indexes=None):
  223. """ Get the power trace of the last simulation
  224. Return a 2-dimensional numpy array of floats.
  225. 1st dimension: number of the trace
  226. 2nd dimension: power point of the trace
  227. :indexes: if not None, return only the power points contained in :indexes:
  228. Must be a list of indexes of power points.
  229. """
  230. assert self.is_executed
  231. results = self.get_results()
  232. nb_traces = self.get_number_of_traces()
  233. trace_length = len(results[0])
  234. if indexes is None:
  235. traces = np.zeros((nb_traces, trace_length))
  236. for i in range(nb_traces):
  237. traces[i,:] = results[i]
  238. return traces
  239. else:
  240. traces = np.zeros((nb_traces, len(indexes)))
  241. for i in range(nb_traces):
  242. traces[i,:] = np.array(results[i])[indexes]
  243. return traces
  244. ### Manipulate the ASM trace
  245. def get_asmtrace(self):
  246. """ Get the ASM trace of the last simulation
  247. The ASM trace is the list of the leaking assembler instructions,
  248. one instruction each point of the leakage power trace
  249. """
  250. assert self.is_executed
  251. # Load the ASM trace
  252. if self._complete_asmtrace is None:
  253. with open(self.get_asmtrace_filename(), 'r') as _file:
  254. self._complete_asmtrace = _file.read()
  255. if type(self._complete_asmtrace) is not list:
  256. self._complete_asmtrace = self._complete_asmtrace.split('\n')
  257. return self._complete_asmtrace
  258. def get_indexes_of(self, condition):
  259. """ Get the list of indexes of the instructions
  260. verifying the 'condition' in the ASM trace
  261. :condition: Boolean function with ASM instruction (string) for input
  262. """
  263. assert self.is_executed
  264. return [i for i, instr in enumerate(self.get_asmtrace()) if condition(instr)]
  265. ### Manipulate the Printed Data
  266. def get_printed_data(self, per_trace=True):
  267. """ Get the printed data of the last simulation
  268. A printed data is a data which has been given to the function 'printbyte'
  269. during the simulation
  270. :per_trace: If True (default), split equally the printed data in a table
  271. with a length equal to the number of simulated power traces
  272. """
  273. assert self.is_executed
  274. # Load the printed data
  275. if self._complete_printed_data is None:
  276. with open(self.get_printed_data_filename(), 'r') as _file:
  277. self._complete_printed_data = list(map(lambda x: int(x, 16), _file.readlines()))
  278. if per_trace:
  279. # Return printed data for each trace
  280. data = self._complete_printed_data
  281. nb_traces = self.get_number_of_traces()
  282. nb_data_per_trace = len(data) // nb_traces
  283. return [data[nb_data_per_trace*i:nb_data_per_trace*(i+1)] for i in range(nb_traces)]
  284. else:
  285. # Return printed data
  286. return self._complete_printed_data