首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >纯 PHP 实现的异步高性能 RabbitMQ 3.0 客户端发布!

纯 PHP 实现的异步高性能 RabbitMQ 3.0 客户端发布!

作者头像
Tinywan
发布2026-07-01 15:17:42
发布2026-07-01 15:17:42
280
举报
文章被收录于专栏:开源技术小栈开源技术小栈

✨ 简介

适配 Workerman/webmanAMQP 组件包

核心特性

  • ✅ 支持基于 AMQP 协议工具实现 AMQP-Server
  • ✅ 支持 5 种消费模式:简单队列、workQueue、routing、pub/sub、exchange
  • ✅ 支持延迟队列(RabbitMQ 须安装插件)
  • ✅ 支持连接池、通道池,Builder 支持影子模式(并发补偿)
  • ✅ 3.0 版本相比之前版本,更符合 AMQP 协议约定,更合理的架构设计和使用逻辑
    • 使用 ConnectionManagement 多连接管理器管理 ConnectionClient),合理复用机制及并发使用能力
    • 使用 Channel-Pool 管理 Channel,合理的复用和并发机制
    • 提供 AMQP 协议包,可供开发者自定义实现 AMQP-ClientAMQP-Server,并提供 AMQP-Frame 协议帧工具

🏗️ 架构概念

代码语言:javascript
复制
    ┌───────────┐
    | Builder A | ──┐
    └───────────┘   |                                          | ┌───────────┐
                    |                                          | | Channel 1 |
                    |                                          | └───────────┘
    ┌───────────┐   └─> ┌──────────────────┐                   | ┌───────────┐
    | Builder A | ────> | Connections Pool | ── connection ──> | | Channel 2 |
    └───────────┘   ┌─> └──────────────────┘   min ... MAX     | └───────────┘
                    |         <static>          <context>      | ┌───────────┐
                    |                                          | | Channel 3 |
    ┌───────────┐   |                                          | └───────────┘
    | Builder C | ──┘                                                 ...
    └───────────┘                                                 channel-max

🚀 快速开始

系统要求

  • PHP >= 8.1
  • webman-framework >= 2.0 或 workerman >= 5.1
  • rabbitmq-server >= 3.10

安装

代码语言:javascript
复制
composer require workbunny/webman-rabbitmq

⚙️ 配置

基础配置 app.php

代码语言:javascript
复制
<?php declare(strict_types=1);

return [
    'enable' => true,
    // 日志 LoggerInterface | LoggerInterface::class
    'logger' => null,
];

连接配置 connections.php

代码语言:javascript
复制
<?php declare(strict_types=1);

useWorkbunny\WebmanRabbitMQ\Clients\AbstractClient;
useWorkbunny\WebmanRabbitMQ\Connections\Connection;

return [
    'default' => [
        'connection'       => Connection::class,
        // 连接池,用于支撑影子模式
        'connections_pool' => [
            'min_connections'       => 1,
            'max_connections'       => 20,
            'idle_timeout'          => 60,
            'wait_timeout'          => 10,
        ],
        'config' => [
            'debug'              => false,
            'host'               => '127.0.0.1',
            'vhost'              => '/',
            'port'               => 5672,
            'username'           => 'guest',
            'password'           => 'guest',
            'mechanism'          => 'AMQPLAIN',
            'timeout'            => 10,
            // 重启间隔
            'restart_interval'   => 5,
            // 通道池
            'channels_pool'      => [
                'idle_timeout'     => 60,
                'wait_timeout'     => 10,
            ],
            'client_properties' => [
                'name'     => 'workbunny/webman-rabbitmq',
                'version'  => InstalledVersions::getVersion('workbunny/webman-rabbitmq'),
            ],
            // 心跳回调 callable
            'heartbeat_callback' => function () {
            },

            // see https://www.workerman.net/doc/workerman/async-tcp-connection/construct.html
//            'context' => [
//                'ssl' => [
//                    'verify_peer'      => false,
//                    'verify_peer_name' => false,
//                ],
//            ]
        ],
    ],
];

🛠️ 命令行工具

代码语言:javascript
复制
# 构建 Builder
php webman workbunny:rabbitmq-builder -h

# 移除/关闭 Builder
php webman workbunny:rabbitmq-remove -h

# 查看 Builder 列表
php webman workbunny:rabbitmq-list -h

⏰ 延迟队列

⚠️ 延迟队列需要为 RabbitMQ 安装 rabbitmq_delayed_message_exchange 插件

