XX-Mini源码与原理分析
XX-Mini源码取自XX-Net项目1,精简web UI、php_proxy 以及 x_tunnel 等功能,只保留 gae_proxy 以及自动扫描IP功能。然而,它只是个客户端,服务端还是得用XX-Net,实际上XX-Net的可读性更好。
从早期的GAE proxy、WallProxy、GoAgent到XX-Net等,这些基于google GAE平台的翻墙方案2,有其可取之处。本篇旨在探索源代码和原理,向开路者致敬。这里只分析代码,未经实际安装测试现在的可用性和速度。
文档参考
- HTTP TUNNEL
- Tunneling TCP based protocols through Web proxy server
- 《HTTP权威指南》第6章-代理
- App Engine Python:Outbound Requests
- tor-dev: A simple HTTP transport and big ideas
- GoAgent的安全风险
- Tor wiki: GoAgent
原理
HTTPS流量的走向
- 浏览器发送HTTP CONNECT请求到XX-Mini
- XX-Mini返回浏览器连接建立的回应,然后浏览器和XX-Mini进行SSL的会话。因为浏览器要事先导入XX-Mini的Cert,所以XX-Mini以MITM的方式伪装成目标站点与浏览器完成SSL上的HTTP数据传输。
- XX-Mini通过将浏览器的HTTP代理请求,对头部Header和body压缩(未使用Content-Encoding的body)后打包
- 发送HTTPS POST请求给Google App Engine,POST DATA即是打包好的HTTP CONNECT数据包。
- Google App Engine解出POST DATA,取出其中HTTP CONNECT数据,拿到要连接的URL和HTTP Headers
- 使用App Engine提供的urlfetch方法发送HTTP请求(猜测使用这个方法时,在google那边也是发起HTTP代理请求)获取HTTP/HTTPS响应结果
- 再把这个HTTP响应整个打包,HTTPS响应返回XX-Mini
- XX-Mini取出打包HTTPS body中的真正响应,走SSL返回给浏览器。这时浏览器认为是和XX-Mini完成了一次HTTP CONNECT的转发并和目标服务器完成了HTTPS也即SSL上的HTTP交互。
HTTP流理走向
- 浏览器发送HTTP代理请求到XX-Mini
- XX-Mini将该请求打包,通过HTTPS连接发送至GAE上的XX-Net
- XX-Net服务端将浏览器的HTTP代理请求拿出,取出HTTP头部的PATH(URL绝对地址),通过urlfetch发送请求到目标服务器
- XX-Net服务端收到回应,将HTTP回应打包后通过HTTPS body回复给XX-Mini
- XX-Mini客户端取出藏在HTTPS中的目标服务器的回应,回复给浏览器
安全性
- GoAgent类程序列在启动时会尝试自动往系统的可信根证书中导入一个的根证书用于本地端程序的MITM方式运作,但由于这个证书的私钥是公开的,在访问国内网站即不走GAE代理时,任何人都可以利用这个私钥来伪造国内网站的证书进行HTTPS中间人攻击。
- GAE的urlfetch请求的Headers里,User-Agent里包含GAE的appid,可能会被目标网站追踪。
User-Agent: AppEngine-Google; http://code.google.com/appengine; appid: xxx
Google App Engine的限制
- Sockets are available only for paid apps.
- You cannot create a listen socket; you can only create outbound sockets.
目录结构
除了两个库pyasn1和socks的代码,工程总共6107行Python代码,其中有很多代码是扫描IP、多IP自动选择、多socket自动选择的连接管理的功能。XX-Mini只是客户端的代码,并未包含服务端代码,服务端代码可参考XX-Net/code/default/gae_proxy/server/
- proxy.py,程序入口
- proxy.ini,配置文件
- lib/xlog.py,重复造轮子,logging模型可完成同样功能
- lib/scan_ip_log.py,重复造轮子
- lib/simple_http_server.py,重复造轮子,HTTP Server
- lib/simple_http_client.py,重复造轮子
- lib/openssl_wrap.py,重复造轮子?没有代码在使用它
- lib/socks.py,SOCKS协议的第三方实现SocksiPy
- lib/ca_util.py,Cert的管理
- lib/check_ip.py,扫描可用IP的功能
- lib/appids_mangager.py,维护着一个可用的appid列表
- lib/ip_utils.py,解释ip_range.txt文件内容的工具
- lib/google_ip_range.py,读取ip_range.txt文件
- lib/google_ip.py,扫描可用IP的功能
- lib/check_local_network.py,发HTTPS请求到baidu检查当前网络
- lib/connect_control.py,连接控制,失败太多会阻止建立连接一段时间
- lib/connect_manager.py,socket的连接池,记录socket及其握手时间
配置文件
- proxy.ini程序运行需要的配置
- ip_range.txt或data/ip_range.txt,提供用于扫描google ip的ip范围列表
- data/good_ip.txt,存储扫描得到的可用google ip。
- data/CA.crt根证书,data/cert/*.crt。CA.crt需要import要系统中,linux的认会调用certutil命令导入,Mac系统会调用security命令导入。
- caret.pem,用于扫描google IP或者
- data/proxy.pac,生成的本地PAC文件
PAC功能
地址 http://gae_host:8086/proxy.pac
pac_daemon = simple_http_server.HTTPServer((config.PAC_IP, config.PAC_PORT), PACServerHandler)
pac_thread = threading.Thread(target=pac_daemon.serve_forever)
pac_thread.setDaemon(True)
pac_thread.start()
生成PAC
从https://easylist-downloads.adblockplus.org/easylistchina.txt下载广告屏弊规则,从https://github.com/gfwlist/gfwlist下载需要翻墙的规则,并生成一份PAC到data/proxy.pac。对于adblock的匹配,转到gae_host:8086去处理,对于gfwlist的,则转到gae_host:8087去处理。
class PACServerHandler(simple_http_server.HttpServerHandler):
def do_CONNECT(self):
self.wfile.write(b'HTTP/1.1 403\r\nConnection: close\r\n\r\n')
8086端口除了服务proxy.pac文件,还作为代理黑洞的入口用来去广告
扫描可用的google IP
如何探测一个ip是不是可用的google IP
代码check_ip.py:test_gae_ip()。通过发起https://{appid}.appspot.com/_gh/请求到要检查的IP,通过回应是否是202来确认是不是google 的可用IP。
ssl_sock = connect_ssl(ip, timeout=max_timeout, openssl_context=openssl_context)
request_data = 'GET /_gh/ HTTP/1.1\r\nHost: %s.appspot.com\r\n\r\n' % appid
ssl_sock.send(request_data.encode())
response = httplib.HTTPResponse(ssl_sock, buffering=True)
proxy.ini里的下面这段配置只是为了在本地扫描IP用的SOCKS代理配置,这样建立连接时可以走代理3,并不是goAgent运作的功能配置需要。
[proxy]
enable = 0
host = 127.0.0.1
port = 8888
user =
passwd =
如下代码说明用于扫描google ip的proxy只能是SOCKS代理
import socks
def connect_ssl(ip, port=443):
if config.PROXY_ENABLE:
sock = socks.socksocket(socket.AF_INET)
else:
sock = socket.socket(socket.AF_INET)
可用的google IP列表维护
主要功能在代码文件google_ip_range.py/google_ip.py。程序会开线程出来持续扫描可用的一组google ip,并且按握手时间进行排序
- 配置
[google_ip]
auto_adjust_scan_ip_thread_num = 1 #是否自动调整扫描线程数
max_scan_ip_thread_num = 50 #最大多少个扫描线程
max_good_ip_num = 3000 #只保存最快的前3000个
ip_connect_interval = 8
max_links_per_ip = 1
- 扫描IP范围由ip_range.txt或者data/ip_range.txt(优先)提供列表。功能通过google_ip_range.py读配置文件中读取全部IP范围
1.9.22.0-1.9.22.255
1.9.1-2.0-255
168.111.1.
168.10.11.0/8
11.16.198.102
- 维护一份按握手时间短、失败率少的可用IP列表,保存到data/good_ip.txt中,启动时会加载。
ip_dict = sorted(self.ip_dict.items(), key=lambda x: (x[1]['handshake_time'] + x[1]['fail_times'] * 1000))
with open(self.good_ip_file, "w") as fd:
for ip, property in ip_dict:
fd.write( "%s %s %s %d %d\n" % (ip,
property['domain'],
property['server'],
property['handshake_time'],
property['fail_times']) )
- 跟据第100个可用IP的握手时间和失败率自动调整扫描功能的线程数
i = self.gws_ip_list[99]
handshake_time = self.ip_dict[i][‘handshake_time'] + self.ip_dict[i][‘fail_times'] * 1000
self.scan_ip_thread_num = int( (handshake_time - 200)/2 * self.max_scan_ip_thread_num/50 )
# 启动够那么多个线程
new_thread_num = self.scan_ip_thread_num -self.scan_thread_count
for i in range(0, new_thread_num):
self.scan_thread_count += 1
p = threading.Thread(target = self.scan_ip_worker)
- 多个线程持续随机选择IP进行扫描,如果是google ip就加到可用IP列表里
def scan_ip_worker(self):
ip = self.ip_range.get_ip()
result = check_ip.test_gae_ip(ip)
if result:
self.add_ip(ip, result.handshake_time, result.domain, "gws")
self.remove_slowest_ip()
- 从可用ip里面取出一个ip的算法。比如,在几秒钟之前才用过的,先不要使用,使用次数超过一定值的,先不要使用。
if time_now - get_time < self.ip_connect_interval:
self.gws_ip_pointer += 1
continue
if self.ip_dict[ip]['links'] >=config.max_links_per_ip:
self.gws_ip_pointer += 1
continue
APPID可用列表维护
appids_mangager.py:APPID_mangaer维护着一个可用的appid列表,列表初始来源是如下配置,public_appid和appid只用其一(优先后者),前者的列表长得可怕,难怪google的IP都要被封了。
[gae]
public_appid = xxnet-1|xxnet-2|xxnet-3
appid =
password =
appid列表在运行中如果出错,会用如下方法报告可用性,然后就会从可用appid列表中删除,可惜只能写log,不能反馈回配置文件里。
def report_out_of_quota(self, appid)
def report_not_exist(self, appid, ip)
SSL连接池维护
connect_manager.py维护一组连接池:新的SSL连接(扫描到但没用到的)、已用的连接SSL池。这样主要是为了能够在需要连接到GAE Proxy时可以迅速拿到一条连接,同时还能复用之前用过的连接。
配置参数如下
[connect_manager]
https_max_connect_thread = 10
https_connection_pool_min = 5
https_connection_pool_max = 50
; keep connection pool until no active request timeout
keep_active_timeout = 600
; the time that HTTPS connection can reuse after last transmit, drop it or send new request to keep it alive
https_keep_alive = 55
队列
队列类型使用线程的Lock进行加锁,有可用的socket再通知回调get():
self.pool_lock = threading.Lock()
self.not_empty = threading.Condition(self.pool_lock)
def get(self, block=True, timeout=None):
end_time = time.time() + timeout
if not self.qsize():
self.not_empty.wait(remaining)
if not self.qsize():
return None
return self._get()
def put(self, item):
handshake_time, sock = item
self.not_empty.acquire()
try:
self.pool[sock] = handshake_time
self.not_empty.notify()
finally:
self.not_empty.release()
取出socket,跟据队列中所有socket握手时间长短优先取出使用
def _get(self):
for sock in self.pool:
time = self.pool[sock]
if time < fastest_time or not fastest_sock:
fastest_time = time
fastest_sock = sock
return (fastest_time, fastest_sock)
SSL握手时间(handshake time)
握手时间是从TCP socket建立成功到SSL握手完成所花的时间
ssl_sock = SSLConnection(self.openssl_context,
sock, ip, google_ip.ssl_closed)
ssl_sock.set_connect_state()
ssl_sock.connect(ip_port)
time_connected = time.time()
ssl_sock.do_handshake()
time_handshaked = time.time()
connect_time = int((time_connected - time_begin) * 1000)
handshake_time = int((time_handshaked - time_connected) * 1000)
后台线程
一个持续创建新连接(到随机的gae可用IP)的线程
def create_connection_daemon(self):
while connect_control.keep_running:
time.sleep(0.1)
if self.thread_num > self.max_thread_num:
continue
if self.new_conn_pool.qsize() > target_conn_num:
time.sleep(1)
continue
p = threading.Thread(target=self._connect_thread)
p.start()
一个是socket连接超时回收的线程,对所有离上次响应时间超过600秒的(keep_active_timeout配置项设置)的连接直接回收销毁,超过55秒(https_keep_alive配置项配置)的进行一次HTTPS HEAD探测,如果没响应了也回收,有响应则可以留给别的连接进行复用。
def keep_alive_thread(self):
while self.keep_alive:
# 超过200秒的socket进行处理
for ssl_sock in to_keep_live_list:
inactive_time = time.time() - ssl_sock.last_use_time
#超过600秒回收
if inactive_time > self.keep_alive:
ssl_sock.close()
else:
if self.head_request(ssl_sock):
# 可重用
else:
ssl_sock.close()
除此之后还会在获取连接获取不到是手动启下创建连接的线程
核心功能:HTTP代理
[listen]
ip = 127.0.0.1
port = 8087
核心功能在proxy_handler.py:GAEProxyHandler。前面通过pac_server,将需要走代理的HTTP请求转给了127.0.0.1:8087,相当于是在客户端开了个代理,但此代理需要将真正的HTTP请求转发到google GAE,也就是墙外的服务端,由它来进行最后一次转发到目标服务器请求HTTP资源。
proxy_daemon = simple_http_server.HTTPServer((config.LISTEN_IP, config.LISTEN_PORT), proxy_handler.GAEProxyHandler)
proxy_thread =threading.Thread(target=proxy_daemon.serve_forever)
proxy_thread.setDaemon(True)
proxy_thread.start()
非CONNECT请求的处理
GAEProxyHandler:do_AGENT方法实现。要注意的是,由HTTP Server读取完headers就回调到GAEProxyHandler了,所以payload部分还要继续读取。
HTTP代理请求的PATH字段是完整的目标URL地址,所以有如下代码进行处理
if self.path[0] == '/' and host:
self.path = 'http://%s%s' % (host, self.path)
elif not host and '://' in self.path:
host = urlparse.urlparse(self.path).netloc
- 封装加包好HTTP代理请求。如下可见,HTTP头部被zlib压缩后和body拼在一起。
kwargs = {}
kwargs['password'] = config.GAE_PASSWORD
kwargs['maxsize'] = config.AUTORANGE_MAXSIZE
kwargs['timeout'] = '19'
header_payload = '%s %s HTTP/1.1\r\n' % (method, url)
header_payload += ''.join('%s: %s\r\n' % (k, v) for k, v in headers.items() if k not in skip_headers)
header_payload += ''.join('X-URLFETCH-%s: %s\r\n' % (k, v) for k, v in kwargs.items() if v)
header_payload = deflate(payload)
packed_body = '%s%s%s' % (struct.pack('!h', len(payload)), header_payload, body)- 将打包好的HTTP代理请求包(packed_body),放到HTTPS POST body里发送到在GAE上的代理https://{appid}.aapspot.com/_gh/。因为是真正的HTTP数据包头部也做了压缩后整个打包成CONTENT发送给了GAE proxy,暴露给墙的一个普通的HTTPS的连接。
headers = {}
headers['Content-Length'] = str(len(body))
headers['Host'] = ssl_sock.appid + ".appspot.com"
ssl_sock = https_manager.get_ssl_connection()
ssl_sock.appid = appid_manager.get_appid()
request_data = 'POST /_gh/ HTTP/1.1\r\n'
request_data += ''.join('%s: %s\r\n' % (k, v)
for k, v in headers.items() if k not in skip_headers)
request_data += '\r\n'
ssl_sock.send(request_data.encode())
sock.send(packed_body)- 收到代理服务回返回的数据包进行解码。其中要判断很多HTTP Code返回码,以能够更好的提醒到浏览器(返回HTML提醒的页面)或者记录日志。
# end reading the headers
headers_length, = struct.unpack('!h', data)
data = response.read(headers_length)
raw_response_line, headers_data = inflate(data).split('\r\n', 1)
data = response.read(config.AUTORANGE_BUFSIZE)- 返回给浏览器一个HTTP响应
//清理HEADER
response_headers = {}
for key, value in response.getheaders():
if key in skip_headers:
continue
response_headers[key] = value
//读取payload
while True:
data = response.read(config.AUTORANGE_BUFSIZE)
…
//发送HTTP响应到浏览器
wfile.write("HTTP/1.1 %d %s\r\n" % (response.status, response.reason))
for key in response_headers:
value = response_headers[key]
send_header(wfile, key, value)
wfile.write("\r\n")
ret = wfile.write(data)
CONNECT请求(HTTPS)
GAEProxyHandler:do_CONNECT_AGENT方法实现。原理是CONNECT握手完成后,由本地XX-Mini用自制的证书4,跟浏览器作HTTPS请求的交互。XX-Mini得到HTTPS中的HTTP请求,转发到GAE Proxy,后面的流程就跟上面的非HTTPS请求一样了,只是从浏览器到XX-Mini这一段连接是不是SSL加密的区别而已。整个流程是三段HTTPS连接。
- HTTP CONNECT握手。XX-Mini直接返回OK
self.wfile.write(b'HTTP/1.1 200 OK\r\n\r\n')
- 和浏览器建立SSL连接,再进行后续的代理请求转发
# 为host生成一个证书
certfile = CertUtil.get_cert(host)
ssl_sock = ssl.wrap_socket(self.connection, keyfile=certfile, certfile=certfile, server_side=True)
self.connection = ssl_sock
self.parse_request()
- 把浏览器HTTPS里的请求转为HTTP代理请求,要把相对路经转成绝对路经的代理请求,以方便GAE Proxy进行转发
if self.path[0] == '/' and host:
#不是个合法的HTTP代理请求(https),但GAE Proxy能支持
self.path = 'https://%s%s' %
(self.headers['Host'], self.path)
elif not host and '://' in self.path:
host = urlparse.urlparse(self.path).netloc
- 转发代理请求到GAE Proxy,跟前面非CONNECT请求的处理是完全一样的流程了。
HTTP代理,GAE Proxy部分
代码在XX-Net:gae.py。GAE有一些上传下载文件的限制。如果下载超过10M,那么会使用Content-Range Header进行客户端和GAE Proxy进行交互,这种情况两边的代码都比较复杂,这里不打算分析代码了:
# How to Download file large then 10M?
# HTTP protocol support range fetch.
# If server return header include "accept-ranges", then client can request special range
# by put Content-Range in request header.
#
# GAE server will return 206 status code if file is too large and server support range fetch.
# Then GAE_proxy local client will switch to range fetch mode.
XX-Mini客户端与XX-Net服务端进行HTTPS通信,把真正的请求和回应压缩、加密后放到这些通信的HTTP BODY中进行传输。协议有写在gae_handler.py文件头部:
GoAgent local-server protocol 3.2
request:
POST /_gh/ HTTP/1.1
HOST: appid.appspot.com
content-length: xxx
http content:
{
pack_req_head_len: 2 bytes,
pack_req_head : deflate{
original request line,
original request headers,
X-URLFETCH-kwargs HEADS, {
password,
maxsize, defined in config AUTO RANGE MAX SIZE
timeout, request timeout for GAE urlfetch.
}
}
body
}
response:
200 OK
http-Heads:
Content-type: image/gif
http-content:{
response_head{
data_len: 2 bytes,
data: deflate{
HTTP/1.1 status, status_code
headers
content = error_message, if GAE server fail
}
}
body
}
XX-Net解码XX-Mini上传的数据包的BODY部分。注意到的是,XX-Net是支持数据包的rc4加密的,但XX-Mini没有使用这个扩展特性。
def application(environ, start_response):
wsgi_input = environ['wsgi.input']
input_data = wsgi_input.read(int(environ.get('CONTENT_LENGTH', '0')))
if 'rc4' in options:
input_data = RC4Cipher(__password__).encrypt(input_data)
payload_length, = struct.unpack('!h', input_data[:2])
payload = inflate(input_data[2:2+payload_length])
body = input_data[2+payload_length:]
raw_response_line, payload = payload.split('\r\n', 1)
method, url = raw_response_line.split()[:2]
for line in payload.splitlines():
key, value = line.split(':', 1)
headers[key.title()] = value.strip()
转发代理请求,这里用到了google的库。注意,因为XX-Mini上传上来的url始终是个完整的URL,所以这里直接使用并没有什么问题。
from google.appengine.api import urlfetch
fetchmethod = getattr(urlfetch, method, None)
response = urlfetch.fetch(url,
body, fetchmethod, headers,
allow_truncated=allow_truncated,
follow_redirects=False,
deadline=timeout,
validate_certificate=validate_certificate)
下发返回结果给XX-Mini客户端,协议和上传协议差不多(注意,这里的协议都是在XX-Mini客户端到XX-Net服务端的SSL连接之下的)。把整个Response塞到了Body里,content-type设置成了image/gif。
这整个Response里,HTTP Header同样使用了压缩,其body在客户端Accept-Encoding接受的请况下、在本来目标服力器返回没有使用压缩的情况下添加了gzip/defalte压缩。
if status_code == 200 and 'content-encoding' not in response_headers
# 内容压缩
if 'deflate' in accept_encoding:
response_headers['Content-Encoding'] = 'deflate'
data = deflate(data)
# 在SSL上给XX-Mini回应
start_response('200 OK', [('Content-Type', 'image/gif')])
# 真正的回应包,头部和内容都尽可能做压缩
yield format_response(status_code, response_headers, '')
yield data
def format_response(status, headers, content):
data = 'HTTP/1.1 %d %s\r\n%s\r\n\r\n%s' % (status,
httplib.responses.get(status, 'Unknown'),
'\r\n'.join('%s: %s' % (k.title(), v) for
k, v in headers.items()), content)
data = deflate(data)
return struct.pack('!h', len(data)) + data
代码优化
因为代码是很多人写的,也渊远流长,所以代码混乱也可以理解,不过,重构或者重写一下,对熟悉本项目的Python开发者来说应该不难。有些功能没必要造轮子的,不如把精力分出来把核心功能(IP扫描、连接管理、HTTPS代理)做更好。
- 建立SSL连接、证书校验与发起HTTPS GET/HEAD请求的代码,可以抽出来做成一个模块,提供给check_ip.py、connect_manager.py、check_local_network.py等。实现可以统一用socket、httplib或者requests其中之一来实现,别不统一。如果用requests这种内置可以设置代理的库,应该连socks.py这个SOCKS库依赖也可以去掉了。
- socket连接测试或者ip连接测试的HTTPS GET/HEAD相关方法应该作得尽可能简洁,每一种类型就一个函数调用作入口,参数是目标url。不要copy代码。
- 连接池,用多个线程去跑,还要维护线程数挺累人的?总之这里的代码没必要这么复杂,用线程池或者干脆别用线程得了。
- 有很多重复的代码,比如发送HTTP请求与HTTP请求头的解释、HTTP数据包的构造。加外simple_http_client.py/simple_http_server.py实现了标准库里http.client和http.server几乎完全类似的接口,不知道为什么不通过继承实现。
如何优化PAC的代码
PAC生成代码非常混乱,应该要像ShadowsocksX那样直接使用部分AdblockPlus的部分JS代码(用模板文件简单的替换两种URL匹配列表)。这样对AdblockList的使用就更简单,解决转换成自定义格式匹配的烦恼,也不用写这么多正则转换和文件拼接。
对GAE Proxy方式的精简
如果把客户端改成只混淆数据包(类似GFW.press),然后就把HTTP代理请求发送到GAE Proxy,解混淆数据包后,使用google平台的urlfetch.fetch函数,很方便就完成了代理请求,这么做应该代码很简单,客户端和服务端都是一个python文件就搞定了。