首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >多品牌 AI 回答采集任务,如何做队列调度和对比结果入库?

多品牌 AI 回答采集任务,如何做队列调度和对比结果入库?

原创
作者头像
用户12582597
发布2026-07-03 11:31:37
发布2026-07-03 11:31:37
120
举报

竞品分析是企业日常决策中的高频需求。当这种需求从“人工搜索截图”升级为“系统化 AI 回答监测”时,后端面临一个复合型工程问题:如何在同一轮采样中公平地覆盖多个品牌,又如何把零散的问答结果组织成可直接对比的结构化数据?本文将围绕多品牌采集的任务编排、并发控制、对比结果生成和查询接口设计四个环节,拆解一条完整的数据链路。


一、多品牌采集的特殊约束

单品牌采集和多品牌竞品对比,在工程上有三个关键差异:

  1. 公平性要求:同一问题必须尽可能在同一时间窗口内向同一 AI 平台发送,避免因模型更新或联网数据变化导致对比基线不一致。
  2. 任务规模倍增:5 个品牌 × 8 个场景 × 4 个平台 × 3 轮采样 = 480 条独立任务,任务编排复杂度远高于单品牌场景。
  3. 对比结果需要聚合视图:每个品牌的数据独立存储后,还需要一张“对比表”支持品牌间横向查询——谁的提及率更高、谁在推荐场景更占优势。

这三个约束决定了架构上需要分层设计:采集层负责公平调度,存储层负责对比聚合。


二、整体数据链路

链路分五个环节:任务编排、队列调度、并发控制、单品牌入库、对比聚合。下文逐一展开。


三、任务编排:用对比组实现公平采样

多品牌采集的核心是“对比组”概念——同一个问题,对多个品牌在同一轮次内依次发送,形成一个对比组。

