-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsimprocedures.py
227 lines (181 loc) · 7.46 KB
/
simprocedures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# Copyright 2023 VMware, Inc.
# SPDX-License-Identifier: BSD-2-Clause
import inspect
from typing import Optional, Set, Type, Tuple
import angr
from controlstateplugin import ControlStatePlugin
from arch import arch
import capstone
def state_ip(s:angr.SimState) -> Optional[int]:
v = s.registers.load(arch.ip_reg_name)
try:
return s.solver.eval_one(v)
except angr.SimValueError:
return None
def track_to_ret(proc: angr.SimProcedure):
state = proc.state
control = state.control
assert isinstance(control, ControlStatePlugin)
if control.backtracking:
return
ip = state_ip(state)
assert(ip is not None)
# TODO: Check if we need better way
ret_ip = state.callstack.ret_addr
assert(ret_ip is not None and ret_ip != 0)
# TODO: let the arch give the address width
if ret_ip < 0:
ret_ip += 1 << arch.address_width
br = control.current_branch
while br is not None and br['to_ip'] != ret_ip:
control.next_branch()
br = control.current_branch
if br is None:
# We would not be able to return to the correct address
control.diverged = True
control.expected_ip = None
else:
br.update({
'from_ip': None,
'from_sym': None,
'from_offset': None
})
def track_out_of_syms(proc: angr.SimProcedure, sym_names:Set[str]):
state = proc.state
control = state.control
assert isinstance(control, ControlStatePlugin)
if control.backtracking:
return
ip = state_ip(state)
assert(ip is not None)
br = control.current_branch
while br is not None and br['from_ip'] in sym_names:
control.next_branch()
br = control.current_branch
if br is None:
control.diverged = True
control.expected_ip = None
class CopyProcedure(angr.SimProcedure):
#pylint:disable=arguments-differ
def run(self, dst_addr, src_addr, limit):
track_to_ret(self)
copied = self.state.solver.BVS('copied', 64)
self.state.add_constraints(copied >= 0)
if False and 'unconstrained' in str(limit):
old_limit = limit
limit = self.state.solver.BVS('limit', arch.address_width)
self.state.add_constraints(old_limit == limit)
self.state.add_constraints(limit <= self.state.libc.max_memcpy_size)
#self.state.add_constraints(copied <= self.state.libc.max_memcpy_size)
self.state.add_constraints(copied <= limit)
if not self.state.solver.is_true(copied == 0):
src_mem = self.state.memory.load(src_addr, copied, endness='Iend_LE')
self.state.memory.store(dst_addr, src_mem, size=copied, endness='Iend_LE')
return self.ret(limit - copied)
def __rept__(self) -> str:
return 'CopyProcedure'
class ReturnProcedure(angr.SimProcedure):
def __init__(self):
super(ReturnProcedure, self).__init__()
def run(self):
control = self.state.control
assert isinstance(control, ControlStatePlugin)
if control.backtracking:
self.ret()
track_out_of_syms(self, {'zen_untrain_ret', '__x86_return_thunk'})
if control.diverged:
return None
# Force the correct return address
self.ret_to = control.current_branch['to_ip']
r = self.ret()
self.ret_to = None
control.next_branch()
return r
class ProcedureWrapper(angr.SimProcedure):
def __init__(self, proc_class:Type[angr.SimProcedure], limits:Optional[Tuple[Optional[int], Optional[int]]]=None):
super(ProcedureWrapper, self).__init__()
self.proc_class = proc_class
sig = inspect.signature(proc_class.run)
self.n_parameters = len(sig.parameters) - 1
self.limits = limits and enumerate(limits)
def run(self):
# Collect arguments from the state registers according to the calling convention
track_to_ret(self)
cc = self.state.project.factory.cc()
args = cc.ARG_REGS
# Fetch arguments from the registers
arg_values = [self.state.registers.load(reg) for reg in args][:self.n_parameters]
if self.limits:
for i, (min_val, max_val) in self.limits:
if min_val is None and max_val is None:
continue
val = arg_values[i]
if max_val is not None:
self.state.add_constraints(val <= max_val)
if min_val is not None:
self.state.add_constraints(val >= min_val)
# call the procedure with the fetched arguments
result = self.inline_call(self.proc_class, *arg_values).ret_expr
if result.length == arch.address_width:
return result
return result.sign_extend(arch.address_width - result.length)
class RepHook(angr.exploration_techniques.tracer.RepHook):
def __init__(self, mnemonic):
super().__init__(mnemonic.split(" ")[1])
def trace_to_next(self, state):
c = state.control
assert isinstance(c, ControlStatePlugin)
if not c.backtracking:
addr = state.addr
br = c.current_branch
while br is not None and br['from_ip'] == addr and br['to_ip'] == addr:
c.next_branch()
br = c.current_branch
def run(self, state, procedure=None, *arguments, **kwargs):
self.trace_to_next(state)
if procedure is not None:
result = self._inline_call(state, procedure, *arguments, **kwargs)
print(f'Result of inline call: {result}')
# Invoke the run() method from the parent class
super().run(state)
# TODO: Move to AngrSim
class RetpolineProcedure(angr.SimProcedure):
def __init__(self, reg: str):
super(RetpolineProcedure, self).__init__()
self.reg = reg
def run(self):
state = self.state
reg = getattr(state.regs, self.reg)
control = state.control
if control.backtracking:
return self.jump(reg)
trace_from_ip = control.current_branch['from_ip']
trace_to_ip = control.current_branch['to_ip']
control.expected_ip = trace_to_ip
angr_mgr = control.angr_mgr
current_state_ip = state_ip(state)
prev_state_ip = state.history and state.history.parent and state.history.parent.addr
def in_retpoline(ip:int) -> bool:
sym_name = angr_mgr.get_sym_name(ip)
return (sym_name.startswith('__x86_indirect_thunk') or
sym_name in {'__x86_return_thunk', 'zen_untrain_ret'})
# When using kprobes we skip the retpolines, but when using hardware tracer
# we keep them.
if (current_state_ip == trace_from_ip or
(not in_retpoline(trace_from_ip) and prev_state_ip == trace_from_ip)):
# TODO: Handle the case in which the trace ends with a retpoline
while in_retpoline(trace_to_ip):
control.next_branch()
trace_to_ip = control.current_branch['to_ip']
trace_from_ip = control.current_branch['from_ip']
if not in_retpoline(trace_from_ip):
control.diverged = True
break
control.expected_ip = trace_to_ip
else:
control.diverged = True
if not control.diverged:
state.add_constraints(reg == trace_to_ip)
control.next_branch()
return self.jump(trace_to_ip)
return self.jump(reg)