Skip to content

Commit b16e2c7

Browse files
committed
feat(r2): wrapper class R2Qiling and R2Mem
1 parent e310cb8 commit b16e2c7

File tree

4 files changed

+232
-6
lines changed

4 files changed

+232
-6
lines changed

examples/extensions/r2/deflat_r2.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from qiling import Qiling
1111
from qiling.const import QL_VERBOSE
12-
from qiling.extensions.r2 import R2, R2Deflator
12+
from qiling.extensions.r2 import R2Qiling
1313

1414

1515

qiling/extensions/r2/__init__.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,39 @@
1+
from qiling import Qiling
12
from .r2 import R2
3+
from .mem import R2Mem
24
from .deflat import R2Deflator
5+
6+
from unicorn.unicorn_const import UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC
7+
8+
9+
class R2Qiling(Qiling):
10+
def __init__(self, *args, **kwargs):
11+
super().__init__(*args, **kwargs)
12+
self._mem = R2Mem(self.mem)
13+
self.r2 = R2(self)
14+
15+
16+
def uc2perm(ps: int) -> str:
17+
perms_d = {
18+
UC_PROT_READ : 'r',
19+
UC_PROT_WRITE : 'w',
20+
UC_PROT_EXEC : 'x'
21+
}
22+
23+
return ''.join(val if idx & ps else '-' for idx, val in perms_d.items())
24+
25+
def assert_mem_equal(ql: 'R2Qiling'):
26+
map_info = ql.mem.map_info
27+
mem_regions = list(ql.uc.mem_regions())
28+
assert len(map_info) == len(mem_regions), f'len: map_info={len(map_info)} != mem_regions={len(mem_regions)}'
29+
for i, mem_region in enumerate(mem_regions):
30+
s, e, p, _, _, data = map_info[i]
31+
if (s, e - 1, p) != mem_region:
32+
ql.log.error('map_info:')
33+
print('\n'.join(ql.mem.get_formatted_mapinfo()))
34+
ql.log.error('uc.mem_regions:')
35+
print('\n'.join(f'{s:010x} - {e:010x} {uc2perm(p)}' for (s, e, p) in mem_regions))
36+
raise AssertionError(f'(start, end, perm): map_info={(s, e - 1, p)} != mem_region={mem_region}')
37+
uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1)
38+
assert len(data) == len(uc_mem), f'len of {i} mem: map_info={len(data)} != mem_region={len(uc_mem)}'
39+
assert data == uc_mem, f'Memory region {i} {mem_region[0]:#x} - {mem_region[1]:#x} not equal to map_info[{i}]'

