项目分层
graph TD
subgraph deer-flow
direction TB
subgraph backend
direction TB
H["Harness 层 (deerflow.*)"]
A1["agents<br/>Agent 系统"]
A2["runtime<br/>RunManager, Checkpointer, Store"]
A3["config<br/>配置系统"]
A4["persistence<br/>ORM models, repositories"]
A["App 层 (app.*)"]
G["gateway<br/>FastAPI Gateway"]
C2["channels<br/>IM 平台集成"]
end
F["frontend<br/>Next.js 前端"]
end
A -.->|"可以导入"| H
H -.-x|"绝不能导入"| A
依赖规则:App 可以导入 Harness,但 Harness 绝不能导入 App。
请求处理流程
graph LR
B[浏览器] --> N[nginx]
N -->|"路由 API 请求"| GW["Gateway (FastAPI)"]
N -->|"提供静态文件"| FE["Frontend (Next.js)"]
单次对话请求的完整路径
sequenceDiagram
participant U as 用户
participant N as nginx
participant G as Gateway
participant RM as RunManager
participant A as Agent
participant CP as Checkpointer
participant ES as EventStore
participant FE as 前端
U->>N: POST /api/threads/{tid}/runs/stream
N->>G: 转发请求
G->>G: get_run_context(request) 构建依赖
G->>RM: create_or_reject() 检查冲突 + 创建Run
RM-->>G: RunRecord
G->>A: asyncio.create_task(run_agent(ctx, record))
activate A
A->>CP: 加载历史 ThreadState
CP-->>A: 历史状态
A->>A: agent.astream(input, config) 执行图
A->>CP: 每步节点完成后自动保存 checkpoint
A->>FE: StreamBridge → SSE 流式事件
A->>ES: RunJournal 写入 event_store
A->>RM: set_status(success/failed) 持久化
deactivate A
五种存储各司其职
DeerFlow 的运行时数据分散在五个独立的存储中:
| # | 存储 | 存什么 | 隔离单位 | 写入者 |
|---|---|---|---|---|
| 1 | Checkpointer | ThreadState 版本快照 | thread_id | LangGraph 自动 |
| 2 | Store | 跨线程 KV 键值对 | namespace 元组 | 业务代码手动 |
| 3 | RunStore | Run 元数据 | run_id + thread_id | RunManager |
| 4 | RunEventStore | 事件流(AI chunk、工具调用等) | thread_id + run_id + seq | RunJournal |
| 5 | ThreadMetaStore | 线程元数据(标题、状态) | thread_id + user_id | Gateway 路由 + worker |
存储之间的关系图
graph TD
FE["前端 (Next.js)"] -->|HTTP 请求| GW["Gateway (FastAPI)"]
subgraph Gateway
direction TB
TMS["ThreadMetaStore<br/>(SQL 或 Store)<br/>线程列表、标题、状态"]
RM["RunManager + RunStore<br/>Run 状态、token 用量"]
RES["RunEventStore<br/>(SQL/JSONL/Memory)<br/>事件回放、断线重连"]
end
GW -->|"run_agent(ctx, ...)"| LG["LangGraph Agent 执行"]
subgraph Agent["LangGraph Agent 执行"]
CP["Checkpointer<br/>(SQLite/Postgres)<br/>对话状态快照<br/>自动保存/恢复"]
ST["Store<br/>(ThreadMeta 底层)<br/>跨线程共享数据"]
end
为什么不合并成一个数据库?
- Checkpointer 由 LangGraph 驱动,表结构由 LangGraph 定义
- RunStore / RunEventStore / ThreadMetaStore 由 DeerFlow 定义
- 职责分离:对话状态(可回滚)vs 运行记录(只追加)vs 列表元数据(频繁查询)
依赖注入链路
启动时:lifespan → langgraph_runtime → app.state
sequenceDiagram
participant P as 进程启动
participant F as FastAPI lifespan
participant LR as langgraph_runtime()
participant AS as app.state
participant ES as AsyncExitStack
P->>F: 进程启动
F->>F: get_app_config()
F->>LR: async with langgraph_runtime(app, config)
activate LR
LR->>ES: async with AsyncExitStack() as stack
activate ES
LR->>AS: app.state.checkpointer = make_checkpointer(config)
LR->>AS: app.state.store = make_store(config)
LR->>AS: app.state.run_manager = RunManager(store=...)
LR->>AS: app.state.run_event_store = make_run_event_store(...)
LR-->>F: yield (进程持续运行)
Note over F: 在此处理请求...
F-->>LR: 进程关闭
ES-->>ES: 后进先出关闭所有资源
deactivate ES
deactivate LR
请求时:request → app.state → RunContext
sequenceDiagram
participant U as 用户请求
participant R as FastAPI Request
participant AS as app.state
participant RC as RunContext
participant A as run_agent
U->>R: HTTP 请求到达
Note over R: request.app 自动持有<br/>FastAPI 实例引用
R->>AS: get_checkpointer(request)
AS-->>R: checkpointer 实例
R->>AS: get_store(request)
AS-->>R: store 实例
R->>RC: RunContext(checkpointer=..., store=...)
RC-->>A: ctx传递给run_agent
activate A
A->>A: 使用 checkpointer/store 执行 Agent
deactivate A
配置热重载边界
| 类别 | 字段 | 行为 |
|---|---|---|
| 每次请求热重载 | models、tools、summarization、memory、subagents | 通过 get_app_config() 检查 mtime 自动重载 |
| 启动时冻结 | checkpointer、database、run_events、sandbox.use | 持有连接/句柄,重启才生效 |
get_run_context() 故意让 app_config 走热重载,event_store + run_events_config 走冻结快照——避免”新配置 + 旧实例”的不一致。