首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >等等!别着急spawn!—— 对比并发应用中可变状态的处理方式

等等!别着急spawn!—— 对比并发应用中可变状态的处理方式

作者头像
架构师部落
发布2026-06-22 12:35:18
发布2026-06-22 12:35:18
1190
举报

在Rust中,使用多个任务可能是执行非顺序工作最常见的方式。每个任务可以等待不同的I/O事件,异步运行时会在I/O事件发生后立即调度这些任务执行工作。这种方式能“非阻塞地”等待I/O事件发生并实现并发,简单又便捷。但在多个任务间共享可变状态却可能变得十分复杂。

通常,我们会使用Mutex(互斥锁)或通道(channel)来实现状态共享。前者很难用对,后者则很容易失控。作为一种替代方案,我建议默认采用单任务模式,无需在多个任务间共享状态,从而从根本上解决可变状态共享的问题;同时,我们也会分析该模式的权衡取舍,并说明在需要时如何逐步过渡到其他模式。

本文将以一个简单的网络应用为例,逐步采用不同的模式来实现可变状态共享:首先是互斥锁模式,接着是通道模式,然后是两种单任务模式的实现(并分析其利弊)。通过这些案例,我们能更全面地了解各种模式的特点,进而为“默认采用单任务模式”提供合理依据。

目录

  1. 1. 写作背景
  2. 2. 示例问题
  3. 3. 具体实现
    • • 测试用例
    • • 消息解析
    • • 互斥锁(Mutex)模式
    • • 通道(Channels)模式
    • • 单任务统筹模式
      • • 竞争 futures 模式
      • • 手动实现 future 模式
  4. 4. 模式选择
    • • 多任务 vs 单任务
    • • 手动实现 future vs 组合器(Combinators)

1. 写作背景

本文聚焦于I/O密集型应用中的可变状态共享问题。这类应用能从并发中显著获益——任务在等待I/O事件时,不会阻塞程序其他部分的执行。为实现这一点,我们通常会使用异步运行时,本文也将围绕异步运行时展开讨论。

在异步运行时中实现并发最常见的方式,是生成新任务来处理I/O事件。“任务(Task)”是异步运行时提供的基本单元,代表一段待执行的工作。当I/O事件解除了对该任务的阻塞后,运行时会将其调度到当前线程或其他线程上,与其他任务并发执行。

这种便捷性背后存在一定代价:任务要执行的Future必须满足'static生命周期,且可能需要满足Send trait(仅当任务可能被调度到不同线程时才需满足这一要求)。我们可以通过tokio::task::LocalSet等结构避免Send要求(尽管这种方式略显繁琐),但'static生命周期的要求却很难规避。

'static生命周期之所以是必需的,是因为被生成的Future可能在任何后续时间执行,甚至可能超出最初生成它的代码块的作用域。要满足这一要求并实现可变状态共享,有两种常见方式:

  • • 使用具备内部可变性的结构存储状态;
  • • 让不同任务分别拥有状态的不同部分,并通过通道协调任务间的交互。

另一种方案是完全不使用spawn(生成任务),而是在单个任务中运行所有Future。我们可以借助futures::future::selectfutures::future::select_alltokio::select等函数实现这一点,它们不要求Future具备'static生命周期。但需要注意的是,这些Future仍是同时存在的独立工作单元,无法通过&mut引用共享状态——若要共享状态,仍需使用具备内部可变性的结构。

不过,若能将I/O futures与状态修改操作分离,我们或许能让futures不持有共享状态的引用,而是通过一个公共代码块来处理I/O事件。在没有I/O发生的公共路径中,我们可以通过&mut引用直接修改状态。此外,还有一种方法能让I/O结构间共享可变状态。

通常,在async函数中调用.await时,运行时会调度一个与任务关联的Waker(唤醒器),并在未来某个时刻唤醒该任务。编译器会自动跟踪Future的状态:它会生成一个枚举来表示.await的执行点,并保存当前状态。我们发现,这种“任务与Waker绑定”的设计仅为了让任务能在同一执行点恢复运行。因此,我们可以调整代码结构:每次任务被唤醒时,依次推进工作流程,并轮询所有I/O以检测新事件。这样一来,所有I/O都能共享可变状态的引用。如果目前对此理解不深也没关系,后续的具体示例会让这一逻辑更清晰。

接下来,我们将通过一个简单问题,用不同的模式实现I/O并发代码,从而分析每种模式的优缺点。

2. 示例问题

这是一个简化但接近真实的场景,我们添加了一些人为约束以减少冗余代码,从而聚焦核心问题——并发代码中的可变状态共享。

我们需要实现一个服务器:客户端可连接到该服务器,并通过服务器与其他客户端交换消息。

注意:网络传输中的所有数字均采用网络字节序(大端序)编码。

交互流程

  1. 1. 客户端通过TCP套接字连接服务器后,会立即被分配一个ID,服务器会以4字节的响应将该ID返回给客户端。
    • • 示例:客户端ID为031,服务器向客户端返回该4字节ID。
  2. 2. 客户端可通过带外通道(out-of-band channel)交换彼此的ID,然后使用目标客户端的ID构造消息,服务器会将该消息转发给目标客户端。
    • • 消息结构:目标客户端ID(4字节) + 变长数据(以空字符\0结尾)。
    • • 示例:目标ID为031,数据为3262063,则消息为031 + 3262063 + \0
  3. 3. 服务器读取完整消息后,会将空字符结尾的数据部分转发给目标客户端(不包含ID头部)。