qiling/extensions/r2/mem.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import ctypes
2+
3+
4+
from qiling.os.memory import QlMemoryManager, MapInfoEntry
5+
from qiling.exception import QlMemoryMappedError
6+
7+
from typing import Any, Callable, Iterator, List, Mapping, MutableSequence, Optional, Pattern, Sequence, Tuple, Union
8+
9+
from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL
10+
11+
class R2Mem(QlMemoryManager):
12+
'''A wrapper for QlMemoryManager that uses map_ptr and store raw memory in map_info
13+
NOTE: ql.mem already contains map_infor after loader.run(), so instead of super().__init__(),
14+
we accept mem object to simulate inheritance by composition
15+
'''
16+
17+
def __init__(self, mem: QlMemoryManager):
18+
self.__dict__.update(mem.__dict__)
19+
self._convert_map()
20+
21+
def _convert_map(self):
22+
'''Clean existing map_info and remap memory'''
23+
mapinfo = self.map_info.copy()
24+
self.map_info = []
25+
self.cmap = {}
26+
for s, e, p, label, _mmio in mapinfo:
27+
data = self.read(s, e - s)
28+
self.ql.uc.mem_unmap(s, e - s)
29+
self.map(s, e - s, p, label, data)
30+
31+
def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: Optional[str] = None, ptr: Optional[bytearray] = None):
32+
"""Map a new memory range.
33+
34+
Args:
35+
addr: memory range base address
36+
size: memory range size (in bytes)
37+
perms: requested permissions mask
38+
info: range label string
39+
ptr: pointer to use (if any)
40+
41+
Raises:
42+
QlMemoryMappedError: in case requested memory range is not fully available
43+
"""
44+
45+
assert perms & ~UC_PROT_ALL == 0, f'unexpected permissions mask {perms}'
46+
47+
if not self.is_available(addr, size):
48+
for line in self.get_formatted_mapinfo():
49+
print(line)
50+
raise QlMemoryMappedError(f'Requested memory {addr:#x} + {size:#x} is unavailable')
51+
52+
buf = self.map_ptr(addr, size, perms, ptr)
53+
self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False, data=buf)
54+
55+
def map_ptr(self, addr: int, size: int, perms: int = UC_PROT_ALL, buf: Optional[bytearray] = None) -> bytearray:
56+
"""Map a new memory range allocated as Python bytearray, will not affect map_info
57+
58+
Args:
59+
addr: memory range base address
60+
size: memory range size (in bytes)
61+
perms: requested permissions mask
62+
buf: bytearray already allocated (if any)
63+
64+
Returns:
65+
bytearray with size, should be added to map_info by caller
66+
"""
67+
buf = buf or bytearray(size)
68+
buf_type = ctypes.c_ubyte * size
69+
cdata = buf_type.from_buffer(buf)
70+
self.cmap[addr] = cdata
71+
self.ql.uc.mem_map_ptr(addr, size, perms, cdata)
72+
return buf
73+
74+
def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio: bool = False, data : bytearray = None):
75+
"""Add a new memory range to map.
76+
77+
Args:
78+
mem_s: memory range start
79+
mem_e: memory range end
80+
mem_p: permissions mask
81+
mem_info: map entry label
82+
is_mmio: memory range is mmio
83+
"""
84+
self.map_info.append((mem_s, mem_e, mem_p, mem_info, is_mmio, data))
85+
self.map_info.sort(key=lambda tp: tp[0])
86+
87+
def del_mapinfo(self, mem_s: int, mem_e: int):
88+
"""Subtract a memory range from map, will destroy data and unmap uc mem in the range.
89+
90+
Args:
91+
mem_s: memory range start
92+
mem_e: memory range end
93+
"""
94+
95+
tmp_map_info: MutableSequence[MapInfoEntry] = []
96+
97+
for s, e, p, info, mmio, data in self.map_info:
98+
if e <= mem_s:
99+
tmp_map_info.append((s, e, p, info, mmio, data))
100+
continue
101+
102+
if s >= mem_e:
103+
tmp_map_info.append((s, e, p, info, mmio, data))
104+
continue
105+
106+
del self.cmap[s] # remove cdata reference starting at s
107+
if s < mem_s:
108+
self.ql.uc.mem_unmap(s, mem_s - s)
109+
self.map_ptr(s, mem_s - s, p, data[:mem_s - s])
110+
tmp_map_info.append((s, mem_s, p, info, mmio, data[:mem_s - s]))
111+
112+
if s == mem_s:
113+
pass
114+
115+
if e > mem_e:
116+
self.ql.uc.mem_unmap(mem_e, e - mem_e)
117+
self.map_ptr(mem_e, e - mem_e, p, data[mem_e - e:])
118+
tmp_map_info.append((mem_e, e, p, info, mmio, data[mem_e - e:]))
119+
120+
if e == mem_e:
121+
pass
122+
123+
del data[mem_s - s:mem_e - s]
124+
125+
self.map_info = tmp_map_info
126+
127+
def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, mem_info: Optional[str] = None, data: Optional[bytearray] = None):
128+
tmp_map_info: Optional[MapInfoEntry] = None
129+
info_idx: int = None
130+
131+
for idx, map_info in enumerate(self.map_info):
132+
if mem_s >= map_info[0] and mem_e <= map_info[1]:
133+
tmp_map_info = map_info
134+
info_idx = idx
135+
break
136+
137+
if tmp_map_info is None:
138+
self.ql.log.error(f'Cannot change mapinfo at {mem_s:#08x}-{mem_e:#08x}')
139+
return
140+
141+
if mem_p is not None:
142+
data = data or self.read(mem_s, mem_e - mem_s).copy()
143+
assert(len(data) == mem_e - mem_s)
144+
self.unmap(mem_s, mem_e - mem_s)
145+
self.map_ptr(mem_s, mem_e - mem_s, mem_p, data)
146+
self.add_mapinfo(mem_s, mem_e, mem_p, mem_info or tmp_map_info[3], tmp_map_info[4], data)
147+
return
148+
149+
if mem_info is not None:
150+
self.map_info[info_idx] = (tmp_map_info[0], tmp_map_info[1], tmp_map_info[2], mem_info, tmp_map_info[4], tmp_map_info[5])
151+
152+
def save(self):
153+
"""Save entire memory content.
154+
"""
155+
156+
mem_dict = {
157+
"ram" : [],
158+
"mmio" : []
159+
}
160+
161+
for lbound, ubound, perm, label, is_mmio, data in self.map_info:
162+
if is_mmio:
163+
mem_dict['mmio'].append((lbound, ubound, perm, label, *self.mmio_cbs[(lbound, ubound)]))
164+
else:
165+
data = self.read(lbound, ubound - lbound) # read instead of using data from map_info to avoid error
166+
mem_dict['ram'].append((lbound, ubound, perm, label, data))
167+
168+
return mem_dict
169+
170+
def restore(self, mem_dict):
171+
"""Restore saved memory content.
172+
"""
173+
174+
for lbound, ubound, perms, label, data in mem_dict['ram']:
175+
self.ql.log.debug(f'restoring memory range: {lbound:#08x} {ubound:#08x} {label}')
176+
177+
size = ubound - lbound
178+
if self.is_available(lbound, size):
179+
self.ql.log.debug(f'mapping {lbound:#08x} {ubound:#08x}, mapsize = {size:#x}')
180+
self.map(lbound, size, perms, label, data)
181+
182+
self.ql.log.debug(f'writing {len(data):#x} bytes at {lbound:#08x}')
183+
self.write(lbound, bytes(data))
184+
185+
for lbound, ubound, perms, label, read_cb, write_cb in mem_dict['mmio']:
186+
self.ql.log.debug(f"restoring mmio range: {lbound:#08x} {ubound:#08x} {label}")
187+
188+
#TODO: Handle overlapped MMIO?
189+
self.map_mmio(lbound, ubound - lbound, read_cb, write_cb, info=label)