代码语言:javascript
复制
-- 对比任务主表
CREATE TABLE comparison_task (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    task_id VARCHAR(64) NOT NULL UNIQUE COMMENT '对比组ID',
    scene_type VARCHAR(30) NOT NULL COMMENT '场景分类',
    query_text TEXT NOT NULL COMMENT '统一提问文本',
    platform VARCHAR(30) NOT NULL COMMENT '目标AI平台',
    round INT NOT NULL COMMENT '采样轮次',
    brand_list JSON NOT NULL COMMENT '被对比品牌列表',
    status VARCHAR(20) DEFAULT 'pending' COMMENT 'pending/collecting/completed',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

brand_list 字段示例:

代码语言:javascript
复制
["品牌A", "品牌B", "品牌C", "品牌D", "品牌E"]

调度器生成任务时,一个对比组被拆分为多条独立的采集子任务,但共享同一个 comparison_group_id,确保事后可按组聚合对比。

代码语言:javascript
复制
{
  "sub_task_id": "sub-uuid-xxxx",
  "comparison_group_id": "cmp-uuid-yyyy",
  "brand_name": "品牌A",
  "platform": "deepseek",
  "scene_type": "RECOMMEND",
  "query_text": "有哪些值得推荐的云数据库方案?",
  "round": 2,
  "created_at": "2026-07-03T10:00:00Z"
}

四、队列调度与并发控制
4.1 消息投递策略

同一对比组内的子任务,在投递时带上 group_idsequence 字段。消费者拉取后按 group_id 做聚合校验——只有当同一对比组内所有品牌的子任务都完成后,才触发对比结果聚合。

代码语言:javascript
复制
def on_sub_task_complete(sub_task: dict):
    group_id = sub_task["comparison_group_id"]
    redis.sadd(f"cmp_done:{group_id}", sub_task["brand_name"])
    
    expected_brands = redis.get(f"cmp_expected:{group_id}")
    done_brands = redis.smembers(f"cmp_done:{group_id}")
    
    if len(done_brands) == len(expected_brands):
        trigger_comparison_aggregation(group_id)

Redis 集合用于跟踪每个对比组的完成状态。全组完成后自动触发聚合逻辑。

4.2 并发控制:避免单平台限流

多品牌采集时,同一平台的请求量成倍增加。需要在消费者层做并发控制:

代码语言:javascript
复制
class RateLimitedCollector:
    def __init__(self, platform: str):
        self.limiter = TokenBucket(rate=5, capacity=10)  # 每秒最多5次请求
    
    def collect(self, task: dict) -> dict:
        while not self.limiter.consume():
            time.sleep(0.2)
        return call_ai_api(task["platform"], task["query_text"])

令牌桶基于平台维度实例化,同一平台的子任务共享同一个限流器,避免触发 AI 平台的流控策略。

4.3 对比组超时处理

部分子任务可能因网络抖动或 AI 平台限流而失败。对比组不应无限等待,需要设置超时机制:

代码语言:javascript
复制
def check_comparison_timeout(group_id: str, timeout_minutes: int = 30):
    created_at = redis.get(f"cmp_created:{group_id}")
    if (now() - created_at).minutes > timeout_minutes:
        missing = get_missing_brands(group_id)
        mark_group_partial(group_id, missing)  # 标记为部分完成,缺失品牌记录在案
        trigger_comparison_aggregation(group_id)

超时后对比组标记为 partial,后续查询时提示“部分品牌数据缺失”。


五、单品牌结果入库

每个子任务完成后,提及率、推荐率、解释文本等指标写入单品牌结果表:

代码语言:javascript
复制
CREATE TABLE brand_ai_performance (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    comparison_group_id VARCHAR(64) NOT NULL,
    brand_name VARCHAR(100) NOT NULL,
    platform VARCHAR(30) NOT NULL,
    scene_type VARCHAR(30) NOT NULL,
    round INT NOT NULL,
    
    is_mentioned TINYINT(1) DEFAULT 0,
    is_recommended TINYINT(1) DEFAULT 0,
    recommendation_rank INT DEFAULT 0,
    has_explanation TINYINT(1) DEFAULT 0,
    
    is_valid TINYINT(1) DEFAULT 1 COMMENT '样本有效性',
    raw_data_url VARCHAR(512),
    collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    
    INDEX idx_group (comparison_group_id),
    INDEX idx_brand_scene (brand_name, scene_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

comparison_group_id 是贯穿采集和聚合两阶段的核心关联键。


六、对比聚合表:让竞品数据可横向查询

单品牌表适合纵向分析(一个品牌在时间维度上的变化),但竞品对比需要横向视图——在一个查询中同时看到所有品牌在同一条件下的指标。

6.1 聚合表设计

代码语言:javascript
复制
CREATE TABLE comparison_aggregated (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    comparison_group_id VARCHAR(64) NOT NULL,
    platform VARCHAR(30) NOT NULL,
    scene_type VARCHAR(30) NOT NULL,
    query_text TEXT NOT NULL,
    
    brand_name VARCHAR(100) NOT NULL,
    mention_rate DECIMAL(5,2) COMMENT '提及率(%)',
    recommend_rate DECIMAL(5,2) COMMENT '推荐率(%)',
    avg_recommend_rank DECIMAL(3,1) COMMENT '平均推荐位次',
    sample_count INT COMMENT '有效样本数',
    total_rounds INT COMMENT '总采样轮次',
    
    aggregated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_group_platform_scene (comparison_group_id, platform, scene_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
6.2 聚合逻辑

触发聚合时,从 brand_ai_performance 表中按 comparison_group_id 拉取所有品牌的子任务结果,逐品牌计算指标:

代码语言:javascript
复制
def aggregate_comparison(group_id: str):
    rows = db.query(
        "SELECT brand_name, is_mentioned, is_recommended, recommendation_rank, is_valid "
        "FROM brand_ai_performance WHERE comparison_group_id = ?",
        group_id
    )
    
    brands = {}
    for row in rows:
        b = row["brand_name"]
        if b not in brands:
            brands[b] = {"mentioned": 0, "recommended": 0, "ranks": [], "valid": 0}
        if row["is_valid"]:
            brands[b]["valid"] += 1
            brands[b]["mentioned"] += row["is_mentioned"]
            brands[b]["recommended"] += row["is_recommended"]
            if row["recommendation_rank"] > 0:
                brands[b]["ranks"].append(row["recommendation_rank"])
    
    for brand_name, stats in brands.items():
        total = stats["valid"]
        mention_rate = round(stats["mentioned"] * 100.0 / total, 2) if total > 0 else 0
        recommend_rate = round(stats["recommended"] * 100.0 / total, 2) if total > 0 else 0
        avg_rank = round(sum(stats["ranks"]) / len(stats["ranks"]), 1) if stats["ranks"] else 0
        
        db.insert("comparison_aggregated", {
            "comparison_group_id": group_id,
            "brand_name": brand_name,
            "mention_rate": mention_rate,
            "recommend_rate": recommend_rate,
            "avg_recommend_rank": avg_rank,
            "sample_count": total,
            "total_rounds": get_total_rounds(group_id)
        })

七、对比查询接口设计

聚合完成后,业务侧需要两类查询接口。

7.1 单场景对比查询

代码语言:javascript
复制
SELECT 
    brand_name,
    mention_rate,
    recommend_rate,
    avg_recommend_rank,
    sample_count
FROM comparison_aggregated
WHERE comparison_group_id = 'cmp-uuid-yyyy'
  AND scene_type = 'RECOMMEND'
ORDER BY recommend_rate DESC;

返回示例:

代码语言:javascript
复制
{
  "group_id": "cmp-uuid-yyyy",
  "platform": "deepseek",
  "scene_type": "RECOMMEND",
  "query": "有哪些值得推荐的云数据库方案?",
  "results": [
    {"brand": "品牌A", "mention_rate": 85.0, "recommend_rate": 66.7, "avg_rank": 1.2},
    {"brand": "品牌B", "mention_rate": 72.0, "recommend_rate": 50.0, "avg_rank": 2.1},
    {"brand": "品牌C", "mention_rate": 45.0, "recommend_rate": 25.0, "avg_rank": 3.0}
  ]
}
7.2 跨场景综合对比

代码语言:javascript
复制
SELECT 
    brand_name,
    AVG(mention_rate) AS avg_mention,
    AVG(recommend_rate) AS avg_recommend,
    COUNT(DISTINCT scene_type) AS scene_coverage
FROM comparison_aggregated
WHERE comparison_group_id = 'cmp-uuid-yyyy'
GROUP BY brand_name
ORDER BY avg_recommend DESC;

跨场景聚合可以揭示更深层的问题:一个品牌可能在推荐场景表现好,但在风险判断场景几乎不被提及,说明其在“口碑查询”环节存在可见度缺口。


八、工程实践要点

1. 对比组的原子性设计

同一对比组内的所有子任务共享相同的 query_textplatformround,仅 brand_name 不同。这是对比公平性的保障——如果品牌 A 的问题文本和品牌 B 不同,对比结果就没有意义。

2. 缺失数据要可见而非隐藏

对比组部分失败时,不要让接口只返回“完成”的品牌。缺失的品牌应显式标记 status: "partial"missing_brands 字段,让调用方明确知道哪些数据不可用,而非误以为缺失品牌表现差。

3. 聚合表与原始表分离

comparison_aggregated 是快照表,brand_ai_performance 是明细表。两者不要合并。当业务方对聚合结果有疑问时,可以从明细表中下钻到每次采样的原始数据,甚至通过 raw_data_url 回溯到 COS 中的原始回答。


九、结语

多品牌 AI 回答采集与对比,本质上是将“竞品分析”从人工经验转化为系统化数据工程。对比组的设计保证了公平性,Redis 跟踪 + 超时兜底保证了完整性,聚合表与明细表分离兼顾了查询效率与数据可追溯性。

这套链路已在消费品牌、企业服务等多个行业的竞品 AI 可见度对比中实际运行。开发者可在此基础上,扩展对比维度(如引用率、解释准确度)或引入时序对比(同一品牌在不同时间段的对比组结果变化)。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、多品牌采集的特殊约束
  • 二、整体数据链路
  • 三、任务编排:用对比组实现公平采样
  • 四、队列调度与并发控制
    • 4.1 消息投递策略
    • 4.2 并发控制:避免单平台限流
    • 4.3 对比组超时处理
  • 五、单品牌结果入库
  • 六、对比聚合表:让竞品数据可横向查询
    • 6.1 聚合表设计
    • 6.2 聚合逻辑
  • 七、对比查询接口设计
    • 7.1 单场景对比查询
    • 7.2 跨场景综合对比
  • 八、工程实践要点
  • 九、结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档