简化假设(仅为演示)

  • • 所有客户端均“行为良好”,不会违反协议(例如,始终发送完整消息,不会在未完成前启动新消息);
  • • 客户端发送的所有消息均符合协议,且消息开头必为4字节ID;
  • • 客户端不会向自身发送消息;
  • • 服务器启动后永不停止;
  • • 随机生成的32位无符号整数(u32)ID不会重复;
  • • 暂不考虑客户端可能收到来自不同客户端的分段消息且无法区分的问题。

3. 具体实现

我们需要通过并发处理两种事件:一是来自任意客户端的消息,二是新的TCP连接。实现并发的方式有多种(线程、操作系统原生函数、异步运行时),由于本文聚焦I/O密集型应用,我们选择使用tokio异步运行时。

接下来,我们将逐步实现不同模式的服务器,并分析其特点。

3.1 测试用例

首先,我们编写一个测试用例来明确协议的预期行为。该测试不用于捕获边缘情况,仅用于定义核心功能的预期结果。

服务器的接口定义如下:

代码语言:javascript
复制
struct Server;

impl Server {
    pub async fn new() -> Server;
    pub async fn handle_connections(self: Arc<Self>) -> tokio::task::JoinHandle<()>;
}

提示:后续章节中,Server::handle_connections的签名会略有调整(改为使用&mut self),但更新测试用例非常简单。此外,每个示例都会提供完整实现(包括对应的测试用例)。

基于上述接口,测试用例实现如下:

代码语言:javascript
复制
#[cfg(test)]
mod tests {
    #[tokio::test]
    asyncfnit_works() {
        const MSG1: &str = "hello, number 2\0";
        const MSG2: &str = "hello back, number 1\0";

        letserver = Arc::new(Server::new().await);
        tokio::spawn(server.handle_connections());

        letmut sock1 = TcpStream::connect("127.0.0.1:8080").await.unwrap();
        letmut sock2 = TcpStream::connect("127.0.0.1:8080").await.unwrap();

        letid1 = sock1.read_u32().await.unwrap();
        letid2 = sock2.read_u32().await.unwrap();

        sock1.write_u32(id2).await.unwrap();
        sock1.write_all(MSG1.as_bytes()).await.unwrap();
        sock1.flush().await.unwrap();

        letmut buf = [0; MSG1.len()];
        sock2.read_exact(&mut buf).await.unwrap();
        assert_eq!(str::from_utf8(&buf).unwrap(), MSG1);

        sock2.write_u32(id1).await.unwrap();
        sock2.write_all(MSG2.as_bytes()).await.unwrap();
        sock2.flush().await.unwrap();

        letmut buf = [0; MSG2.len()];
        sock1.read_exact(&mut buf).await.unwrap();
        assert_eq!(str::from_utf8(&buf).unwrap(), MSG2);
    }
}

测试逻辑简述:

  1. 1. 在后台启动服务器;
  2. 2. 创建两个套接字并连接到服务器,获取各自的ID;
  3. 3. 一个客户端按协议向另一个客户端发送消息;
  4. 4. 验证双方均能收到对方的消息。

3.2 消息解析

我们使用bytes crate来便捷地处理网络消息。以下函数会接收输入消息的字节流,按协议解析消息:

  • • 解析成功:返回(<目标ID>, <消息字节流>),并从原缓冲区中消费该消息对应的字节;
  • • 解析失败:返回错误,且不修改原缓冲区;
  • • 每次仅解析一条消息。

实现代码如下:

代码语言:javascript
复制
struct ParseError;

fnparse_message(message: &mut BytesMut) ->Result<(u32, Bytes), ParseError> {
    if message.len() < 5 {
        returnErr(ParseError);
    }

    letend = message[4..]
        .iter()
        .position(|&c| c == b'\0')
        .ok_or(ParseError)?
        + 5;

    letmut data = message.split_to(end);

    letdest = data.split_to(4);
    letdest = u32::from_be_bytes(dest[..].try_into().unwrap());

    Ok((dest, data.freeze()))
}

该函数逻辑简单,具体实现细节对后续示例影响不大,此处不展开说明。

3.3 互斥锁(Mutex)模式

首先,我们尝试不使用任何同步机制实现handle_connections方法,看看会出现什么问题:

代码语言:javascript
复制
struct Server {
    listener: TcpListener,
    connections: HashMap<u32, TcpStream>,
}

implServer {
    pubasyncfnnew() -> Server {
        letlistener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
        Server {
            listener,
            connections: Default::default(),
        }
    }

    pubasyncfnhandle_connections(&mutself) {
        loop {
            let (socket, _) = self.listener.accept().await.unwrap();

            tokio::spawn(asyncmove {
                self.handle_connection(socket).await;
            });
        }
    }

    asyncfnhandle_connection(&mutself, mut socket: TcpStream) {
        letid = rand::rng().random();
        socket.write_u32(id).await.unwrap();
        socket.flush().await.unwrap();
        self.connections.insert(id, socket);

        letmut buffer = BytesMut::new();

        loop {
            letOk(n) = self
                .connections
                .get_mut(&id)
                .unwrap()
                .read_buf(&mut buffer)
                .await
            else {
                self.connections.remove(&id);
                break;
            };

            if n == 0 {
                self.connections.remove(&id);
                break;
            }

            letOk((dest, m)) = parse_message(&mut buffer) else {
                continue;
            };

            letSome(socket) = self.connections.get_mut(&dest) else {
                continue;
            };

            ifletErr(e) = socket.write_all(&m).await {
                eprintln!("Failed to write to socket {e}");
            }
        }
    }
}

注意:本示例代码可在content/concurrency-patterns/naive目录中找到。

编译器会立即报错:

  • • 第19行:self被借用为'static生命周期,超出了handle_connection的作用域;
  • • 第18行:在循环的前一次迭代中,self已被移动,后续无法再使用。

