专栏 DeerFlow 架构分析 · 第 1 篇

写作

DeerFlow 架构概览 —— 运行时存储层次与请求处理流程

DeerFlow 架构概览 —— 运行时存储层次与请求处理流程

项目分层

graph TD
    subgraph deer-flow
        direction TB
        subgraph backend
            direction TB
            H["Harness 层 (deerflow.*)"]
            A1["agents<br/>Agent 系统"]
            A2["runtime<br/>RunManager, Checkpointer, Store"]
            A3["config<br/>配置系统"]
            A4["persistence<br/>ORM models, repositories"]

            A["App 层 (app.*)"]
            G["gateway<br/>FastAPI Gateway"]
            C2["channels<br/>IM 平台集成"]
        end
        F["frontend<br/>Next.js 前端"]
    end

    A -.->|"可以导入"| H
    H -.-x|"绝不能导入"| A

依赖规则:App 可以导入 Harness,但 Harness 绝不能导入 App。


请求处理流程

graph LR
    B[浏览器] --> N[nginx]
    N -->|"路由 API 请求"| GW["Gateway (FastAPI)"]
    N -->|"提供静态文件"| FE["Frontend (Next.js)"]

单次对话请求的完整路径

sequenceDiagram
    participant U as 用户
    participant N as nginx
    participant G as Gateway
    participant RM as RunManager
    participant A as Agent
    participant CP as Checkpointer
    participant ES as EventStore
    participant FE as 前端

    U->>N: POST /api/threads/{tid}/runs/stream
    N->>G: 转发请求
    G->>G: get_run_context(request) 构建依赖
    G->>RM: create_or_reject() 检查冲突 + 创建Run
    RM-->>G: RunRecord
    G->>A: asyncio.create_task(run_agent(ctx, record))
    activate A
    A->>CP: 加载历史 ThreadState
    CP-->>A: 历史状态
    A->>A: agent.astream(input, config) 执行图
    A->>CP: 每步节点完成后自动保存 checkpoint
    A->>FE: StreamBridge → SSE 流式事件
    A->>ES: RunJournal 写入 event_store
    A->>RM: set_status(success/failed) 持久化
    deactivate A

五种存储各司其职

DeerFlow 的运行时数据分散在五个独立的存储中:

#存储存什么隔离单位写入者
1CheckpointerThreadState 版本快照thread_idLangGraph 自动
2Store跨线程 KV 键值对namespace 元组业务代码手动
3RunStoreRun 元数据run_id + thread_idRunManager
4RunEventStore事件流(AI chunk、工具调用等)thread_id + run_id + seqRunJournal
5ThreadMetaStore线程元数据(标题、状态)thread_id + user_idGateway 路由 + worker

存储之间的关系图

graph TD
    FE["前端 (Next.js)"] -->|HTTP 请求| GW["Gateway (FastAPI)"]

    subgraph Gateway
        direction TB
        TMS["ThreadMetaStore<br/>(SQL 或 Store)<br/>线程列表、标题、状态"]
        RM["RunManager + RunStore<br/>Run 状态、token 用量"]
        RES["RunEventStore<br/>(SQL/JSONL/Memory)<br/>事件回放、断线重连"]
    end

    GW -->|"run_agent(ctx, ...)"| LG["LangGraph Agent 执行"]

    subgraph Agent["LangGraph Agent 执行"]
        CP["Checkpointer<br/>(SQLite/Postgres)<br/>对话状态快照<br/>自动保存/恢复"]
        ST["Store<br/>(ThreadMeta 底层)<br/>跨线程共享数据"]
    end

为什么不合并成一个数据库?

  • Checkpointer 由 LangGraph 驱动,表结构由 LangGraph 定义
  • RunStore / RunEventStore / ThreadMetaStore 由 DeerFlow 定义
  • 职责分离:对话状态(可回滚)vs 运行记录(只追加)vs 列表元数据(频繁查询)

依赖注入链路

启动时:lifespan → langgraph_runtime → app.state

sequenceDiagram
    participant P as 进程启动
    participant F as FastAPI lifespan
    participant LR as langgraph_runtime()
    participant AS as app.state
    participant ES as AsyncExitStack

    P->>F: 进程启动
    F->>F: get_app_config()
    F->>LR: async with langgraph_runtime(app, config)

    activate LR
    LR->>ES: async with AsyncExitStack() as stack
    activate ES

    LR->>AS: app.state.checkpointer = make_checkpointer(config)
    LR->>AS: app.state.store = make_store(config)
    LR->>AS: app.state.run_manager = RunManager(store=...)
    LR->>AS: app.state.run_event_store = make_run_event_store(...)

    LR-->>F: yield (进程持续运行)
    Note over F: 在此处理请求...

    F-->>LR: 进程关闭
    ES-->>ES: 后进先出关闭所有资源
    deactivate ES
    deactivate LR

请求时:request → app.state → RunContext

sequenceDiagram
    participant U as 用户请求
    participant R as FastAPI Request
    participant AS as app.state
    participant RC as RunContext
    participant A as run_agent

    U->>R: HTTP 请求到达
    Note over R: request.app 自动持有<br/>FastAPI 实例引用

    R->>AS: get_checkpointer(request)
    AS-->>R: checkpointer 实例

    R->>AS: get_store(request)
    AS-->>R: store 实例

    R->>RC: RunContext(checkpointer=..., store=...)
    RC-->>A: ctx传递给run_agent
    activate A
    A->>A: 使用 checkpointer/store 执行 Agent
    deactivate A

配置热重载边界

类别字段行为
每次请求热重载models、tools、summarization、memory、subagents通过 get_app_config() 检查 mtime 自动重载
启动时冻结checkpointer、database、run_events、sandbox.use持有连接/句柄,重启才生效

get_run_context() 故意让 app_config 走热重载,event_store + run_events_config 走冻结快照——避免”新配置 + 旧实例”的不一致。