Skip to content

Commit

Permalink
ioctlance
Browse files Browse the repository at this point in the history
  • Loading branch information
zeze-zeze committed Nov 9, 2023
0 parents commit 3485a60
Show file tree
Hide file tree
Showing 206 changed files with 9,748 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.vs
x64
Release
__pycache__
39 changes: 39 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
FROM ubuntu:20.04

# install 32-bit support
RUN dpkg --add-architecture i386

ENV TZ=Asia/Taipei

RUN DEBIAN_FRONTEND=noninteractive apt-get update && apt-get install -y tzdata

# general dependencies
RUN DEBIAN_FRONTEND=noninteractive apt-get update && apt-get install -y git build-essential python3 python3-pip python3-dev htop vim sudo

# install virtualenvwrapper
#RUN pip install virtualenvwrapper

# angr dependencies
RUN DEBIAN_FRONTEND=noninteractive apt-get update && apt-get install -y openjdk-8-jdk zlib1g:i386 libtinfo5:i386 libstdc++6:i386 libgcc1:i386 libc6:i386 libssl-dev nasm binutils-multiarch qtdeclarative5-dev libpixman-1-dev libglib2.0-dev debian-archive-keyring debootstrap libtool libreadline-dev cmake libffi-dev libxslt1-dev libxml2-dev
RUN pip install angr==9.2.18 ipython==8.5.0 ipdb==0.13.9

# setup user `ioctlance` with a home directory
RUN useradd -ms /bin/bash ioctlance
USER ioctlance

#ENV VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3
#RUN /bin/bash -c "source /usr/local/bin/virtualenvwrapper.sh && \
# mkvirtualenv ioctlance && \
# pip install angr==9.2.18 ipython==8.5.0 ipdb==0.13.9"

COPY ./analysis /home/ioctlance/analysis/
COPY ./evaluation /home/ioctlance/evaluation/
COPY ./dataset /home/ioctlance/dataset/
USER root

#RUN echo 'export VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3' >> /home/ioctlance/.bashrc
#RUN echo 'export WORKON_HOME=$HOME/.virtualenvs' >> /home/ioctlance/.bashrc
#RUN echo 'source /usr/local/bin/virtualenvwrapper.sh && workon ioctlance' >> /home/ioctlance/.bashrc

WORKDIR /home/ioctlance/
CMD ["/bin/bash"]
674 changes: 674 additions & 0 deletions License.txt

Large diffs are not rendered by default.

110 changes: 110 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# IOCTLance
<p align="center">
<img src="asset/ioctlance.png">
</p>