要解决这些问题,我们需要异步引用计数内部可变性,因此选择Arc<Mutex<T>>组合。若使用常规的std::sync::Mutex,编译器会拒绝通过——因为我们需要在发送消息时持有锁,所以必须使用Tokio提供的tokio::sync::Mutex

但仅用互斥锁包装connections(如下所示)仍会存在问题:

代码语言:javascript
复制
struct Server {
    listener: TcpListener,
    connections: tokio::sync::Mutex<HashMap<u32, TcpStream>>,
}

implServer {
    pubasyncfnhandle_connections(self: Arc<Self>) {
        loop {
            let (socket, _) = self.listener.accept().await.unwrap();
            letserver = Arc::clone(&self);

            tokio::spawn(asyncmove {
                server.handle_connection(socket).await;
            });
        }
    }

    asyncfnhandle_connection(&self, mut socket: TcpStream) {
        letid: u32 = rand::rng().random();
        socket.write_u32(id).await.unwrap();
        socket.flush().await.unwrap();
        self.connections.lock().await.insert(id, socket);

        letmut buffer = BytesMut::new();

        loop {
            letOk(n) = self
                .connections
                .lock()
                .await
                .get_mut(&id)
                .unwrap()
                .read_buf(&mut buffer)
                .await
            else {
                self.connections.lock().await.remove(&id);
                break;
            };

            if n == 0 {
                self.connections.lock().await.remove(&id);
                break;
            }

            letOk((dest, m)) = parse_message(&mut buffer) else {
                continue;
            };

            letmut connections = self.connections.lock().await;

            letSome(socket) = connections.get_mut(&dest) else {
                continue;
            };

            ifletErr(e) = socket.write_all(&m).await {
                eprintln!("Failed to write to socket {e}");
            }
        }
    }
}

注意:本示例代码可在content/concurrency-patterns/deadlock-mutex目录中找到。

问题:当一个任务持有互斥锁从套接字读取数据时,会阻塞其他所有套接字的读写操作,导致死锁。即使代码能编译通过,测试也会无限挂起。

修复方案:拆分套接字的读写端

我们只需共享套接字的“写端”,因此可以将TcpStream拆分为读端(OwnedReadHalf)和写端(OwnedWriteHalf):

代码语言:javascript
复制
struct Server {
    listener: TcpListener,
    connections: tokio::sync::Mutex<HashMap<u32, OwnedWriteHalf>>,
}

implServer {
    pubasyncfnhandle_connections(self: Arc<Self>) {
        loop {
            let (socket, _) = self.listener.accept().await.unwrap();
            letserver = self.clone();

            tokio::spawn(asyncmove {
                server.handle_connection(socket).await;
            });
        }
    }

    asyncfnhandle_connection(&self, socket: TcpStream) {
        let (mut read, mut write) = socket.into_split();
        letid: u32 = rand::rng().random();
        write.write_u32(id).await.unwrap();
        write.flush().await.unwrap();
        self.connections.lock().await.insert(id, write);

        letmut buffer = BytesMut::new();

        loop {
            letOk(n) = read.read_buf(&mut buffer).awaitelse {
                self.connections.lock().await.remove(&id);
                break;
            };

            if n == 0 {
                self.connections.lock().await.remove(&id);
                break;
            }

            letOk((dest, m)) = parse_message(&mut buffer) else {
                continue;
            };

            letmut connections = self.connections.lock().await;
            letSome(connection) = connections.get_mut(&dest) else {
                continue;
            };

            connection.write_all(&m).await.unwrap();
        }
    }
}

注意:本示例代码可在content/concurrency-patterns/mutex目录中找到。

此时测试可以通过,但仍存在隐患:当某个套接字的写缓冲区已满时,尝试向该套接字写入数据会阻塞当前任务,导致任务永久持有互斥锁,进而阻止其他所有客户端继续执行。

你可能会尝试用Arc<Mutex<OwnedWriteHalf>>包装每个写端来修复此问题,但这只会增加代码复杂度和潜在风险。由此可见,使用Mutex处理状态共享的复杂度很高——即使是在这个状态简单的应用中,正确使用Mutex也十分困难。

这种问题在状态包含I/O资源时会更突出;即便状态不包含I/O资源,只要存在多个Mutex,就很容易陷入死锁。虽然Mutex可以被正确使用,但对于大多数I/O密集型应用而言,它并非最佳选择,且使用门槛较高。

接下来,我们将介绍另一种常见的并发状态共享方案:通道(Channels)。

3.4 通道(Channels)模式

通道模式的核心思想是:用单个任务持有状态,其他任务仅负责处理I/O,并通过通道向状态持有任务发送消息,以实现状态的更新和读取。

通道模式有多种变体,本文聚焦于通道对可变状态共享的影响,因此保持实现简洁。具体代码如下:

代码语言:javascript
复制
struct Server {
    tx: mpsc::Sender<Message>,
    listener: TcpListener,
}

implServer {
    pubasyncfnnew() -> Server {
        let (tx, rx) = mpsc::channel(100);
        letlistener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
        tokio::spawn(message_dispatcher(rx));
        Server { tx, listener }
    }

    pubasyncfnhandle_connections(self: Arc<Self>) {
        loop {
            let (socket, _) = self.listener.accept().await.unwrap();
            letserver = self.clone();

            tokio::spawn(asyncmove {
                server.handle_connection(socket).await;
            });
        }
    }

    asyncfnhandle_connection(&self, socket: TcpStream) {
        letid: u32 = rand::rng().random();
        let (mut read, write) = socket.into_split();
        self.tx.send(Message::New(id, write)).await.unwrap();

        letmut buffer = BytesMut::new();

        loop {
            letOk(n) = read.read_buf(&mut buffer).awaitelse {
                let_ = self.tx.send(Message::Disconnect(id)).await;
                break;
            };

            if n == 0 {
                let_ = self.tx.send(Message::Disconnect(id)).await;
                break;
            }

            letOk((dest, m)) = parse_message(&mut buffer) else {
                continue;
            };

            self.tx.send(Message::Send(dest, m)).await.unwrap();
        }
    }
}

