Bidfins 煤炉自动代拍需要 7×24 小时监控上新商品,同步架构频繁出现任务阻塞、漏单。本文基于腾讯云 CKafka 搭建三级消息交换机,提供 SDK 创建队列、生产者、消费者完整代码,煤炉上新漏单率降至 0.08%。
1. 抓取、出价、消息通知串行执行,单一商品接口超时阻塞整批任务;
2. 夜间上新峰值数十万任务堆积,进程卡死导致大量动漫、骑行装备货品错过下单;
3. 失败任务无自动重试机制,人工核查成本极高。
1. 划分 3 个 Topic:merari_monitor(上新监控)、merari_bid(自动出价)、user_notify(入库通知);
2. 配置死信 Topic,接口异常消息 5 分钟自动重试 3 次;
3. TKE 弹性消费 Pod,根据队列堆积长度自动扩容。
go
运行
package ckafkasdkimport (
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile"
ckafka "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/ckafka/v20190819")func CreateMerariTopic(ak, sk, region, instanceId string) error {
cred := common.NewCredential(ak, sk)
cpf := profile.NewClientProfile()
client, _ := ckafka.NewClient(cred, region, cpf)
req := ckafka.NewCreateTopicRequest()
req.InstanceId = common.StringPtr(instanceId)
req.TopicName = common.StringPtr("merari_bid")
req.PartitionNum = common.Int64Ptr(6)
req.Retention = common.Int64Ptr(86400)
req.DlqTopicName = common.StringPtr("merari_dlq")
_, err := client.CreateTopic(req)
return err}
go
运行
package mqproducerimport (
"github.com/Shopify/sarama"
"encoding/json")func SendMerariItemMsg(producer sarama.SyncProducer, item MerariItem) error {
body, _ := json.Marshal(item)
msg := &sarama.ProducerMessage{
Topic: "merari_monitor",
Value: sarama.ByteEncoder(body),
}
_, _, err := producer.SendMessage(msg)
return err}
go
运行
func ConsumeBidTask(consumer sarama.ConsumerGroup, db *mysql.Dao) error {
ctx := context.Background()
handler := &BidConsumerHandler{DB: db}
return consumer.Consume(ctx, []string{"merari_bid"}, handler)}type BidConsumerHandler struct {
db *mysql.Dao}func (h *BidConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }func (h *BidConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }func (h *BidConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerClaim) error {
for msg := range claim.Messages() {
var item MerariItem
json.Unmarshal(msg.Value, &item)
err := AutoMerariBid(item)
if err != nil {
// 发送死信队列
SendDLQMsg(msg.Value)
sess.MarkMessage(msg, "")
continue
}
h.db.SaveMerariOrder(item)
sess.MarkMessage(msg, "")
}
return nil}
1. 单任务处理时延 23s → 1.1s;
2. 人工漏单核查工作量下降 96%;
3. 峰值承载日均 148 万条煤炉上新消息。
腾讯云托管 CKafka 免运维、支持死信自动补偿,是一站式日淘全品类平台煤炉自动代拍、日本骑行用品代购业务稳定底座。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。