一、前言
接手了一个用 Tornado 做 API 的老模块,功能正常,但响应时间不太好看——平均 3 秒,高峰期飙到 8 秒。服务本身逻辑不复杂,就是一个接口查几个外部数据源拼装结果返回。
Tornado 本身是异步框架,但"异步"两个字不是装上去就能跑满的。代码里一个 time.sleep()、一个同步 HTTP 请求、一个没有 await 的数据库调用,都能把 I/O 循环卡死。
本文从头过一遍优化过程,每一步都有代码和现象对比,不改架构,只改写法。
二、先看现状
2.1 压测命令
1 2
| # 用 wrk 简单压一下 wrk -t4 -c20 -d30s http://localhost:8888/api/v1/order/detail?id=1001
|
结果:
1 2 3
| Requests/sec: 3.2 Latency (avg): 3125.46ms Latency (max): 8237.12ms
|
3 秒的平均延迟,每秒只能处理 3 个请求,20 个并发连接就把服务打满了。
2.2 看一眼代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class OrderDetailHandler(tornado.web.RequestHandler): def get(self): order_id = self.get_argument("id") db = MySQLdb.connect(host="...", user="...", passwd="...", db="...") cursor = db.cursor() cursor.execute("SELECT * FROM orders WHERE id = %s", (order_id,)) order = cursor.fetchone() cursor.close() db.close() resp = requests.get(f"http://logistics-api/order/{order_id}/track") logistics = resp.json() self.write({"order": order, "logistics": logistics})
|
一眼望去全是问题:
- 同步
requests.get() 阻塞 I/O 循环
- 原生的
MySQLdb 同步操作数据库
- 没有用
async def,完全没利用 Tornado 的异步能力
一个请求进来,整个事件循环得等它全部跑完才能处理下一个。20 个并发就是 20 个请求排队,一个一个走。
三、第一轮:改异步
3.1 把 Handler 改成 async
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import aiohttp import aiomysql from tornado.web import RequestHandler
class OrderDetailHandler(RequestHandler): async def get(self): order_id = self.get_argument("id") order, logistics = await asyncio.gather( self._get_order(order_id), self._get_logistics(order_id), ) self.write({"order": order, "logistics": logistics}) async def _get_order(self, order_id): pool = await aiomysql.create_pool(host="...", user="...", ...) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT * FROM orders WHERE id = %s", (order_id,)) return await cur.fetchone() pool.close() await pool.wait_closed() async def _get_logistics(self, order_id): async with aiohttp.ClientSession() as session: async with session.get(f"http://logistics-api/order/{order_id}/track") as resp: return await resp.json()
|
关键改动:
| 改动 |
做了什么 |
作用 |
def get → async def get |
用 async/await 替代同步 |
请求不再阻塞 I/O 循环 |
requests.get → aiohttp |
同步 HTTP → 异步 HTTP |
网络等待时不阻塞其他请求 |
MySQLdb → aiomysql |
同步 MySQL → 异步 MySQL |
数据库等待时不阻塞循环 |
asyncio.gather |
并行发起两个无关请求 |
串行变并行,总耗时 = 最慢那个 |
3.2 再压测
1 2 3
| Requests/sec: 18.5 Latency (avg): 1082.34ms Latency (max): 2513.81ms
|
从 3.2 QPS 涨到 18.5,提升约 6 倍。平均延迟从 3 秒降到 1 秒。
但还不够——理论上异步框架处理这种 I/O 密集场景应该在数百毫秒级别。
四、第二轮:找慢在哪
4.1 加耗时日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import time
class OrderDetailHandler(RequestHandler): async def get(self): order_id = self.get_argument("id") t0 = time.time() order, logistics = await asyncio.gather( self._get_order(order_id), self._get_logistics(order_id), ) t1 = time.time() print(f"db: {order_took:.2f}s, api: {logistics_took:.2f}s") self.write({"order": order, "logistics": logistics})
|
加上后看到日志:
1 2 3
| db: 0.45s, api: 1.02s db: 0.52s, api: 0.95s db: 0.38s, api: 1.12s
|
外部物流 API 平均耗时 1 秒,占了整个链路的大头。这是上游接口的瓶颈,我们改不了它。
4.2 加缓存
对这种查了也不怎么变的数据,直接上本地缓存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| from functools import lru_cache import json
@lru_cache(maxsize=128) def get_logistics_cache(order_id: int) -> dict: pass
class OrderDetailHandler(RequestHandler): async def get(self): order_id = self.get_argument("id") cache_key = f"logistics_{order_id}" cached = self.application.cache.get(cache_key) if cached: logistics = cached else: logistics = await self._get_logistics(order_id) self.application.cache.set(cache_key, logistics, expire=60) order = await self._get_order(order_id) self.write({"order": order, "logistics": logistics})
|
4.3 同步改异步还不够——连接池复用
之前的 _get_order 每次请求都 create_pool,这是极其昂贵的操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class Application(tornado.web.Application): def __init__(self): self.db_pool = None async def startup(self): self.db_pool = await aiomysql.create_pool( host="...", port=3306, user="...", password="...", db="...", maxsize=10, )
class OrderDetailHandler(RequestHandler): async def _get_order(self, order_id): pool = self.application.db_pool async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT * FROM orders WHERE id = %s", (order_id,)) return await cur.fetchone()
|
五、第三轮:还有几个隐藏坑
5.1 time.sleep
排查发现一些工具函数里用了 time.sleep(0.1) 做重试等待:
1 2 3 4 5 6 7
| def retry_fetch(url, retries=3): for i in range(retries): try: return requests.get(url) except: time.sleep(0.5)
|
改为:
1 2 3 4 5 6 7 8
| async def retry_fetch(url, retries=3): for i in range(retries): try: async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.json() except: await asyncio.sleep(0.5)
|
5.2 文件操作
Tornado 的 I/O 循环管不了文件读写。如果 Handler 里需要读大文件:
1 2 3 4 5 6 7 8 9 10 11
| with open("template.html") as f: content = f.read()
from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=4)
async def async_read_file(path): loop = asyncio.get_event_loop() return await loop.run_in_executor(executor, lambda: open(path).read())
|
5.3 日志写入
日志库如果配置了同步 FileHandler,高并发下 write 锁也会拖慢:
1 2 3 4 5 6 7 8 9 10
|
import queue log_queue = queue.Queue()
def log_worker(): while True: msg = log_queue.get() with open("app.log", "a") as f: f.write(msg + "\n")
|
六、最终压测
做完以上优化后:
1 2 3
| Requests/sec: 152.4 Latency (avg): 131.24ms Latency (max): 412.58ms
|
| 阶段 |
QPS |
平均延迟 |
最大延迟 |
| 优化前 |
3.2 |
3125ms |
8237ms |
| 第一轮(异步化) |
18.5 |
1082ms |
2513ms |
| 第二轮(连接池+缓存) |
87.3 |
228ms |
834ms |
| 第三轮(消除隐藏阻塞) |
152.4 |
131ms |
412ms |
从 3 QPS 到 150 QPS,50 倍的提升,没有改一行业务逻辑。
七、总结
| 问题 |
现象 |
修复 |
| 同步 HTTP 请求 |
请求阻塞 I/O 循环 |
替换为 aiohttp |
| 同步数据库操作 |
数据库调用阻塞事件循环 |
替换为 aiomysql / asyncpg |
| 每次请求新建连接池 |
频繁创建销毁连接 |
应用启动时创建,Handler 复用 |
| 外部 API 慢 |
串行等待上游响应 |
asyncio.gather 并行 + 本地缓存 |
time.sleep |
不管同步/异步都阻塞 |
替换为 asyncio.sleep |
| 文件读写 |
Handler 内同步读大文件 |
run_in_executor 抛到线程池 |
| 日志刷盘 |
高并发下 write 锁阻塞 |
异步队列写日志 |
Tornado 异步优化的核心原则就一条:别让 I/O 循环等你。
任何让 Python 干等外部资源的地方——网络、磁盘、数据库、sleep——都要用异步版本或者扔到线程池。只要有一个同步调用卡在关键路径上,前面的 async 就白写了。
跑一下 wrk,看看你的服务在并发下是不是真的"异步"了。