asyncfnmessage_dispatcher(mut rx: mpsc::Receiver<Message>) {
    letmut connections = HashMap::new();
    whileletSome(msg) = rx.recv().await {
        match msg {
            Message::New(id, mut connection) => {
                ifletErr(e) = connection.write_u32(id).await {
                    eprintln!("Failed to write to socket: {e}");
                };
                connections.insert(id, connection);
            }
            Message::Send(id, items) => {
                letSome(connection) = connections.get_mut(&id) else {
                    continue;
                };
                ifletErr(e) = connection.write_all(&items).await {
                    eprintln!("Failed to write to socket: {e}");
                }
            }
            Message::Disconnect(id) => {
                connections.remove(&id);
            }
        }
    }
}

enumMessage {
    New(u32, OwnedWriteHalf),
    Send(u32, Bytes),
    Disconnect(u32),
}

注意:本示例代码可在content/concurrency-patterns/channels目录中找到。

核心逻辑
  • • 单个message_dispatcher任务持有connections状态,作为唯一所有者,它无需额外同步即可通过&mut引用修改connections
  • • 处理I/O的任务(如handle_connection)不直接操作状态,而是通过mpsc::Sender发送Message枚举(包含“新连接”“发送消息”“断开连接”三种指令);
  • message_dispatcher通过mpsc::Receiver接收消息,并根据消息类型更新状态或执行I/O操作。
优势与问题
  • 优势:相比Mutex模式,通道模式更简单,且从根本上避免了死锁风险¹(除非两个任务互相等待对方的消息,导致无法推进工作)。
  • 问题:存在“队首阻塞(Head-of-the-Line Blocking)”——向某个套接字写入数据时,若写操作阻塞,会导致message_dispatcher任务无法处理其他消息,直到该写操作完成。

优化方案:为每个写端分配独立任务

为解决队首阻塞,我们可以让每个套接字的写端由独立任务持有,具体实现如下:

代码语言:javascript
复制
async fncentral_dispatcher(mut rx: mpsc::Receiver<Message>) {
    letmut connections = HashMap::new();
    whileletSome(msg) = rx.recv().await {
        match msg {
            Message::New(id, connection) => {
                let (tx, rx) = mpsc::channel(100);

                lethandle = tokio::spawn(client_dispatcher(connection, rx)).abort_handle();
                tx.send(Bytes::from_owner(id.to_be_bytes())).await.unwrap();
                connections.insert(id, (tx, handle));
            }
            Message::Send(id, items) => {
                letSome((connection, _)) = connections.get_mut(&id) else {
                    continue;
                };
                ifletErr(e) = connection.send(items).await {
                    eprintln!("Failed to write to socket: {e}");
                }
            }
            Message::Disconnect(id) => {
                ifletSome((_, handle)) = connections.remove(&id) {
                    handle.abort();
                }
            }
        }
    }
}

asyncfnclient_dispatcher(mut connection: OwnedWriteHalf, mut rx: mpsc::Receiver<Bytes>) {
    loop {
        letm = rx.recv().await.unwrap();
        ifletErr(e) = connection.write_all(&m).await {
            eprintln!("Failed to write to socket: {e}");
            break;
        }
    }
}

注意:本示例代码可在content/concurrency-patterns/channel-per-writer目录中找到。

优化点
  • • 每个套接字的写端(OwnedWriteHalf)由独立的client_dispatcher任务持有;
  • central_dispatcher通过通道向client_dispatcher发送待写数据,即使某个套接字的写缓冲区已满,central_dispatcher仍能继续处理其他消息。
残留问题

通道的发送缓冲区本质上只是一个额外的缓冲层——若向某个客户端发送大量突发消息,当该通道的缓冲区满时,central_dispatcher仍会被阻塞。

进一步优化:支持背压(Backpressure)

若希望在某个客户端的写缓冲区满时,停止从发送方读取消息(即实现背压),可以添加一个“许可通道”(permission channel):

代码语言:javascript
复制
// central_dispatcher 中的 Message::Send 分支
asyncfncentral_dispatcher(mut rx: mpsc::Receiver<Message>) {
    // ...
    match msg {
        // ...
        Message::Send(id, items, permission_token) => {
            letSome((connection, _)) = connections.get_mut(&id) else {
                continue;
            };

            tokio::spawn(async {
                ifletErr(e) = connection.send(items).await {
                    eprintln!("Failed to write to socket: {e}");
                }

                let_ = permission_token.send(());
            })
        }
        // ...
    }
}

// handle_connection 中的消息发送逻辑
asyncfnhandle_connection(&self, socket: TcpStream) {
    // ...
    loop {
        letOk(n) = read.read_buf(&mut buffer).awaitelse {
            let_ = self.tx.send(Message::Disconnect(id)).await;
            break;
        };

        // ...

        let (permission_token, permission_listener) = oneshot::channel();
        self.tx.send(Message::Send(dest, m, permission_token)).await.unwrap();
        permission_token.recv().await;
    }
}
逻辑说明
  • • 发送方(handle_connection)发送消息时,会创建一个oneshot通道(permission_token),并等待通道的响应;
  • central_dispatcher将消息发送给目标客户端的写任务后,通过permission_token向发送方发送“许可”;
  • • 发送方收到许可后,才会继续读取下一条消息,从而实现对发送方的背压控制。
问题:代码复杂度飙升

