
适配 Workerman/webman 的 AMQP 组件包
AMQP 协议工具实现 AMQP-ServerBuilder 支持影子模式(并发补偿)AMQP 协议约定,更合理的架构设计和使用逻辑ConnectionManagement 多连接管理器管理 Connection(Client),合理复用机制及并发使用能力Channel-Pool 管理 Channel,合理的复用和并发机制AMQP 协议包,可供开发者自定义实现 AMQP-Client 或 AMQP-Server,并提供 AMQP-Frame 协议帧工具 ┌───────────┐
| Builder A | ──┐
└───────────┘ | | ┌───────────┐
| | | Channel 1 |
| | └───────────┘
┌───────────┐ └─> ┌──────────────────┐ | ┌───────────┐
| Builder A | ────> | Connections Pool | ── connection ──> | | Channel 2 |
└───────────┘ ┌─> └──────────────────┘ min ... MAX | └───────────┘
| <static> <context> | ┌───────────┐
| | | Channel 3 |
┌───────────┐ | | └───────────┘
| Builder C | ──┘ ...
└───────────┘ channel-max
composer require workbunny/webman-rabbitmq
app.php<?php declare(strict_types=1);
return [
'enable' => true,
// 日志 LoggerInterface | LoggerInterface::class
'logger' => null,
];
connections.php<?php declare(strict_types=1);
useWorkbunny\WebmanRabbitMQ\Clients\AbstractClient;
useWorkbunny\WebmanRabbitMQ\Connections\Connection;
return [
'default' => [
'connection' => Connection::class,
// 连接池,用于支撑影子模式
'connections_pool' => [
'min_connections' => 1,
'max_connections' => 20,
'idle_timeout' => 60,
'wait_timeout' => 10,
],
'config' => [
'debug' => false,
'host' => '127.0.0.1',
'vhost' => '/',
'port' => 5672,
'username' => 'guest',
'password' => 'guest',
'mechanism' => 'AMQPLAIN',
'timeout' => 10,
// 重启间隔
'restart_interval' => 5,
// 通道池
'channels_pool' => [
'idle_timeout' => 60,
'wait_timeout' => 10,
],
'client_properties' => [
'name' => 'workbunny/webman-rabbitmq',
'version' => InstalledVersions::getVersion('workbunny/webman-rabbitmq'),
],
// 心跳回调 callable
'heartbeat_callback' => function () {
},
// see https://www.workerman.net/doc/workerman/async-tcp-connection/construct.html
// 'context' => [
// 'ssl' => [
// 'verify_peer' => false,
// 'verify_peer_name' => false,
// ],
// ]
],
],
];
# 构建 Builder
php webman workbunny:rabbitmq-builder -h
# 移除/关闭 Builder
php webman workbunny:rabbitmq-remove -h
# 查看 Builder 列表
php webman workbunny:rabbitmq-list -h
⚠️ 延迟队列需要为 RabbitMQ 安装 rabbitmq_delayed_message_exchange 插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
use function Workbunny\WebmanRabbitMQ\publish;
publish(new TestBuilder(), 'abc', headers: [
'x-delay' => 10000, // 延迟 10 秒
]); // return bool
⚠️ 注意:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQException 异常
REQUEUE 实现Builder 支持通过 REQUEUE 标记进行消息重入队尾header 中的时间标记,和逻辑判断,当满足时间条件时则执行,不满足条件则通过 REQUEUE 将数据自动推回队尾0-9 的优先级,合理分配时间段和优先级的匹配关系“⚠️ 注意:
WebmanRabbitMQPublishException 异常Builder,以下以 Workbunny\Tests\TestBuilders\TestPublishBuilder 举例use function Workbunny\WebmanRabbitMQ\publish;
use Workbunny\Tests\TestBuilders\TestPublishBuilder;
publish(new TestPublishBuilder(), 'abc'); // return bool
use Workbunny\Tests\TestBuilders\TestPublishBuilder;
use Workbunny\WebmanRabbitMQ\BuilderConfig;
use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface;
$builder = new TestPublishBuilder();
$body = 'abc';
return $builder->action(function (ConnectionInterface $connection) use ($builder, $body) {
$config = new BuilderConfig($builder->getBuilderConfig()());
$config->setBody($body);
$builder->publish($connection, $config);
});
需要自行指定 exchange 等参数
use Workbunny\WebmanRabbitMQ\BuilderConfig;
useWorkbunny\WebmanRabbitMQ\Connection\ConnectionInterface;
useWorkbunny\WebmanRabbitMQ\ConnectionsManagement;
$config = new \Workbunny\WebmanRabbitMQ\BuilderConfig();
$config->setExchange('your_exchange');
$config->setRoutingKey('your_routing_key');
$config->setQueue('your_queue');
$config->setBody('abc');
$config->setMandatory(true);
$config->setImmediate(false);
// 使用 your_connection 配置连接发送
return ConnectionsManagement::connection(function (ConnectionInterface $connection) use ($config) {
$connection->channel()->publish(
$config->getBody(),
$config->getHeaders(),
$config->getExchange(),
$config->getRoutingKey(),
$config->getMandatory(),
$config->getImmediate()
);
}, 'your_connection');
“⚠️ 注意:首先使用命令行工具或者手动创建对应的
Builder,以下以Workbunny\Tests\TestBuilders\TestConsumeBuilder举例
Builder 文件,将 handler() 方法逻辑添加消费逻辑Builder 自定义进程即是启动消费use Workbunny\Tests\TestBuilders\TestConsumeBuilder;
use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface;
$builder = new TestConsumeBuilder();
$builder->action(function (ConnectionInterface $connection) use ($builder) {
$builder->consume($connection, $builder->getBuilderConfig());
});
⚠️ 注意:需要保持该进程常驻