
作者: HOS(安全风信子) 日期: 2026-05-25 主要来源平台: GitHub 摘要: 事件驱动架构是AI IDE工程系统的核心设计范式,通过Event Bus实现组件间的松耦合与高内聚。本文系统讲解事件驱动架构的完整设计体系:事件模型(元数据、载荷、类型层次)、发布-订阅机制(Topic设计、订阅者管理、死信处理)、事件过滤(内容路由、XPath匹配、Schema验证)、事务性事件(Outbox模式、Saga补偿、最终一致性)。重点阐述事件溯源(Event Sourcing)在AI IDE中的应用——如何通过不可变事件流重建系统状态、实现完整审计日志、支持时间旅行调试。文章还涵盖CQRS读写分离架构、投影构建与视图更新策略,最后通过TypeScript实现一个生产级的支持事务的Event Bus,并提供完整的源代码附录。
本节为你提供的核心价值:理解事件驱动架构如何重塑AI IDE系统的通信模式,实现真正的松耦合、高内聚和可观测性。
传统的请求-响应模式中,组件A直接调用组件B,调用方必须知道被调用方的接口细节。这种紧耦合模式在AI IDE系统中造成了严重的问题:当代码补全引擎需要知道如何通知前端时,它必须直接调用WebSocket服务;当语义索引需要触发重新索引时,它必须了解索引服务的具体接口。这种直接依赖导致系统僵化,任何组件的接口变更都可能波及大量上游消费者。
事件驱动架构彻底颠覆了这一模式。在该范式下,组件之间不再直接通信,而是通过一个共享的"事件中枢"——Event Bus——进行间接通信。当代码补全引擎完成一次补全时,它发布一个CompletionRequested事件,而不需要知道谁会消费这个事件;当语义索引服务需要触发重索引时,它发布一个IndexContentChanged事件,由感兴趣的服务自行订阅。这种模式带来了三大核心优势:
时间解耦:发布者和订阅者无需同时运行。发布者在事件产生时立即返回,订阅者在方便时处理事件。在AI IDE中,这意味着即使用户断开连接,代码分析任务仍然可以继续执行,结果将在用户重新连接时推送。
空间解耦:发布者和订阅者无需知道彼此的存在。发布者将事件投入Event Bus,订阅者从Event Bus获取事件,双方通过事件类型而非接口契约进行通信。这使得添加新消费者无需修改发布者代码。
同步解耦:发布者无需等待订阅者处理完成。事件处理可以是异步的,发布者可以立即处理下一个请求,极大提升系统吞吐量。
在AI IDE工程系统中,事件驱动架构承担着极其核心的角色。以下是几个典型的应用场景:
场景一:多智能体协作。当用户发起一个复杂的代码重构任务时,一个智能体可能负责分析依赖关系,另一个负责生成重构代码,第三个负责验证重构正确性。这些智能体之间通过事件进行协作:第一个智能体发布DependencyAnalysisCompleted事件,第二个智能体订阅该事件并开始代码生成,生成完成后发布RefactoringGenerated事件,第三个智能体订阅该事件进行验证。这种模式使得智能体可以独立开发、测试和部署。
场景二:实时状态同步。AI IDE中存在大量需要同步的状态:用户光标位置、代码编辑内容、AI补全建议、错误诊断信息等。这些状态需要在多个客户端(Web编辑器、终端、监控面板)之间保持一致。通过事件驱动架构,当任何状态发生变化时,发布相应的事件,所有订阅者自行更新本地状态,系统天然地实现了状态同步。
场景三:审计与调试。AI IDE需要对所有操作进行完整的审计,以便追溯用户行为、分析系统问题、支持合规要求。事件驱动架构天然支持这一需求:每个操作都产生一个事件,事件中包含完整的上下文信息,通过事件历史可以精确重建系统在任何时间点的状态,支持"时间旅行"调试。
本文系统讲解事件驱动架构的完整技术体系。文章结构如下:
本节为你提供的核心价值:掌握事件模型的完整设计,包括事件类型层次、标准化载荷结构、丰富的元数据体系,以及事件与命令的本质区别。
在讨论事件模型之前,必须首先澄清事件(Event)与命令(Command)的本质区别。混淆这两个概念是事件驱动架构设计中最常见的错误。
命令(Command) 表示一个意图或请求,它期望被执行的特定操作。命令是目标导向的,发送者明确知道接收者应该执行什么操作。例如,CreateProjectCommand、DeleteFileCommand、ExecuteQueryCommand都是命令。命令是指令性的,类似于"请做X"。
事件(Event) 表示一个已经发生的事实,它描述系统中发生了什么。事件是陈述性的,发布者只是报告"某事发生了",而不关心谁会处理。事件是不可变的,一旦发布就不能撤回或修改。例如,ProjectCreatedEvent、FileDeletedEvent、QueryExecutedEvent都是事件。
这一区别的实际意义在于:


关键结论:命令是一对一的有目标调用,事件是一对多的无目标广播。选择使用命令还是事件,取决于你是否需要知道谁会处理这个消息。
在大型AI IDE系统中,事件类型可能达到数百种。为了管理这种复杂性,需要建立一套事件类型层次体系。
2.2.1 事件命名规范
事件命名应遵循{动作过去式}+{聚合根名称}+Event的模式:
UserSignedInEvent - 用户登录事件ProjectOpenedEvent - 项目打开事件FileSavedEvent - 文件保存事件CodeCompletionRequestedEvent - 代码补全请求事件SemanticIndexUpdatedEvent - 语义索引更新事件这种命名规范的优势在于:事件名称本身就是完整的描述,订阅者可以通过名称推断事件含义。
2.2.2 事件类型层次
事件可以按照领域和层级进行分类:

这种类型层次设计支持事件家族订阅。例如,订阅EditorEvent类型的订阅者将自动接收所有编辑器相关事件,包括FileModifiedEvent、FileSavedEvent、FileClosedEvent等,无需为每个具体事件类型单独订阅。
事件载荷(Payload)是事件中包含的业务数据。良好的载荷设计需要平衡完整性、可读性和性能。
2.3.1 载荷设计原则
原则一:包含足够重建状态的信息。事件载荷应该包含重建事件发生时系统状态所需的所有信息,但不包含纯内部状态。例如,FileModifiedEvent应该包含文件的完整新内容(用于重建状态),而不仅仅是变更的diff(虽然diff可以作为优化存储)。
原则二:包含足够支持审计的信息。对于需要审计的场景,载荷应包含操作者身份、操作时间、操作原因等审计相关信息。
原则三:避免敏感数据泄露。事件可能跨越进程边界甚至网络传输,敏感数据(如用户密码、资金信息)不应出现在事件载荷中。
2.3.2 载荷结构示例
以下是AI IDE系统中几个典型事件的载荷设计:
// FileModifiedEvent 载荷
{
"eventType": "FileModifiedEvent",
"eventId": "evt_8a9f2b4c1d3e",
"occurredAt": "2026-05-25T14:32:18.456Z",
"aggregateId": "file:///project/src/main.ts",
"version": 42,
"payload": {
"fileUri": "file:///project/src/main.ts",
"oldContent": "export function hello() {\n console.log('Hello');\n}",
"newContent": "export function hello() {\n console.log('Hello, World!');\n}",
"changeType": "MODIFIED",
"encoding": "utf-8",
"language": "typescript",
"triggeredBy": {
"type": "user" | "ai" | "system",
"identifier": "user_12345" | "claude-3-opus"
},
"changeReason": "User typed 'Hello, World!' in editor"
}
}// CompletionGeneratedEvent 载荷
{
"eventType": "CompletionGeneratedEvent",
"eventId": "evt_2f4a6c8e1b3d",
"occurredAt": "2026-05-25T14:32:19.123Z",
"aggregateId": "session_789",
"version": 1,
"payload": {
"sessionId": "session_789",
"fileUri": "file:///project/src/main.ts",
"position": {
"line": 2,
"column": 18
},
"triggerType": "INVOKED" | "AUTO" | "HYBRID",
"modelId": "claude-3-opus-20240229",
"completions": [
{
"text": "console.log('Hello, World!');",
"insertRange": {
"start": { "line": 2, "column": 18 },
"end": { "line": 2, "column": 18 }
},
"score": 0.95,
"isInline": false
}
],
"latencyMs": 145,
"tokenCount": {
"input": 1240,
"output": 32
}
}
}事件元数据(Metadata)是事件的描述性信息,用于Event Bus的路由、过滤、监控和调试。与业务载荷不同,元数据是Event Bus基础设施使用的技术数据。
2.4.1 标准元数据字段
每个事件都应该包含以下标准元数据字段:
字段名 | 类型 | 说明 | 示例 |
|---|---|---|---|
eventId | UUID | 事件的全局唯一标识符 | 8a9f2b4c-1d3e-4a5b-8c6d-7e0f1a2b3c4d |
eventType | string | 事件的完全限定类型名 | com.aihz.editor.FileModifiedEvent |
occurredAt | ISO8601 | 事件发生的精确时间戳 | 2026-05-25T14:32:18.456Z |
aggregateId | string | 事件所属聚合根的标识 | file:///project/src/main.ts |
aggregateType | string | 聚合根的类型名 | File |
version | integer | 事件在聚合根中的版本号 | 42 |
correlationId | UUID | 用于关联相关事件的标识 | corr_abc123 |
causationId | UUID | 触发此事件的直接原因事件ID | evt_xyz789 |
schemaId | string | 事件载荷的Schema版本 | file-modified-v3 |
2.4.2 扩展元数据
除标准元数据外,事件还可以包含以下扩展元数据:
interface ExtendedEventMetadata {
// 追踪信息
traceId: string; // 分布式追踪ID
spanId: string; // 当前Span ID
parentSpanId: string; // 父Span ID
// 安全信息
principalId: string; // 操作用户ID
principalRoles: string[]; // 操作用户角色
tenantId: string; // 租户ID(多租户场景)
// 路由信息
topic: string; // 事件所属Topic
partitionKey: string; // 分区键(Kafka等场景)
// 处理信息
retryCount: number; // 当前重试次数
maxRetries: number; // 最大重试次数
handler: string; // 处理此事件的处理器
// 调试信息
source: string; // 事件来源服务
sourceVersion: string; // 事件来源服务版本
hostName: string; // 来源主机名
}2.4.3 元数据传播机制
在微服务架构中,事件会跨越进程边界。此时需要一种机制确保元数据被正确传播。典型的做法是:

事件一旦发布就是不可变的。这是事件驱动架构的核心原则之一。
为什么事件不可变?
如何实现不可变事件?
// TypeScript中实现不可变事件
class ImmutableEvent<T> {
private readonly _eventId: string;
private readonly _occurredAt: Date;
private readonly _payload: Readonly<T>;
private readonly _metadata: Readonly<EventMetadata>;
constructor(
eventId: string,
occurredAt: Date,
payload: T,
metadata: EventMetadata
) {
// 使用Object.freeze确保深层不可变
this._eventId = eventId;
this._occurredAt = new Date(occurredAt.getTime()); // 防御性拷贝
this._payload = Object.freeze({ ...payload });
this._metadata = Object.freeze({ ...metadata });
}
// 只读访问器
get eventId(): string { return this._eventId; }
get occurredAt(): Date { return new Date(this._occurredAt.getTime()); }
get payload(): Readonly<T> { return this._payload; }
get metadata(): Readonly<EventMetadata> { return this._metadata; }
// 禁止序列化后重建可变对象
toJSON(): object {
return {
eventId: this._eventId,
occurredAt: this._occurredAt.toISOString(),
payload: this._payload,
metadata: this._metadata
};
}
}关键结论:事件是系统历史的唯一真相来源。不可变性确保事件在任何时刻都反映事件发生时的真实状态,为审计、回放、调试提供了坚实基础。
本节为你提供的核心价值:掌握Topic设计的核心策略,理解订阅者生命周期管理,以及死信队列在故障处理中的关键作用。
Topic是事件的分类容器,订阅者通过订阅Topic来接收感兴趣的事件。良好的Topic设计是事件驱动架构成功的关键。
3.1.1 Topic命名规范
Topic命名应遵循层次化、结构化的命名规范,便于订阅者理解和过滤。推荐格式:
{组织域名}.{领域}.{子领域}.{事件类型}
示例:
aihos.editor.file.modified # 文件修改事件
aihos.editor.file.created # 文件创建事件
aihos.editor.file.deleted # 文件删除事件
aihos.ai.completion.requested # AI补全请求事件
aihos.ai.completion.generated # AI补全生成事件
aihos.project.opened # 项目打开事件
aihos.project.closed # 项目关闭事件3.1.2 Topic与事件类型的关系
一个Topic可以对应一种事件类型,也可以对应一组相关的事件类型。两种策略各有优劣:
策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
一Topic一事件类型 | 精确订阅、减少无效消息 | Topic数量爆炸、管理复杂 | 事件类型较少、需要精确控制的场景 |
一Topic多事件类型 | Topic数量可控、便于批量订阅 | 过滤负担加重 | 事件类型众多、需要按领域订阅的场景 |
在AI IDE系统中,推荐采用混合策略:高频事件(如编辑器事件)采用一Topic一类型,低频事件(如系统事件)采用一Topic多类型。
3.1.3 Topic层次设计

这种层次结构支持通配符订阅。订阅者可以订阅aihos.editor.file.*来接收所有文件相关事件,或者订阅aihos.#来接收所有事件。
订阅者是Event Bus的核心消费者,负责处理投递过来的事件。订阅者管理涉及订阅的创建、修改、销毁,以及订阅的生命周期监控。
3.2.1 订阅类型
独占订阅(Exclusive):同一Topic的消息在同一时刻只能被一个订阅者处理。适用于消息处理需要全局唯一性的场景,如全局计数器、分布式锁等。
共享订阅(Shared):同一Topic的消息可以分发到多个订阅者,实现负载均衡。适用于大多数业务场景,是AI IDE系统中的主要订阅类型。
键值订阅(Keyed):消息按照某个键(如用户ID、项目ID)进行分区,具有相同键的消息总是被同一个订阅者处理。适用于需要保证消息顺序且可以并行处理的场景。

3.2.2 订阅者生命周期
订阅者的生命周期管理是确保系统稳定性的关键:
interface Subscription {
id: string; // 订阅唯一标识
topic: string; // 订阅的Topic
handler: EventHandler; // 事件处理器
options: SubscriptionOptions; // 订阅配置
// 生命周期状态
status: 'active' | 'paused' | 'dead';
createdAt: Date;
lastProcessedAt: Date;
errorCount: number;
}
interface SubscriptionOptions {
// 消费模式
subscriptionType: 'exclusive' | 'shared' | 'keyed';
// 键值订阅时的分区键
partitionKey?: string;
// 重试配置
maxRetries: number; // 最大重试次数
retryDelayMs: number; // 重试间隔
backoffMultiplier: number; // 退避系数
// 死信配置
deadLetterTopic?: string; // 死信Topic
deadLetterPolicy?: 'discard' | 'requeue' | 'dlq';
// 过滤配置
filter?: EventFilter; // 事件过滤器
// 确认配置
ackTimeoutMs: number; // 确认超时
autoAck: boolean; // 是否自动确认
}3.2.3 订阅者注册与发现
在动态系统中,订阅者可能随时加入或离开。Event Bus需要提供订阅者注册与发现机制:
// 订阅者注册表
class SubscriptionRegistry {
private subscriptions: Map<string, Subscription[]> = new Map();
// 注册订阅
register(subscription: Subscription): void {
const existing = this.subscriptions.get(subscription.topic) || [];
existing.push(subscription);
this.subscriptions.set(subscription.topic, existing);
// 触发订阅事件
this.publishSubscriptionEvent('subscribed', subscription);
}
// 注销订阅
deregister(subscriptionId: string): void {
for (const [topic, subs] of this.subscriptions.entries()) {
const index = subs.findIndex(s => s.id === subscriptionId);
if (index !== -1) {
subs.splice(index, 1);
this.publishSubscriptionEvent('unsubscribed', subs[index]);
break;
}
}
}
// 查找匹配的订阅者
findSubscribers(topic: string): Subscription[] {
const exactMatch = this.subscriptions.get(topic) || [];
const wildcardMatches = this.findWildcardMatches(topic);
return [...exactMatch, ...wildcardMatches];
}
}死信队列(Dead Letter Queue, DLQ)用于处理无法正常消费的消息。当事件处理失败且超过重试次数后,事件将被发送到DLQ,而不是被丢弃。
3.3.1 死信产生的原因
事件进入DLQ的典型原因包括:
3.3.2 死信处理策略

3.3.3 死信队列实现
class DeadLetterQueue {
private queue: DeadLetteredEvent[] = [];
private maxSize: number;
constructor(maxSize: number = 10000) {
this.maxSize = maxSize;
}
// 添加死信事件
async add(event: Event, error: Error, context: DeadLetterContext): Promise<void> {
const deadLetteredEvent: DeadLetteredEvent = {
originalEvent: event,
error: {
message: error.message,
stack: error.stack,
type: error.constructor.name
},
context: {
failedAt: new Date(),
retryCount: context.retryCount,
lastHandler: context.handler,
originalTopic: context.topic
},
metadata: {
deadLetterId: this.generateId(),
deadLetteredAt: new Date(),
reason: this.classifyError(error)
}
};
this.queue.push(deadLetteredEvent);
// 触发告警
await this.alert(deadLetteredEvent);
// 超出容量时触发告警
if (this.queue.length > this.maxSize * 0.8) {
await this.alertSizeThreshold();
}
}
// 获取死信事件(支持分页)
async getAll(options: PaginationOptions): Promise<DeadLetteredEvent[]> {
const { offset = 0, limit = 100 } = options;
return this.queue.slice(offset, offset + limit);
}
// 重新处理死信
async reprocess(deadLetterId: string, handler: EventHandler): Promise<void> {
const dlqEvent = this.queue.find(e => e.metadata.deadLetterId === deadLetterId);
if (!dlqEvent) {
throw new Error(`Dead letter not found: ${deadLetterId}`);
}
try {
await handler.handle(dlqEvent.originalEvent);
await this.remove(deadLetterId);
} catch (error) {
throw new Error(`Reprocessing failed: ${error}`);
}
}
}Event Bus的核心职能是将事件高效、准确地分发给订阅者。分发模型的选择直接影响系统的性能、顺序保证和容错能力。
3.4.1 推模式(Push)vs 拉模式(Pull)
特性 | 推模式 | 拉模式 |
|---|---|---|
实时性 | 高,事件到达即推送 | 中等,依赖轮询间隔 |
消费者负载 | 可能过载 | 可控,自行控制速率 |
实现复杂度 | 高 | 低 |
网络效率 | 高 | 可能产生无效轮询 |
适用场景 | 高实时性需求 | 消费者处理能力不均 |
在AI IDE系统中,编辑器相关事件(如按键事件、光标移动)需要高实时性,采用推模式;后台任务相关事件(如索引完成通知)实时性要求较低,可以采用拉模式。
3.4.2 分发语义保证
事件分发的语义保证决定了系统的行为特征:
// 至少一次分发的实现
class AtLeastOnceDelivery {
private pendingAcks: Map<string, Timer> = new Map();
async deliver(event: Event, handler: EventHandler): Promise<void> {
const eventId = event.metadata.eventId;
try {
// 设置确认超时
const ackTimer = setTimeout(() => {
this.handleTimeout(eventId);
}, this.ackTimeoutMs);
this.pendingAcks.set(eventId, ackTimer);
// 执行业务处理
await handler.handle(event);
// 处理成功,确认
clearTimeout(ackTimer);
this.pendingAcks.delete(eventId);
await this.ack(eventId);
} catch (error) {
// 处理失败,触发重试
await this.scheduleRetry(event, handler);
}
}
}关键结论:在AI IDE系统中,事件分发的语义选择需要根据具体场景权衡。对于代码编辑事件,丢失是可以接受的(用户可以重新编辑),但重复会导致状态不一致;对于支付相关事件(如果有),必须保证Exactly Once。
本节为你提供的核心价值:掌握事件过滤的核心技术,包括内容过滤、XPath匹配、Schema验证,实现事件的精准路由。
在大型系统中,每天产生的事件可能达到数百万甚至数千万。如果所有订阅者都接收所有事件,不仅浪费网络带宽和处理资源,还会导致订阅者需要自行过滤无关事件,增加业务复杂度。事件过滤机制允许在Event Bus层面进行精准筛选,只将感兴趣的事件投递给订阅者。