每次状态与I/O的交互都需要新增通道和任务,且必须仔细管理任务的生命周期,避免资源泄漏²。例如,若要向消息发送方返回错误信息(而非静默失败),需要额外添加错误通道,并在多个任务间传递上下文(如发送方ID),代码会变得非常繁琐:

代码语言:javascript
复制
async fncentral_dispatcher(reader_rx: mpsc::Receiver<Message>) {
    letmut connections = HashMap::new();
    let (write_errors_tx, write_error_rx) = mpsc::channel(100);
    letwrite_error_rx = ReceiverStream::new(write_error_rx);
    letreader_rx = ReceiverStream::new(reader_rx);
    letmut rx = write_error_rx.merge(reader_rx);
    whileletSome(msg) = rx.next().await {
        match msg {
            Message::New(id, connection) => {
                let (tx, rx) = mpsc::channel(100);

                leterrors_tx = write_errors_tx.clone();
                lethandle =
                    tokio::spawn(client_dispatcher(connection, rx, errors_tx)).abort_handle();
                ifletErr(e) = tx.send((Bytes::from_owner(id.to_be_bytes()), None)).await {
                    eprintln!("Failed to write to socket: {e}");
                }
                connections.insert(id, (tx, handle));
            }
            Message::Send(id, items, src) => {
                letSome((connection, _)) = connections.get_mut(&id) else {
                    continue;
                };
                ifletErr(e) = connection.send((items, Some(src))).await {
                    eprintln!("Failed to write to socket: {e}");
                }
            }
            Message::Disconnect(id) => {
                ifletSome((_, handle)) = connections.remove(&id) {
                    handle.abort();
                }
            }
            Message::Error(e, src) => {
                eprintln!("Failed to write to socket: {e}");
                letSome(src) = src else {
                    continue;
                };

                letSome((connection, _)) = connections.get(&src) else {
                    continue;
                };

                letmut error = BytesMut::new();
                error.put_u8(0xFF);
                error.put(&format!("Error: {}\0", e).into_bytes()[..]);

                let_ = connection.send((error.freeze(), None)).await;
            }
        }
    }
}

asyncfnclient_dispatcher(
    mut connection: OwnedWriteHalf,
    mut rx: mpsc::Receiver<(Bytes, Option<u32>)>,
    errors_tx: mpsc::Sender<Message>,
) {
    loop {
        let (m, src) = rx.recv().await.unwrap();
        ifletErr(e) = connection.write_all(&m).await {
            let_ = errors_tx.send(Message::Error(e, src)).await;
            break;
        }
    }
}

注意:本示例代码可在content/concurrency-patterns/channel-with-error目录中找到。

最大痛点:调试与追踪困难

通道模式将I/O与状态完全解耦,导致无法在状态上下文中标定“修改状态的数值来自哪里”。例如,在central_dispatcher中,若要追踪Message::Send的来源,通过调试器查看调用栈时,只能看到通道接收逻辑,无法追溯到具体的发送方(因为Sender可克隆,可能存在于多个地方)。这会极大增加代码的理解和调试难度——毕竟对大多数开发者而言,阅读代码的时间远多于编写代码的时间。

相比之下,Mutex模式的handle_connection函数中,I/O操作与状态使用在同一调用栈中,变量来源清晰可见(如m来自parse_messagebuffer来自套接字读取)。

那么,能否在不使用Mutex的前提下,保持I/O数据的局部性?答案是肯定的——采用单任务模式

3.5 单任务统筹模式

单任务模式的核心洞察是:我们需要的是并发(Concurrency),而非并行(Parallelism)。通过在单个任务中处理所有I/O,该任务可以独占状态的所有权,并自由修改状态,无需同步机制。

在异步Rust中,单任务处理多并发事件是一种常见模式(例如,同时等待超时和通道消息),通常通过tokio::select!宏实现。但tokio::select!不适用于“可变数量的futures”场景,因此我们使用futures_concurrency::future::Race(或futures::future::select_all)来实现多个futures的“竞争”。

核心目标

在单个任务中并发等待以下I/O事件,并同步响应:

  1. 1. 新的TCP连接;
  2. 2. 来自任意套接字的新消息;
  3. 3. 向客户端转发数据。
3.5.1 竞争 futures 模式

首先,我们需要统一所有futures的返回类型,通过Event枚举实现:

代码语言:javascript
复制
enum Event {
    Read((u32, Bytes)),  // 读取到消息:(发送方ID, 消息字节流)
    Connection(TcpStream),  // 新连接:TCP套接字
}

同时,定义一个带上下文的错误类型(包含I/O错误和产生错误的客户端ID):

代码语言:javascript
复制
type Result<T> = std::result::Result<T, (io::Error, u32)>;

接下来,封装ReadSocketWriteSocket结构,分别处理套接字的读和写:

代码语言:javascript
复制
// 处理套接字读操作
structReadSocket {
    buffer: BytesMut,    // 读缓冲区
    reader: OwnedReadHalf,  // 套接字读端
    id: u32,             // 客户端ID
}

implReadSocket {
    fnnew(reader: OwnedReadHalf, id: u32) -> ReadSocket {
        ReadSocket {
            buffer: BytesMut::new(),
            reader,
            id,
        }
    }

    asyncfnread(&mutself) ->Result<Event> {
        loop {
            // 尝试解析缓冲区中的消息,解析成功则返回
            ifletOk(read_result) = parse_message(&mutself.buffer) {
                returnOk(Event::Read(read_result));
            };

            // 读取数据到缓冲区(若失败,返回带客户端ID的错误)
            self.reader
                .read_buf(&mutself.buffer)
                .await
                .map_err(|e| (e, self.id))?;
        }
    }
}

