old htb folders
This commit is contained in:
2023-08-29 21:53:22 +02:00
parent 62ab804867
commit 82b0759f1e
21891 changed files with 6277643 additions and 0 deletions

View File

@@ -0,0 +1,73 @@
__version__ = "9.2.35"
import logging
from typing import Set
from .block import Block
from . import statement as Stmt
from . import expression as Expr
from .statement import Assignment
from .expression import Expression, Const, Tmp, Register, UnaryOp, BinaryOp
from .converter_common import Converter
from .manager import Manager
log = logging.getLogger(__name__)
available_converters: Set[str] = set()
try:
from .converter_vex import VEXIRSBConverter
import pyvex
available_converters.add("vex")
except ImportError as e:
log.debug("Could not import VEXIRSBConverter")
log.debug(e)
try:
from .converter_pcode import PCodeIRSBConverter
from angr.engines import pcode
available_converters.add("pcode")
except ImportError as e:
log.debug("Could not import PCodeIRSBConverter")
log.debug(e)
class IRSBConverter(Converter):
@staticmethod
def convert(irsb, manager): # pylint:disable=arguments-differ
"""
Convert the given IRSB to an AIL block
:param irsb: The IRSB to convert
:param manager: The manager to use
:return: Returns the converted block
"""
if "pcode" in available_converters and isinstance(irsb, pcode.lifter.IRSB):
return PCodeIRSBConverter.convert(irsb, manager)
elif "vex" in available_converters and isinstance(irsb, pyvex.IRSB):
return VEXIRSBConverter.convert(irsb, manager)
else:
raise ValueError("No converter available for %s" % type(irsb))
__all__ = [
"available_converters",
"Block",
"Stmt",
"Expr",
"Assignment",
"Expression",
"Const",
"Tmp",
"Register",
"UnaryOp",
"BinaryOp",
"Manager",
"IRSBConverter",
*(["PCodeIRSBConverter"] if "pcode" in available_converters else []),
*(["VEXIRSBConverter"] if "vex" in available_converters else []),
]

View File

@@ -0,0 +1,70 @@
from typing import List, TYPE_CHECKING
if TYPE_CHECKING:
from .statement import Statement
class Block:
"""
Describes an AIL block.
"""
__slots__ = (
"addr",
"original_size",
"statements",
"idx",
)
def __init__(self, addr, original_size, statements=None, idx=None):
self.addr = addr
self.original_size = original_size
self.statements: List["Statement"] = [] if statements is None else statements
self.idx = idx
def copy(self, statements=None):
return Block(
addr=self.addr,
original_size=self.original_size,
statements=self.statements[::] if statements is None else statements,
idx=self.idx,
)
def __repr__(self):
if self.idx is None:
return "<AILBlock %#x of %d statements>" % (self.addr, len(self.statements))
else:
return "<AILBlock %#x.%d of %d statements>" % (self.addr, self.idx, len(self.statements))
def __str__(self):
if self.idx is None:
block_str = "## Block %x\n" % self.addr
else:
block_str = "## Block %x.%d\n" % (self.addr, self.idx)
stmts_str = "\n".join(
[
("%02d | %s | " % (i, hex(getattr(stmt, "ins_addr", 0)))) + str(stmt)
for i, stmt in enumerate(self.statements)
]
)
block_str += stmts_str
return block_str
def __eq__(self, other):
return (
type(other) is Block
and self.addr == other.addr
and self.statements == other.statements
and self.idx == other.idx
)
def likes(self, other):
return (
type(other) is Block
and len(self.statements) == len(other.statements)
and all(s1.likes(s2) for s1, s2 in zip(self.statements, other.statements))
)
def __hash__(self):
# Changing statements does not change the hash of a block, which allows in-place statement editing
return hash((Block, self.addr, self.idx))

View File

@@ -0,0 +1,8 @@
class SkipConversionNotice(Exception):
pass
class Converter:
@staticmethod
def convert(thing):
raise NotImplementedError()

View File