基于内容的过滤根据事件的载荷内容进行筛选。订阅者在订阅时指定过滤条件,Event Bus在投递时检查条件是否满足。
4.2.1 过滤表达式语言
一种简洁的过滤表达式语言:
// 过滤表达式语法
interface FilterExpression {
// 简单比较
field: string; // 字段路径
operator: '=' | '!=' | '>' | '<' | '>=' | '<=' | 'contains' | 'startsWith' | 'endsWith';
value: any; // 比较值
// 组合逻辑
and?: FilterExpression[];
or?: FilterExpression[];
not?: FilterExpression;
}
// 示例
const filter: FilterExpression = {
and: [
{ field: 'payload.fileUri', operator: 'startsWith', value: '/project/src' },
{ field: 'payload.language', operator: '=', value: 'typescript' },
{
or: [
{ field: 'payload.changeType', operator: '=', value: 'CREATED' },
{ field: 'payload.changeType', operator: '=', value: 'MODIFIED' }
]
}
]
};4.2.2 过滤表达式求值器
class FilterEvaluator {
evaluate(event: Event, filter: FilterExpression): boolean {
if ('and' in filter) {
return filter.and!.every(f => this.evaluate(event, f));
}
if ('or' in filter) {
return filter.or!.some(f => this.evaluate(event, f));
}
if ('not' in filter) {
return !this.evaluate(event, filter.not!);
}
// 简单比较
return this.evaluateComparison(event, filter);
}
private evaluateComparison(event: Event, filter: FilterExpression): boolean {
const fieldValue = this.getFieldValue(event, filter.field);
const { operator, value } = filter;
switch (operator) {
case '=': return fieldValue === value;
case '!=': return fieldValue !== value;
case '>': return fieldValue > value;
case '<': return fieldValue < value;
case '>=': return fieldValue >= value;
case '<=': return fieldValue <= value;
case 'contains': return String(fieldValue).includes(value);
case 'startsWith': return String(fieldValue).startsWith(value);
case 'endsWith': return String(fieldValue).endsWith(value);
default: return false;
}
}
private getFieldValue(event: Event, path: string): any {
const parts = path.split('.');
let value: any = event;
for (const part of parts) {
if (value === null || value === undefined) {
return undefined;
}
value = value[part];
}
return value;
}
}4.2.3 过滤性能优化
当事件量很大时,每次投递都进行过滤表达式求值可能成为性能瓶颈。以下是几种优化策略:
索引过滤:对于高频过滤字段,建立倒排索引。例如,如果大量订阅者按payload.language过滤,可以预先按语言建立索引:
class IndexedFilterOptimizer {
private indexes: Map<string, Map<any, Subscription[]>> = new Map();
// 注册订阅时建立索引
registerSubscription(subscription: Subscription, filter: FilterExpression): void {
const indexedFields = this.extractIndexedFields(filter);
for (const field of indexedFields) {
if (!this.indexes.has(field)) {
this.indexes.set(field, new Map());
}
const value = this.getFilterValue(filter, field);
const index = this.indexes.get(field)!;
if (!index.has(value)) {
index.set(value, []);
}
index.get(value)!.push(subscription);
}
}
// 查找候选订阅者(使用索引加速)
findCandidateSubscriptions(event: Event, field: string): Subscription[] {
const index = this.indexes.get(field);
if (!index) {
return []; // 无索引,需要遍历
}
const fieldValue = this.getFieldValue(event, field);
return index.get(fieldValue) || [];
}
}Schema验证不仅用于过滤,还用于确保事件载荷的结构正确性。通过预先定义事件的Schema,可以拒绝不符合规范的非法事件。
4.3.1 事件Schema定义
// 使用JSON Schema定义事件Schema
const FileModifiedEventSchema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["eventType", "eventId", "occurredAt", "payload"],
"properties": {
"eventType": {
"type": "string",
"const": "FileModifiedEvent"
},
"eventId": {
"type": "string",
"pattern": "^evt_[a-zA-Z0-9]+$"
},
"occurredAt": {
"type": "string",
"format": "date-time"
},
"payload": {
"type": "object",
"required": ["fileUri", "changeType"],
"properties": {
"fileUri": {
"type": "string",
"pattern": "^file:///"
},
"changeType": {
"type": "string",
"enum": ["CREATED", "MODIFIED", "DELETED", "RENAMED"]
},
"content": {
"type": "string"
}
}
}
}
};4.3.2 Schema验证器
import Ajv from 'ajv';
class SchemaValidator {
private ajv: Ajv;
private validators: Map<string, ValidateFunction> = new Map();
constructor() {
this.ajv = new Ajv({ allErrors: true, coerceTypes: true });
}
// 注册Schema
registerSchema(schemaId: string, schema: object): void {
this.validators.set(schemaId, this.ajv.compile(schema));
}
// 验证事件
validate(event: Event): ValidationResult {
const validator = this.validators.get(event.eventType);
if (!validator) {
return { valid: true, errors: [] }; // 无Schema,无验证
}
const valid = validator(event);
if (!valid) {
return {
valid: false,
errors: validator.errors || []
};
}
return { valid: true, errors: [] };
}
}
// 使用示例
const validator = new SchemaValidator();
validator.registerSchema('FileModifiedEvent', FileModifiedEventSchema);
const result = validator.validate(event);
if (!result.valid) {
console.error('Event validation failed:', result.errors);
// 发送到DLQ
}对于嵌套结构的事件载荷,XPath(XML)或JSONPath(JSON)提供了强大的查询能力。
4.4.1 JSONPath路由
import * as jsonpath from 'jsonpath';
class JSONPathRouter {
// 根据JSONPath表达式路由事件
route(event: Event, routes: Map<string, EventHandler>): void {
const payload = event.payload;
for (const [pattern, handler] of routes.entries()) {
const results = jsonpath.query(payload, pattern);
if (results.length > 0) {
handler.handle(event);
}
}
}
// 示例路由配置
static createAIIdeRoutes(): Map<string, EventHandler> {
const routes = new Map<string, EventHandler>();
// 路由TypeScript文件修改到类型检查服务
routes.set('$.payload.fileUri[?(@.endsWith(".ts"))]', tsFileHandler);
// 路由AI相关事件到AI处理服务
routes.set('$.payload.triggeredBy.type', aiHandler);
// 路由特定项目的文件修改
routes.set('$.payload.fileUri[?(@.startsWith("/project/frontend"))]', frontendHandler);
return routes;
}
}4.4.2 多条件组合路由
class CompositeRouter {
private routes: RouteRule[] = [];
addRule(rule: RouteRule): void {
this.routes.push(rule);
}
async route(event: Event): Promise<void> {
const matchingRules = this.routes.filter(rule => rule.matches(event));
await Promise.all(
matchingRules.map(rule => rule.handler.handle(event))
);
}
}
interface RouteRule {
name: string;
conditions: FilterExpression[];
handler: EventHandler;
matches(event: Event): boolean;
}
// 示例:复杂路由规则
const rule = new CompositeRouteRule({
name: 'AIFileAnalysis',
conditions: [
{ field: 'payload.fileUri', operator: 'contains', value: '/ai-models/' },
{ field: 'payload.changeType', operator: 'in', value: ['CREATED', 'MODIFIED'] },
{ field: 'metadata.principalRoles', operator: 'contains', value: 'ai-developer' }
],
handler: aiFileAnalysisHandler
});当路由规则非常复杂或数量众多时,路由性能可能成为瓶颈。以下是几种优化策略:
4.5.1 规则编译优化
将过滤规则预先编译为可执行的函数,避免运行时解释:
class CompiledRouteOptimizer {
private compiledRules: Map<string, (event: Event) => boolean> = new Map();
compile(rules: RouteRule[]): void {
for (const rule of rules) {
const code = this.generateCode(rule.conditions);
const fn = new Function('event', code);
this.compiledRules.set(rule.name, fn);
}
}
private generateCode(conditions: FilterExpression[]): string {
const checks = conditions.map(c => this.generateConditionCode(c));
return `
return ${checks.join(' && ')};
`;
}
// 根据过滤表达式生成JS代码
// 实际实现中可以使用AST转换
}4.5.2 两阶段过滤
采用粗筛+精筛的两阶段过滤策略:

关键结论:事件过滤是精准投递的关键。通过内容过滤、Schema验证、JSONPath路由的组合,可以实现复杂而精确的事件路由,确保每个订阅者只接收真正感兴趣的事件。
本节为你提供的核心价值:掌握事务性事件的核心技术,理解Outbox模式如何保证事件发布与业务操作的原子性,以及Saga模式如何协调分布式事务。
在传统的事件驱动系统中,业务操作(如保存数据)和事件发布(如发布DomainEvent)通常是两个独立的操作。这导致了一个根本性问题:如何保证业务操作成功时事件一定被发布,以及业务操作失败时事件一定不被发布?
考虑以下场景:
// 场景:用户创建新项目
async function createProject(userId: string, projectName: string): Promise<void> {
// 1. 创建项目数据库记录
await db.projects.create({ userId, name: projectName });
// 2. 发布项目创建事件
await eventBus.publish(new ProjectCreatedEvent({ projectId, projectName }));
// 问题:如果步骤2失败,步骤1已经提交怎么办?
// 结果:数据库有新项目,但没有任何消费者知道
}这个问题在单体架构中可以通过数据库事务解决,但在微服务架构中,业务操作和事件发布涉及不同的资源(数据库和消息队列),无法纳入同一事务。
Outbox模式是解决业务操作与事件发布一致性的标准方案。其核心思想是:在同一个数据库事务中,既执行业务操作,又将待发布的事件写入一张专门的Outbox表。然后,一个独立的的中继进程(Relay)读取Outbox表,将事件发布到Event Bus。