// 处理套接字写操作
structWriteSocket {
    buffers: VecDeque<Bytes>,  // 待发送消息队列
    writer: OwnedWriteHalf,    // 套接字写端
    id: u32,                   // 客户端ID
}

implWriteSocket {
    fnnew(writer: OwnedWriteHalf, id: u32) -> WriteSocket {
        WriteSocket {
            buffers: VecDeque::new(),
            writer,
            id,
        }
    }

    // 推进写操作(仅在有数据时执行)
    asyncfnadvance_send(&mutself) ->Result<Event> {
        loop {
            // 若没有待发送数据,挂起当前future(无唤醒条件)
            letSome(buffer) = self.buffers.front_mut() else {
                std::future::pending::<Infallible>().await;
                unreachable!();
            };

            // 写入数据(若失败,返回带客户端ID的错误)
            self.writer
                .write_all_buf(buffer)
                .await
                .map_err(|e| (e, self.id))?;

            // 写入成功,移除已发送的缓冲区
            self.buffers.pop_front();
        }
    }

    // 调度消息发送(非阻塞,仅将消息加入队列)
    fnsend(&mutself, buf: Bytes) {
        self.buffers.push_back(buf);
    }
}
关键设计说明
  • ReadSocket:持有自己的读缓冲区,read方法仅需&mut self即可调用,无需共享缓冲区,支持多个读操作并发执行;
  • WriteSocket
    • send方法不阻塞,仅将消息加入待发送队列,避免阻塞单任务的主循环;
    • advance_send方法循环尝试发送队列中的消息:若队列空,则挂起(pending);若有消息,则写入套接字,直到队列空或写入失败。
    • • 若advance_send在写入完成前被丢弃,下次调用时会从上次中断的位置继续(因为Bytes会跟踪已写入的字节数)。
实现“下一个事件”的获取逻辑

定义next_event函数,返回一个future,该future会“竞争”所有I/O futures,返回第一个完成的事件:

代码语言:javascript
复制
fn next_event(&mutself) ->implFuture<Output = Result<Event>> {
    // 监听新连接的future
    letlisten = self
        .listener
        .accept()
        .map(|stream| Ok(Event::Connection(stream.unwrap().0)));

    // 若暂无连接,仅返回“监听新连接”的future
    ifself.read_connections.is_empty() {
        return listen.boxed();
    }

    // 所有读操作的future竞争
    letreads = self
        .read_connections
        .values_mut()
        .map(|reader| reader.read())
        .collect::<Vec<_>>()
        .race();

    // 所有写操作的future竞争
    letwrites = self
        .write_connections
        .values_mut()
        .map(|write| write.advance_send())
        .collect::<Vec<_>>()
        .race();

    // 同时竞争“新连接”“读”“写”三类future
    (listen, reads, writes).race().boxed()
}

说明race方法由Race trait提供,会公平地返回第一个完成的future的结果³⁴——即使某个future频繁触发事件,也不会一直抢占(若有其他future完成)。

单任务的主循环:处理所有事件

基于next_event,实现handle_connections函数,在单个任务中处理所有I/O事件:

代码语言:javascript
复制
pub asyncfnhandle_connections(&mutself) {
    loop {
        // 获取下一个完成的事件
        letev = self.next_event().await;
        letev = match ev {
            // 处理错误:移除出错的连接
            Err((e, i)) => {
                eprintln!("Socket error: {e}");
                self.read_connections.remove(&i);
                self.write_connections.remove(&i);
                continue;
            }
            Ok(ev) => ev,
        };

        // 处理不同类型的事件
        match ev {
            // 处理“读取到消息”事件:转发给目标客户端
            Event::Read((dest, items)) => {
                letSome(writer) = self.write_connections.get_mut(&dest) else {
                    continue;
                };
                writer.send(items);
            }
            // 处理“新连接”事件:分配ID,初始化读写端
            Event::Connection(tcp_stream) => {
                letid = rand::rng().random();
                let (r, w) = tcp_stream.into_split();

                // 向新客户端发送ID
                letmut write_sock = WriteSocket::new(w, id);
                write_sock.send(Bytes::from_owner(id.to_be_bytes()));

                // 注册连接
                self.read_connections.insert(id, ReadSocket::new(r, id));
                self.write_connections.insert(id, write_sock);
            }
        }
    }
}

注意:本示例代码可在content/concurrency-patterns/race目录中找到。

优势
  1. 1. 无同步开销:单任务独占状态,通过&mut self直接修改read_connectionswrite_connections,无需Mutex或通道;
  2. 2. 错误处理简洁:若需向发送方返回错误,无需新增通道,只需修改WriteSocket和错误处理逻辑即可。例如,让WriteSocket的待发送队列保存“消息+发送方ID”,出错时直接向发送方发送错误消息;
  3. 3. 背压控制灵活:可轻松实现全局背压(如统计所有套接字的待发送缓冲区总大小,超过阈值时暂停读操作)。
局限性
  1. 1. I/O间无法共享可变状态:多数场景可通过bytes crate规避,但在无标准库(no-std)或无内存分配(no-alloc)等受限环境中可能存在不便;
  2. 2. 错误需携带上下文:需通过map_err(|e| (e, self.id))为错误添加客户端ID,略显繁琐;
  3. 3. next_event复杂度随事件增多而上升:新增事件时需统一返回类型(Event枚举),且可能存在内存分配(如Vec存储futures)。
3.5.2 手动实现 future 模式

在“竞争futures”模式中,race会依次轮询所有futures,并返回第一个完成的结果(伪代码如下):

