sekai ctf 外卡赛 Discrepancy

luyanpei

题目源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
### IMPORTS ###
from pickle import _Unpickler as py_unpickler
from _pickle import Unpickler as c_unpickler
from pickletools import dis
from io import BytesIO
DEBUG = False



### HELPER FUNCTIONS ###
def py_pickle_wrapper(data: bytes) -> bool:
"""
Wrapper function for Python's pickle.loads.
"""

class SafePyUnpickler(py_unpickler):
def find_class(self, module_name: str, global_name: str):
print("no no no")
exit(1)

try:
SafePyUnpickler(BytesIO(data)).load()
return True
except Exception:
if DEBUG:
print("Failed SafePyUnpickler")
return False

def c_pickle_wrapper(data: bytes) -> bool:
"""
Wrapper function for C's pickle.loads.
"""

class SafeCUnpickler(c_unpickler):
def find_class(self, module_name: str, global_name: str):
print("no no no")
exit(1)

try:
SafeCUnpickler(BytesIO(data)).load()
return True
except Exception:
if DEBUG:
print("Failed SafeCUnpickler")
return False

def pickletools_wrapper(data: bytes) -> bool:
"""
Wrapper function for pickletools.genops.
"""
try:
dis(data)
return True
except Exception:
if DEBUG:
print("Failed genops")
return False

def get_input() -> bytes:
inp = input("Pickle bytes in hexadecimal format: ")
if inp.startswith("0x"):
inp = inp[2:]

b = bytes.fromhex(inp)[:8]
return b



### MAIN ###
if __name__ == "__main__":
# Check 1
print("Check 1")
b1 = get_input()
if py_pickle_wrapper(b1) and c_pickle_wrapper(b1) and not pickletools_wrapper(b1):
print("Passed check 1")
else:
print("Failed check 1")
exit(1)

# Check 2
print("Check 2")
b2 = get_input()
if not py_pickle_wrapper(b2) and c_pickle_wrapper(b2) and pickletools_wrapper(b2):
print("Passed check 2")
else:
print("Failed check 2")
exit(1)

# Check 3
print("Check 3")
b3 = get_input()
if py_pickle_wrapper(b3) and not c_pickle_wrapper(b3) and pickletools_wrapper(b3):
print("Passed check 3")
else:
print("Failed check 3")
exit(1)

# Check 4
print("Check 4")
b4 = get_input()
if not py_pickle_wrapper(b4) and not c_pickle_wrapper(b4) and pickletools_wrapper(b4):
print("Passed check 4")
else:
print("Failed check 4")
exit(1)

# Check 5
print("Check 5")
b5 = get_input()
if not py_pickle_wrapper(b5) and c_pickle_wrapper(b5) and not pickletools_wrapper(b5):
print("Passed check 5")
else:
print("Failed check 5")
exit(1)

# get flag
print("All checks passed")
FLAG = open("flag.txt", "r").read()
print(FLAG)

可以观察到这三个函数

py_pickle_wrapper() 使用 Python 实现的 pickle 解析器 解析字节序列,成功返回 True。
c_pickle_wrapper() 使用 C 实现的 pickle 解析器 解析字节序列,成功返回 True。
pickletools_wrapper() 使用 pickletools.dis 反汇编字节序列,成功返回 True。

给出了三个判定器

  • py_pickle_wrapper(b): 用 纯 Python _Unpickler 反序列化,重写 find_class(一旦调用即退出),成功返回 True,抛异常返回 False。
  • c_pickle_wrapper(b): 用 C 加速 _pickle.Unpickler,同样重写 find_class,规则同上。
  • pickletools_wrapper(b): pickletools.dis(b) 反汇编,若成功返回 True,异常为 False。

程序接受我们构造的有效的 pickle,取前 8 字节bytes.fromhex(inp)[:8]),所以每个测试项只允许用最多 8 字节的 pickle bytes。

然后下面还有5个check

Check 1

条件:py=True, c=True, dis=False

纯 Python unpickler 和 C unpickler 都能运行成功(返回 True),但 pickletools.dis 在静态分析/验证阶段抛异常或报错(返回 False)

Check 2

条件:py=False, c=True, dis=True

C 实现能接受并 .load() 成功;pickletools.dis 能正确反汇编;但纯 Python 实现会在 .load() 阶段抛出异常(返回 False)。

Check 3

条件:py=True, c=False, dis=True

