0%

Tornado 异步处理优化,从 3 秒到 200 毫秒



一、前言

接手了一个用 Tornado 做 API 的老模块,功能正常,但响应时间不太好看——平均 3 秒,高峰期飙到 8 秒。服务本身逻辑不复杂,就是一个接口查几个外部数据源拼装结果返回。

Tornado 本身是异步框架,但"异步"两个字不是装上去就能跑满的。代码里一个 time.sleep()、一个同步 HTTP 请求、一个没有 await 的数据库调用,都能把 I/O 循环卡死。

本文从头过一遍优化过程,每一步都有代码和现象对比,不改架构,只改写法。

二、先看现状

2.1 压测命令

1
2
# 用 wrk 简单压一下
wrk -t4 -c20 -d30s http://localhost:8888/api/v1/order/detail?id=1001

结果:

1
2
3
Requests/sec:     3.2
Latency (avg): 3125.46ms
Latency (max): 8237.12ms

3 秒的平均延迟,每秒只能处理 3 个请求,20 个并发连接就把服务打满了。

2.2 看一眼代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class OrderDetailHandler(tornado.web.RequestHandler):
def get(self):
order_id = self.get_argument("id")

# 查 MySQL
db = MySQLdb.connect(host="...", user="...", passwd="...", db="...")
cursor = db.cursor()
cursor.execute("SELECT * FROM orders WHERE id = %s", (order_id,))
order = cursor.fetchone()
cursor.close()
db.close()

# 调外部 HTTP API 查物流信息
resp = requests.get(f"http://logistics-api/order/{order_id}/track")
logistics = resp.json()

# 拼装结果返回
self.write({"order": order, "logistics": logistics})

一眼望去全是问题:

  • 同步 requests.get() 阻塞 I/O 循环
  • 原生的 MySQLdb 同步操作数据库
  • 没有用 async def,完全没利用 Tornado 的异步能力

一个请求进来,整个事件循环得等它全部跑完才能处理下一个。20 个并发就是 20 个请求排队,一个一个走。

三、第一轮:改异步

3.1 把 Handler 改成 async

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import aiohttp
import aiomysql
from tornado.web import RequestHandler

class OrderDetailHandler(RequestHandler):
async def get(self):
order_id = self.get_argument("id")

# 用 asyncio.gather 并行查两个依赖
order, logistics = await asyncio.gather(
self._get_order(order_id),
self._get_logistics(order_id),
)

self.write({"order": order, "logistics": logistics})

async def _get_order(self, order_id):
pool = await aiomysql.create_pool(host="...", user="...", ...)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM orders WHERE id = %s", (order_id,))
return await cur.fetchone()
pool.close()
await pool.wait_closed()

async def _get_logistics(self, order_id):
async with aiohttp.ClientSession() as session:
async with session.get(f"http://logistics-api/order/{order_id}/track") as resp:
return await resp.json()

关键改动:

改动 做了什么 作用
def getasync def get 用 async/await 替代同步 请求不再阻塞 I/O 循环
requests.getaiohttp 同步 HTTP → 异步 HTTP 网络等待时不阻塞其他请求
MySQLdbaiomysql 同步 MySQL → 异步 MySQL 数据库等待时不阻塞循环
asyncio.gather 并行发起两个无关请求 串行变并行,总耗时 = 最慢那个

3.2 再压测

1
2
3
Requests/sec:     18.5
Latency (avg): 1082.34ms
Latency (max): 2513.81ms

从 3.2 QPS 涨到 18.5,提升约 6 倍。平均延迟从 3 秒降到 1 秒。

但还不够——理论上异步框架处理这种 I/O 密集场景应该在数百毫秒级别。

四、第二轮:找慢在哪

4.1 加耗时日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time

class OrderDetailHandler(RequestHandler):
async def get(self):
order_id = self.get_argument("id")
t0 = time.time()

order, logistics = await asyncio.gather(
self._get_order(order_id),
self._get_logistics(order_id),
)
t1 = time.time()
print(f"db: {order_took:.2f}s, api: {logistics_took:.2f}s")

self.write({"order": order, "logistics": logistics})

加上后看到日志:

1
2
3
db: 0.45s, api: 1.02s
db: 0.52s, api: 0.95s
db: 0.38s, api: 1.12s

外部物流 API 平均耗时 1 秒,占了整个链路的大头。这是上游接口的瓶颈,我们改不了它。