## Description
Presented at [CODE BLUE 2023](https://codeblue.jp/2023/en/), this project titled [Enhanced Vulnerability Hunting in WDM Drivers with Symbolic Execution and Taint Analysis](https://drive.google.com/file/d/1lEegyJ1SBB_lDts6F3W3JPySucM3nugR/view?usp=sharing) introduces IOCTLance, a tool that enhances its capacity to detect various vulnerability types in Windows Driver Model (WDM) drivers. In a comprehensive evaluation involving 104 known vulnerable WDM drivers and 328 unknown ones, IOCTLance successfully unveiled 117 previously unidentified vulnerabilities within 26 distinct drivers. As a result, 41 CVEs were reported, encompassing 25 cases of denial of service, 5 instances of insufficient access control, and 11 examples of elevation of privilege.

## Features
### Target Vulnerability Types
- map physical memory
- controllable process handle
- buffer overflow
- null pointer dereference
- read/write controllable address
- arbitrary shellcode execution
- arbitrary wrmsr
- arbitrary out
- dangerous file operation


### Optional Customizations
- length limit
- loop bound
- total timeout
- IoControlCode timeout
- recursion
- symbolize data section


## Build
### Docker (Recommand)
```
docker build .
```

### Local
```
dpkg --add-architecture i386
apt-get update
apt-get install git build-essential python3 python3-pip python3-dev htop vim sudo \
openjdk-8-jdk zlib1g:i386 libtinfo5:i386 libstdc++6:i386 libgcc1:i386 \
libc6:i386 libssl-dev nasm binutils-multiarch qtdeclarative5-dev libpixman-1-dev \
libglib2.0-dev debian-archive-keyring debootstrap libtool libreadline-dev cmake \
libffi-dev libxslt1-dev libxml2-dev
pip install angr==9.2.18 ipython==8.5.0 ipdb==0.13.9
```

## Analysis
```
# python3 analysis/ioctlance.py -h
usage: ioctlance.py [-h] [-i IOCTLCODE] [-T TOTAL_TIMEOUT] [-t TIMEOUT] [-l LENGTH] [-b BOUND]
[-g GLOBAL_VAR] [-a ADDRESS] [-e EXCLUDE] [-o] [-r] [-c] [-d]
path
positional arguments:
path dir (including subdirectory) or file path to the driver(s) to analyze
optional arguments:
-h, --help show this help message and exit
-i IOCTLCODE, --ioctlcode IOCTLCODE
analyze specified IoControlCode (e.g. 22201c)
-T TOTAL_TIMEOUT, --total_timeout TOTAL_TIMEOUT
total timeout for the whole symbolic execution (default 1200, 0 to unlimited)
-t TIMEOUT, --timeout TIMEOUT
timeout for analyze each IoControlCode (default 40, 0 to unlimited)
-l LENGTH, --length LENGTH
the limit of number of instructions for technique LengthLimiter (default 0, 0
to unlimited)
-b BOUND, --bound BOUND
the bound for technique LoopSeer (default 0, 0 to unlimited)
-g GLOBAL_VAR, --global_var GLOBAL_VAR
symbolize how many bytes in .data section (default 0 hex)
-a ADDRESS, --address ADDRESS
address of ioctl handler to directly start hunting with blank state (e.g.
140005c20)
-e EXCLUDE, --exclude EXCLUDE
exclude function address split with , (e.g. 140005c20,140006c20)
-o, --overwrite overwrite x.sys.json if x.sys has been analyzed (default False)
-r, --recursion do not kill state if detecting recursion (default False)
-c, --complete get complete base state (default False)
-d, --debug print debug info while analyzing (default False)
```


## Evaluation
```
# python3 evaluation/statistics.py -h
usage: statistics.py [-h] [-w] path
positional arguments:
path target dir or file path
optional arguments:
-h, --help show this help message and exit
-w, --wdm copy the wdm drivers into <path>/wdm
```


## Test
1. Compile the testing examples in [test](./test) to generate testing driver files.
2. Run IOCTLance against the drvier files.


## Reference
- [ucsb-seclab/popkorn-artifact](https://github.com/ucsb-seclab/popkorn-artifact)
- [eclypsium/Screwed-Drivers](https://github.com/eclypsium/Screwed-Drivers)
- [koutto/ioctlbf](https://github.com/koutto/ioctlbf)
- [Living Off The Land Drivers](https://www.loldrivers.io/)
156 changes: 156 additions & 0 deletions analysis/breakpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import claripy
import angr
import globals
import utils
import ipdb

def b_mem_write_ioctl_handler(state):
# Store the address of ioctl handler when writing into the memory.
ioctl_handler_addr = state.solver.eval(state.inspect.mem_write_expr)
globals.ioctl_handler = int(ioctl_handler_addr)
state.globals['ioctl_handler'] = globals.ioctl_handler
globals.simgr.move(from_stash='deadended', to_stash='_Drop')

def b_mem_write_DriverStartIo(state):
# Store the address of DriverStartIo when writing into the memory.
DriverStartIo_addr = state.solver.eval(state.inspect.mem_write_expr)
globals.DriverStartIo = int(DriverStartIo_addr)
globals.basic_info['DriverStartIo'] = hex(globals.DriverStartIo)
utils.print_info(f'DriverStartIo: {hex(globals.DriverStartIo)}')

def b_mem_read(state):
utils.print_debug(f'mem_read {state}, {state.inspect.mem_read_address}, {state.inspect.mem_read_expr}, {state.inspect.mem_read_length}, {state.inspect.mem_read_condition}')

# Iterate all target buffers.
for target in globals.NPD_TARGETS:
if target in str(state.inspect.mem_read_address):
asts = [i for i in state.inspect.mem_read_address.recursive_children_asts]
target_base = asts[0] if len(asts) > 1 else state.inspect.mem_read_address
vars = state.inspect.mem_read_address.variables

if str(target_base) not in state.globals['tainted_ProbeForRead'] and str(target_base) not in state.globals['tainted_ProbeForWrite'] and len(vars) == 1:
# Add constraints to test whether the pointer is null or not.
tmp_state = state.copy()
if target == 'SystemBuffer':
if '*' in str(state.inspect.mem_read_address):
# If SystemBuffer is a pointer, check whether it is controllable.
tmp_state.solver.add(tmp_state.inspect.mem_read_address == 0x87)
if tmp_state.satisfiable() and not str(target_base) in state.globals['tainted_MmIsAddressValid']:
utils.print_vuln('read/write controllable address', 'read', state, {}, {'read from': str(state.inspect.mem_read_address)})
else:
# If SystemBuffer is not a pointer, check whether it can be null.
tmp_state.solver.add(globals.SystemBuffer == 0)
tmp_state.solver.add(globals.InputBufferLength == 0)
tmp_state.solver.add(globals.OutputBufferLength == 0)
if tmp_state.satisfiable() and str(target_base) not in state.globals['tainted_MmIsAddressValid']:
utils.print_vuln('null pointer dereference - input buffer', 'read input buffer', state, {}, {'read from': str(state.inspect.mem_read_address)})
elif target == 'Type3InputBuffer' or target == 'UserBuffer':
# If Type3InputBuffer or UserBuffer is a pointer, check whether it is controllable.
if target == 'Type3InputBuffer':
tmp_state.solver.add(globals.Type3InputBuffer == 0x87)
elif target == 'UserBuffer':
tmp_state.solver.add(globals.UserBuffer == 0x87)

if tmp_state.satisfiable() and not str(target_base) in state.globals['tainted_MmIsAddressValid']:
utils.print_vuln('read/write controllable address', 'read', state, {}, {'read from': str(state.inspect.mem_read_address)})
else:
# Only detect the allocated memory in case of false positive.
if '+' in str(tmp_state.inspect.mem_read_address):
return
tmp_state.solver.add(tmp_state.inspect.mem_read_address == 0)
if tmp_state.satisfiable():
utils.print_vuln('null pointer dereference - allocated memory', 'read allocated memory', state, {}, {'read from': str(state.inspect.mem_read_address)})

# We symbolize the address of the tainted buffer because we want to detect the vulnerability when the driver reads/writes to/from the buffer.
if utils.tainted_buffer(target_base) and str(target_base) not in state.globals:
tmp_state = state.copy()
tmp_state.solver.add(target_base == globals.FIRST_ADDR)
if not tmp_state.satisfiable():
break

state.globals[str(target_base)] = True
mem = claripy.BVS(f'*{str(target_base)}', 8 * 0x200).reversed
addr = utils.next_base_addr()
state.solver.add(target_base == addr)
state.memory.store(addr, mem, 0x200, disable_actions=True, inspect=False)

def b_mem_write(state):
utils.print_debug(f'mem_write {state}, {state.inspect.mem_write_address}, {state.inspect.mem_write_expr}, {state.inspect.mem_write_length}, {state.inspect.mem_write_condition}')

# Iterate all target buffers.
for target in globals.NPD_TARGETS:
if target in str(state.inspect.mem_write_address):
asts = [i for i in state.inspect.mem_write_address.recursive_children_asts]
target_base = asts[0] if len(asts) > 1 else state.inspect.mem_write_address
vars = state.inspect.mem_write_address.variables

if str(target_base) not in state.globals['tainted_ProbeForRead'] and str(target_base) not in state.globals['tainted_ProbeForWrite'] and len(vars) == 1:
# Add constraints to test whether the pointer is null or not.
tmp_state = state.copy()
if target == 'SystemBuffer':
if '*' in str(state.inspect.mem_write_address):
# If SystemBuffer is a pointer, check whether it is controllable.
tmp_state.solver.add(tmp_state.inspect.mem_write_address == 0x87)
if tmp_state.satisfiable():
utils.print_vuln('read/write controllable address', 'write', state, {}, {'write to': str(state.inspect.mem_write_address)})
else:
# If SystemBuffer is not a pointer, check whether it can be null.
tmp_state.solver.add(globals.SystemBuffer == 0)
tmp_state.solver.add(globals.InputBufferLength == 0)
tmp_state.solver.add(globals.OutputBufferLength == 0)
if tmp_state.satisfiable() and str(target_base) not in state.globals['tainted_MmIsAddressValid']:
utils.print_vuln('null pointer dereference - input buffer', 'write input buffer', state, {}, {'write to': str(state.inspect.mem_write_address)})
elif target == 'Type3InputBuffer' or target == 'UserBuffer':
# If Type3InputBuffer or UserBuffer is a pointer, check whether it is controllable.
if target == 'Type3InputBuffer':
tmp_state.solver.add(globals.Type3InputBuffer == 0x87)
elif target == 'UserBuffer':
tmp_state.solver.add(globals.UserBuffer == 0x87)

if tmp_state.satisfiable():
utils.print_vuln('read/write controllable address', 'write', state, {}, {'write to': str(state.inspect.mem_write_address)})
else:
# Only detect the allocated memory in case of false positive.
if '+' in str(tmp_state.inspect.mem_write_address):
return
tmp_state.solver.add(tmp_state.inspect.mem_write_address == 0)
if tmp_state.satisfiable():
utils.print_vuln('null pointer dereference - allocated memory', 'write allocated memory', state, {}, {'write to': str(state.inspect.mem_write_address)})

# We symbolize the address of the tainted buffer because we want to detect the vulnerability when the driver reads/writes to/from the buffer.
if utils.tainted_buffer(target_base) and str(target_base) not in state.globals:
tmp_state = state.copy()
tmp_state.solver.add(target_base == globals.FIRST_ADDR)
if not tmp_state.satisfiable():
break

state.globals[str(target_base)] = True
mem = claripy.BVS(f'*{str(target_base)}', 8 * 0x200).reversed
addr = utils.next_base_addr()
state.solver.add(target_base == addr)
state.memory.store(addr, mem, 0x200, disable_actions=True, inspect=False)

def b_address_concretization_before(state):
utils.print_debug(f'address_concretization_before_hook: {state}\n\taddress_concretization_strategy: {state.inspect.address_concretization_strategy}\n\taddress_concretization_action: {state.inspect.address_concretization_action}\n\taddress_concretization_memory: {state.inspect.address_concretization_memory}\n\taddress_concretization_expr: {state.inspect.address_concretization_expr}\n\taddress_concretization_add_constraints: {state.inspect.address_concretization_add_constraints}\n\taddress_concretization_result: {state.inspect.address_concretization_result}\n')

def b_address_concretization_after(state):
utils.print_debug(f'address_concretization_after_hook: {state}\n\taddress_concretization_strategy: {state.inspect.address_concretization_strategy}\n\taddress_concretization_action: {state.inspect.address_concretization_action}\n\taddress_concretization_memory: {state.inspect.address_concretization_memory}\n\taddress_concretization_expr: {state.inspect.address_concretization_expr}\n\taddress_concretization_add_constraints: {state.inspect.address_concretization_add_constraints}\n\taddress_concretization_result: {state.inspect.address_concretization_result}\n')

def b_call(state):
ret_addr = state.solver.eval(state.memory.load(state.regs.rsp, state.arch.bytes, endness=state.arch.memory_endness))
utils.print_debug(f'call: state: {state}, ret_addr: {hex(ret_addr)}, function addr: {state.inspect.function_address})')

# Check if the function address to call is tainted.
if utils.tainted_buffer(state.inspect.function_address):
state.regs.rip = 0x1337
utils.print_vuln('arbitrary shellcode execution', '', state, {}, {'function address': str(state.inspect.function_address), 'return address': hex(ret_addr)})

# If the number of function address evaluated is more than 1, skip the call.
if len(state.solver.eval_upto(state.inspect.function_address, 2)) > 1:
tmp_state = state.copy()
tmp_state.regs.rip = globals.DO_NOTHING
globals.simgr.deferred.append(tmp_state)
return angr.SIM_PROCEDURES['stubs']['ReturnUnconstrained']().execute(state)

def b_dirty(state):
utils.print_debug(f'dirty: state: {state}, dirty name: {state.inspect.dirty_name}, dirty handler: {state.inspect.dirty_handler}, dirty args: {state.inspect.dirty_args}, dirty result: {state.inspect.dirty_result})')
33 changes: 33 additions & 0 deletions analysis/globals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
irp_addr = 0x1337000
irsp_addr = 0x6000000
mycc = None

phase = 1
DriverStartIo = 0
ioctl_handler = 0
DO_NOTHING = 0
INIT_FIRST_ADDR = 0x444f0000
FIRST_ADDR = 0x444f0000

NPD_TARGETS = ['SystemBuffer', 'Type3InputBuffer', 'UserBuffer', 'ExAllocatePool_0x', 'ExAllocatePool2_0x', 'ExAllocatePool3_0x', 'ExAllocatePoolWithTag_0x', 'MmAllocateNonCachedMemory_0x', 'MmAllocateContiguousMemorySpecifyCache_0x']
SystemBuffer = None
Type3InputBuffer = None
UserBuffer = None
InputBufferLength = None
OutputBufferLength = None
IoControlCode = None

args = None

DOS_DEVICES = ['\\DosDevices\\'.encode('utf-16le'), '\\??\\'.encode('utf-16le')]

proj = None
cfg = None
simgr = None

eval_upto = 3
vulns_unique = set()
driver_info = {}
basic_info = {}
vulns_info = []
error_msgs = []
Loading

0 comments on commit 3485a60

Please sign in to comment.