@@ -0,0 +1,505 @@
import logging
from angr.utils.constants import DEFAULT_STATEMENT
from angr.engines.pcode.lifter import IRSB
from pypcode import OpCode, Varnode
from .block import Block
from .statement import Statement, Assignment, Store, Jump, ConditionalJump, Return, Call
from .expression import Expression, DirtyExpression, Const, Register, Tmp, UnaryOp, BinaryOp, Load, Convert
# FIXME: Convert, ITE
from .manager import Manager
from .converter_common import Converter
log = logging.getLogger(name=__name__)
# FIXME: Not all ops are mapped to AIL expressions!
opcode_to_generic_name = {
# OpCode.MULTIEQUAL : '',
# OpCode.INDIRECT : '',
# OpCode.PIECE : '',
# OpCode.SUBPIECE : '',
OpCode.INT_EQUAL: "CmpEQ",
OpCode.INT_NOTEQUAL: "CmpNE",
OpCode.INT_SLESS: "CmpLTs",
OpCode.INT_SLESSEQUAL: "CmpLEs",
OpCode.INT_LESS: "CmpLT",
OpCode.INT_LESSEQUAL: "CmpLE",
# OpCode.INT_ZEXT : '',
# OpCode.INT_SEXT : '',
OpCode.INT_ADD: "Add",
OpCode.INT_SUB: "Sub",
# OpCode.INT_CARRY : '',
# OpCode.INT_SCARRY : '',
# OpCode.INT_SBORROW : '',
# OpCode.INT_2COMP : '',
# OpCode.INT_NEGATE : '',
OpCode.INT_XOR: "Xor",
OpCode.INT_AND: "And",
OpCode.INT_OR: "Or",
OpCode.INT_LEFT: "Shl",
OpCode.INT_RIGHT: "Shr",
OpCode.INT_SRIGHT: "Sar",
OpCode.INT_MULT: "Mul",
OpCode.INT_DIV: "Div",
# OpCode.INT_SDIV : '',
# OpCode.INT_REM : '',
# OpCode.INT_SREM : '',
OpCode.BOOL_NEGATE: "Not",
# OpCode.BOOL_XOR : '',
OpCode.BOOL_AND: "LogicalAnd",
OpCode.BOOL_OR: "LogicalOr",
# OpCode.CAST : '',
# OpCode.PTRADD : '',
# OpCode.PTRSUB : '',
# OpCode.FLOAT_EQUAL : '',
# OpCode.FLOAT_NOTEQUAL : '',
# OpCode.FLOAT_LESS : '',
# OpCode.FLOAT_LESSEQUAL : '',
# OpCode.FLOAT_NAN : '',
# OpCode.FLOAT_ADD : '',
# OpCode.FLOAT_DIV : '',
# OpCode.FLOAT_MULT : '',
# OpCode.FLOAT_SUB : '',
# OpCode.FLOAT_NEG : '',
# OpCode.FLOAT_ABS : '',
# OpCode.FLOAT_SQRT : '',
# OpCode.FLOAT_INT2FLOAT : '',
# OpCode.FLOAT_FLOAT2FLOAT : '',
# OpCode.FLOAT_TRUNC : '',
# OpCode.FLOAT_CEIL : '',
# OpCode.FLOAT_FLOOR : '',
# OpCode.FLOAT_ROUND : '',
# OpCode.SEGMENTOP : '',
# OpCode.CPOOLREF : '',
# OpCode.NEW : '',
# OpCode.INSERT : '',
# OpCode.EXTRACT : '',
# OpCode.POPCOUNT : '',
}
class PCodeIRSBConverter(Converter):
"""
Converts a p-code IRSB to an AIL block
"""
@staticmethod
def convert(irsb: IRSB, manager: Manager): # pylint:disable=arguments-differ
"""
Convert the given IRSB to an AIL block
:param irsb: IRSB to convert
:param manager: Manager to use
:return: Converted block
"""
return PCodeIRSBConverter(irsb, manager)._convert()
def __init__(self, irsb: IRSB, manager: Manager):
self._irsb = irsb
self._manager = manager
self._statements = []
self._current_ins = None
self._current_op = None
self._current_behavior = None
self._statement_idx = 0
# Remap all uniques s.t. they are write-once with values starting from 0
self._unique_tracker = {}
self._unique_counter = 0
self._special_op_handlers = {
OpCode.COPY: self._convert_copy,
OpCode.INT_ZEXT: self._convert_zext,
OpCode.LOAD: self._convert_load,
OpCode.STORE: self._convert_store,
OpCode.BRANCH: self._convert_branch,
OpCode.CBRANCH: self._convert_cbranch,
OpCode.BRANCHIND: self._convert_branchind,
OpCode.CALL: self._convert_call,
OpCode.CALLIND: self._convert_callind,
OpCode.CALLOTHER: self._convert_callother,
OpCode.RETURN: self._convert_ret,
OpCode.MULTIEQUAL: self._convert_multiequal,
OpCode.INDIRECT: self._convert_indirect,
OpCode.SEGMENTOP: self._convert_segment_op,
OpCode.CPOOLREF: self._convert_cpool_ref,
OpCode.NEW: self._convert_new,
}
manager.tyenv = None
manager.block_addr = irsb.addr
manager.vex_stmt_idx = DEFAULT_STATEMENT # Reset after loop. Necessary?
def _convert(self) -> Block:
"""
Convert the given IRSB to an AIL Block
"""
self._statement_idx = 0
for ins in self._irsb._instructions:
self._current_ins = ins
self._manager.ins_addr = ins.address.offset
for op in self._current_ins.ops:
self._current_op = op
self._current_behavior = self._irsb.behaviors.get_behavior_for_opcode(self._current_op.opcode)
self._convert_current_op()
self._statement_idx += 1
return Block(self._irsb.addr, self._irsb.size, statements=self._statements)
def _convert_current_op(self) -> None:
"""
Convert the current op to corresponding AIL statement
"""
assert self._current_behavior is not None
is_special = self._current_behavior.opcode in self._special_op_handlers
if is_special:
self._special_op_handlers[self._current_behavior.opcode]()
elif self._current_behavior.is_unary:
self._convert_unary()
else:
self._convert_binary()
def _convert_unary(self) -> None:
"""
Convert the current unary op to corresponding AIL statement
"""
opcode = self._current_op.opcode
op = opcode_to_generic_name.get(opcode, None)
in1 = self._get_value(self._current_op.inputs[0])
if op is None:
log.warning("p-code: Unsupported opcode of type %s", opcode.name)
out = DirtyExpression(self._manager.next_atom(), opcode.name, bits=self._current_op.output.size * 8)
else:
out = UnaryOp(self._manager.next_atom(), op, in1, ins_addr=self._manager.ins_addr)
stmt = self._set_value(self._current_op.output, out)
self._statements.append(stmt)
def _convert_binary(self) -> None:
"""
Convert the current binary op to corresponding AIL statement
"""
opcode = self._current_op.opcode
op = opcode_to_generic_name.get(opcode, None)
in1 = self._get_value(self._current_op.inputs[0])
in2 = self._get_value(self._current_op.inputs[1])
signed = op in {"CmpLEs", "CmpGTs"}
if op is None:
log.warning("p-code: Unsupported opcode of type %s.", opcode.name)
out = DirtyExpression(self._manager.next_atom(), opcode.name, bits=self._current_op.output.size * 8)
else:
out = BinaryOp(self._manager.next_atom(), op, [in1, in2], signed, ins_addr=self._manager.ins_addr)
# Zero-extend 1-bit results
zextend_ops = [
OpCode.INT_EQUAL,
OpCode.INT_NOTEQUAL,
OpCode.INT_SLESS,
OpCode.INT_SLESSEQUAL,
OpCode.INT_LESS,
OpCode.INT_LESSEQUAL,
]
if opcode in zextend_ops:
out = Convert(self._manager.next_atom(), 1, self._current_op.output.size * 8, False, out)
stmt = self._set_value(self._current_op.output, out)
self._statements.append(stmt)
def _map_register_name(self, varnode: Varnode) -> int:
"""
Map SLEIGH register offset to ArchInfo register offset based on name.
:param varnode: The varnode to translate
:return: The register file offset
"""
# FIXME: Will need performance optimization
# FIXME: Should not get trans object this way. Moreover, should have a
# faster mapping method than going through trans
reg_name = varnode.get_register_name()
try:
reg_offset = self._manager.arch.get_register_offset(reg_name.lower())
log.debug("Mapped register '%s' to offset %x", reg_name, reg_offset)
except ValueError:
reg_offset = varnode.offset + 0x100000
log.warning("Could not map register '%s' from archinfo. Mapping to %x", reg_name, reg_offset)
return reg_offset
def _remap_temp(self, offset: int, is_write: bool) -> int:
"""
Remap any unique space addresses such that they are written only once
:param offset: The unique space address
:param is_write: Whether the access is a write or a read
:return: The remapped temporary register index
"""
if is_write:
self._unique_tracker[offset] = self._unique_counter
self._unique_counter += 1
else:
assert offset in self._unique_tracker
return self._unique_tracker[offset]
def _convert_varnode(self, varnode: Varnode, is_write: bool) -> Expression:
"""
Convert a varnode to a corresponding AIL expression
:param varnode: The varnode to remap
:param is_write: Whether the varnode is being read or written to
:return: The corresponding AIL expression
"""
space_name = varnode.space.name
size = varnode.size * 8
if space_name == "const":
return Const(self._manager.next_atom(), None, varnode.offset, size)
elif space_name == "register":
offset = self._map_register_name(varnode)
return Register(self._manager.next_atom(), None, offset, size, reg_name=varnode.get_register_name())
elif space_name == "unique":
offset = self._remap_temp(varnode.offset, is_write)
return Tmp(self._manager.next_atom(), None, offset, size)
elif space_name in ["ram", "mem"]:
assert not is_write
addr = Const(self._manager.next_atom(), None, varnode.offset, self._manager.arch.bits)
# Note: Load takes bytes, not bits, for size
return Load(
self._manager.next_atom(),
addr,
varnode.size,
self._manager.arch.memory_endness,
ins_addr=self._manager.ins_addr,
)
else:
raise NotImplementedError()
def _set_value(self, varnode: Varnode, value: Expression) -> Statement:
"""
Create the appropriate assignment statement to store to a varnode
This method stores to the appropriate register, or unique space,
depending on the space indicated by the varnode.
:param varnode: Varnode to store into
:param value: Value to store
:return: Corresponding AIL statement
"""
space_name = varnode.space.name
if space_name in ["register", "unique"]:
return Assignment(
self._statement_idx, self._convert_varnode(varnode, True), value, ins_addr=self._manager.ins_addr
)
elif space_name in ["ram", "mem"]:
addr = Const(self._manager.next_atom(), None, varnode.offset, self._manager.arch.bits)
return Store(
self._statement_idx,
addr,
value,
varnode.size,
self._manager.arch.memory_endness,
ins_addr=self._manager.ins_addr,
)
else:
raise NotImplementedError()
def _get_value(self, varnode: Varnode) -> Expression:
"""
Create the appropriate expression to load from a varnode
This method loads from the appropriate const, register, unique, or RAM
space, depending on the space indicated by the varnode.
:param varnode: Varnode to load from.
:return: Value loaded
"""
return self._convert_varnode(varnode, False)
def _convert_copy(self) -> None:
"""
Convert copy operation
"""
out = self._current_op.output
inp = self._get_value(self._current_op.inputs[0])
stmt = self._set_value(out, inp)
self._statements.append(stmt)
def _convert_zext(self) -> None:
"""
Convert zext operation
"""
out = self._current_op.output
inp = Convert(
self._manager.next_atom(),
self._current_op.inputs[0].size * 8,
out.size * 8,
False,
self._get_value(self._current_op.inputs[0]),
)
stmt = self._set_value(out, inp)
self._statements.append(stmt)
def _convert_negate(self) -> None:
"""
Convert bool negate operation
"""
out = self._current_op.output
inp = self._get_value(self._current_op.inputs[0])
cval = Const(self._manager.next_atom(), None, 0, self._current_op.inputs[0].size * 8)
expr = BinaryOp(self._manager.next_atom(), "CmpEQ", [inp, cval], signed=False, ins_addr=self._manager.ins_addr)
stmt = self._set_value(out, expr)
self._statements.append(stmt)
def _convert_load(self) -> None:
"""
Convert a p-code load operation
"""
spc = self._current_op.inputs[0].get_space_from_const()
assert spc.name in ["ram", "mem"]
off = self._get_value(self._current_op.inputs[1])
out = self._current_op.output
res = Load(
self._manager.next_atom(),
off,
self._current_op.output.size,
self._manager.arch.memory_endness,
ins_addr=self._manager.ins_addr,
)
stmt = self._set_value(out, res)
self._statements.append(stmt)
def _convert_store(self) -> None:
"""
Convert a p-code store operation
"""
spc = self._current_op.inputs[0].get_space_from_const()
assert spc.name in ["ram", "mem"]
off = self._get_value(self._current_op.inputs[1])
data = self._get_value(self._current_op.inputs[2])
log.debug("Storing %s at offset %s", data, off)
# self.state.memory.store(off, data, endness=self.project.arch.memory_endness)
stmt = Store(
self._statement_idx,
off,
data,
self._current_op.inputs[2].size,
self._manager.arch.memory_endness,
ins_addr=self._manager.ins_addr,
)
self._statements.append(stmt)
def _convert_branch(self) -> None:
"""
Convert a p-code branch operation
"""
dest_addr = self._current_op.inputs[0].get_addr()
if dest_addr.is_constant:
raise NotImplementedError("p-code relative branch not supported yet")
dest = Const(self._manager.next_atom(), None, dest_addr.offset, self._manager.arch.bits)
stmt = Jump(self._statement_idx, dest, ins_addr=self._manager.ins_addr)
self._statements.append(stmt)
def _convert_cbranch(self) -> None:
"""
Convert a p-code conditional branch operation
"""
cond = self._get_value(self._current_op.inputs[1])
dest_addr = self._current_op.inputs[0].get_addr()
if dest_addr.is_constant:
raise NotImplementedError("p-code relative branch not supported yet")
dest_addr = dest_addr.offset
cval = Const(self._manager.next_atom(), None, 0, cond.bits)
condition = BinaryOp(self._manager.next_atom(), "CmpNE", [cond, cval], signed=False)
dest = Const(self._manager.next_atom(), None, dest_addr, self._manager.arch.bits)
fallthru = Const(
self._manager.next_atom(), None, self._manager.ins_addr + self._current_ins.length, self._manager.arch.bits
)
stmt = ConditionalJump(self._statement_idx, condition, dest, fallthru, ins_addr=self._manager.ins_addr)
self._statements.append(stmt)
def _convert_ret(self) -> None:
"""
Convert a p-code return operation
"""
dest = Const(self._manager.next_atom(), None, self._irsb.next, self._manager.arch.bits)
stmt = Return(
self._statement_idx,
dest,
[],
ins_addr=self._manager.ins_addr,
vex_block_addr=self._manager.block_addr,
vex_stmt_idx=DEFAULT_STATEMENT,
)
self._statements.append(stmt)
def _convert_branchind(self) -> None:
"""
Convert a p-code indirect branch operation
"""
dest = self._get_value(self._current_op.inputs[0])
stmt = Jump(self._statement_idx, dest, ins_addr=self._manager.ins_addr)
self._statements.append(stmt)
def _convert_call(self) -> None:
"""
Convert a p-code call operation
"""
ret_reg_offset = self._manager.arch.ret_offset
if ret_reg_offset is not None:
ret_expr = Register(None, None, ret_reg_offset, self._manager.arch.bits) # ???
else:
ret_expr = None
dest = Const(self._manager.next_atom(), None, self._irsb.next, self._manager.arch.bits)
stmt = Call(
self._manager.next_atom(),
dest,
ret_expr=ret_expr,
ins_addr=self._manager.ins_addr,
vex_block_addr=self._manager.block_addr,
vex_stmt_idx=DEFAULT_STATEMENT,
)
self._statements.append(stmt)
def _convert_callind(self) -> None:
"""
Convert a p-code indirect call operation
"""
ret_reg_offset = self._manager.arch.ret_offset
ret_expr = Register(None, None, ret_reg_offset, self._manager.arch.bits) # ???
dest = self._get_value(self._current_op.inputs[0])
stmt = Call(
self._manager.next_atom(),
dest,
ret_expr=ret_expr,
ins_addr=self._manager.ins_addr,
vex_block_addr=self._manager.block_addr,
vex_stmt_idx=DEFAULT_STATEMENT,
)
self._statements.append(stmt)
def _convert_callother(self) -> None:
raise NotImplementedError("CALLOTHER emulation not currently supported")
def _convert_multiequal(self) -> None:
raise NotImplementedError("MULTIEQUAL appearing in unheritaged code?")
def _convert_indirect(self) -> None:
raise NotImplementedError("INDIRECT appearing in unheritaged code?")
def _convert_segment_op(self) -> None:
raise NotImplementedError("SEGMENTOP emulation not currently supported")
def _convert_cpool_ref(self) -> None:
raise NotImplementedError("Cannot currently emulate cpool operator")
def _convert_new(self) -> None:
raise NotImplementedError("Cannot currently emulate new operator")