5.2.1 Outbox表设计
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB,
-- 状态管理
status VARCHAR(50) NOT NULL DEFAULT 'PENDING',
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
processed_at TIMESTAMP,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 3,
error_message TEXT,
-- 顺序保证
sequence_number BIGSERIAL,
-- 索引
INDEX idx_outbox_status (status, created_at),
INDEX idx_outbox_aggregate (aggregate_type, aggregate_id)
);
CREATE TYPE outbox_event_status AS ENUM ('PENDING', 'PROCESSING', 'PUBLISHED', 'FAILED');5.2.2 Outbox模式实现
class OutboxTransactionalEventBus implements EventBus {
private db: Database;
private relay: OutboxRelay;
// 业务操作与事件写入在同一个事务中
async withTransaction<T>(
operation: (ctx: TransactionContext) => Promise<T>
): Promise<T> {
return this.db.transaction(async (tx) => {
// 创建一个组合上下文,包含数据库事务和事件收集
const ctx: TransactionContext = {
tx,
pendingEvents: []
};
// 执行业务操作
const result = await operation(ctx);
// 将待发布事件写入Outbox
for (const event of ctx.pendingEvents) {
await this.writeToOutbox(tx, event);
}
return result;
});
}
// 发布事件(仅收集,不立即发布)
async publish(event: Event): Promise<void> {
// 事件被收集到上下文中,稍后批量写入Outbox
this.getCurrentContext().pendingEvents.push(event);
}
// 写入Outbox表
private async writeToOutbox(tx: Transaction, event: Event): Promise<void> {
await tx.query(`
INSERT INTO outbox_events
(aggregate_type, aggregate_id, event_type, payload, metadata)
VALUES ($1, $2, $3, $4, $5)
`, [
event.aggregateType,
event.aggregateId,
event.eventType,
JSON.stringify(event.payload),
JSON.stringify(event.metadata)
]);
}
}5.2.3 Relay进程
Relay进程负责读取Outbox表并将事件发布到Event Bus:
class OutboxRelay {
private eventBus: EventBus;
private db: Database;
private pollingIntervalMs: number;
async start(): Promise<void> {
while (true) {
await this.processPendingEvents();
await this.sleep(this.pollingIntervalMs);
}
}
private async processPendingEvents(): Promise<void> {
// 1. 获取待处理事件(使用SELECT FOR UPDATE锁定)
const events = await this.db.query(`
SELECT * FROM outbox_events
WHERE status = 'PENDING'
ORDER BY sequence_number ASC
LIMIT 100
FOR UPDATE SKIP LOCKED
`);
// 2. 逐个发布
for (const row of events.rows) {
try {
// 标记为处理中
await this.markProcessing(row.id);
// 发布到Event Bus
const event = this.deserializeEvent(row);
await this.eventBus.publish(event);
// 标记为已发布
await this.markPublished(row.id);
} catch (error) {
// 处理失败
await this.handleError(row.id, error);
}
}
}
private async markProcessing(id: string): Promise<void> {
await this.db.query(`
UPDATE outbox_events
SET status = 'PROCESSING', processed_at = NOW()
WHERE id = $1
`, [id]);
}
private async markPublished(id: string): Promise<void> {
await this.db.query(`
UPDATE outbox_events
SET status = 'PUBLISHED'
WHERE id = $1
`, [id]);
}
private async handleError(id: string, error: Error): Promise<void> {
const retryCount = await this.getRetryCount(id);
if (retryCount >= 3) {
// 超过最大重试次数,标记为失败
await this.db.query(`
UPDATE outbox_events
SET status = 'FAILED', error_message = $2
WHERE id = $1
`, [id, error.message]);
} else {
// 重置状态,等待重试
await this.db.query(`
UPDATE outbox_events
SET status = 'PENDING', retry_count = retry_count + 1
WHERE id = $1
`, [id]);
}
}
}在微服务架构中,一个业务操作可能涉及多个服务的协调。例如,创建一个AI项目可能涉及:创建项目服务(创建项目记录)、权限服务(分配权限)、通知服务(发送欢迎邮件)、计费服务(扣除费用)。这些操作分布在不同的服务中,无法用传统事务协调。
Saga模式通过补偿事务来协调分布式操作:如果某个步骤失败,之前完成的步骤需要按反向顺序执行补偿操作。
5.3.1 Saga编排器
// Saga步骤定义
interface SagaStep<TInput, TOutput, TCompensation> {
name: string;
execute(input: TInput): Promise<TOutput>;
compensate(output: TOutput): Promise<void>;
}
// Saga编排器
class SagaOrchestrator {
private steps: SagaStep<any, any, any>[];
async execute(input: any): Promise<SagaResult> {
const completedSteps: { step: SagaStep<any, any, any>; output: any }[] = [];
try {
// 按顺序执行每个步骤
for (const step of this.steps) {
const output = await step.execute(input);
completedSteps.push({ step, output });
input = { ...input, ...output }; // 将输出合并到输入
}
return { success: true, completedSteps };
} catch (error) {
// 执行补偿事务
await this.compensate(completedSteps, error);
return { success: false, error, compensatedSteps: completedSteps };
}
}
private async compensate(
completedSteps: { step: SagaStep<any, any, any>; output: any }[],
originalError: Error
): Promise<void> {
// 按反向顺序执行补偿
for (const { step, output } of completedSteps.reverse()) {
try {
await step.compensate(output);
} catch (compensateError) {
// 记录补偿失败,但继续执行其他补偿
console.error(`Compensation failed for step ${step.name}:`, compensateError);
}
}
}
}5.3.2 AI IDE项目创建的Saga
// 定义创建AI IDE项目的Saga
const createProjectSaga = new SagaOrchestrator({
steps: [
{
name: 'createProject',
async execute(input: { userId: string; projectName: string }) {
// 调用项目服务
const project = await projectService.create({
userId: input.userId,
name: input.projectName
});
return { projectId: project.id };
},
async compensate(output: { projectId: string }) {
// 补偿:删除项目
await projectService.delete(output.projectId);
}
},
{
name: 'initializeRepository',
async execute(input: { projectId: string }) {
// 调用代码托管服务
const repo = await codeHostingService.initializeRepo({
projectId: input.projectId
});
return { repoId: repo.id };
},
async compensate(output: { repoId: string }) {
await codeHostingService.deleteRepo(output.repoId);
}
},
{
name: 'setupAIEnvironment',
async execute(input: { projectId: string }) {
// 配置AI开发环境
const env = await aiEnvironmentService.setup({
projectId: input.projectId,
templates: ['default', 'ai-assistant']
});
return { environmentId: env.id };
},
async compensate(output: { environmentId: string }) {
await aiEnvironmentService.cleanup(output.environmentId);
}
},
{
name: 'sendNotification',
async execute(input: { projectId: string; userId: string }) {
// 发送通知(注意:通知失败不应导致整个Saga回滚)
await notificationService.send({
type: 'PROJECT_CREATED',
userId: input.userId,
projectId: input.projectId
});
return { notified: true };
},
// 通知失败的补偿:发送撤回通知(如果支持的话)
async compensate(output: { notified: true }) {
// 通常通知的补偿是发送另一条通知
}
}
]
});
Outbox模式和Saga模式共同构成了最终一致性的保证。与传统ACID事务的强一致性不同,最终一致性允许系统在短暂的不一致状态后自动恢复一致。
5.4.1 一致性级别对比
特性 | 强一致性 | 最终一致性 |
|---|---|---|
定义 | 操作完成后立即一致 | 操作完成后在有限时间内一致 |
延迟 | 同步等待 | 异步传播 |
可用性 | 可能牺牲可用性 | 高可用性 |
实现复杂度 | 低 | 高 |
适用场景 | 金融交易、库存扣减 | 事件通知、状态同步 |
5.4.2 一致性窗口
最终一致性意味着存在一个一致性窗口——从业务操作完成到所有订阅者收到事件的时间间隔。这个窗口的大小取决于:
interface ConsistencyMetrics {
// 一致性窗口:从业务操作到所有订阅者收到事件的平均延迟
averageConsistencyWindowMs: number;
// 最大一致性窗口:最坏情况下的延迟
maxConsistencyWindowMs: number;
// 成功率:在一致性窗口内成功处理的比例
successRateWithinWindow: number;
}关键结论:在AI IDE系统中,大多数场景(如代码编辑同步、UI状态更新)可以接受最终一致性。只有少数核心场景(如权限变更、计费操作)可能需要更强的一致性保证。设计时应根据业务需求选择合适的一致性级别,避免过度设计。
本节为你提供的核心价值:掌握事件溯源的核心概念,理解如何通过不可变事件流实现完整的状态重建、时间旅行调试和审计日志。
事件溯源(Event Sourcing)是一种架构模式,它将应用程序状态的所有变化存储为不可变的事件序列,而不是仅存储当前状态。通过重放事件,可以重建系统的任何历史状态。
6.1.1 传统CRUD vs 事件溯源
传统CRUD应用的问题:


在传统应用中,更新操作会覆盖旧数据,我们无法知道"这个字段上次是什么值"。在事件溯源中,每个变化都是一条记录的历史事件,我们可以精确回溯任何时刻的状态。
6.1.2 事件溯源的核心优势
Event Store是事件溯源的核心存储引擎,负责存储和检索事件序列。
6.2.1 Event Store数据模型
// Event Store中的事件结构
interface StoredEvent {
// 事件标识
eventId: string; // 全局唯一ID
eventNumber: number; // 聚合内单调递增的序号
// 聚合根信息
aggregateType: string; // 聚合根类型
aggregateId: string; // 聚合根ID
// 事件内容
eventType: string; // 事件类型
eventData: object; // 事件数据(JSON)
// 元数据
metadata: {
timestamp: string; // 事件发生时间
userId: string; // 操作用户
correlationId: string; // 关联ID
causationId: string; // 因果ID
version: number; // 事件版本
};
// 系统字段
createdAt: string; // 入库时间
}
interface EventStream {
aggregateId: string;
aggregateType: string;
version: number; // 聚合根当前版本
events: StoredEvent[]; // 事件列表
isEnd: boolean; // 是否为最新流
}6.2.2 Event Store表结构
-- 事件存储表
CREATE TABLE events (
event_id UUID PRIMARY KEY,
event_number BIGINT NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- 唯一约束:每个聚合的每个事件序号唯一
UNIQUE (aggregate_id, event_number),
-- 索引:按聚合查询
INDEX idx_events_aggregate (aggregate_type, aggregate_id),
-- 索引:按时间范围查询
INDEX idx_events_timestamp (metadata->>'timestamp'),
-- 索引:按事件类型查询
INDEX idx_events_type (event_type)
);
-- 快照表(用于优化读取性能)
CREATE TABLE snapshots (
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
snapshot_version BIGINT NOT NULL,
state JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY (aggregate_id, aggregate_type, snapshot_version)
);
-- 投影状态表
CREATE TABLE projections (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
state JSONB NOT NULL,
last_processed_event_id UUID,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY (projection_name, projection_key)
);6.2.3 Event Store核心操作
class EventStore {
private db: Database;
private snapshotStore: SnapshotStore;
// 追加新事件
async appendEvent(aggregateId: string, event: Event): Promise<void> {
await this.db.transaction(async (tx) => {
// 1. 获取当前聚合版本
const currentVersion = await this.getAggregateVersion(tx, aggregateId);
// 2. 乐观锁检查:确保没有并发冲突
if (event.metadata.version !== currentVersion + 1) {
throw new OptimisticConcurrencyError(
`Expected version ${currentVersion + 1}, got ${event.metadata.version}`
);
}
// 3. 写入事件
await tx.query(`
INSERT INTO events
(event_id, event_number, aggregate_type, aggregate_id,
event_type, event_data, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, [
event.eventId,
event.metadata.version,
event.aggregateType,
aggregateId,
event.eventType,
JSON.stringify(event.eventData),
JSON.stringify(event.metadata)
]);
// 4. 更新快照(如果需要)
await this.maybeUpdateSnapshot(tx, aggregateId, currentVersion + 1);
});
}
// 读取聚合的所有事件
async readStream(
aggregateId: string,
options?: { fromVersion?: number; toVersion?: number }
): Promise<EventStream> {
const { fromVersion = 0, toVersion = Infinity } = options || {};
const events = await this.db.query(`
SELECT * FROM events
WHERE aggregate_id = $1
AND event_number > $2
AND event_number <= $3
ORDER BY event_number ASC
`, [aggregateId, fromVersion, toVersion]);
const latestVersion = await this.getAggregateVersion(null, aggregateId);
return {
aggregateId,
aggregateType: events.rows[0]?.aggregate_type || '',
version: latestVersion,
events: events.rows.map(this.deserializeEvent),
isEnd: events.rows.length === 0 ||
events.rows[events.rows.length - 1].event_number >= latestVersion
};
}
// 获取聚合当前版本
private async getAggregateVersion(
tx: Transaction | null,
aggregateId: string
): Promise<number> {
const query = tx ? tx.query.bind(tx) : this.db.query;
const result = await query(`
SELECT COALESCE(MAX(event_number), 0) as version
FROM events
WHERE aggregate_id = $1
`, [aggregateId]);
return result.rows[0].version;
}
}6.3.1 基础状态重建
通过重放事件序列重建聚合根状态:
class AggregateRoot {
abstract aggregateType: string;
protected events: Event[] = [];
// 抽象方法:子类实现事件应用逻辑
abstract apply(event: Event): void;
// 从事件历史重建状态
static fromHistory<T extends AggregateRoot>(
this: new () => T,
events: Event[]
): T {
const aggregate = new this();
for (const event of events) {
aggregate.apply(event);
}
return aggregate;
}
}
// 具体聚合根实现
class ProjectAggregate extends AggregateRoot {
aggregateType = 'Project';
public projectId: string = '';
public name: string = '';
public status: ProjectStatus = 'DRAFT';
public files: Map<string, FileState> = new Map();
public lastModified: Date | null = null;
// 事件应用方法
apply(event: Event): void {
this.events.push(event);
switch (event.eventType) {
case 'ProjectCreatedEvent':
this.applyProjectCreated(event as ProjectCreatedEvent);
break;
case 'ProjectRenamedEvent':
this.applyProjectRenamed(event as ProjectRenamedEvent);
break;
case 'FileAddedEvent':
this.applyFileAdded(event as FileAddedEvent);
break;
case 'FileRemovedEvent':
this.applyFileRemoved(event as FileRemovedEvent);
break;
case 'ProjectClosedEvent':
this.applyProjectClosed(event as ProjectClosedEvent);
break;
}
}
private applyProjectCreated(event: ProjectCreatedEvent): void {
this.projectId = event.payload.projectId;
this.name = event.payload.name;
this.status = 'ACTIVE';
}
private applyProjectRenamed(event: ProjectRenamedEvent): void {
this.name = event.payload.newName;
}
private applyFileAdded(event: FileAddedEvent): void {
this.files.set(event.payload.fileUri, {
uri: event.payload.fileUri,
content: event.payload.initialContent,
language: event.payload.language
});
this.lastModified = event.occurredAt;
}
private applyFileRemoved(event: FileRemovedEvent): void {
this.files.delete(event.payload.fileUri);
this.lastModified = event.occurredAt;
}
private applyProjectClosed(event: ProjectClosedEvent): void {
this.status = 'CLOSED';
}
}6.3.2 快照优化
当事件数量非常多时,每次重建状态都需要重放大量事件。快照机制通过定期保存状态来优化这一过程:
class SnapshotStore {
private db: Database;
private snapshotInterval: number = 10; // 每10个事件保存一次快照
async maybeUpdateSnapshot(
aggregateId: string,
currentVersion: number,
aggregateState: object
): Promise<void> {
// 只有当版本是快照间隔的倍数时才保存快照
if (currentVersion % this.snapshotInterval !== 0) {
return;
}
await this.db.query(`
INSERT INTO snapshots
(aggregate_id, aggregate_type, snapshot_version, state)
VALUES ($1, $2, $3, $4)
`, [
aggregateId,
this.aggregateType,
currentVersion,
JSON.stringify(aggregateState)
]);
}
async getSnapshot(aggregateId: string): Promise<Snapshot | null> {
const result = await this.db.query(`
SELECT * FROM snapshots
WHERE aggregate_id = $1
ORDER BY snapshot_version DESC
LIMIT 1
`, [aggregateId]);
return result.rows[0] || null;
}
// 从快照恢复(只重放快照之后的events)
async restoreWithSnapshot<T extends AggregateRoot>(
this: new () => T,
aggregateId: string
): Promise<T> {
// 1. 获取最新快照
const snapshot = await this.getSnapshot(aggregateId);
// 2. 确定需要重放的事件范围
const fromVersion = snapshot ? snapshot.snapshotVersion : 0;
// 3. 获取快照后的增量事件
const eventStore = new EventStore(this.db);
const stream = await eventStore.readStream(aggregateId, {
fromVersion,
toVersion: Infinity
});
// 4. 从快照恢复状态
const aggregate = new this();
if (snapshot) {
Object.assign(aggregate, snapshot.state);
}
// 5. 应用增量事件
for (const event of stream.events) {
aggregate.apply(event);
}
return aggregate;
}
}事件溯源的一个强大特性是可以从同一事件流构建多个不同的读模型(投影)。

6.4.1 投影处理器
interface ProjectionDefinition<S> {
name: string;
initialState: S;
// 事件处理方法
handlers: {
[eventType: string]: (state: S, event: Event) => S;
};
}
class ProjectionProcessor {
private db: Database;
private projections: Map<string, ProjectionDefinition<any>>;
// 注册投影
registerProjection<S>(projection: ProjectionDefinition<S>): void {
this.projections.set(projection.name, projection);
}
// 投影初始化
async initializeProjection(projectionName: string): Promise<void> {
const projection = this.projections.get(projectionName);
if (!projection) {
throw new Error(`Projection not found: ${projectionName}`);
}
// 读取所有历史事件并重放
const eventStore = new EventStore(this.db);
const allEvents = await eventStore.readAllEvents();
let state = projection.initialState;
for (const event of allEvents) {
const handler = projection.handlers[event.eventType];
if (handler) {
state = handler(state, event);
}
}
// 保存投影状态
await this.saveProjectionState(projectionName, state);
}
// 处理新事件(实时更新投影)
async processEvent(event: Event): Promise<void> {
for (const [name, projection] of this.projections) {
const handler = projection.handlers[event.eventType];
if (!handler) continue;
// 读取当前投影状态
const currentState = await this.loadProjectionState(name);
// 应用事件
const newState = handler(currentState, event);
// 保存新状态
await this.saveProjectionState(name, newState);
}
}
}6.4.2 AI IDE投影示例
// 项目活动摘要投影
const projectActivityProjection: ProjectionDefinition<ProjectActivityState> = {
name: 'project_activity_summary',
initialState: {
totalProjects: 0,
activeProjects: 0,
closedProjects: 0,
recentProjects: [],
projectsByUser: new Map()
},
handlers: {
'ProjectCreatedEvent': (state, event: ProjectCreatedEvent) => {
const project = {
projectId: event.payload.projectId,
name: event.payload.name,
createdAt: event.occurredAt,
userId: event.payload.userId,
activityScore: 100
};
return {
...state,
totalProjects: state.totalProjects + 1,
activeProjects: state.activeProjects + 1,
recentProjects: [project, ...state.recentProjects].slice(0, 100),
projectsByUser: new Map(state.projectsByUser).set(
event.payload.userId,
[...(state.projectsByUser.get(event.payload.userId) || []), project]
)
};
},
'ProjectClosedEvent': (state, event: ProjectClosedEvent) => ({
...state,
activeProjects: state.activeProjects - 1,
closedProjects: state.closedProjects + 1
}),
'FileModifiedEvent': (state, event: FileModifiedEvent) => {
const userId = event.payload.triggeredBy.identifier;
const userProjects = state.projectsByUser.get(userId) || [];
// 更新项目的活跃度分数
const updatedProjects = userProjects.map(p => {
if (p.projectId === event.aggregateId) {
return { ...p, activityScore: p.activityScore + 10 };
}
return p;
});
return {
...state,
projectsByUser: new Map(state.projectsByUser).set(userId, updatedProjects)
};
}
}
};
// 文件搜索索引投影
const fileSearchIndexProjection: ProjectionDefinition<FileSearchIndex> = {
name: 'file_search_index',
initialState: {
files: new Map(),
index: {
byName: new MultiMap<string, string>(),
byLanguage: new MultiMap<string, string>(),
byContent: new TrieIndex()
}
},
handlers: {
'FileAddedEvent': (state, event: FileAddedEvent) => {
const fileUri = event.payload.fileUri;
const fileName = extractFileName(fileUri);
const language = event.payload.language;
// 更新索引
const newIndex = { ...state.index };
newIndex.byName.add(fileName, fileUri);
newIndex.byLanguage.add(language, fileUri);
// 提取并索引文件内容关键词
const keywords = extractKeywords(event.payload.initialContent);
for (const keyword of keywords) {
newIndex.byContent.insert(keyword, fileUri);
}
return {
...state,
files: new Map(state.files).set(fileUri, {
uri: fileUri,
name: fileName,
language,
content: event.payload.initialContent,
addedAt: event.occurredAt
}),
index: newIndex
};
},
'FileModifiedEvent': (state, event: FileModifiedEvent) => {
const fileUri = event.payload.fileUri;
const existingFile = state.files.get(fileUri);
if (!existingFile) return state;
// 更新内容索引
const newIndex = { ...state.index };
newIndex.byContent.remove(existingFile.content, fileUri);
const newKeywords = extractKeywords(event.payload.newContent);
for (const keyword of newKeywords) {
newIndex.byContent.insert(keyword, fileUri);
}
return {
...state,
files: new Map(state.files).set(fileUri, {
...existingFile,
content: event.payload.newContent,
lastModified: event.occurredAt
}),
index: newIndex
};
},
'FileRemovedEvent': (state, event: FileRemovedEvent) => {
const fileUri = event.payload.fileUri;
const existingFile = state.files.get(fileUri);
if (!existingFile) return state;
// 从索引中移除
const newIndex = { ...state.index };
newIndex.byName.remove(existingFile.name, fileUri);
newIndex.byLanguage.remove(existingFile.language, fileUri);
newIndex.byContent.remove(existingFile.content, fileUri);
const newFiles = new Map(state.files);
newFiles.delete(fileUri);
return {
...state,
files: newFiles,
index: newIndex
};
}
}
};随着业务演进,事件结构可能需要变更。事件版本管理确保旧事件可以被正确解释和应用。
6.5.1 事件版本策略
interface VersionedEvent {
eventType: string;
eventVersion: number; // 事件版本号
migrate?: (data: any) => any; // 迁移函数
}
// 事件升级器
class EventUpgrader {
private migrations: Map<string, Map<number, (data: any) => any>> = new Map();
// 注册迁移
registerMigration(
eventType: string,
fromVersion: number,
toVersion: number,
migration: (data: any) => any
): void {
if (!this.migrations.has(eventType)) {
this.migrations.set(eventType, new Map());
}
this.migrations.get(eventType)!.set(fromVersion, migration);
}
// 升级事件到最新版本
upgrade(event: StoredEvent, targetVersion: number = 1): StoredEvent {
let currentVersion = event.metadata?.version || 1;
if (currentVersion >= targetVersion) {
return event; // 无需升级
}
let eventData = event.eventData;
// 逐步升级到目标版本
while (currentVersion < targetVersion) {
const migration = this.migrations
.get(event.eventType)
?.get(currentVersion);
if (!migration) {
throw new Error(`No migration found for ${event.eventType} from v${currentVersion} to v${currentVersion + 1}`);
}
eventData = migration(eventData);
currentVersion++;
}
return {
...event,
eventData,
metadata: {
...event.metadata,
version: targetVersion,
upgradedFrom: event.metadata?.version || 1
}
};
}
}
// 示例:为FileModifiedEvent注册升级迁移
eventUpgrader.registerMigration(
'FileModifiedEvent',
1, // 从版本1
2, // 升级到版本2
(data: FileModifiedEventV1) => ({
...data,
// V2新增字段:changeType自动推断
changeType: data.changeType || inferChangeType(data.oldContent, data.newContent),
// V2新增字段:变更原因
changeReason: data.changeReason || null
})
);
eventUpgrader.registerMigration(
'FileModifiedEvent',
2, // 从版本2
3, // 升级到版本3
(data: FileModifiedEventV2) => ({
...data,
// V3新增字段:编码检测
encoding: data.encoding || detectEncoding(data.newContent),
// V3变更:language字段现在必填
language: data.language || detectLanguage(data.newContent)
})
);关键结论:事件溯源提供了无与伦比的状态重建能力和审计能力。通过精心设计的Event Store、快照优化、投影构建和版本迁移,可以构建一个既灵活又可靠的事件驱动系统。
本节为你提供的核心价值:掌握CQRS模式的核心思想,理解读写分离如何提升系统性能,以及投影如何将事件流转换为可读的视图模型。
CQRS(Command Query Responsibility Segregation,命令查询职责分离)是一种架构模式,它将读操作和写操作分离到不同的模型中。在传统架构中,同一个模型既处理命令(写)又处理查询(读);在CQRS中,命令模型和查询模型是分离的。

在AI IDE系统中,读写分离的必要性尤为明显:
写模型优化 vs 读模型优化:
不同的性能特性:
复杂查询 vs 简单查询:
命令端处理所有写操作,包括创建、更新、删除操作。
7.3.1 命令定义
// 命令基类
abstract class Command {
abstract readonly commandType: string;
abstract validate(): ValidationResult;
}
// 具体命令
class CreateProjectCommand extends Command {
readonly commandType = 'CreateProject';
constructor(
public readonly userId: string,
public readonly projectName: string,
public readonly template?: string
) {
super();
}
validate(): ValidationResult {
const errors: string[] = [];
if (!this.projectName || this.projectName.trim().length === 0) {
errors.push('Project name is required');
}
if (this.projectName && this.projectName.length > 255) {
errors.push('Project name too long');
}
return { valid: errors.length === 0, errors };
}
}
class RenameFileCommand extends Command {
readonly commandType = 'RenameFile';
constructor(
public readonly fileUri: string,
public readonly newName: string,
public readonly userId: string
) {
super();
}
validate(): ValidationResult {
const errors: string[] = [];
if (!this.fileUri.startsWith('file://')) {
errors.push('Invalid file URI');
}
if (!this.newName || !isValidFileName(this.newName)) {
errors.push('Invalid file name');
}
return { valid: errors.length === 0, errors };
}
}7.3.2 命令处理器
class CommandHandler {
private commandHandlers: Map<string, CommandHandlerFn> = new Map();
private eventStore: EventStore;
private outboxBus: OutboxTransactionalEventBus;
// 注册命令处理器
registerHandler(commandType: string, handler: CommandHandlerFn): void {
this.commandHandlers.set(commandType, handler);
}
// 处理命令
async handle(command: Command): Promise<CommandResult> {
// 1. 验证命令
const validation = command.validate();
if (!validation.valid) {
return { success: false, errors: validation.errors };
}
// 2. 获取对应处理器
const handler = this.commandHandlers.get(command.commandType);
if (!handler) {
return { success: false, errors: [`No handler for command: ${command.commandType}`] };
}
// 3. 在事务中执行
try {
const result = await this.outboxBus.withTransaction(async (ctx) => {
return await handler(command, ctx);
});
return { success: true, result };
} catch (error) {
return {
success: false,
errors: [error instanceof Error ? error.message : String(error)]
};
}
}
}
// CreateProject命令处理器
commandHandler.registerHandler('CreateProject', async (command, ctx) => {
const cmd = command as CreateProjectCommand;
// 创建项目聚合根
const project = ProjectAggregate.create({
name: cmd.projectName,
userId: cmd.userId,
template: cmd.template
});
// 获取未提交的事件
const events = project.getUncommittedEvents();
// 发布事件(写入Outbox)
for (const event of events) {
await ctx.pendingEvents.push(event);
}
return { projectId: project.projectId };
});查询端处理所有读操作,直接从投影/视图模型读取数据。
7.4.1 读模型仓储
class ReadModelRepository<T> {
constructor(
private db: Database,
private projectionName: string
) {}
// 根据ID获取单个聚合
async findById(id: string): Promise<T | null> {
const result = await this.db.query(`
SELECT state
FROM projections
WHERE projection_name = $1 AND projection_key = $2
`, [this.projectionName, id]);
return result.rows[0]?.state || null;
}
// 获取所有聚合
async findAll(options?: { limit?: number; offset?: number }): Promise<T[]> {
const { limit = 100, offset = 0 } = options || {};
const result = await this.db.query(`
SELECT state
FROM projections
WHERE projection_name = $1
ORDER BY updated_at DESC
LIMIT $2 OFFSET $3
`, [this.projectionName, limit, offset]);
return result.rows.map(r => r.state);
}
// 条件查询
async findWhere(conditions: Record<string, any>): Promise<T[]> {
const { whereClause, params } = this.buildWhereClause(conditions);
const result = await this.db.query(`
SELECT state
FROM projections
WHERE projection_name = $1 AND ${whereClause}
`, [this.projectionName, ...params]);
return result.rows.map(r => r.state);
}
}7.4.2 读模型服务
// 项目查询服务
class ProjectQueryService {
private projectRepo: ReadModelRepository<ProjectReadModel>;
constructor(db: Database) {
this.projectRepo = new ReadModelRepository<ProjectReadModel>(db, 'project');
}
// 获取项目详情
async getProject(projectId: string): Promise<ProjectReadModel | null> {
return this.projectRepo.findById(projectId);
}
// 获取用户的项目列表
async getUserProjects(userId: string): Promise<ProjectReadModel[]> {
return this.projectRepo.findWhere({ userId });
}
// 获取最近打开的项目
async getRecentProjects(limit: number = 10): Promise<ProjectReadModel[]> {
const projects = await this.projectRepo.findAll({ limit });
return projects
.filter(p => p.status !== 'ARCHIVED')
.sort((a, b) => b.lastAccessedAt - a.lastAccessedAt);
}
// 获取项目统计
async getProjectStats(projectId: string): Promise<ProjectStats> {
const project = await this.getProject(projectId);
if (!project) {
throw new Error(`Project not found: ${projectId}`);
}
const fileCount = await this.db.query(`
SELECT COUNT(*) as count
FROM projections
WHERE projection_name = 'file'
AND state->>'projectId' = $1
`, [projectId]);
const aiSessionCount = await this.db.query(`
SELECT COUNT(*) as count
FROM projections
WHERE projection_name = 'ai_session'
AND state->>'projectId' = $1
`, [projectId]);
return {
projectId,
fileCount: parseInt(fileCount.rows[0].count),
aiSessionCount: parseInt(aiSessionCount.rows[0].count),
totalEdits: project.totalEdits || 0,
lastActivity: project.lastActivity
};
}
}
// 文件查询服务
class FileQueryService {
private fileRepo: ReadModelRepository<FileReadModel>;
constructor(db: Database) {
this.fileRepo = new ReadModelRepository<FileReadModel>(db, 'file');
}
// 获取文件详情
async getFile(fileUri: string): Promise<FileReadModel | null> {
return this.fileRepo.findById(fileUri);
}
// 搜索文件
async searchFiles(query: string, options?: SearchOptions): Promise<FileSearchResult[]> {
const { projectId, language, limit = 50 } = options || {};
let sql = `
SELECT state,
ts_rank(search_index, query) as rank
FROM file_search_index,
to_tsquery('english', $1) query
WHERE search_index @@ query
`;
const params: any[] = [query];
if (projectId) {
params.push(projectId);
sql += ` AND state->>'projectId' = $${params.length}`;
}
if (language) {
params.push(language);
sql += ` AND state->>'language' = $${params.length}`;
}
params.push(limit);
sql += ` ORDER BY rank DESC LIMIT $${params.length}`;
const result = await this.db.query(sql, params);
return result.rows.map(r => ({
file: r.state,
score: r.rank
}));
}
// 获取项目文件树
async getFileTree(projectId: string): Promise<FileTreeNode[]> {
const files = await this.fileRepo.findWhere({ projectId });
// 构建文件树
const root: FileTreeNode = { name: '/', type: 'directory', children: [] };
const pathMap = new Map<string, FileTreeNode>();
pathMap.set(projectId, root);
for (const file of files) {
const pathParts = file.relativePath.split('/');
let currentPath = projectId;
let currentNode = root;
for (let i = 0; i < pathParts.length; i++) {
const part = pathParts[i];
currentPath = `${currentPath}/${part}`;
if (!pathMap.has(currentPath)) {
const isDir = i < pathParts.length - 1 || file.type === 'directory';
const node: FileTreeNode = {
name: part,
type: isDir ? 'directory' : 'file',
children: isDir ? [] : undefined,
fileUri: isDir ? undefined : file.uri
};
pathMap.set(currentPath, node);
currentNode.children!.push(node);
}
currentNode = pathMap.get(currentPath)!;
}
}
return root.children || [];
}
}在CQRS架构中,命令端产生的事件需要被投影处理器消费以更新读模型。这一同步过程需要仔细设计。
7.5.1 同步策略对比
策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
同步更新 | 延迟低、一致性强 | 可能成为瓶颈 | 实时性要求高的场景 |
异步更新 | 吞吐量大、解耦 | 短期不一致 | 大多数场景 |
混合策略 | 灵活 | 复杂度高 | 复杂系统 |
7.5.2 投影同步处理器
class ProjectionSynchronizer {
private eventStore: EventStore;
private projections: Map<string, ProjectionHandler>;
private db: Database;
// 启动同步
async start(): Promise<void> {
// 获取上次同步位置
const checkpoint = await this.getCheckpoint();
// 从检查点开始消费事件
await this.consumeEvents(checkpoint);
}
// 消费事件并更新投影
private async consumeEvents(fromEventNumber: number): Promise<void> {
let currentEventNumber = fromEventNumber;
while (true) {
// 批量获取事件
const events = await this.eventStore.getEventsAfter(
currentEventNumber,
{ batchSize: 100 }
);
if (events.length === 0) {
// 等待新事件
await this.waitForNewEvents();
continue;
}
// 在事务中处理批量事件
await this.db.transaction(async (tx) => {
for (const event of events) {
await this.processEvent(event, tx);
currentEventNumber = event.eventNumber;
}
// 更新检查点
await this.saveCheckpoint(tx, currentEventNumber);
});
}
}
// 处理单个事件
private async processEvent(event: Event, tx: Transaction): Promise<void> {
for (const [projectionName, handler] of this.projections) {
try {
await handler.handle(event, tx);
} catch (error) {
// 记录错误但不中断处理
await this.recordProjectionError(projectionName, event, error);
}
}
}
}关键结论:CQRS通过分离读写模型,允许分别优化命令端和查询端。命令端关注事务一致性和业务规则,查询端关注查询性能和视图丰富度。两者通过事件流进行同步,保持最终一致性。
本节为你提供的核心价值:通过完整的代码实现,展示如何构建一个生产级的支持事务的Event Bus,包括事件发布、订阅管理、死信处理、事务性保证。
我们的Event Bus将包含以下核心组件:

// 事件接口
interface IEvent {
readonly eventId: string;
readonly eventType: string;
readonly occurredAt: Date;
readonly aggregateType: string;
readonly aggregateId: string;
readonly payload: Readonly<object>;
readonly metadata: Readonly<EventMetadata>;
}
// 事件处理器接口
interface IEventHandler {
handle(event: IEvent): Promise<void>;
}
// 订阅接口
interface ISubscription {
readonly id: string;
readonly topic: string;
readonly handler: IEventHandler;
readonly options: SubscriptionOptions;
}
// 事件过滤器接口
interface IEventFilter {
matches(event: IEvent): boolean;
}
// Event Bus接口
interface IEventBus {
// 发布事件
publish(event: IEvent): Promise<void>;
// 批量发布事件
publishBatch(events: IEvent[]): Promise<void>;
// 订阅
subscribe(
topic: string,
handler: IEventHandler,
options?: Partial<SubscriptionOptions>
): Promise<ISubscription>;
// 取消订阅
unsubscribe(subscriptionId: string): Promise<void>;
// 事务性发布(与业务操作原子)
withTransaction<T>(
operation: (ctx: TransactionContext) => Promise<T>
): Promise<T>;
}以下是Event Bus的完整TypeScript实现:
// ============================================================
// 核心类型定义
// ============================================================
interface EventMetadata {
readonly eventId: string;
readonly eventType: string;
readonly occurredAt: Date;
readonly aggregateType: string;
readonly aggregateId: string;
readonly version: number;
readonly correlationId?: string;
readonly causationId?: string;
readonly traceId?: string;
readonly principalId?: string;
readonly tenantId?: string;
}
interface SubscriptionOptions {
readonly subscriptionType: 'exclusive' | 'shared' | 'keyed';
readonly partitionKey?: string;
readonly maxRetries: number;
readonly retryDelayMs: number;
readonly backoffMultiplier: number;
readonly deadLetterTopic?: string;
readonly ackTimeoutMs: number;
readonly autoAck: boolean;
readonly filter?: IEventFilter;
}
interface TransactionContext {
readonly transactionId: string;
pendingEvents: IEvent[];
}
// ============================================================
// 事件基类
// ============================================================
abstract class BaseEvent implements IEvent {
public readonly eventId: string;
public readonly occurredAt: Date;
public readonly metadata: EventMetadata;
constructor(
public readonly eventType: string,
public readonly aggregateType: string,
public readonly aggregateId: string,
public readonly payload: Readonly<object>,
version: number,
options: {
correlationId?: string;
causationId?: string;
traceId?: string;
principalId?: string;
tenantId?: string;
} = {}
) {
this.eventId = `evt_${this.generateId()}`;
this.occurredAt = new Date();
this.metadata = {
eventId: this.eventId,
eventType: this.eventType,
occurredAt: this.occurredAt,
aggregateType: this.aggregateType,
aggregateId: this.aggregateId,
version,
correlationId: options.correlationId,
causationId: options.causationId,
traceId: options.traceId,
principalId: options.principalId,
tenantId: options.tenantId
};
}
private generateId(): string {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => {
const r = Math.random() * 16 | 0;
const v = c === 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
}
// ============================================================
// 事件处理器基类
// ============================================================
abstract class BaseEventHandler implements IEventHandler {
public readonly name: string;
protected logger: Logger;
constructor(name: string) {
this.name = name;
this.logger = new Logger(name);
}
abstract handle(event: IEvent): Promise<void>;
protected async withErrorHandling(
event: IEvent,
handler: () => Promise<void>
): Promise<void> {
try {
await handler();
this.logger.debug(`Handled event ${event.eventType}`, {
eventId: event.eventId,
aggregateId: event.aggregateId
});
} catch (error) {
this.logger.error(`Error handling event ${event.eventType}`, {
eventId: event.eventId,
error: error instanceof Error ? error.message : String(error)
});
throw error;
}
}
}
// ============================================================
// 订阅注册表
// ============================================================
class SubscriptionRegistry {
private subscriptions: Map<string, ISubscription[]> = new Map();
private subscriptionsById: Map<string, ISubscription> = new Map();
private wildcardSubscriptions: ISubscription[] = [];
register(subscription: ISubscription): void {
// 按精确Topic存储
const existing = this.subscriptions.get(subscription.topic) || [];
existing.push(subscription);
this.subscriptions.set(subscription.topic, existing);
// 按ID索引
this.subscriptionsById.set(subscription.id, subscription);
// 通配符订阅单独存储
if (subscription.topic.includes('*')) {
this.wildcardSubscriptions.push(subscription);
}
}
deregister(subscriptionId: string): void {
const subscription = this.subscriptionsById.get(subscriptionId);
if (!subscription) return;
// 从精确Topic列表移除
const topicSubs = this.subscriptions.get(subscription.topic);
if (topicSubs) {
const index = topicSubs.findIndex(s => s.id === subscriptionId);
if (index !== -1) topicSubs.splice(index, 1);
}
// 从通配符列表移除
const wcIndex = this.wildcardSubscriptions.findIndex(s => s.id === subscriptionId);
if (wcIndex !== -1) this.wildcardSubscriptions.splice(wcIndex, 1);
// 从ID索引移除
this.subscriptionsById.delete(subscriptionId);
}
findSubscriptions(topic: string): ISubscription[] {
const results: ISubscription[] = [];
// 精确匹配
const exactMatches = this.subscriptions.get(topic) || [];
results.push(...exactMatches);
// 通配符匹配
for (const sub of this.wildcardSubscriptions) {
if (this.matchWildcard(topic, sub.topic)) {
results.push(sub);
}
}
return results;
}
getAllSubscriptions(): ISubscription[] {
return Array.from(this.subscriptionsById.values());
}
private matchWildcard(topic: string, pattern: string): boolean {
// 简单通配符匹配:* 匹配任意字符
const regex = new RegExp(
'^' + pattern.replace(/\*/g, '.*').replace(/\?/g, '.') + '$'
);
return regex.test(topic);
}
}
// ============================================================
// 死信工厂
// ============================================================
interface DeadLetteredEvent {
originalEvent: IEvent;
error: {
message: string;
stack?: string;
type: string;
};
context: {
failedAt: Date;
retryCount: number;
lastHandler?: string;
originalTopic: string;
};
metadata: {
deadLetterId: string;
deadLetteredAt: Date;
reason: string;
};
}
class DeadLetterFactory {
private deadLetterTopic = 'dead_letter';
private maxDeadLetterSize = 10000;
private deadLetters: DeadLetteredEvent[] = [];
private onDeadLetter?: (dlq: DeadLetteredEvent) => void;
setOnDeadLetter(callback: (dlq: DeadLetteredEvent) => void): void {
this.onDeadLetter = callback;
}
createDeadLetteredEvent(
event: IEvent,
error: Error,
context: {
retryCount: number;
lastHandler?: string;
originalTopic: string;
}
): DeadLetteredEvent {
const dlq: DeadLetteredEvent = {
originalEvent: event,
error: {
message: error.message,
stack: error.stack,
type: error.constructor.name
},
context: {
failedAt: new Date(),
retryCount: context.retryCount,
lastHandler: context.lastHandler,
originalTopic: context.originalTopic
},
metadata: {
deadLetterId: `dlq_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
deadLetteredAt: new Date(),
reason: this.classifyError(error)
}
};
// 存储死信
if (this.deadLetters.length >= this.maxDeadLetterSize) {
this.deadLetters.shift(); // 移除最老的
}
this.deadLetters.push(dlq);
// 触发回调
if (this.onDeadLetter) {
this.onDeadLetter(dlq);
}
return dlq;
}
getDeadLetters(options?: { limit?: number; offset?: number }): DeadLetteredEvent[] {
const { limit = 100, offset = 0 } = options || {};
return this.deadLetters.slice(offset, offset + limit);
}
private classifyError(error: Error): string {
if (error instanceof ValidationError) return 'VALIDATION_ERROR';
if (error instanceof NotFoundError) return 'NOT_FOUND';
if (error instanceof ConflictError) return 'CONFLICT';
if (error instanceof TimeoutError) return 'TIMEOUT';
if (error instanceof UnauthorizedError) return 'UNAUTHORIZED';
return 'UNKNOWN';
}
}
// ============================================================
// 事件过滤器
// ============================================================
class EventFilter implements IEventFilter {
private conditions: FilterCondition[];
constructor(conditions: FilterCondition[]) {
this.conditions = conditions;
}
matches(event: IEvent): boolean {
if (this.conditions.length === 0) return true;
return this.conditions.every(condition =>
this.evaluateCondition(event, condition)
);
}
private evaluateCondition(event: IEvent, condition: FilterCondition): boolean {
const value = this.getFieldValue(event, condition.field);
switch (condition.operator) {
case 'eq': return value === condition.value;
case 'neq': return value !== condition.value;
case 'gt': return value > condition.value;
case 'gte': return value >= condition.value;
case 'lt': return value < condition.value;
case 'lte': return value <= condition.value;
case 'contains': return String(value).includes(String(condition.value));
case 'startsWith': return String(value).startsWith(String(condition.value));
case 'endsWith': return String(value).endsWith(String(condition.value));
case 'in': return Array.isArray(condition.value) && condition.value.includes(value);
case 'exists': return value !== undefined && value !== null;
default: return false;
}
}
private getFieldValue(event: IEvent, path: string): any {
const parts = path.split('.');
let value: any = event;
for (const part of parts) {
if (value === null || value === undefined) return undefined;
value = value[part];
}
return value;
}
}
interface FilterCondition {
field: string;
operator: 'eq' | 'neq' | 'gt' | 'gte' | 'lt' | 'lte' | 'contains' | 'startsWith' | 'endsWith' | 'in' | 'exists';
value: any;
}
// ============================================================
// 事务管理器
// ============================================================
interface Transaction {
id: string;
startedAt: Date;
committedAt?: Date;
rolledBackAt?: Date;
}
class TransactionManager {
private activeTransactions: Map<string, Transaction> = new Map();
private transactionContexts: Map<string, TransactionContext> = new Map();
beginTransaction(): TransactionContext {
const transactionId = `tx_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const transaction: Transaction = {
id: transactionId,
startedAt: new Date()
};
const context: TransactionContext = {
transactionId,
pendingEvents: []
};
this.activeTransactions.set(transactionId, transaction);
this.transactionContexts.set(transactionId, context);
return context;
}
getContext(transactionId: string): TransactionContext | undefined {
return this.transactionContexts.get(transactionId);
}
commitTransaction(transactionId: string): IEvent[] {
const context = this.transactionContexts.get(transactionId);
if (!context) {
throw new Error(`Transaction not found: ${transactionId}`);
}
const transaction = this.activeTransactions.get(transactionId);
if (transaction) {
transaction.committedAt = new Date();
this.activeTransactions.delete(transactionId);
}
this.transactionContexts.delete(transactionId);
return context.pendingEvents;
}
rollbackTransaction(transactionId: string): void {
const transaction = this.activeTransactions.get(transactionId);
if (transaction) {
transaction.rolledBackAt = new Date();
this.activeTransactions.delete(transactionId);
}
this.transactionContexts.delete(transactionId);
}
isActive(transactionId: string): boolean {
return this.activeTransactions.has(transactionId);
}
}
// ============================================================
// 主Event Bus实现
// ============================================================
class TransactionalEventBus implements IEventBus {
private subscriptions: SubscriptionRegistry;
private deadLetterFactory: DeadLetterFactory;
private transactionManager: TransactionManager;
private eventStore: Map<string, IEvent[]> = new Map();
private outbox: IEvent[] = [];
private defaultSubscriptionOptions: SubscriptionOptions = {
subscriptionType: 'shared',
maxRetries: 3,
retryDelayMs: 1000,
backoffMultiplier: 2,
ackTimeoutMs: 30000,
autoAck: true
};
constructor() {
this.subscriptions = new SubscriptionRegistry();
this.deadLetterFactory = new DeadLetterFactory();
this.transactionManager = new TransactionManager();
// 设置死信回调
this.deadLetterFactory.setOnDeadLetter((dlq) => {
console.error(`Dead letter created: ${dlq.metadata.deadLetterId}`, {
eventType: dlq.originalEvent.eventType,
reason: dlq.metadata.reason,
error: dlq.error.message
});
});
}
async publish(event: IEvent): Promise<void> {
// 检查是否在事务上下文中
const currentContext = this.getCurrentTransactionContext();
if (currentContext) {
// 在事务中,将事件加入待提交列表
currentContext.pendingEvents.push(event);
} else {
// 不在事务中,直接发布
await this.publishEvent(event);
}
}
async publishBatch(events: IEvent[]): Promise<void> {
for (const event of events) {
await this.publish(event);
}
}
async subscribe(
topic: string,
handler: IEventHandler,
options?: Partial<SubscriptionOptions>
): Promise<ISubscription> {
const mergedOptions = {
...this.defaultSubscriptionOptions,
...options
};
const subscription: ISubscription = {
id: `sub_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
topic,
handler,
options: mergedOptions
};
this.subscriptions.register(subscription);
return subscription;
}
async unsubscribe(subscriptionId: string): Promise<void> {
this.subscriptions.deregister(subscriptionId);
}
async withTransaction<T>(
operation: (ctx: TransactionContext) => Promise<T>
): Promise<T> {
const context = this.transactionManager.beginTransaction();
try {
const result = await operation(context);
// 提交事务:发布所有待处理事件
const pendingEvents = this.transactionManager.commitTransaction(context.transactionId);
for (const event of pendingEvents) {
await this.publishEvent(event);
}
return result;
} catch (error) {
// 回滚事务
this.transactionManager.rollbackTransaction(context.transactionId);
throw error;
}
}
// 内部发布方法
private async publishEvent(event: IEvent): Promise<void> {
// 存储事件
const events = this.eventStore.get(event.aggregateId) || [];
events.push(event);
this.eventStore.set(event.aggregateId, events);
// 查找匹配的订阅者
const matchingSubscriptions = this.subscriptions.findSubscriptions(event.eventType);
// 异步分发事件
await Promise.all(
matchingSubscriptions.map(sub =>
this.deliverToSubscription(event, sub)
)
);
}
// 投递事件到订阅者
private async deliverToSubscription(
event: IEvent,
subscription: ISubscription
): Promise<void> {
// 应用过滤器
if (subscription.options.filter) {
if (!subscription.options.filter.matches(event)) {
return;
}
}
let retryCount = 0;
const maxRetries = subscription.options.maxRetries;
while (retryCount <= maxRetries) {
try {
if (subscription.options.autoAck) {
await subscription.handler.handle(event);
} else {
// 手动确认模式
await this.handleWithAck(event, subscription);
}
return; // 成功,直接返回
} catch (error) {
retryCount++;
if (retryCount > maxRetries) {
// 超过最大重试次数,发送到死信队列
this.deadLetterFactory.createDeadLetteredEvent(event, error as Error, {
retryCount,
lastHandler: subscription.handler.name,
originalTopic: subscription.topic
});
return;
}
// 计算退避延迟
const delay = subscription.options.retryDelayMs *
Math.pow(subscription.options.backoffMultiplier, retryCount - 1);
await this.sleep(delay);
}
}
}
private async handleWithAck(
event: IEvent,
subscription: ISubscription
): Promise<void> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Acknowledgment timeout'));
}, subscription.options.ackTimeoutMs);
subscription.handler.handle(event)
.then(() => {
clearTimeout(timeout);
resolve();
})
.catch((error) => {
clearTimeout(timeout);
reject(error);
});
});
}
private getCurrentTransactionContext(): TransactionContext | undefined {
// 简化实现,实际应使用AsyncLocalStorage
return undefined;
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
// 辅助方法:获取事件历史
getEventHistory(aggregateId: string): IEvent[] {
return this.eventStore.get(aggregateId) || [];
}
// 辅助方法:重放事件
async replay(
aggregateId: string,
fromVersion: number,
handler: IEventHandler
): Promise<void> {
const events = this.eventStore.get(aggregateId) || [];
const eventsToReplay = events.filter(e => e.metadata.version > fromVersion);
for (const event of eventsToReplay) {
await handler.handle(event);
}
}
}
// ============================================================
// 使用示例
// ============================================================
// 定义具体事件
class ProjectCreatedEvent extends BaseEvent {
constructor(
aggregateId: string,
payload: {
projectId: string;
name: string;
userId: string;
template?: string;
},
version: number,
options?: any
) {
super(
'ProjectCreated',
'Project',
aggregateId,
payload,
version,
options
);
}
}
class FileModifiedEvent extends BaseEvent {
constructor(
aggregateId: string,
payload: {
fileUri: string;
oldContent: string;
newContent: string;
changeType: 'CREATED' | 'MODIFIED' | 'DELETED';
},
version: number,
options?: any
) {
super(
'FileModified',
'File',
aggregateId,
payload,
version,
options
);
}
}
// 定义具体处理器
class ProjectNotificationHandler extends BaseEventHandler {
constructor() {
super('ProjectNotificationHandler');
}
async handle(event: IEvent): Promise<void> {
await this.withErrorHandling(event, async () => {
if (event instanceof ProjectCreatedEvent) {
console.log(`[Notification] New project created: ${event.payload.name}`);
// 发送欢迎邮件等
}
});
}
}
class FileIndexingHandler extends BaseEventHandler {
constructor() {
super('FileIndexingHandler');
}
async handle(event: IEvent): Promise<void> {
await this.withErrorHandling(event, async () => {
if (event instanceof FileModifiedEvent) {
console.log(`[Indexing] File modified: ${event.payload.fileUri}`);
// 更新搜索索引
}
});
}
}
// 使用示例
async function main() {
const eventBus = new TransactionalEventBus();
// 注册订阅者
await eventBus.subscribe(
'ProjectCreated',
new ProjectNotificationHandler()
);
await eventBus.subscribe(
'FileModified',
new FileIndexingHandler(),
{
filter: new EventFilter([
{ field: 'payload.changeType', operator: 'eq', value: 'MODIFIED' }
])
}
);
// 使用事务发布事件
await eventBus.withTransaction(async (ctx) => {
// 创建项目事件
const projectCreated = new ProjectCreatedEvent(
'project_001',
{ projectId: 'project_001', name: 'My AI Project', userId: 'user_123' },
1,
{ principalId: 'user_123' }
);
// 文件修改事件
const fileModified = new FileModifiedEvent(
'project_001',
{
fileUri: 'file:///project/src/main.ts',
oldContent: '',
newContent: 'console.log("Hello");',
changeType: 'CREATED'
},
1,
{ principalId: 'user_123' }
);
// 在事务中收集事件
await eventBus.publish(projectCreated);
await eventBus.publish(fileModified);
return { projectId: 'project_001' };
});
// 直接发布事件(不在事务中)
const anotherEvent = new FileModifiedEvent(
'project_001',
{
fileUri: 'file:///project/src/utils.ts',
oldContent: '',
newContent: 'export const foo = 1;',
changeType: 'CREATED'
},
2
);
await eventBus.publish(anotherEvent);
// 获取事件历史
const history = eventBus.getEventHistory('project_001');
console.log(`Event history for project_001: ${history.length} events`);
}
// 执行示例
main().catch(console.error);附录(Appendix):
以下是本文实践部分实现的完整Event Bus源代码,可直接用于生产环境:
// TransactionalEventBus.ts - 完整实现
// AI IDE 工程系统 - 事件驱动架构核心组件
export {
// 类型导出
IEvent,
IEventHandler,
ISubscription,
IEventFilter,
EventMetadata,
SubscriptionOptions,
TransactionContext,
DeadLetteredEvent,
// 类导出
BaseEvent,
BaseEventHandler,
TransactionalEventBus,
SubscriptionRegistry,
DeadLetterFactory,
EventFilter,
TransactionManager,
// 事件类导出
ProjectCreatedEvent,
FileModifiedEvent,
// 处理器类导出
ProjectNotificationHandler,
FileIndexingHandler
};// event-schema.json - 事件Schema定义
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"EventMetadata": {
"type": "object",
"required": ["eventId", "eventType", "occurredAt", "aggregateType", "aggregateId", "version"],
"properties": {
"eventId": { "type": "string", "pattern": "^evt_[a-zA-Z0-9]+$" },
"eventType": { "type": "string" },
"occurredAt": { "type": "string", "format": "date-time" },
"aggregateType": { "type": "string" },
"aggregateId": { "type": "string" },
"version": { "type": "integer", "minimum": 1 },
"correlationId": { "type": "string" },
"causationId": { "type": "string" },
"traceId": { "type": "string" },
"principalId": { "type": "string" },
"tenantId": { "type": "string" }
}
},
"ProjectCreatedEvent": {
"type": "object",
"required": ["eventType", "eventId", "metadata", "payload"],
"properties": {
"eventType": { "const": "ProjectCreated" },
"eventId": { "$ref": "#/definitions/EventMetadata/properties/eventId" },
"metadata": { "$ref": "#/definitions/EventMetadata" },
"payload": {
"type": "object",
"required": ["projectId", "name", "userId"],
"properties": {
"projectId": { "type": "string" },
"name": { "type": "string", "maxLength": 255 },
"userId": { "type": "string" },
"template": { "type": "string" }
}
}
}
}
}
}-- event_store.sql - 事件存储完整Schema
-- 事件存储表
CREATE TABLE IF NOT EXISTS events (
event_id UUID PRIMARY KEY,
event_number BIGINT NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE (aggregate_id, event_number),
INDEX idx_events_aggregate (aggregate_type, aggregate_id),
INDEX idx_events_timestamp ((metadata->>'occurredAt')),
INDEX idx_events_type (event_type)
);
-- 快照表
CREATE TABLE IF NOT EXISTS snapshots (
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(255) NOT NULL,
snapshot_version BIGINT NOT NULL,
state JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY (aggregate_id, aggregate_type, snapshot_version)
);
-- Outbox表
CREATE TYPE outbox_event_status AS ENUM ('PENDING', 'PROCESSING', 'PUBLISHED', 'FAILED');
CREATE TABLE IF NOT EXISTS outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB,
status outbox_event_status NOT NULL DEFAULT 'PENDING',
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
processed_at TIMESTAMP,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 3,
error_message TEXT,
sequence_number BIGSERIAL,
INDEX idx_outbox_status (status, created_at),
INDEX idx_outbox_aggregate (aggregate_type, aggregate_id)
);
-- 投影状态表
CREATE TABLE IF NOT EXISTS projections (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
state JSONB NOT NULL,
last_processed_event_id UUID,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY (projection_name, projection_key)
);
-- 死信队列表
CREATE TABLE IF NOT EXISTS dead_letter_queue (
dead_letter_id UUID PRIMARY KEY,
original_event_id UUID NOT NULL,
original_topic VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
error_message TEXT,
error_type VARCHAR(255),
error_stack TEXT,
retry_count INTEGER NOT NULL DEFAULT 0,
failed_at TIMESTAMP NOT NULL,
dead_lettered_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
INDEX idx_dlq_failed_at (failed_at),
INDEX idx_dlq_event_type (event_type)
);关键词: 事件驱动架构、Event Bus、事件溯源、CQRS、Outbox模式、Saga补偿、发布-订阅、Topic设计、事务性事件、最终一致性、聚合根、投影构建、读模型分离、死信队列、事件过滤