qiling/extensions/r2/r2.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from .deflat import R2Deflator
1818

1919
if TYPE_CHECKING:
20-
from qiling.core import Qiling
20+
from qiling.extensions.r2 import R2Qiling
2121

2222
def perm2uc(permstr: str) -> int:
2323
'''convert "-rwx" to unicorn const'''
@@ -208,7 +208,7 @@ def end(self):
208208

209209

210210
class R2:
211-
def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0):
211+
def __init__(self, ql: 'R2Qiling', baseaddr=(1 << 64) - 1, loadaddr=0):
212212
super().__init__()
213213
self.ql = ql
214214
# r2 -B [baddr] set base address for PIE binaries
@@ -242,7 +242,7 @@ def _rbuf_map(self, cbuf: ctypes.Array, perm: int = UC_PROT_ALL, addr: int = 0,
242242
desc = libr.r_io_open_buffer(self._r2i, rbuf, UC_PROT_ALL, 0) # last arg `mode` is always 0 in r2 code
243243
libr.r_io.r_io_map_add(self._r2i, desc.contents.fd, desc.contents.perm, delta, addr, len(cbuf))
244244

245-
def _setup_mem(self, ql: 'Qiling'):
245+
def _setup_mem(self, ql: 'R2Qiling'):
246246
if not hasattr(ql, '_mem'):
247247
return
248248
for start, _end, perms, _label, _mmio, _buf in ql.mem.map_info:
@@ -417,11 +417,11 @@ def set_backtrace(self, target: Union[int, str]):
417417
'''Set backtrace at target address before executing'''
418418
if isinstance(target, str):
419419
target = self.where(target)
420-
def bt_hook(__ql: "Qiling", *args):
420+
def bt_hook(__ql: 'R2Qiling', *args):
421421
print(self._backtrace_fuzzy())
422422
self.ql.hook_address(bt_hook, target)
423423

424-
def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int:
424+
def disassembler(self, ql: 'R2Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int:
425425
'''A human-friendly monkey patch of QlArchUtils.disassembler powered by r2, can be used for hook_code
426426
:param ql: Qiling instance
427427
:param addr: start address for disassembly

0 commit comments

Comments
 (0)