代码语言:javascript
复制
fn race(self: Pin<&mutself>, ctx: Context) -> Poll<Result<T>> {
    if Poll::Ready(result) = self.fut1.poll(ctx) {
        return result;
    }

    if Poll::Ready(result) = self.fut2.poll(ctx) {
        return result;
    }

    // ...

    if Poll::Ready(result) = self.futn.poll(ctx) {
        return result;
    }

    Poll::Pending
}

若我们手动实现这一轮询逻辑,而非依赖库函数,可获得以下优势:

  • • 无需统一futures的返回类型;
  • • 在轮询处直接处理结果,无需为错误添加人工上下文;
  • • 可向futures传递可变引用(引用仅在轮询调用期间有效,不会被长期持有)。
步骤1:重新封装 Socket 结构

由于Tokio的ReaderWriter不支持便捷的手动轮询,我们不再拆分套接字,而是直接使用TcpStream

代码语言:javascript
复制
struct Socket {
    write_buffers: VecDeque<Bytes>,  // 待发送消息队列
    read_buffer: BytesMut,           // 读缓冲区
    stream: TcpStream,               // TCP套接字
    waker: Option<Waker>,            // 唤醒器(用于无数据时的唤醒)
}

implSocket {
    fnnew(stream: TcpStream) -> Socket {
        Socket {
            write_buffers: VecDeque::new(),
            read_buffer: BytesMut::new(),
            waker: None,
            stream,
        }
    }

    // 调度消息发送(非阻塞,唤醒等待中的写操作)
    fnsend(&mutself, buf: Bytes) {
        self.write_buffers.push_back(buf);
        // 若存在唤醒器,唤醒写操作
        letSome(w) = self.waker.take() else {
            return;
        };
        w.wake_by_ref();
    }

    // 手动轮询写操作
    fnpoll_send(&mutself, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        // 若暂无待发送数据,保存唤醒器并挂起
        letSome(write_buffer) = self.write_buffers.front_mut() else {
            self.waker = Some(cx.waker().clone());
            return Poll::Pending;
        };

        loop {
            // 等待套接字可写
            ready!(self.stream.poll_write_ready(cx))?;

            // 尝试写入数据
            matchself.stream.try_write(write_buffer) {
                Ok(n) => {
                    write_buffer.advance(n);  // 推进已写入字节数
                }
                Err(e) if e.kind() == ErrorKind::WouldBlock => {
                    continue;  // 若暂时无法写入,继续等待
                }
                Err(e) => {
                    return Poll::Ready(Err(e));  // 其他错误,返回
                }
            }

            // 若当前缓冲区已写完,移除该缓冲区
            if write_buffer.is_empty() {
                self.write_buffers.pop_front();
            }

            return Poll::Ready(Ok(()));
        }
    }

    // 手动轮询读操作
    fnpoll_read(&mutself, cx: &mut Context<'_>) -> Poll<io::Result<&mut BytesMut>> {
        loop {
            // 等待套接字可读
            ready!(self.stream.poll_read_ready(cx))?;

            // 尝试读取数据到缓冲区
            matchself.stream.try_read_buf(&mutself.read_buffer) {
                Ok(0) => {
                    // 读取到0字节,视为连接重置
                    return Poll::Ready(Err(io::Error::from(io::ErrorKind::ConnectionReset)));
                }
                Ok(_) => {}  // 读取成功,继续
                Err(e) if e.kind() == ErrorKind::WouldBlock => {
                    continue;  // 暂时无法读取,继续等待
                }
                Err(e) => {
                    return Poll::Ready(Err(e));  // 其他错误,返回
                }
            };

            // 返回读取到的缓冲区
            return Poll::Ready(Ok(&mutself.read_buffer));
        }
    }
}
关键设计说明
  • poll_read:循环等待套接字可读,读取数据到缓冲区,返回缓冲区或错误;若暂时无法读取,继续等待;
  • poll_send:循环等待套接字可写,发送队列中的消息;若队列空,保存唤醒器(后续调用send时唤醒);
  • 公平性:每个poll_read/poll_send仅执行一次I/O操作,避免单个套接字抢占所有资源。
步骤2:实现服务器的主轮询逻辑

Server添加poll_next函数,作为单任务的主轮询入口:

代码语言:javascript
复制
impl Server {
    fnpoll_next(&mutself, cx: &mut Context<'_>) -> Poll<()> {
        // 1. 处理新连接
        ifletPoll::Ready(Ok((connection, _))) = self.listener.poll_accept(cx) {
            letmut socket = Socket::new(connection);
            letid: u32 = rand::rng().random();
            // 向新客户端发送ID
            socket.send(Bytes::from_owner(id.to_be_bytes()));
            self.connections.insert(id, socket);
            // 唤醒自己,确保后续能处理其他事件
            cx.waker().wake_by_ref();
        }

        // 2. 推进所有套接字的写操作
        forconninself.connections.values_mut() {
            if conn.poll_send(cx).is_ready() {
                // 若写操作完成,唤醒自己,继续处理其他写操作
                cx.waker().wake_by_ref();
            }
        }

        // 3. 处理所有套接字的读操作(先收集ID,避免迭代中修改map)
        letkeys: Vec<_> = self.connections.keys().copied().collect();
        forkin keys {
            letbuf = matchself.connections.get_mut(&k).unwrap().poll_read(cx) {
                // 读取成功:获取缓冲区
                Poll::Ready(Ok(buf)) => buf,
                // 读取错误:移除连接
                Poll::Ready(Err(e)) => {
                    self.connections.remove(&k);
                    println!("Failed to read from {k}: {e}");
                    continue;
                }
                // 暂时无法读取:跳过
                Poll::Pending => {
                    continue;
                }
            };

            // 唤醒自己,确保后续能处理解析后的消息
            cx.waker().wake_by_ref();

            // 解析消息并转发
            letOk((dest, b)) = parse_message(buf) else {
                continue;
            };
            letSome(destination) = self.connections.get_mut(&dest) else {
                continue;
            };
            destination.send(b);
        }

        // 始终返回Pending,让运行时继续调度(服务器永不停止)
        Poll::Pending
    }
}
步骤3:暴露异步接口