纯 Python unpickler 能成功 load;pickletools.dis 能反汇编;而 C 实现(_pickle)在 .load() 时抛异常或无法接受。

Check 4

条件:py=False, c=False, dis=True

pickletools 能反汇编(语法/静态检查通过),但两个运行时 Unpickler 在 .load() 阶段都会失败(返回 False)

Check 5

条件:py=False, c=True, dis=False

C unpickler 能成功;纯 Python unpickler失败;但 pickletools.dis 在静态反汇编/验证阶段也失败(不识别或抛异常)。

然后题目给了一段描述

image-20250820210709002

让我去阅读pickle的源码,但是题目只给了八个有效识别的字节,我们完全可以本地模拟这三个反编译函数的实现从而来爆破出来有效字节

所以我们直接修改题目源码,把原题的三个“判定函数”在本地复现(把 exit(1) 换成 raise),然后在 opcode 集合穷举长度 ≤8 的字节序列来爆破五个 check(直接爆破计算时间过于庞大)。

缩小搜索空间:用 pickletools.opcodes 的 opcode 字节集作为 alphabet(只在 opcode 集中穷举字节),

从 Python 自身的 pickletools 获取

1
2
3
4
from pickletools import opcodes
# 一个简单的映射:opcode name -> byte (hex)
mapping = [(op.name, hex(ord(op.code))) for op in opcodes]
print(mapping)

然后结合ai,得到了理论上更容易命中这些check的opcode列表

1
seed_bytes = [b'\x29', b'\x28', b'\x4e', b'']  # EMPTY_TUPLE / MARK / NONE / empty

一句话结论(AI对这些种子的解释)

这些 seed 是 pickle 协议里单字节就能改变运行时栈状态/语义的常用 opcode(EMPTY_TUPLEMARKNONE),在字节预算极紧(≤8 bytes)下它们能以最小的代价制造出栈状态差异语义边界,而这些正是让 pure-Python_pickle(C)pickletools.dis 三者表现不同的关键。

在穷举时优先固定 seed,然后对剩余 positions 做穷举(比从全字节开始穷举更快)。

记录 dis() 输出与两种 unpickler 的异常并比较,异常信息直接给出定位线索。

若没命中,再尝试加入 PROTO 前缀或 STOP 后缀,或把 seed 扩到其它高价值 opcode(如 MEMOIZEPROTO 等)。

然后来编写脚本进行爆破

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from pickle import _Unpickler as py_unpickler
from _pickle import Unpickler as c_unpickler
from pickletools import genops, dis, opcodes
from io import BytesIO, StringIO
from contextlib import redirect_stdout
from itertools import product
import time

class SafePy(py_unpickler):
def find_class(self, module, name):
raise Exception("GLOBAL blocked")

class SafeC(c_unpickler):
def find_class(self, module, name):
raise Exception("GLOBAL blocked")

def py_ok(b):
try:
SafePy(BytesIO(b)).load()
return True
except Exception:
return False

def c_ok(b):
try:
SafeC(BytesIO(b)).load()
return True
except Exception:
return False

def dis_ok(b):
try:
buf = StringIO()
with redirect_stdout(buf):
dis(b)
return True
except Exception:
return False

def show(label, b):
print(label, b.hex(), "py", py_ok(b), "c", c_ok(b), "dis", dis_ok(b))


opcode_bytes = sorted(set(ord(op.code) for op in opcodes))
max_core_len = 4 # core opcode length to try (start small: 1..3..4)
visit_limit = 10_000_000 # max candidates to attempt (avoid runaway)
pad_to_8 = True
try_stop_end = True
protocols = [2, 3, 4] # PROTO versions to try (prefix 0x80 <p>)
seed_bytes = [b'\x29', b'\x28', b'\x4e', b''] # EMPTY_TUPLE / MARK / NONE / empty
targets = {
"check1": lambda p,c,d: (p and c and (not d)), # py=True, c=True, dis=False
"check2": lambda p,c,d: ((not p) and c and d), # py=False, c=True, dis=True
"check3": lambda p,c,d: (p and (not c) and d), # py=True, c=False, dis=True
"check4": lambda p,c,d: ((not p) and (not c) and d), # py=False, c=False, dis=True
"check5": lambda p,c,d: ((not p) and c and (not d)), # py=False, c=True, dis=False
}

found = {k: None for k in targets}