View File

@@ -0,0 +1,716 @@
import logging
import pyvex
from angr.utils.constants import DEFAULT_STATEMENT
from angr.engines.vex.claripy.irop import vexop_to_simop
from .block import Block
from .statement import Assignment, Store, Jump, Call, ConditionalJump, DirtyStatement, Return
from .expression import (
Const,
Register,
Tmp,
DirtyExpression,
UnaryOp,
Convert,
BinaryOp,
Load,
ITE,
Reinterpret,
VEXCCallExpression,
TernaryOp,
)
from .converter_common import SkipConversionNotice, Converter
log = logging.getLogger(name=__name__)
class VEXExprConverter(Converter):
@staticmethod
def simop_from_vexop(vex_op):
return vexop_to_simop(vex_op)
@staticmethod
def generic_name_from_vex_op(vex_op):
return vexop_to_simop(vex_op)._generic_name
@staticmethod
def convert(expr, manager): # pylint:disable=arguments-differ
"""
:param expr:
:return:
"""
func = EXPRESSION_MAPPINGS.get(type(expr))
if func is not None:
return func(expr, manager)
if isinstance(expr, pyvex.const.IRConst):
return VEXExprConverter.const_n(expr, manager)
if isinstance(expr, pyvex.IRExpr.CCall):
operands = tuple(VEXExprConverter.convert(arg, manager) for arg in expr.args)
ccall = VEXCCallExpression(
manager.next_atom(), expr.cee.name, operands, bits=expr.result_size(manager.tyenv)
)
return DirtyExpression(manager.next_atom(), ccall, bits=expr.result_size(manager.tyenv))
log.warning("VEXExprConverter: Unsupported VEX expression of type %s.", type(expr))
return DirtyExpression(manager.next_atom(), expr, bits=expr.result_size(manager.tyenv))
@staticmethod
def convert_list(exprs, manager):
converted = []
for expr in exprs:
converted.append(VEXExprConverter.convert(expr, manager))
return converted
@staticmethod
def register(offset, bits, manager):
reg_size = bits // manager.arch.byte_width
reg_name = manager.arch.translate_register_name(offset, reg_size)
return Register(
manager.next_atom(),
None,
offset,
bits,
reg_name=reg_name,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def tmp(tmp_idx, bits, manager):
return Tmp(
manager.next_atom(),
None,
tmp_idx,
bits,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def RdTmp(expr, manager):
return VEXExprConverter.tmp(expr.tmp, expr.result_size(manager.tyenv), manager)
@staticmethod
def Get(expr, manager):
return VEXExprConverter.register(expr.offset, expr.result_size(manager.tyenv), manager)
@staticmethod
def Load(expr, manager):
return Load(
manager.next_atom(),
VEXExprConverter.convert(expr.addr, manager),
expr.result_size(manager.tyenv) // 8,
expr.end,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def Unop(expr, manager):
op_name = VEXExprConverter.generic_name_from_vex_op(expr.op)
if op_name == "Reinterp":
simop = vexop_to_simop(expr.op)
return Reinterpret(
manager.next_atom(),
simop._from_size,
simop._from_type,
simop._to_size,
simop._to_type,
VEXExprConverter.convert(expr.args[0], manager),
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
elif op_name is None:
# is it a conversion?
simop = vexop_to_simop(expr.op)
if simop._conversion:
if simop._from_side == "HI":
# returns the high-half of the argument
inner = VEXExprConverter.convert(expr.args[0], manager)
shifted = BinaryOp(
manager.next_atom(), "Shr", [inner, Const(manager.next_atom(), None, simop._to_size, 8)], False
)
return Convert(
manager.next_atom(),
simop._from_size,
simop._to_size,
simop.is_signed,
shifted,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
return Convert(
manager.next_atom(),
simop._from_size,
simop._to_size,
simop.is_signed,
VEXExprConverter.convert(expr.args[0], manager),
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
raise NotImplementedError("Unsupported operation")
return UnaryOp(
manager.next_atom(),
op_name,
VEXExprConverter.convert(expr.args[0], manager),
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def Binop(expr, manager):
op = VEXExprConverter.simop_from_vexop(expr.op)
op_name = op._generic_name
operands = VEXExprConverter.convert_list(expr.args, manager)
if op_name == "Add" and type(operands[1]) is Const and operands[1].sign_bit == 1:
# convert it to a sub
op_name = "Sub"
op1_val, op1_bits = operands[1].value, operands[1].bits
operands[1] = Const(operands[1].idx, None, (1 << op1_bits) - op1_val, op1_bits)
signed = False
if op_name in {"CmpLE", "CmpLT", "CmpGE", "CmpGT", "Div", "DivMod", "Mul", "Mull"}:
if op.is_signed:
signed = True
if op_name == "Cmp" and op._float:
# Rename Cmp to CmpF
op_name = "CmpF"
if op_name is None and op._conversion:
# conversion
# TODO: Finish this
if op._from_type == "I" and op._to_type == "F":
# integer to floating point
rm = operands[0]
operand = operands[1]
return Convert(
manager.next_atom(),
op._from_size,
op._to_size,
op.is_signed,
operand,
from_type=Convert.TYPE_INT,
to_type=Convert.TYPE_FP,
rounding_mode=rm,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
elif op._from_side == "HL":
# Concatenating the two arguments and form a new value
op_name = "Concat"
elif op._from_type == "F" and op._to_type == "F":
# floating point to floating point
rm = operands[0]
operand = operands[1]
return Convert(
manager.next_atom(),
op._from_size,
op._to_size,
op.is_signed,
operand,
from_type=Convert.TYPE_FP,
to_type=Convert.TYPE_FP,
rounding_mode=rm,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
elif op._from_type == "F" and op._to_type == "I":
# floating point to integer
# floating point to floating point
rm = operands[0]
operand = operands[1]
return Convert(
manager.next_atom(),
op._from_size,
op._to_size,
op.is_signed,
operand,
from_type=Convert.TYPE_FP,
to_type=Convert.TYPE_INT,
rounding_mode=rm,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
bits = op._output_size_bits
return BinaryOp(
manager.next_atom(),
op_name,
operands,
signed,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
bits=bits,
)
@staticmethod
def Triop(expr, manager):
op = VEXExprConverter.simop_from_vexop(expr.op)
op_name = op._generic_name
operands = VEXExprConverter.convert_list(expr.args, manager)
bits = op._output_size_bits
if op._float:
# this is a floating-point operation where the first argument is the rounding mode. in fact, we have a
# BinaryOp here.
rm = operands[0]
return BinaryOp(
manager.next_atom(),
op_name,
operands[1:],
True, # all floating-point operations are signed
floating_point=True,
rounding_mode=rm,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
bits=bits,
)
return TernaryOp(
manager.next_atom(),
op_name,
operands,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
bits=bits,
)
@staticmethod
def Const(expr, manager):
# pyvex.IRExpr.Const
return Const(
manager.next_atom(),
None,
expr.con.value,
expr.result_size(manager.tyenv),
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def const_n(expr, manager):
# pyvex.const.xxx
return Const(
manager.next_atom(),
None,
expr.value,
expr.size,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def ITE(expr, manager):
cond = VEXExprConverter.convert(expr.cond, manager)
iffalse = VEXExprConverter.convert(expr.iffalse, manager)
iftrue = VEXExprConverter.convert(expr.iftrue, manager)
return ITE(
manager.next_atom(),
cond,
iffalse,
iftrue,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
EXPRESSION_MAPPINGS = {
pyvex.IRExpr.RdTmp: VEXExprConverter.RdTmp,
pyvex.IRExpr.Get: VEXExprConverter.Get,
pyvex.IRExpr.Unop: VEXExprConverter.Unop,
pyvex.IRExpr.Binop: VEXExprConverter.Binop,
pyvex.IRExpr.Triop: VEXExprConverter.Triop,
pyvex.IRExpr.Const: VEXExprConverter.Const,
pyvex.const.U32: VEXExprConverter.const_n,
pyvex.const.U64: VEXExprConverter.const_n,
pyvex.IRExpr.Load: VEXExprConverter.Load,
pyvex.IRExpr.ITE: VEXExprConverter.ITE,
}
class VEXStmtConverter(Converter):
@staticmethod
def convert(idx, stmt, manager): # pylint:disable=arguments-differ
"""
:param idx:
:param stmt:
:param manager:
:return:
"""
try:
func = STATEMENT_MAPPINGS[type(stmt)]
except KeyError:
return DirtyStatement(idx, stmt, ins_addr=manager.ins_addr)
return func(idx, stmt, manager)
@staticmethod
def WrTmp(idx, stmt, manager):
var = VEXExprConverter.tmp(stmt.tmp, stmt.data.result_size(manager.tyenv), manager)
reg = VEXExprConverter.convert(stmt.data, manager)
return Assignment(
idx,
var,
reg,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def Put(idx, stmt, manager):
data = VEXExprConverter.convert(stmt.data, manager)
reg = VEXExprConverter.register(stmt.offset, data.bits, manager)
return Assignment(
idx,
reg,
data,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def Store(idx, stmt, manager):
return Store(
idx,
VEXExprConverter.convert(stmt.addr, manager),
VEXExprConverter.convert(stmt.data, manager),
stmt.data.result_size(manager.tyenv) // 8,
stmt.endness,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def Exit(idx, stmt, manager):
if stmt.jumpkind in {
"Ijk_EmWarn",
"Ijk_NoDecode",
"Ijk_MapFail",
"Ijk_NoRedir",
"Ijk_SigTRAP",
"Ijk_SigSEGV",
"Ijk_ClientReq",
"Ijk_SigFPE_IntDiv",
}:
raise SkipConversionNotice()
return ConditionalJump(
idx,
VEXExprConverter.convert(stmt.guard, manager),
VEXExprConverter.convert(stmt.dst, manager),
None, # it will be filled in right afterwards
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def LoadG(idx, stmt: pyvex.IRStmt.LoadG, manager):
sizes = {
"ILGop_Ident32": (32, 32, False),
"ILGop_Ident64": (64, 64, False),
"ILGop_IdentV128": (128, 128, False),
"ILGop_8Uto32": (8, 32, False),
"ILGop_8Sto32": (8, 32, True),
"ILGop_16Uto32": (16, 32, False),
"ILGop_16Sto32": (16, 32, True),
}
dst = VEXExprConverter.tmp(stmt.dst, manager.tyenv.sizeof(stmt.dst) // 8, manager)
load_bits, convert_bits, signed = sizes[stmt.cvt]
src = Load(
manager.next_atom(),
VEXExprConverter.convert(stmt.addr, manager),
load_bits // 8,
stmt.end,
guard=VEXExprConverter.convert(stmt.guard, manager),
alt=VEXExprConverter.convert(stmt.alt, manager),
)
if convert_bits != load_bits:
src = Convert(manager.next_atom(), load_bits, convert_bits, signed, src)
return Assignment(
idx,
dst,
src,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def StoreG(idx, stmt: pyvex.IRStmt.StoreG, manager):
return Store(
idx,
VEXExprConverter.convert(stmt.addr, manager),
VEXExprConverter.convert(stmt.data, manager),
stmt.data.result_size(manager.tyenv) // 8,
stmt.endness,
guard=VEXExprConverter.convert(stmt.guard, manager),
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
@staticmethod
def CAS(idx, stmt: pyvex.IRStmt.CAS, manager):
# compare-and-swap is translated into multiple statements. the atomic property is lost.
stmts = []
double = stmt.dataHi is not None
ty = stmt.expdLo.result_type(manager.tyenv)
if double:
ty, narrow_to_bits, widen_from_bits, widen_to_bits = {
"Ity_I8": ("Ity_I16", 8, 8, 16),
"Ity_I16": ("Ity_I32", 16, 16, 32),
"Ity_I32": ("Ity_I64", 32, 32, 64),
"Ity_I64": ("Ity_V128", 64, 64, 128),
}[ty]
dataHi = VEXExprConverter.convert(stmt.dataHi, manager)
dataLo = VEXExprConverter.convert(stmt.dataLo, manager)
data = BinaryOp(idx, "Concat", (dataHi, dataLo), False)
expdHi = Convert(idx, widen_from_bits, widen_to_bits, False, VEXExprConverter.convert(stmt.dataHi, manager))
expdLo = Convert(idx, widen_from_bits, widen_to_bits, False, VEXExprConverter.convert(stmt.dataLo, manager))
expd = BinaryOp(idx, "Concat", (expdHi, expdLo), False)
else:
narrow_to_bits = widen_to_bits = None
data = VEXExprConverter.convert(stmt.dataLo, manager)
expd = VEXExprConverter.convert(stmt.expdLo, manager)
size = {
"Ity_I8": 1,
"Ity_I16": 2,
"Ity_I32": 4,
"Ity_I64": 8,
"Ity_V128": 16,
}[ty]
# load value from memory
addr = VEXExprConverter.convert(stmt.addr, manager)
val = Load(
idx,
addr,
size,
stmt.endness,
)
cmp = BinaryOp(idx, "CmpEQ", (val, expd), False)
store = Store(
idx,
addr.copy(),
data,
size,
stmt.endness,
guard=cmp,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
stmts.append(store)
if double:
val_shifted = BinaryOp(idx, "Shr", (val, narrow_to_bits), False)
valHi = Convert(idx, widen_to_bits, narrow_to_bits, False, val_shifted)
valLo = Convert(idx, widen_to_bits, narrow_to_bits, False, val)
wrtmp_0 = Assignment(
idx,
Tmp(idx, None, stmt.oldLo, narrow_to_bits),
valLo,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
wrtmp_1 = Assignment(
idx,
Tmp(idx, None, stmt.oldHi, narrow_to_bits),
valHi,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
stmts.append(wrtmp_0)
stmts.append(wrtmp_1)
else:
wrtmp = Assignment(
idx,
Tmp(idx, None, stmt.oldLo, size),
val,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=manager.vex_stmt_idx,
)
stmts.append(wrtmp)
return stmts
STATEMENT_MAPPINGS = {
pyvex.IRStmt.Put: VEXStmtConverter.Put,
pyvex.IRStmt.WrTmp: VEXStmtConverter.WrTmp,
pyvex.IRStmt.Store: VEXStmtConverter.Store,
pyvex.IRStmt.Exit: VEXStmtConverter.Exit,
pyvex.IRStmt.StoreG: VEXStmtConverter.StoreG,
pyvex.IRStmt.LoadG: VEXStmtConverter.LoadG,
pyvex.IRStmt.CAS: VEXStmtConverter.CAS,
}
class VEXIRSBConverter(Converter):
@staticmethod
def convert(irsb, manager): # pylint:disable=arguments-differ
"""
:param irsb:
:param manager:
:return:
"""
# convert each VEX statement into an AIL statement
statements = []
idx = 0
manager.tyenv = irsb.tyenv
manager.block_addr = irsb.addr
addr = irsb.addr
first_imark = True
conditional_jumps = []
for vex_stmt_idx, stmt in enumerate(irsb.statements):
if type(stmt) is pyvex.IRStmt.IMark:
if first_imark:
# update block address
addr = stmt.addr + stmt.delta
first_imark = False
manager.ins_addr = stmt.addr + stmt.delta
continue
if type(stmt) is pyvex.IRStmt.AbiHint:
# TODO: How can we use AbiHint?
continue
manager.vex_stmt_idx = vex_stmt_idx
try:
converted = VEXStmtConverter.convert(idx, stmt, manager)
if type(converted) is list:
# got multiple statements
statements.extend(converted)
idx += len(converted)
else:
# got one statement
statements.append(converted)
if type(converted) is ConditionalJump:
conditional_jumps.append(converted)
idx += 1
except SkipConversionNotice:
pass
manager.vex_stmt_idx = DEFAULT_STATEMENT
if irsb.jumpkind == "Ijk_Call":
# call
# FIXME: Move ret_expr and fp_ret_expr creation into angr because we cannot reliably determine which
# expressions can be returned from the call without performing further analysis
ret_reg_offset = manager.arch.ret_offset
ret_expr = Register(
manager.next_atom(),
None,
ret_reg_offset,
manager.arch.bits,
reg_name=manager.arch.translate_register_name(ret_reg_offset, size=manager.arch.bits),
)
fp_ret_reg_offset = manager.arch.fp_ret_offset
if fp_ret_reg_offset is not None and fp_ret_reg_offset != ret_expr:
fp_ret_expr = Register(
manager.next_atom(),
None,
fp_ret_reg_offset,
manager.arch.bits,
reg_name=manager.arch.translate_register_name(fp_ret_reg_offset, size=manager.arch.bits),
)
else:
fp_ret_expr = None
statements.append(
Call(
manager.next_atom(),
VEXExprConverter.convert(irsb.next, manager),
ret_expr=ret_expr,
fp_ret_expr=fp_ret_expr,
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=DEFAULT_STATEMENT,
)
)
elif irsb.jumpkind == "Ijk_Boring":
if len(conditional_jumps) == 1:
# fill in the false target
cond_jump = conditional_jumps[0]
cond_jump.false_target = VEXExprConverter.convert(irsb.next, manager)
else:
# jump
statements.append(
Jump(
manager.next_atom(),
VEXExprConverter.convert(irsb.next, manager),
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=DEFAULT_STATEMENT,
)
)
elif irsb.jumpkind == "Ijk_Ret":
# return
statements.append(
Return(
manager.next_atom(),
VEXExprConverter.convert(irsb.next, manager),
[],
ins_addr=manager.ins_addr,
vex_block_addr=manager.block_addr,
vex_stmt_idx=DEFAULT_STATEMENT,
)
)
return Block(addr, irsb.size, statements=statements)

View File

@@ -0,0 +1,33 @@
import itertools
from typing import Optional
class Manager:
def __init__(self, name: Optional[str] = None, arch=None):
self.name = name
self.arch = arch
self.atom_ctr = itertools.count()
self._ins_addr: Optional[int] = None
###
# vex specific
###
self.vex_stmt_idx: Optional[int] = None
self.tyenv = None
self.block_addr = None
def next_atom(self):
return next(self.atom_ctr)
def reset(self):
self.atom_ctr = itertools.count()
@property
def ins_addr(self) -> Optional[int]:
return self._ins_addr
@ins_addr.setter
def ins_addr(self, v):
self._ins_addr = v

View File

@@ -0,0 +1,663 @@
# pylint:disable=isinstance-second-argument-not-valid-type
from typing import Optional, TYPE_CHECKING
try:
import claripy
except ImportError:
claripy = None
from .utils import stable_hash, is_none_or_likeable
from .tagged_object import TaggedObject
from .expression import Expression
if TYPE_CHECKING:
from angr.calling_conventions import SimCC
class Statement(TaggedObject):
"""
The base class of all AIL statements.
"""
__slots__ = ()
def __repr__(self):
raise NotImplementedError()
def __str__(self):
raise NotImplementedError()
def replace(self, old_expr, new_expr):
raise NotImplementedError()
def eq(self, expr0, expr1): # pylint:disable=no-self-use
if claripy is not None and (isinstance(expr0, claripy.ast.Base) or isinstance(expr1, claripy.ast.Base)):
return expr0 is expr1
return expr0 == expr1
class Assignment(Statement):
"""
Assignment statement: expr_a = expr_b
"""
__slots__ = (
"dst",
"src",
)
def __init__(self, idx, dst, src, **kwargs):
super().__init__(idx, **kwargs)
self.dst = dst
self.src = src
def __eq__(self, other):
return type(other) is Assignment and self.idx == other.idx and self.dst == other.dst and self.src == other.src
def likes(self, other):
return type(other) is Assignment and self.dst.likes(other.dst) and self.src.likes(other.src)
__hash__ = TaggedObject.__hash__
def _hash_core(self):
return stable_hash((Assignment, self.idx, self.dst, self.src))
def __repr__(self):
return f"Assignment ({self.dst}, {self.src})"
def __str__(self):
return f"{str(self.dst)} = {str(self.src)}"
def replace(self, old_expr, new_expr):
if self.dst == old_expr:
r_dst = True
replaced_dst = new_expr
else:
r_dst, replaced_dst = self.dst.replace(old_expr, new_expr)
if self.src == old_expr:
r_src = True
replaced_src = new_expr
else:
r_src, replaced_src = self.src.replace(old_expr, new_expr)
if r_dst or r_src:
return True, Assignment(self.idx, replaced_dst, replaced_src, **self.tags)
else:
return False, self
def copy(self) -> "Assignment":
return Assignment(self.idx, self.dst, self.src, **self.tags)
class Store(Statement):
__slots__ = (
"addr",
"size",
"data",
"endness",
"variable",
"offset",
"guard",
)
def __init__(self, idx, addr, data, size, endness, guard=None, variable=None, offset=None, **kwargs):
super().__init__(idx, **kwargs)
self.addr = addr
self.data = data
self.size = size
self.endness = endness
self.variable = variable
self.guard = guard
self.offset = offset # variable_offset
def __eq__(self, other):
return (
type(other) is Store
and self.idx == other.idx
and self.eq(self.addr, other.addr)
and self.eq(self.data, other.data)
and self.size == other.size
and self.guard == other.guard
and self.endness == other.endness
)
def likes(self, other):
return (
type(other) is Store
and self.addr.likes(other.addr)
and self.data.likes(other.data)
and self.size == other.size
and self.guard == other.guard
and self.endness == other.endness
)
__hash__ = TaggedObject.__hash__
def _hash_core(self):
return stable_hash((Store, self.idx, self.addr, self.data, self.size, self.endness, self.guard))
def __repr__(self):
return "Store (%s, %s[%d])%s" % (
self.addr,
str(self.data),
self.size,
"" if self.guard is None else "[%s]" % self.guard,
)
def __str__(self):
if self.variable is None:
return "STORE(addr={}, data={}, size={}, endness={}, guard={})".format(
self.addr, str(self.data), self.size, self.endness, self.guard
)
else:
return "%s =%s %s<%d>%s" % (
self.variable.name,
"L" if self.endness == "Iend_LE" else "B",
str(self.data),
self.size,
"" if self.guard is None else "[%s]" % self.guard,
)
def replace(self, old_expr, new_expr):
if self.addr.likes(old_expr):
r_addr = True
replaced_addr = new_expr
else:
r_addr, replaced_addr = self.addr.replace(old_expr, new_expr)
if isinstance(self.data, Expression):
if self.data.likes(old_expr):
r_data = True
replaced_data = new_expr
else:
r_data, replaced_data = self.data.replace(old_expr, new_expr)
else:
r_data, replaced_data = False, self.data
if self.guard is not None:
r_guard, replaced_guard = self.guard.replace(old_expr, new_expr)
else:
r_guard, replaced_guard = False, None
if r_addr or r_data or r_guard:
return True, Store(
self.idx,
replaced_addr,
replaced_data,
self.size,
self.endness,
guard=replaced_guard,
variable=self.variable,
**self.tags,
)
else:
return False, self
def copy(self) -> "Store":
return Store(
self.idx,
self.addr,
self.data,
self.size,
self.endness,
guard=self.guard,
variable=self.variable,
offset=self.offset,
**self.tags,
)
class Jump(Statement):
__slots__ = (
"target",
"target_idx",
)
def __init__(self, idx, target, target_idx: Optional[int] = None, **kwargs):
super().__init__(idx, **kwargs)
self.target = target
self.target_idx = target_idx
def __eq__(self, other):
return type(other) is Jump and self.idx == other.idx and self.target == other.target
def likes(self, other):
return type(other) is Jump and is_none_or_likeable(self.target, other.target)
__hash__ = TaggedObject.__hash__
def _hash_core(self):
return stable_hash((Jump, self.idx, self.target))
def __repr__(self):
if self.target_idx is not None:
return f"Jump ({self.target}.{self.target_idx})"
return "Jump (%s)" % self.target
def __str__(self):
if self.target_idx is not None:
return f"Goto({self.target}.{self.target_idx})"
return "Goto(%s)" % self.target
def replace(self, old_expr, new_expr):
r, replaced_target = self.target.replace(old_expr, new_expr)
if r:
return True, Jump(self.idx, replaced_target, **self.tags)
else:
return False, self
def copy(self):
return Jump(
self.idx,
self.target,
**self.tags,
)
class ConditionalJump(Statement):
__slots__ = (
"condition",
"true_target",
"false_target",
)
def __init__(self, idx, condition, true_target, false_target, **kwargs):
super().__init__(idx, **kwargs)
self.condition = condition
self.true_target = true_target
self.false_target = false_target
def __eq__(self, other):
return (
type(other) is ConditionalJump
and self.idx == other.idx
and self.condition == other.condition
and self.true_target == other.true_target
and self.false_target == other.false_target
)
def likes(self, other):
return (
type(other) is ConditionalJump
and self.condition.likes(other.condition)
and is_none_or_likeable(self.true_target, other.true_target)
and is_none_or_likeable(self.false_target, other.false_target)
)
__hash__ = TaggedObject.__hash__
def _hash_core(self):
return stable_hash((ConditionalJump, self.idx, self.condition, self.true_target, self.false_target))
def __repr__(self):
return "ConditionalJump (condition: {}, true: {}, false: {})".format(
self.condition, self.true_target, self.false_target
)
def __str__(self):
return "if ({}) {{ Goto {} }} else {{ Goto {} }}".format(
self.condition,
self.true_target,
self.false_target,
)
def replace(self, old_expr, new_expr):
if self.condition == old_expr:
r_cond = True
replaced_cond = new_expr
else:
r_cond, replaced_cond = self.condition.replace(old_expr, new_expr)
if self.true_target is not None:
if self.true_target == old_expr:
r_true = True
replaced_true = new_expr
else:
r_true, replaced_true = self.true_target.replace(old_expr, new_expr)
else:
r_true, replaced_true = False, self.true_target
if self.false_target is not None:
if self.false_target == old_expr:
r_false = True
replaced_false = new_expr
else:
r_false, replaced_false = self.false_target.replace(old_expr, new_expr)
else:
r_false, replaced_false = False, self.false_target
r = r_cond or r_true or r_false
if r:
return True, ConditionalJump(self.idx, replaced_cond, replaced_true, replaced_false, **self.tags)
else:
return False, self
def copy(self) -> "ConditionalJump":
return ConditionalJump(self.idx, self.condition, self.true_target, self.false_target, **self.tags)
class Call(Expression, Statement):
"""
Call is both an expression and a statement. The return expression of a call is defined as the ret_expr if and only
if the callee function has one return expression.
"""
__slots__ = (
"target",
"calling_convention",
"prototype",
"args",
"ret_expr",
"fp_ret_expr",
)
def __init__(
self,
idx,
target,
calling_convention: Optional["SimCC"] = None,
prototype=None,
args=None,
ret_expr=None,
fp_ret_expr=None,
**kwargs,
):
super().__init__(idx, target.depth + 1 if isinstance(target, Expression) else 1, **kwargs)
self.target = target
self.calling_convention = calling_convention
self.prototype = prototype
self.args = args
self.ret_expr = ret_expr
self.fp_ret_expr = fp_ret_expr
def likes(self, other):
return (
type(other) is Call
and is_none_or_likeable(self.target, other.target)
and self.calling_convention == other.calling_convention
and self.prototype == other.prototype
and is_none_or_likeable(self.args, other.args, is_list=True)
and is_none_or_likeable(self.ret_expr, other.ret_expr)
and is_none_or_likeable(self.fp_ret_expr, other.fp_ret_expr)
)
__hash__ = TaggedObject.__hash__
def _hash_core(self):
return stable_hash((Call, self.idx, self.target))
def __repr__(self):
return f"Call (target: {self.target}, prototype: {self.prototype}, args: {self.args})"
def __str__(self):
cc = "Unknown CC" if self.calling_convention is None else "%s" % self.calling_convention
if self.args is None:
if self.calling_convention is not None:
s = (
("%s" % cc)
if self.prototype is None
else f"{self.calling_convention}: {self.calling_convention.arg_locs(self.prototype)}"
)
else:
s = ("%s" % cc) if self.prototype is None else repr(self.prototype)
else:
s = (f"{cc}: {self.args}") if self.prototype is None else f"{self.calling_convention}: {self.args}"
if self.ret_expr is None:
ret_s = "no-ret-value"
else:
ret_s = f"{self.ret_expr}"
if self.fp_ret_expr is None:
fp_ret_s = "no-fp-ret-value"
else:
fp_ret_s = f"{self.fp_ret_expr}"
return f"Call({self.target}, {s}, ret: {ret_s}, fp_ret: {fp_ret_s})"
@property
def bits(self):
return self.ret_expr.bits
@property
def size(self):
return self.bits // 8
@property
def verbose_op(self):
return "call"
@property
def op(self):
return "call"
def replace(self, old_expr, new_expr):
if isinstance(self.target, Expression):
r0, replaced_target = self.target.replace(old_expr, new_expr)
else:
r0 = False
replaced_target = self.target
r = r0
new_args = None
if self.args:
new_args = []
for arg in self.args:
if arg == old_expr:
r_arg = True
replaced_arg = new_expr
else:
r_arg, replaced_arg = arg.replace(old_expr, new_expr)
r |= r_arg
new_args.append(replaced_arg)
new_ret_expr = None
if self.ret_expr:
if self.ret_expr == old_expr:
r_ret = True
replaced_ret = new_expr
else:
r_ret, replaced_ret = self.ret_expr.replace(old_expr, new_expr)
r |= r_ret
new_ret_expr = replaced_ret
new_fp_ret_expr = None
if self.fp_ret_expr:
if self.fp_ret_expr == old_expr:
r_ret = True
replaced_fp_ret = new_expr
else:
r_ret, replaced_fp_ret = self.fp_ret_expr.replace(old_expr, new_expr)
r |= r_ret
new_fp_ret_expr = replaced_fp_ret
if r:
return True, Call(
self.idx,
replaced_target,
calling_convention=self.calling_convention,
prototype=self.prototype,
args=new_args,
ret_expr=new_ret_expr,
fp_ret_expr=new_fp_ret_expr,
**self.tags,
)
else:
return False, self
def copy(self):
return Call(
self.idx,
self.target,
calling_convention=self.calling_convention,
prototype=self.prototype,
args=self.args[::] if self.args is not None else None,
ret_expr=self.ret_expr,
fp_ret_expr=self.fp_ret_expr,
**self.tags,
)
class Return(Statement):
__slots__ = (
"target",
"ret_exprs",
)
def __init__(self, idx, target, ret_exprs, **kwargs):
super().__init__(idx, **kwargs)
self.target = target
self.ret_exprs = ret_exprs if isinstance(ret_exprs, list) else list(ret_exprs)
def __eq__(self, other):
return (
type(other) is Return
and self.idx == other.idx
and self.target == other.target
and self.ret_exprs == other.ret_exprs
)
def likes(self, other):
return (
type(other) is Return
and is_none_or_likeable(self.target, other.target)
and is_none_or_likeable(self.ret_exprs, other.ret_exprs, is_list=True)
)
__hash__ = TaggedObject.__hash__
def _hash_core(self):
return stable_hash((Return, self.idx, self.target, tuple(self.ret_exprs)))
def __repr__(self):
return "Return to {!r} ({})".format(self.target, ",".join(repr(x) for x in self.ret_exprs))
def __str__(self):
exprs = ",".join(str(ret_expr) for ret_expr in self.ret_exprs)
if not exprs:
return "return;"
else:
return "return %s;" % exprs
def replace(self, old_expr, new_expr):
new_ret_exprs = []
replaced = False
if self.target is not None:
r, new_target = self.target.replace(old_expr, new_expr)
if r:
replaced = True
else:
new_target = self.target
else:
new_target = None
for expr in self.ret_exprs:
if expr == old_expr:
r_expr = True
replaced_expr = new_expr
else:
r_expr, replaced_expr = expr.replace(old_expr, new_expr)
if r_expr:
replaced = True
new_ret_exprs.append(replaced_expr)
else:
new_ret_exprs.append(old_expr)
if replaced:
return True, Return(
self.idx,
new_target,
new_ret_exprs,
**self.tags,
)
return False, self
def copy(self):
return Return(
self.idx,
self.target,
self.ret_exprs[::],
**self.tags,
)
class DirtyStatement(Statement):
"""
Wrapper around the original statement, which is usually not convertible (temporarily).
"""
__slots__ = ("dirty_stmt",)
def __init__(self, idx, dirty_stmt, **kwargs):
super().__init__(idx, **kwargs)
self.dirty_stmt = dirty_stmt
def _hash_core(self):
return stable_hash((DirtyStatement, self.dirty_stmt))
def __repr__(self):
return "DirtyStatement (%s)" % (type(self.dirty_stmt))
def __str__(self):
return "[D] %s" % (str(self.dirty_stmt))
def copy(self) -> "DirtyStatement":
return DirtyStatement(self.idx, self.dirty_stmt, **self.tags)
class Label(Statement):
"""
A dummy statement that indicates a label with a name.
"""
__slots__ = (
"name",
"ins_addr",
"block_idx",
)
def __init__(self, idx, name: str, ins_addr: int, block_idx: Optional[int] = None, **kwargs):
super().__init__(idx, **kwargs)
self.name = name
self.ins_addr = ins_addr
self.block_idx = block_idx
def likes(self, other: "Label"):
return isinstance(other, Label)
def _hash_core(self):
return stable_hash(
(
Label,
self.name,
self.ins_addr,
self.block_idx,
)
)
def __repr__(self):
return f"Label {self.name}"
def __str__(self):
return f"{self.name}:"
def copy(self) -> "Label":
return Label(self.idx, self.name, self.ins_addr, self.block_idx, **self.tags)

View File

@@ -0,0 +1,61 @@
from typing import Dict
class TaggedObject:
"""
A class that takes arbitrary tags.
"""
__slots__ = (
"idx",
"_tags",
"_hash",
)
def __init__(self, idx, **kwargs):
self._tags = None
self.idx = idx
self._hash = None
if kwargs:
self.initialize_tags(kwargs)
def initialize_tags(self, tags):
self._tags = {}
for k, v in tags.items():
self._tags[k] = v
def __getattr__(self, item):
try:
return self.tags[item]
except KeyError:
return super().__getattribute__(item)
def __new__(cls, *args, **kwargs): # pylint:disable=unused-argument
"""Create a new instance and set `_tags` attribute.
Since TaggedObject override `__getattr__` method and try to access the
`_tags` attribute, infinite recursion could occur if `_tags` not ready
to exists.
This behavior causes an infinite recursion error when copying
`TaggedObject` with `copy.deepcopy`.
Hence, we set `_tags` attribute here to prevent this problem.
"""
self = super().__new__(cls)
self._tags = None
return self
def __hash__(self):
if self._hash is None:
self._hash = self._hash_core()
return self._hash
def _hash_core(self):
raise NotImplementedError()
@property
def tags(self) -> Dict:
if not self._tags:
self._tags = {}
return self._tags

View File

@@ -0,0 +1,110 @@
from typing import Union, Tuple, Optional, TYPE_CHECKING
import struct
try:
import claripy
except ImportError:
claripy = None
try:
import _md5 as md5lib
except ImportError:
import hashlib as md5lib
if TYPE_CHECKING:
from .expression import Expression
get_bits_type_params = Union[int, "Expression"]
if claripy:
get_bits_type_params = Union[int, claripy.ast.Bits, "Expression"]
def get_bits(expr: get_bits_type_params) -> Optional[int]:
# delayed import
from .expression import Expression
if isinstance(expr, Expression):
return expr.bits
elif isinstance(expr, claripy.ast.Bits):
return expr.size()
elif hasattr(expr, "bits"):
return expr.bits
else:
return None
md5_unpacker = struct.Struct("4I")
def stable_hash(t: Tuple) -> int:
cnt = _dump_tuple(t)
hd = md5lib.md5(cnt).digest()
return md5_unpacker.unpack(hd)[0] # 32 bits
def _dump_tuple(t: Tuple) -> bytes:
cnt = b""
for item in t:
if item is not None:
type_ = type(item)
if type_ in _DUMP_BY_TYPE:
cnt += _DUMP_BY_TYPE[type_](item)
else:
# for TaggedObjects, hash(item) is stable
# other types of items may show up, such as pyvex.expr.CCall and Dirty. they will be removed some day.
cnt += struct.pack("<Q", hash(item) & 0xFFFF_FFFF_FFFF_FFFF)
cnt += b"\xf0"
return cnt
def _dump_str(t: str) -> bytes:
return t.encode("ascii")
def _dump_int(t: int) -> bytes:
prefix = b"" if t >= 0 else b"-"
t = abs(t)
if t <= 0xFFFF:
return prefix + struct.pack("<H", t)
elif t <= 0xFFFF_FFFF:
return prefix + struct.pack("<I", t)
elif t <= 0xFFFF_FFFF_FFFF_FFFF:
return prefix + struct.pack("<Q", t)
else:
cnt = b""
while t > 0:
cnt += _dump_int(t & 0xFFFF_FFFF_FFFF_FFFF)
t >>= 64
return prefix + cnt
def _dump_type(t: type) -> bytes:
return t.__name__.encode("ascii")
_DUMP_BY_TYPE = {
tuple: _dump_tuple,
str: _dump_str,
int: _dump_int,
type: _dump_type,
}
def is_none_or_likeable(arg1, arg2, is_list=False):
"""
Returns whether two things are both None or can like each other
"""
from .expression import Expression
if arg1 is None or arg2 is None:
if arg1 == arg2:
return True
return False
if is_list:
return len(arg1) == len(arg2) and all(is_none_or_likeable(a1, a2) for a1, a2 in zip(arg1, arg2))
if isinstance(arg1, Expression):
return arg1.likes(arg2)
return arg1 == arg2