安装插件

  1. 进入 RabbitMQ 的 plugins 目录下执行命令下载插件(以 RabbitMQ 3.10.2 举例):
代码语言:javascript
复制
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez
  1. 执行安装命令:
代码语言:javascript
复制
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

使用延迟队列

代码语言:javascript
复制
use function Workbunny\WebmanRabbitMQ\publish;

publish(new TestBuilder(), 'abc', headers: [
    'x-delay' => 10000, // 延迟 10 秒
]); // return bool

⚠️ 注意:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQException 异常

替代方案

  • 不少第三方厂商不支持安装延迟队列插件
  • 当不支持安装延迟队列时,可以通过优先级队列 + REQUEUE 实现
    • Builder 支持通过 REQUEUE 标记进行消息重入队尾
    • 通过自定义 header 中的时间标记,和逻辑判断,当满足时间条件时则执行,不满足条件则通过 REQUEUE 将数据自动推回队尾
    • 为了减少数据延迟问题,使用优先级标识将时间较近的消息优先级定义高一些,而时间较长的数据优先级定义低一些
    • 队列通常支持 0-9 的优先级,合理分配时间段和优先级的匹配关系

📤 生产消息

⚠️ 注意

  • 向延迟队列发布普通消息会抛出 WebmanRabbitMQPublishException 异常
  • 首先使用命令行工具或者手动创建对应的 Builder,以下以 Workbunny\Tests\TestBuilders\TestPublishBuilder 举例

方式一:快捷发送

代码语言:javascript
复制
use function Workbunny\WebmanRabbitMQ\publish;
use Workbunny\Tests\TestBuilders\TestPublishBuilder;

publish(new TestPublishBuilder(), 'abc'); // return bool

方式二:Builder 发送

代码语言:javascript
复制
use Workbunny\Tests\TestBuilders\TestPublishBuilder;
use Workbunny\WebmanRabbitMQ\BuilderConfig;
use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface;

$builder = new TestPublishBuilder();
$body = 'abc';

return $builder->action(function (ConnectionInterface $connection) use ($builder, $body) {
    $config = new BuilderConfig($builder->getBuilderConfig()());
    $config->setBody($body);
    $builder->publish($connection, $config);
});

方式三:原生发送

需要自行指定 exchange 等参数

代码语言:javascript
复制
use Workbunny\WebmanRabbitMQ\BuilderConfig;
useWorkbunny\WebmanRabbitMQ\Connection\ConnectionInterface;
useWorkbunny\WebmanRabbitMQ\ConnectionsManagement;

$config = new \Workbunny\WebmanRabbitMQ\BuilderConfig();
$config->setExchange('your_exchange');
$config->setRoutingKey('your_routing_key');
$config->setQueue('your_queue');
$config->setBody('abc');
$config->setMandatory(true);
$config->setImmediate(false);

// 使用 your_connection 配置连接发送
return ConnectionsManagement::connection(function (ConnectionInterface $connection) use ($config) {
    $connection->channel()->publish(
        $config->getBody(),
        $config->getHeaders(),
        $config->getExchange(),
        $config->getRoutingKey(),
        $config->getMandatory(),
        $config->getImmediate()
    );
}, 'your_connection');

📥 消费消息

⚠️ 注意:首先使用命令行工具或者手动创建对应的 Builder,以下以 Workbunny\Tests\TestBuilders\TestConsumeBuilder 举例

方式一:快捷消费

  1. 修改生成的 Builder 文件,将 handler() 方法逻辑添加消费逻辑
  2. 启动构建好的 Builder 自定义进程即是启动消费

方式二:Builder 消费

代码语言:javascript
复制
use Workbunny\Tests\TestBuilders\TestConsumeBuilder;
use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface;

$builder = new TestConsumeBuilder();
$builder->action(function (ConnectionInterface $connection) use ($builder) {
    $builder->consume($connection, $builder->getBuilderConfig());
});

⚠️ 注意:需要保持该进程常驻

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-03-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源技术小栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ✨ 简介
    • 核心特性
  • 🏗️ 架构概念
  • 🚀 快速开始
    • 系统要求
    • 安装
  • ⚙️ 配置
    • 基础配置 app.php
    • 连接配置 connections.php
  • 🛠️ 命令行工具
  • ⏰ 延迟队列
    • 安装插件
    • 使用延迟队列
    • 替代方案
  • 📤 生产消息
    • 方式一:快捷发送
    • 方式二:Builder 发送
    • 方式三:原生发送
  • 📥 消费消息
    • 方式一:快捷消费
    • 方式二:Builder 消费
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档