4.2 加缓存

对这种查了也不怎么变的数据,直接上本地缓存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from functools import lru_cache
import json

@lru_cache(maxsize=128)
def get_logistics_cache(order_id: int) -> dict:
# 这里只是缓存键的定义,实际数据在第一次请求时写入
pass

class OrderDetailHandler(RequestHandler):
async def get(self):
order_id = self.get_argument("id")

# 缓存命中直接返回,不调外部 API
cache_key = f"logistics_{order_id}"
cached = self.application.cache.get(cache_key)
if cached:
logistics = cached
else:
logistics = await self._get_logistics(order_id)
self.application.cache.set(cache_key, logistics, expire=60)

order = await self._get_order(order_id)
self.write({"order": order, "logistics": logistics})

4.3 同步改异步还不够——连接池复用

之前的 _get_order 每次请求都 create_pool,这是极其昂贵的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 改为应用启动时创建连接池,handler 直接拿
class Application(tornado.web.Application):
def __init__(self):
self.db_pool = None
# ...

async def startup(self):
self.db_pool = await aiomysql.create_pool(
host="...", port=3306,
user="...", password="...",
db="...", maxsize=10,
)

# handler 里
class OrderDetailHandler(RequestHandler):
async def _get_order(self, order_id):
pool = self.application.db_pool # 直接拿,不新建
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM orders WHERE id = %s", (order_id,))
return await cur.fetchone()

五、第三轮:还有几个隐藏坑

5.1 time.sleep

排查发现一些工具函数里用了 time.sleep(0.1) 做重试等待:

1
2
3
4
5
6
7
# 错误示例——会阻塞整个事件循环
def retry_fetch(url, retries=3):
for i in range(retries):
try:
return requests.get(url)
except:
time.sleep(0.5) # ← 这一行阻塞了所有协程

改为:

1
2
3
4
5
6
7
8
async def retry_fetch(url, retries=3):
for i in range(retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
except:
await asyncio.sleep(0.5) # ← 异步 sleep,不阻塞

5.2 文件操作

Tornado 的 I/O 循环管不了文件读写。如果 Handler 里需要读大文件:

1
2
3
4
5
6
7
8
9
10
11
# 同步读——阻塞
with open("template.html") as f:
content = f.read()

# 改到线程池执行
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)

async def async_read_file(path):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, lambda: open(path).read())

5.3 日志写入

日志库如果配置了同步 FileHandler,高并发下 write 锁也会拖慢:

1
2
3
4
5
6
7
8
9
10
# 建议用异步日志,或者单独起一个日志线程
# 简单方案:消息先放队列,日志线程异步刷盘
import queue
log_queue = queue.Queue()

def log_worker():
while True:
msg = log_queue.get()
with open("app.log", "a") as f:
f.write(msg + "\n")

六、最终压测

做完以上优化后:

1
2
3
Requests/sec:     152.4
Latency (avg): 131.24ms
Latency (max): 412.58ms
阶段 QPS 平均延迟 最大延迟
优化前 3.2 3125ms 8237ms
第一轮(异步化) 18.5 1082ms 2513ms
第二轮(连接池+缓存) 87.3 228ms 834ms
第三轮(消除隐藏阻塞) 152.4 131ms 412ms

从 3 QPS 到 150 QPS,50 倍的提升,没有改一行业务逻辑。

七、总结

问题 现象 修复
同步 HTTP 请求 请求阻塞 I/O 循环 替换为 aiohttp
同步数据库操作 数据库调用阻塞事件循环 替换为 aiomysql / asyncpg
每次请求新建连接池 频繁创建销毁连接 应用启动时创建,Handler 复用
外部 API 慢 串行等待上游响应 asyncio.gather 并行 + 本地缓存
time.sleep 不管同步/异步都阻塞 替换为 asyncio.sleep
文件读写 Handler 内同步读大文件 run_in_executor 抛到线程池
日志刷盘 高并发下 write 锁阻塞 异步队列写日志

Tornado 异步优化的核心原则就一条:别让 I/O 循环等你

任何让 Python 干等外部资源的地方——网络、磁盘、数据库、sleep——都要用异步版本或者扔到线程池。只要有一个同步调用卡在关键路径上,前面的 async 就白写了。

跑一下 wrk,看看你的服务在并发下是不是真的"异步"了。