def candidates():
cnt = 0
for core_len in range(0, max_core_len+1):
for core in product(opcode_bytes, repeat=core_len):
coreb = bytes(core)
base_variants = []
if try_stop_end:
base_variants.append(coreb + b'\x2e') # STOP '.'
base_variants.append(coreb)
for p in protocols:
hdr = bytes([0x80, p])
if try_stop_end:
base_variants.append(hdr + coreb + b'\x2e')
base_variants.append(hdr + coreb)
for seed in seed_bytes:
if try_stop_end:
base_variants.append(seed + coreb + b'\x2e')
base_variants.append(seed + coreb)
seen = set()
for s in base_variants:
if s in seen:
continue
seen.add(s)

if pad_to_8:
if len(s) > 8:
continue
s_padded = s + b'\x00' * (8 - len(s))
else:
s_padded = s


try:

list(genops(s_padded))
except Exception:
continue

yield s_padded
cnt += 1
if cnt >= visit_limit:
return

start = time.time()
tested = 0
last_report = start
report_interval = 5.0 # seconds

for b in candidates():
tested += 1
p, c, d = py_ok(b), c_ok(b), dis_ok(b)
for name, cond in targets.items():
if found[name] is None and cond(p, c, d):
found[name] = b
print(f"[+] Found {name}: {b.hex()} (py={p}, c={c}, dis={d})")
now = time.time()
if now - last_report > report_interval:
print(f"[{time.strftime('%H:%M:%S')}] tested={tested}, elapsed={round(now-start,2)}s, found={sum(1 for v in found.values() if v)}")
last_report = now
if all(found.values()):
break

print("Done in", round(time.time()-start,2), "sec, tested:", tested)
for name in targets:
print(name, "=>", None if found[name] is None else found[name].hex())

构建 base_variants:对 coreb 产生多个变体

  • 直接 coreb,或 coreb + STOP
  • 在前面加入 PROTO 头(0x80, ver),再组合 coreb 与可选 STOP。
  • 在前面加入 seedEMPTY_TUPLE/MARK/NONE/空)再组合 coreb 与可选 STOP。

image-20250820213100005

找到五个check,编写交互脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pwn import *
context(log_level='debug')

a= remote(
"discrepancy.chals.sekai.team",
1337,
ssl=True,
sni=True,
typ="tcp"
)
a.recvuntil("Pickle bytes in hexadecimal format:")
a.sendline("0x29292e0000000000")
a.recvuntil("Pickle bytes in hexadecimal format:")
a.sendline("0x2928652e00000000")
a.recvuntil("Pickle bytes in hexadecimal format:")
a.sendline("0x2929622e00000000")
a.recvuntil("Pickle bytes in hexadecimal format:")
a.sendline("0x282e000000000000")
a.recvuntil("Pickle bytes in hexadecimal format:")
a.sendline("0x292865292e000000")
a.interactive()
# check1:0x80024e4e2e000000
# check2:0x2928902e
# check3:0x80059505000000000000004B012E 0x2E
# check4:0x8002824e2e000000
# check5:0x80044B01710568052E
# check1 => 29292e0000000000
# check2 => 2928652e00000000
# check3 => 2929622e00000000
# check4 => 282e000000000000
# check5 => 292865292e000000

然后就get flag

image-20250820213201392

总结:看到这个题目,其实以前并没有接触过pickle,在本地测试了这三个的反汇编结果之后,打算试一试这道题目,这道题目实际上dis 比 unpickle 严格,所以通过unpickle简单,但是通过dis就有点难度。一开始自己手动能够构造出check1

EMPTY_TUPLE 29
EMPTY_TUPLE 29
STOP 2e

dis() 因为语义STOP 后栈不空 报错,不通过dis()。然后后面就想着既然只有8个字节,本地直接模拟爆破,第一次爆了两个小时只有check1,check2和check4。结合ai不断修改opcode的范围,灵光乍现根据已经爆破出来的来设置种子减少穷举范围(跟yzb打完游戏之后的灵感,果然,多跟yzb打游戏),最后才搞出来check3和check5。

  • Title: sekai ctf 外卡赛 Discrepancy
  • Author: luyanpei
  • Created at : 2025-08-19 10:30:05
  • Updated at : 2025-08-24 10:18:15
  • Link: https://redefine.ohevan.com/posts/572.html
  • License: All Rights Reserved © luyanpei