
在Rust中,使用多个任务可能是执行非顺序工作最常见的方式。每个任务可以等待不同的I/O事件,异步运行时会在I/O事件发生后立即调度这些任务执行工作。这种方式能“非阻塞地”等待I/O事件发生并实现并发,简单又便捷。但在多个任务间共享可变状态却可能变得十分复杂。
通常,我们会使用Mutex(互斥锁)或通道(channel)来实现状态共享。前者很难用对,后者则很容易失控。作为一种替代方案,我建议默认采用单任务模式,无需在多个任务间共享状态,从而从根本上解决可变状态共享的问题;同时,我们也会分析该模式的权衡取舍,并说明在需要时如何逐步过渡到其他模式。
本文将以一个简单的网络应用为例,逐步采用不同的模式来实现可变状态共享:首先是互斥锁模式,接着是通道模式,然后是两种单任务模式的实现(并分析其利弊)。通过这些案例,我们能更全面地了解各种模式的特点,进而为“默认采用单任务模式”提供合理依据。
本文聚焦于I/O密集型应用中的可变状态共享问题。这类应用能从并发中显著获益——任务在等待I/O事件时,不会阻塞程序其他部分的执行。为实现这一点,我们通常会使用异步运行时,本文也将围绕异步运行时展开讨论。
在异步运行时中实现并发最常见的方式,是生成新任务来处理I/O事件。“任务(Task)”是异步运行时提供的基本单元,代表一段待执行的工作。当I/O事件解除了对该任务的阻塞后,运行时会将其调度到当前线程或其他线程上,与其他任务并发执行。
这种便捷性背后存在一定代价:任务要执行的Future必须满足'static生命周期,且可能需要满足Send trait(仅当任务可能被调度到不同线程时才需满足这一要求)。我们可以通过tokio::task::LocalSet等结构避免Send要求(尽管这种方式略显繁琐),但'static生命周期的要求却很难规避。
'static生命周期之所以是必需的,是因为被生成的Future可能在任何后续时间执行,甚至可能超出最初生成它的代码块的作用域。要满足这一要求并实现可变状态共享,有两种常见方式:
另一种方案是完全不使用spawn(生成任务),而是在单个任务中运行所有Future。我们可以借助futures::future::select、futures::future::select_all或tokio::select等函数实现这一点,它们不要求Future具备'static生命周期。但需要注意的是,这些Future仍是同时存在的独立工作单元,无法通过&mut引用共享状态——若要共享状态,仍需使用具备内部可变性的结构。
不过,若能将I/O futures与状态修改操作分离,我们或许能让futures不持有共享状态的引用,而是通过一个公共代码块来处理I/O事件。在没有I/O发生的公共路径中,我们可以通过&mut引用直接修改状态。此外,还有一种方法能让I/O结构间共享可变状态。
通常,在async函数中调用.await时,运行时会调度一个与任务关联的Waker(唤醒器),并在未来某个时刻唤醒该任务。编译器会自动跟踪Future的状态:它会生成一个枚举来表示.await的执行点,并保存当前状态。我们发现,这种“任务与Waker绑定”的设计仅为了让任务能在同一执行点恢复运行。因此,我们可以调整代码结构:每次任务被唤醒时,依次推进工作流程,并轮询所有I/O以检测新事件。这样一来,所有I/O都能共享可变状态的引用。如果目前对此理解不深也没关系,后续的具体示例会让这一逻辑更清晰。
接下来,我们将通过一个简单问题,用不同的模式实现I/O并发代码,从而分析每种模式的优缺点。
这是一个简化但接近真实的场景,我们添加了一些人为约束以减少冗余代码,从而聚焦核心问题——并发代码中的可变状态共享。
我们需要实现一个服务器:客户端可连接到该服务器,并通过服务器与其他客户端交换消息。
注意:网络传输中的所有数字均采用网络字节序(大端序)编码。
031,服务器向客户端返回该4字节ID。\0结尾)。031,数据为3262063,则消息为031 + 3262063 + \0。我们需要通过并发处理两种事件:一是来自任意客户端的消息,二是新的TCP连接。实现并发的方式有多种(线程、操作系统原生函数、异步运行时),由于本文聚焦I/O密集型应用,我们选择使用tokio异步运行时。
接下来,我们将逐步实现不同模式的服务器,并分析其特点。
首先,我们编写一个测试用例来明确协议的预期行为。该测试不用于捕获边缘情况,仅用于定义核心功能的预期结果。
服务器的接口定义如下:
struct Server;
impl Server {
pub async fn new() -> Server;
pub async fn handle_connections(self: Arc<Self>) -> tokio::task::JoinHandle<()>;
}提示:后续章节中,
Server::handle_connections的签名会略有调整(改为使用&mut self),但更新测试用例非常简单。此外,每个示例都会提供完整实现(包括对应的测试用例)。
基于上述接口,测试用例实现如下:
#[cfg(test)]
mod tests {
#[tokio::test]
asyncfnit_works() {
const MSG1: &str = "hello, number 2\0";
const MSG2: &str = "hello back, number 1\0";
letserver = Arc::new(Server::new().await);
tokio::spawn(server.handle_connections());
letmut sock1 = TcpStream::connect("127.0.0.1:8080").await.unwrap();
letmut sock2 = TcpStream::connect("127.0.0.1:8080").await.unwrap();
letid1 = sock1.read_u32().await.unwrap();
letid2 = sock2.read_u32().await.unwrap();
sock1.write_u32(id2).await.unwrap();
sock1.write_all(MSG1.as_bytes()).await.unwrap();
sock1.flush().await.unwrap();
letmut buf = [0; MSG1.len()];
sock2.read_exact(&mut buf).await.unwrap();
assert_eq!(str::from_utf8(&buf).unwrap(), MSG1);
sock2.write_u32(id1).await.unwrap();
sock2.write_all(MSG2.as_bytes()).await.unwrap();
sock2.flush().await.unwrap();
letmut buf = [0; MSG2.len()];
sock1.read_exact(&mut buf).await.unwrap();
assert_eq!(str::from_utf8(&buf).unwrap(), MSG2);
}
}测试逻辑简述:
我们使用bytes crate来便捷地处理网络消息。以下函数会接收输入消息的字节流,按协议解析消息:
(<目标ID>, <消息字节流>),并从原缓冲区中消费该消息对应的字节;实现代码如下:
struct ParseError;
fnparse_message(message: &mut BytesMut) ->Result<(u32, Bytes), ParseError> {
if message.len() < 5 {
returnErr(ParseError);
}
letend = message[4..]
.iter()
.position(|&c| c == b'\0')
.ok_or(ParseError)?
+ 5;
letmut data = message.split_to(end);
letdest = data.split_to(4);
letdest = u32::from_be_bytes(dest[..].try_into().unwrap());
Ok((dest, data.freeze()))
}该函数逻辑简单,具体实现细节对后续示例影响不大,此处不展开说明。
首先,我们尝试不使用任何同步机制实现handle_connections方法,看看会出现什么问题:
struct Server {
listener: TcpListener,
connections: HashMap<u32, TcpStream>,
}
implServer {
pubasyncfnnew() -> Server {
letlistener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
Server {
listener,
connections: Default::default(),
}
}
pubasyncfnhandle_connections(&mutself) {
loop {
let (socket, _) = self.listener.accept().await.unwrap();
tokio::spawn(asyncmove {
self.handle_connection(socket).await;
});
}
}
asyncfnhandle_connection(&mutself, mut socket: TcpStream) {
letid = rand::rng().random();
socket.write_u32(id).await.unwrap();
socket.flush().await.unwrap();
self.connections.insert(id, socket);
letmut buffer = BytesMut::new();
loop {
letOk(n) = self
.connections
.get_mut(&id)
.unwrap()
.read_buf(&mut buffer)
.await
else {
self.connections.remove(&id);
break;
};
if n == 0 {
self.connections.remove(&id);
break;
}
letOk((dest, m)) = parse_message(&mut buffer) else {
continue;
};
letSome(socket) = self.connections.get_mut(&dest) else {
continue;
};
ifletErr(e) = socket.write_all(&m).await {
eprintln!("Failed to write to socket {e}");
}
}
}
}注意:本示例代码可在
content/concurrency-patterns/naive目录中找到。
编译器会立即报错:
self被借用为'static生命周期,超出了handle_connection的作用域;self已被移动,后续无法再使用。要解决这些问题,我们需要异步引用计数和内部可变性,因此选择Arc<Mutex<T>>组合。若使用常规的std::sync::Mutex,编译器会拒绝通过——因为我们需要在发送消息时持有锁,所以必须使用Tokio提供的tokio::sync::Mutex。
但仅用互斥锁包装connections(如下所示)仍会存在问题:
struct Server {
listener: TcpListener,
connections: tokio::sync::Mutex<HashMap<u32, TcpStream>>,
}
implServer {
pubasyncfnhandle_connections(self: Arc<Self>) {
loop {
let (socket, _) = self.listener.accept().await.unwrap();
letserver = Arc::clone(&self);
tokio::spawn(asyncmove {
server.handle_connection(socket).await;
});
}
}
asyncfnhandle_connection(&self, mut socket: TcpStream) {
letid: u32 = rand::rng().random();
socket.write_u32(id).await.unwrap();
socket.flush().await.unwrap();
self.connections.lock().await.insert(id, socket);
letmut buffer = BytesMut::new();
loop {
letOk(n) = self
.connections
.lock()
.await
.get_mut(&id)
.unwrap()
.read_buf(&mut buffer)
.await
else {
self.connections.lock().await.remove(&id);
break;
};
if n == 0 {
self.connections.lock().await.remove(&id);
break;
}
letOk((dest, m)) = parse_message(&mut buffer) else {
continue;
};
letmut connections = self.connections.lock().await;
letSome(socket) = connections.get_mut(&dest) else {
continue;
};
ifletErr(e) = socket.write_all(&m).await {
eprintln!("Failed to write to socket {e}");
}
}
}
}注意:本示例代码可在
content/concurrency-patterns/deadlock-mutex目录中找到。
问题:当一个任务持有互斥锁从套接字读取数据时,会阻塞其他所有套接字的读写操作,导致死锁。即使代码能编译通过,测试也会无限挂起。
我们只需共享套接字的“写端”,因此可以将TcpStream拆分为读端(OwnedReadHalf)和写端(OwnedWriteHalf):
struct Server {
listener: TcpListener,
connections: tokio::sync::Mutex<HashMap<u32, OwnedWriteHalf>>,
}
implServer {
pubasyncfnhandle_connections(self: Arc<Self>) {
loop {
let (socket, _) = self.listener.accept().await.unwrap();
letserver = self.clone();
tokio::spawn(asyncmove {
server.handle_connection(socket).await;
});
}
}
asyncfnhandle_connection(&self, socket: TcpStream) {
let (mut read, mut write) = socket.into_split();
letid: u32 = rand::rng().random();
write.write_u32(id).await.unwrap();
write.flush().await.unwrap();
self.connections.lock().await.insert(id, write);
letmut buffer = BytesMut::new();
loop {
letOk(n) = read.read_buf(&mut buffer).awaitelse {
self.connections.lock().await.remove(&id);
break;
};
if n == 0 {
self.connections.lock().await.remove(&id);
break;
}
letOk((dest, m)) = parse_message(&mut buffer) else {
continue;
};
letmut connections = self.connections.lock().await;
letSome(connection) = connections.get_mut(&dest) else {
continue;
};
connection.write_all(&m).await.unwrap();
}
}
}注意:本示例代码可在
content/concurrency-patterns/mutex目录中找到。
此时测试可以通过,但仍存在隐患:当某个套接字的写缓冲区已满时,尝试向该套接字写入数据会阻塞当前任务,导致任务永久持有互斥锁,进而阻止其他所有客户端继续执行。
你可能会尝试用Arc<Mutex<OwnedWriteHalf>>包装每个写端来修复此问题,但这只会增加代码复杂度和潜在风险。由此可见,使用Mutex处理状态共享的复杂度很高——即使是在这个状态简单的应用中,正确使用Mutex也十分困难。
这种问题在状态包含I/O资源时会更突出;即便状态不包含I/O资源,只要存在多个Mutex,就很容易陷入死锁。虽然Mutex可以被正确使用,但对于大多数I/O密集型应用而言,它并非最佳选择,且使用门槛较高。
接下来,我们将介绍另一种常见的并发状态共享方案:通道(Channels)。
通道模式的核心思想是:用单个任务持有状态,其他任务仅负责处理I/O,并通过通道向状态持有任务发送消息,以实现状态的更新和读取。
通道模式有多种变体,本文聚焦于通道对可变状态共享的影响,因此保持实现简洁。具体代码如下:
struct Server {
tx: mpsc::Sender<Message>,
listener: TcpListener,
}
implServer {
pubasyncfnnew() -> Server {
let (tx, rx) = mpsc::channel(100);
letlistener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
tokio::spawn(message_dispatcher(rx));
Server { tx, listener }
}
pubasyncfnhandle_connections(self: Arc<Self>) {
loop {
let (socket, _) = self.listener.accept().await.unwrap();
letserver = self.clone();
tokio::spawn(asyncmove {
server.handle_connection(socket).await;
});
}
}
asyncfnhandle_connection(&self, socket: TcpStream) {
letid: u32 = rand::rng().random();
let (mut read, write) = socket.into_split();
self.tx.send(Message::New(id, write)).await.unwrap();
letmut buffer = BytesMut::new();
loop {
letOk(n) = read.read_buf(&mut buffer).awaitelse {
let_ = self.tx.send(Message::Disconnect(id)).await;
break;
};
if n == 0 {
let_ = self.tx.send(Message::Disconnect(id)).await;
break;
}
letOk((dest, m)) = parse_message(&mut buffer) else {
continue;
};
self.tx.send(Message::Send(dest, m)).await.unwrap();
}
}
}
asyncfnmessage_dispatcher(mut rx: mpsc::Receiver<Message>) {
letmut connections = HashMap::new();
whileletSome(msg) = rx.recv().await {
match msg {
Message::New(id, mut connection) => {
ifletErr(e) = connection.write_u32(id).await {
eprintln!("Failed to write to socket: {e}");
};
connections.insert(id, connection);
}
Message::Send(id, items) => {
letSome(connection) = connections.get_mut(&id) else {
continue;
};
ifletErr(e) = connection.write_all(&items).await {
eprintln!("Failed to write to socket: {e}");
}
}
Message::Disconnect(id) => {
connections.remove(&id);
}
}
}
}
enumMessage {
New(u32, OwnedWriteHalf),
Send(u32, Bytes),
Disconnect(u32),
}注意:本示例代码可在
content/concurrency-patterns/channels目录中找到。
message_dispatcher任务持有connections状态,作为唯一所有者,它无需额外同步即可通过&mut引用修改connections;handle_connection)不直接操作状态,而是通过mpsc::Sender发送Message枚举(包含“新连接”“发送消息”“断开连接”三种指令);message_dispatcher通过mpsc::Receiver接收消息,并根据消息类型更新状态或执行I/O操作。Mutex模式,通道模式更简单,且从根本上避免了死锁风险¹(除非两个任务互相等待对方的消息,导致无法推进工作)。message_dispatcher任务无法处理其他消息,直到该写操作完成。为解决队首阻塞,我们可以让每个套接字的写端由独立任务持有,具体实现如下:
async fncentral_dispatcher(mut rx: mpsc::Receiver<Message>) {
letmut connections = HashMap::new();
whileletSome(msg) = rx.recv().await {
match msg {
Message::New(id, connection) => {
let (tx, rx) = mpsc::channel(100);
lethandle = tokio::spawn(client_dispatcher(connection, rx)).abort_handle();
tx.send(Bytes::from_owner(id.to_be_bytes())).await.unwrap();
connections.insert(id, (tx, handle));
}
Message::Send(id, items) => {
letSome((connection, _)) = connections.get_mut(&id) else {
continue;
};
ifletErr(e) = connection.send(items).await {
eprintln!("Failed to write to socket: {e}");
}
}
Message::Disconnect(id) => {
ifletSome((_, handle)) = connections.remove(&id) {
handle.abort();
}
}
}
}
}
asyncfnclient_dispatcher(mut connection: OwnedWriteHalf, mut rx: mpsc::Receiver<Bytes>) {
loop {
letm = rx.recv().await.unwrap();
ifletErr(e) = connection.write_all(&m).await {
eprintln!("Failed to write to socket: {e}");
break;
}
}
}注意:本示例代码可在
content/concurrency-patterns/channel-per-writer目录中找到。
OwnedWriteHalf)由独立的client_dispatcher任务持有;central_dispatcher通过通道向client_dispatcher发送待写数据,即使某个套接字的写缓冲区已满,central_dispatcher仍能继续处理其他消息。通道的发送缓冲区本质上只是一个额外的缓冲层——若向某个客户端发送大量突发消息,当该通道的缓冲区满时,central_dispatcher仍会被阻塞。
若希望在某个客户端的写缓冲区满时,停止从发送方读取消息(即实现背压),可以添加一个“许可通道”(permission channel):
// central_dispatcher 中的 Message::Send 分支
asyncfncentral_dispatcher(mut rx: mpsc::Receiver<Message>) {
// ...
match msg {
// ...
Message::Send(id, items, permission_token) => {
letSome((connection, _)) = connections.get_mut(&id) else {
continue;
};
tokio::spawn(async {
ifletErr(e) = connection.send(items).await {
eprintln!("Failed to write to socket: {e}");
}
let_ = permission_token.send(());
})
}
// ...
}
}
// handle_connection 中的消息发送逻辑
asyncfnhandle_connection(&self, socket: TcpStream) {
// ...
loop {
letOk(n) = read.read_buf(&mut buffer).awaitelse {
let_ = self.tx.send(Message::Disconnect(id)).await;
break;
};
// ...
let (permission_token, permission_listener) = oneshot::channel();
self.tx.send(Message::Send(dest, m, permission_token)).await.unwrap();
permission_token.recv().await;
}
}handle_connection)发送消息时,会创建一个oneshot通道(permission_token),并等待通道的响应;central_dispatcher将消息发送给目标客户端的写任务后,通过permission_token向发送方发送“许可”;每次状态与I/O的交互都需要新增通道和任务,且必须仔细管理任务的生命周期,避免资源泄漏²。例如,若要向消息发送方返回错误信息(而非静默失败),需要额外添加错误通道,并在多个任务间传递上下文(如发送方ID),代码会变得非常繁琐:
async fncentral_dispatcher(reader_rx: mpsc::Receiver<Message>) {
letmut connections = HashMap::new();
let (write_errors_tx, write_error_rx) = mpsc::channel(100);
letwrite_error_rx = ReceiverStream::new(write_error_rx);
letreader_rx = ReceiverStream::new(reader_rx);
letmut rx = write_error_rx.merge(reader_rx);
whileletSome(msg) = rx.next().await {
match msg {
Message::New(id, connection) => {
let (tx, rx) = mpsc::channel(100);
leterrors_tx = write_errors_tx.clone();
lethandle =
tokio::spawn(client_dispatcher(connection, rx, errors_tx)).abort_handle();
ifletErr(e) = tx.send((Bytes::from_owner(id.to_be_bytes()), None)).await {
eprintln!("Failed to write to socket: {e}");
}
connections.insert(id, (tx, handle));
}
Message::Send(id, items, src) => {
letSome((connection, _)) = connections.get_mut(&id) else {
continue;
};
ifletErr(e) = connection.send((items, Some(src))).await {
eprintln!("Failed to write to socket: {e}");
}
}
Message::Disconnect(id) => {
ifletSome((_, handle)) = connections.remove(&id) {
handle.abort();
}
}
Message::Error(e, src) => {
eprintln!("Failed to write to socket: {e}");
letSome(src) = src else {
continue;
};
letSome((connection, _)) = connections.get(&src) else {
continue;
};
letmut error = BytesMut::new();
error.put_u8(0xFF);
error.put(&format!("Error: {}\0", e).into_bytes()[..]);
let_ = connection.send((error.freeze(), None)).await;
}
}
}
}
asyncfnclient_dispatcher(
mut connection: OwnedWriteHalf,
mut rx: mpsc::Receiver<(Bytes, Option<u32>)>,
errors_tx: mpsc::Sender<Message>,
) {
loop {
let (m, src) = rx.recv().await.unwrap();
ifletErr(e) = connection.write_all(&m).await {
let_ = errors_tx.send(Message::Error(e, src)).await;
break;
}
}
}注意:本示例代码可在
content/concurrency-patterns/channel-with-error目录中找到。
通道模式将I/O与状态完全解耦,导致无法在状态上下文中标定“修改状态的数值来自哪里”。例如,在central_dispatcher中,若要追踪Message::Send的来源,通过调试器查看调用栈时,只能看到通道接收逻辑,无法追溯到具体的发送方(因为Sender可克隆,可能存在于多个地方)。这会极大增加代码的理解和调试难度——毕竟对大多数开发者而言,阅读代码的时间远多于编写代码的时间。
相比之下,Mutex模式的handle_connection函数中,I/O操作与状态使用在同一调用栈中,变量来源清晰可见(如m来自parse_message,buffer来自套接字读取)。
那么,能否在不使用Mutex的前提下,保持I/O数据的局部性?答案是肯定的——采用单任务模式。
单任务模式的核心洞察是:我们需要的是并发(Concurrency),而非并行(Parallelism)。通过在单个任务中处理所有I/O,该任务可以独占状态的所有权,并自由修改状态,无需同步机制。
在异步Rust中,单任务处理多并发事件是一种常见模式(例如,同时等待超时和通道消息),通常通过tokio::select!宏实现。但tokio::select!不适用于“可变数量的futures”场景,因此我们使用futures_concurrency::future::Race(或futures::future::select_all)来实现多个futures的“竞争”。
在单个任务中并发等待以下I/O事件,并同步响应:
首先,我们需要统一所有futures的返回类型,通过Event枚举实现:
enum Event {
Read((u32, Bytes)), // 读取到消息:(发送方ID, 消息字节流)
Connection(TcpStream), // 新连接:TCP套接字
}同时,定义一个带上下文的错误类型(包含I/O错误和产生错误的客户端ID):
type Result<T> = std::result::Result<T, (io::Error, u32)>;接下来,封装ReadSocket和WriteSocket结构,分别处理套接字的读和写:
// 处理套接字读操作
structReadSocket {
buffer: BytesMut, // 读缓冲区
reader: OwnedReadHalf, // 套接字读端
id: u32, // 客户端ID
}
implReadSocket {
fnnew(reader: OwnedReadHalf, id: u32) -> ReadSocket {
ReadSocket {
buffer: BytesMut::new(),
reader,
id,
}
}
asyncfnread(&mutself) ->Result<Event> {
loop {
// 尝试解析缓冲区中的消息,解析成功则返回
ifletOk(read_result) = parse_message(&mutself.buffer) {
returnOk(Event::Read(read_result));
};
// 读取数据到缓冲区(若失败,返回带客户端ID的错误)
self.reader
.read_buf(&mutself.buffer)
.await
.map_err(|e| (e, self.id))?;
}
}
}
// 处理套接字写操作
structWriteSocket {
buffers: VecDeque<Bytes>, // 待发送消息队列
writer: OwnedWriteHalf, // 套接字写端
id: u32, // 客户端ID
}
implWriteSocket {
fnnew(writer: OwnedWriteHalf, id: u32) -> WriteSocket {
WriteSocket {
buffers: VecDeque::new(),
writer,
id,
}
}
// 推进写操作(仅在有数据时执行)
asyncfnadvance_send(&mutself) ->Result<Event> {
loop {
// 若没有待发送数据,挂起当前future(无唤醒条件)
letSome(buffer) = self.buffers.front_mut() else {
std::future::pending::<Infallible>().await;
unreachable!();
};
// 写入数据(若失败,返回带客户端ID的错误)
self.writer
.write_all_buf(buffer)
.await
.map_err(|e| (e, self.id))?;
// 写入成功,移除已发送的缓冲区
self.buffers.pop_front();
}
}
// 调度消息发送(非阻塞,仅将消息加入队列)
fnsend(&mutself, buf: Bytes) {
self.buffers.push_back(buf);
}
}read方法仅需&mut self即可调用,无需共享缓冲区,支持多个读操作并发执行;send方法不阻塞,仅将消息加入待发送队列,避免阻塞单任务的主循环;advance_send方法循环尝试发送队列中的消息:若队列空,则挂起(pending);若有消息,则写入套接字,直到队列空或写入失败。advance_send在写入完成前被丢弃,下次调用时会从上次中断的位置继续(因为Bytes会跟踪已写入的字节数)。定义next_event函数,返回一个future,该future会“竞争”所有I/O futures,返回第一个完成的事件:
fn next_event(&mutself) ->implFuture<Output = Result<Event>> {
// 监听新连接的future
letlisten = self
.listener
.accept()
.map(|stream| Ok(Event::Connection(stream.unwrap().0)));
// 若暂无连接,仅返回“监听新连接”的future
ifself.read_connections.is_empty() {
return listen.boxed();
}
// 所有读操作的future竞争
letreads = self
.read_connections
.values_mut()
.map(|reader| reader.read())
.collect::<Vec<_>>()
.race();
// 所有写操作的future竞争
letwrites = self
.write_connections
.values_mut()
.map(|write| write.advance_send())
.collect::<Vec<_>>()
.race();
// 同时竞争“新连接”“读”“写”三类future
(listen, reads, writes).race().boxed()
}说明:
race方法由Racetrait提供,会公平地返回第一个完成的future的结果³⁴——即使某个future频繁触发事件,也不会一直抢占(若有其他future完成)。
基于next_event,实现handle_connections函数,在单个任务中处理所有I/O事件:
pub asyncfnhandle_connections(&mutself) {
loop {
// 获取下一个完成的事件
letev = self.next_event().await;
letev = match ev {
// 处理错误:移除出错的连接
Err((e, i)) => {
eprintln!("Socket error: {e}");
self.read_connections.remove(&i);
self.write_connections.remove(&i);
continue;
}
Ok(ev) => ev,
};
// 处理不同类型的事件
match ev {
// 处理“读取到消息”事件:转发给目标客户端
Event::Read((dest, items)) => {
letSome(writer) = self.write_connections.get_mut(&dest) else {
continue;
};
writer.send(items);
}
// 处理“新连接”事件:分配ID,初始化读写端
Event::Connection(tcp_stream) => {
letid = rand::rng().random();
let (r, w) = tcp_stream.into_split();
// 向新客户端发送ID
letmut write_sock = WriteSocket::new(w, id);
write_sock.send(Bytes::from_owner(id.to_be_bytes()));
// 注册连接
self.read_connections.insert(id, ReadSocket::new(r, id));
self.write_connections.insert(id, write_sock);
}
}
}
}注意:本示例代码可在
content/concurrency-patterns/race目录中找到。
&mut self直接修改read_connections和write_connections,无需Mutex或通道;WriteSocket和错误处理逻辑即可。例如,让WriteSocket的待发送队列保存“消息+发送方ID”,出错时直接向发送方发送错误消息;bytes crate规避,但在无标准库(no-std)或无内存分配(no-alloc)等受限环境中可能存在不便;map_err(|e| (e, self.id))为错误添加客户端ID,略显繁琐;next_event复杂度随事件增多而上升:新增事件时需统一返回类型(Event枚举),且可能存在内存分配(如Vec存储futures)。在“竞争futures”模式中,race会依次轮询所有futures,并返回第一个完成的结果(伪代码如下):
fn race(self: Pin<&mutself>, ctx: Context) -> Poll<Result<T>> {
if Poll::Ready(result) = self.fut1.poll(ctx) {
return result;
}
if Poll::Ready(result) = self.fut2.poll(ctx) {
return result;
}
// ...
if Poll::Ready(result) = self.futn.poll(ctx) {
return result;
}
Poll::Pending
}若我们手动实现这一轮询逻辑,而非依赖库函数,可获得以下优势:
由于Tokio的Reader和Writer不支持便捷的手动轮询,我们不再拆分套接字,而是直接使用TcpStream:
struct Socket {
write_buffers: VecDeque<Bytes>, // 待发送消息队列
read_buffer: BytesMut, // 读缓冲区
stream: TcpStream, // TCP套接字
waker: Option<Waker>, // 唤醒器(用于无数据时的唤醒)
}
implSocket {
fnnew(stream: TcpStream) -> Socket {
Socket {
write_buffers: VecDeque::new(),
read_buffer: BytesMut::new(),
waker: None,
stream,
}
}
// 调度消息发送(非阻塞,唤醒等待中的写操作)
fnsend(&mutself, buf: Bytes) {
self.write_buffers.push_back(buf);
// 若存在唤醒器,唤醒写操作
letSome(w) = self.waker.take() else {
return;
};
w.wake_by_ref();
}
// 手动轮询写操作
fnpoll_send(&mutself, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
// 若暂无待发送数据,保存唤醒器并挂起
letSome(write_buffer) = self.write_buffers.front_mut() else {
self.waker = Some(cx.waker().clone());
return Poll::Pending;
};
loop {
// 等待套接字可写
ready!(self.stream.poll_write_ready(cx))?;
// 尝试写入数据
matchself.stream.try_write(write_buffer) {
Ok(n) => {
write_buffer.advance(n); // 推进已写入字节数
}
Err(e) if e.kind() == ErrorKind::WouldBlock => {
continue; // 若暂时无法写入,继续等待
}
Err(e) => {
return Poll::Ready(Err(e)); // 其他错误,返回
}
}
// 若当前缓冲区已写完,移除该缓冲区
if write_buffer.is_empty() {
self.write_buffers.pop_front();
}
return Poll::Ready(Ok(()));
}
}
// 手动轮询读操作
fnpoll_read(&mutself, cx: &mut Context<'_>) -> Poll<io::Result<&mut BytesMut>> {
loop {
// 等待套接字可读
ready!(self.stream.poll_read_ready(cx))?;
// 尝试读取数据到缓冲区
matchself.stream.try_read_buf(&mutself.read_buffer) {
Ok(0) => {
// 读取到0字节,视为连接重置
return Poll::Ready(Err(io::Error::from(io::ErrorKind::ConnectionReset)));
}
Ok(_) => {} // 读取成功,继续
Err(e) if e.kind() == ErrorKind::WouldBlock => {
continue; // 暂时无法读取,继续等待
}
Err(e) => {
return Poll::Ready(Err(e)); // 其他错误,返回
}
};
// 返回读取到的缓冲区
return Poll::Ready(Ok(&mutself.read_buffer));
}
}
}send时唤醒);poll_read/poll_send仅执行一次I/O操作,避免单个套接字抢占所有资源。为Server添加poll_next函数,作为单任务的主轮询入口:
impl Server {
fnpoll_next(&mutself, cx: &mut Context<'_>) -> Poll<()> {
// 1. 处理新连接
ifletPoll::Ready(Ok((connection, _))) = self.listener.poll_accept(cx) {
letmut socket = Socket::new(connection);
letid: u32 = rand::rng().random();
// 向新客户端发送ID
socket.send(Bytes::from_owner(id.to_be_bytes()));
self.connections.insert(id, socket);
// 唤醒自己,确保后续能处理其他事件
cx.waker().wake_by_ref();
}
// 2. 推进所有套接字的写操作
forconninself.connections.values_mut() {
if conn.poll_send(cx).is_ready() {
// 若写操作完成,唤醒自己,继续处理其他写操作
cx.waker().wake_by_ref();
}
}
// 3. 处理所有套接字的读操作(先收集ID,避免迭代中修改map)
letkeys: Vec<_> = self.connections.keys().copied().collect();
forkin keys {
letbuf = matchself.connections.get_mut(&k).unwrap().poll_read(cx) {
// 读取成功:获取缓冲区
Poll::Ready(Ok(buf)) => buf,
// 读取错误:移除连接
Poll::Ready(Err(e)) => {
self.connections.remove(&k);
println!("Failed to read from {k}: {e}");
continue;
}
// 暂时无法读取:跳过
Poll::Pending => {
continue;
}
};
// 唤醒自己,确保后续能处理解析后的消息
cx.waker().wake_by_ref();
// 解析消息并转发
letOk((dest, b)) = parse_message(buf) else {
continue;
};
letSome(destination) = self.connections.get_mut(&dest) else {
continue;
};
destination.send(b);
}
// 始终返回Pending,让运行时继续调度(服务器永不停止)
Poll::Pending
}
}使用poll_fn将poll_next包装为异步函数,供外部调用:
async fn handle_connections(&mut self) {
poll_fn(move |cx| self.poll_next(cx)).await;
}注意:本示例代码可在
content/concurrency-patterns/hand-rolled目录中找到。
cx.waker().wake_by_ref(),可能导致任务永久挂起(例如,读取部分消息后未唤醒,后续无法继续读取);以上模式并非“非此即彼”,而是可以灵活组合(例如,在单任务中使用通道与其他任务通信)。选择模式时,需基于性能需求、代码复杂度和可维护性综合权衡。
Mutex——其使用门槛高,易引发死锁。推荐:若需使用多任务,建议采用结构化方案(如Actor模型)²⁵,避免任务管理混乱。
async/await语法,可完全避免“唤醒器遗漏”类bug;代码更简洁,易维护。async/await自动管理I/O事件的唤醒,无需手动处理Waker。结论:若多个futures需共享可变状态,优先在单任务中使用组合器和
async/await;仅当基准测试显示性能瓶颈时,才考虑手动实现future或拆分多任务。
Sans-IO是一种与I/O无关的协议实现模型,将协议逻辑抽象为“状态机”,由外部事件驱动状态转换。它与单任务模式的契合度很高——单任务模式中,我们已将I/O事件与状态处理分离,这与Sans-IO的核心思想一致。
若需深入了解Sans-IO,推荐阅读Firezone的相关文章。