如何在异步环境中安全地实现线程与协程并发下的 HTTP 缓存类

本文详解如何基于 `aiohttp` 和 `asyncio` 构建线程安全、协程安全的单例 http 缓存类,重点解决并发请求同一 url 时的重复拉取问题,并优化时间精度与资源竞争控制。

在使用 aiohttp 构建异步 HTTP 缓存服务(如配合 Tornado 或 FastAPI)时,一个常见误区是认为“只要用了 async/await 就天然线程安全”。实际上,Python 的 GIL 仅保障纯 Python 字节码层面的线程互斥,但无法阻止 asyncio 任务在单线程内并发修改共享状态(如 dict)所引发的逻辑竞态——尤其是当多个协程同时检测到缓存过期并触发 _fetch_update(url) 时,可能造成多次重复请求,浪费资源且增加服务压力。

✅ 核心问题:不是“数据损坏”,而是“逻辑冗余”

原代码中 self._cache[url] = {...} 虽为原子操作(CPython 中 dict 赋值是线程安全的),但其前置判断 url not in self._cache or ... 与后续写入之间存在时间窗口。若两个协程几乎同时执行该判断,均得出“需更新”结论,则会并发执行两次 session.get(),导致:

  • 同一 URL 被重复请求;
  • 后完成的响应覆盖先完成的(无一致性保证);
  • 错误日志混乱,难以调试。

这不是内存损坏,却是典型的 “check-then-act” 竞态(TOCTOU),必须通过同步机制消除。

✅ 正确解法:按 URL 细粒度协同锁(Per-URL Coordinated Fetching)

我们不采用全局 asyncio.Lock()(会串行化所有 URL 请求,严重损害并发性能),而是为每个待请求的 URL 动态维护一个 asyncio.Event,实现按 URL 粒度的协同等待

import asyncio
import logging
import aiohttp
import time

DEFAULT_TIMEOUT = 20
HTTP_READ_TIMEOUT = 1

class HTTPRequestCache:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._cache = {}
            cls._instance._time_out = DEFAULT_TIMEOUT
            cls._instance._http_read_timeout = HTTP_READ_TIMEOUT
            cls._instance._fetching_now = {}  # {url: asyncio.Event()}
            cls._instance._lock = asyncio.Lock()  # 仅用于保护 _fetching_now 字典本身
        return cls._instance

    async def _fetch_update(self, url):
        # Step 1: 获取对 _fetching_now 的独占访问权,检查/注册当前 URL 的 fetch 状态
        async with self._lock:
            if url in self._fetching_now:
                # 另一协程已在处理该 URL → 等待其完成
                event = self._fetching_now[url]
                await event.wait()
                # 检查是否已成功缓存(避免重复等待后仍去请求)
                if url in self._cache and self._cache[url]["cached_at"] >= time.monotonic() - self._time_out:
                    return
            else:
                # 首次标记该 URL 正在被获取
                self._fetching_now[url] = asyncio.Event()

        # Step 2: 执行实际 HTTP 请求(此时无锁,允许多 URL 并发)
        try:
            async with aiohttp.ClientSession() as session:
                logging.info(f"Fetching {url}")
                async with session.get(url, timeout=self._http_read_timeout) as resp:
                    resp.raise_for_status()
                    data = await resp.json()
                    cached_at = time.monotonic()  # ✅ 使用 monotonic 时间,避免系统时钟跳变影响
                    self._cache[url] = {
                        "cached_at": cached_at,
                        "config": data,
                        "errors": 0
                    }
                    logging.info(f"Updated cache for {url}")
        except aiohttp.ClientError as e:
            logging.error(f"Failed to fetch {url}: {e}")
        finally:
            # Step 3: 清理状态,通知所有等待者
            async with self._lock:
                if url in self._fetching_now:
                    self._fetching_now[url].set()
                    del self._fetching_now[url]

    async def get(self, url):
        # 使用 monotonic 时间进行过期判断,更可靠
        now = time.monotonic()
        if (url not in self._cache 
            or self._cache[url]["cached_at"] < now - self._time_out):
            await self._fetch_update(url)
        return self._cache.get(url, {}).get("config")

? 关键设计说明

  • _lock 仅保护 _fetching_now 字典读写:因 dict 操作本身非原子(如 in + [] 组合),需锁保护其结构一致性。
  • asyncio.Event 实现“等待即订阅”:协程发现 URL 正在被获取时,直接 await event.wait(),无需轮询;完成时 event.set() 唤醒全部等待者。
  • time.monotonic() 替代 time.time():确保超时计算不受系统时间回拨(如 NTP 校准、DST 切换)干扰,符合高可靠性场景要求。
  • 无全局阻塞:不同 URL 的请求完全并发,仅相同 URL 的请求被智能协调,兼顾性能与正确性。

⚠️ 注意事项与扩展建议

  • Tornado 兼容性:Tornado 6+ 原生支持 async/await,可直接将 HTTPRequestCache 实例挂载为应用级单例(如 app.settings['cache']),无需额外线程适配。
  • 错误重试策略:当前示例未集成指数退避或错误计数(如 MAX_ERRORS)。如需增强鲁棒性,可在 except 块中增加 self._cache[url]["errors"] += 1,并在 get() 中根据错误次数决定是否跳过缓存或降级返回。
  • 内存清理:长期运行需添加 LRU 或 TTL 驱逐逻辑(如定期扫描 cached_at 过期项),防止内存泄漏。
  • 多进程场景:若部署于多进程(如 Gunicorn + --workers),单例失效,需改用 Redis 等外部缓存。

此方案在保持异步高性能的同时,彻底消除了缓存更新的逻辑竞态,是构建生产级异步 HTTP 客户端缓存的推荐实践。