使用poll_fnpoll_next包装为异步函数,供外部调用:

代码语言:javascript
复制
async fn handle_connections(&mut self) {
    poll_fn(move |cx| self.poll_next(cx)).await;
}

注意:本示例代码可在content/concurrency-patterns/hand-rolled目录中找到。

优势
  1. 1. 错误处理更自然:无需为错误添加人工上下文,可在轮询处直接处理;
  2. 2. 支持I/O间共享状态:例如,可在多个套接字间共享一个缓冲区(适用于UDP等协议);
  3. 3. 公平性可控:可自定义轮询顺序和频率,优化性能。
风险
  1. 1. 唤醒器遗漏:若某个分支未调用cx.waker().wake_by_ref(),可能导致任务永久挂起(例如,读取部分消息后未唤醒,后续无法继续读取);
  2. 2. 公平性需手动维护:若某个套接字频繁触发事件,可能导致其他套接字被饿死。

4. 模式选择

以上模式并非“非此即彼”,而是可以灵活组合(例如,在单任务中使用通道与其他任务通信)。选择模式时,需基于性能需求、代码复杂度和可维护性综合权衡。

4.1 多任务 vs 单任务

  • 不推荐优先使用 Mutex:除非处于性能或内存受限环境(如no-std),或无异步运行时(需在多线程间共享状态),否则应避免使用Mutex——其使用门槛高,易引发死锁。
  • 优先选择单任务模式
    • • 优势:无需同步机制,状态访问直接;背压控制灵活;错误处理清晰;调试时变量来源可追溯;无任务生命周期管理风险。
    • • 适用场景:大多数I/O密集型应用,尤其是状态共享需求简单、事件频率不高的场景。
  • 多任务模式的适用场景
    • • 性能瓶颈:当某个I/O事件频率极高,单任务处理速度无法跟上,导致其他事件被阻塞时,需将该事件拆分到独立任务(需通过基准测试验证);
    • • 并行调度:需利用多线程并行处理任务(但会损失数据局部性,且需谨慎处理背压);
    • • 无状态隔离:任务间无需共享可变状态,可作为独立工作单元(如处理无状态请求)。

推荐:若需使用多任务,建议采用结构化方案(如Actor模型)²⁵,避免任务管理混乱。

4.2 手动实现 future vs 组合器(Combinators)

  • 优先使用组合器(如 Race/select_all)
    • • 优势:基于async/await语法,可完全避免“唤醒器遗漏”类bug;代码更简洁,易维护。
    • • 原理:类似RAII(资源获取即初始化)自动管理资源,async/await自动管理I/O事件的唤醒,无需手动处理Waker
  • 手动实现 future 的适用场景
    • • 需在I/O间共享状态(如共享缓冲区);
    • • 需自定义公平性逻辑(如调整轮询优先级);
    • • 需避免组合器带来的额外开销(如枚举包装、内存分配)。

结论:若多个futures需共享可变状态,优先在单任务中使用组合器和async/await;仅当基准测试显示性能瓶颈时,才考虑手动实现future或拆分多任务。

补充:Sans-IO 模式

Sans-IO是一种与I/O无关的协议实现模型,将协议逻辑抽象为“状态机”,由外部事件驱动状态转换。它与单任务模式的契合度很高——单任务模式中,我们已将I/O事件与状态处理分离,这与Sans-IO的核心思想一致。

  • • Sans-IO的优势:可组合性强、易测试(无需真实I/O);
  • • 单任务模式的优势:简化Sans-IO状态机的共享(单任务独占状态,无需同步)。

若需深入了解Sans-IO,推荐阅读Firezone的相关文章。

参考链接

  1. 1. 死锁仍可能发生:若两个任务互相等待对方的消息,会导致无法推进工作。↩
  2. 2. Tokio Actor模型实现:https://draft.ryhl.io/blog/actors-with-tokio/↩
  3. 3. futures-concurrency 的 Race 公平性讨论:https://github.com/yoshuawuyts/futures-concurrency/issues/85↩
  4. 4. futures-concurrency 的 Race 公平性实现:https://github.com/yoshuawuyts/futures-concurrency/pull/104↩
  5. 5. 用Actor替代任务:https://blog.yoshuawuyts.com/replacing-tasks-with-actors/↩
  6. 6. 树形结构化并发:https://blog.yoshuawuyts.com/tree-structured-concurrency/↩
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-08-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构师部落 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 1. 写作背景
  • 2. 示例问题
    • 交互流程
    • 简化假设(仅为演示)
  • 3. 具体实现
    • 3.1 测试用例
    • 3.2 消息解析
    • 3.3 互斥锁(Mutex)模式
    • 修复方案:拆分套接字的读写端
    • 3.4 通道(Channels)模式
      • 核心逻辑
      • 优势与问题
    • 优化方案:为每个写端分配独立任务
      • 优化点
      • 残留问题
    • 进一步优化:支持背压(Backpressure)
      • 逻辑说明
      • 问题:代码复杂度飙升
      • 最大痛点:调试与追踪困难
    • 3.5 单任务统筹模式
      • 核心目标
      • 3.5.1 竞争 futures 模式
      • 关键设计说明
      • 优势
      • 局限性
      • 3.5.2 手动实现 future 模式
      • 优势
      • 风险
  • 4. 模式选择
    • 4.1 多任务 vs 单任务
    • 4.2 手动实现 future vs 组合器(Combinators)
    • 补充:Sans-IO 模式
  • 参考链接
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档