在多平台 AI 回答监测系统中,“诊断”环节决定了数据的最终质量。采集回来的大段文本,哪些样本是有效的?哪些存在品牌混淆或事实错误?如何高效地调度诊断任务并将结果结构化入库?本文将围绕队列调度、异常标签生成、结构化入库和诊断结果查询四个环节,拆解一条可落地的数据诊断链路设计。
一次完整的 AI 回答监测任务,通常包含采集和诊断两个阶段。采集阶段负责向多个 AI 平台提问并拉回原始回答,诊断阶段负责回答三个问题:
本文聚焦诊断阶段。它的输入是采集阶段存入 COS 的原始回答,输出是带有诊断标签的结构化数据。
采集阶段完成后,调度器扫描 COS 中新增的原始回答文件,为每个文件生成一条诊断任务消息:
{
"diagnosis_id": "diag-uuid-xxxx",
"task_id": "collect-uuid-xxxx",
"platform": "deepseek",
"scene_type": "RECOMMEND",
"target_brand": "品牌A",
"raw_data_url": "cos://bucket/2026-07-01/deepseek/RECOMMEND/collect-uuid-xxxx.json",
"created_at": "2026-07-01T10:00:00Z"
}消息体只携带元信息,不携带原始回答全文。诊断函数根据 raw_data_url 按需从 COS 拉取,避免消息体过大。
诊断任务与采集任务有本质区别:采集耗时取决于 AI 平台响应速度(波动大),诊断耗时取决于 NLP 处理逻辑(相对稳定)。因此队列配置策略不同:
配置项 | 采集队列 | 诊断队列 |
|---|---|---|
消息超时 | 120s | 30s |
消费并发 | 受限于 API 配额 | 可按 CPU 核数弹性扩展 |
重试策略 | 指数退避 | 立即重试 1 次后进 DLQ |
批量消费 | 不建议 | 建议,减少拉取次数 |
诊断任务建议开启批量消费,单次拉取 10-20 条消息,在云函数内串行处理,减少 COS 网络请求的频繁建连开销。
诊断任务可能因云函数超时被队列重投。通过在结果表中对 task_id 设唯一约束实现幂等:
CREATE TABLE diagnosis_result (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
task_id VARCHAR(64) NOT NULL UNIQUE,
-- 其余字段略
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;诊断函数入口处检查 task_id 是否已存在,存在则跳过,避免重复处理。
异常标注是诊断链路中业务价值最高的环节。建议建立三级标签体系,逐层递进。
标签 | 说明 | 示例 |
|---|---|---|
valid | 有效样本,AI 正常返回 | 正常的回答文本 |
invalid_timeout | 采集阶段超时 | 空响应 |
invalid_refuse | AI 拒绝回答 | “我还不会回答这个问题” |
invalid_irrelevant | 回答与问题明显不相关 | 问防晒霜却回答汽车品牌 |
一级标签在诊断流程最前面判断。标记为 invalid_* 的样本不进入后续环节,直接落库并结束诊断。
标签 | 说明 |
|---|---|
mentioned | 品牌在回答中被提及 |
not_mentioned | 品牌未被提及 |
recommended | 品牌被作为推荐对象 |
cited | 品牌相关内容被引用 |
二级标签通过关键词匹配 + 规则判断自动生成。需要特别处理品牌别名合并——如果回答中出现品牌别名但未出现全称,仍应标记为 mentioned。
标签 | 触发条件 | 严重程度 |
|---|---|---|
confusion | 品牌描述中出现竞品特征词 | 高 |
fact_error | 主营业务、成立时间等关键事实错误 | 高 |
omission | 解释文本过短或关键维度缺失 | 中 |
outdated | 使用已过时的信息 | 中 |
vague | 描述过于笼统,无实质信息 | 低 |
clean | 未发现异常 | — |
三级标签的生成逻辑见下一节。
诊断云函数内部按以下顺序执行:
def diagnose(raw_response: dict) -> dict:
text = raw_response["answer"]
brand = raw_response["target_brand"]
# Step 1: 有效性判断
if is_timeout(text):
return {"validity": "invalid_timeout"}
if is_refuse(text):
return {"validity": "invalid_refuse"}
# Step 2: 提及与推荐提取
mention_result = extract_mention(text, brand)
recommend_result = extract_recommendation(text, brand)
# Step 3: 解释段落抽取(如有提及)
explanation = ""
if mention_result["is_mentioned"]:
explanation = extract_explanation(text, brand)
# Step 4: 三级风险标签生成
risk_tags = []
if check_confusion(explanation, brand):
risk_tags.append("confusion")
if check_fact_error(explanation, brand):
risk_tags.append("fact_error")
if check_omission(explanation):
risk_tags.append("omission")
if not risk_tags:
risk_tags.append("clean")
return {
"validity": "valid",
"is_mentioned": mention_result["is_mentioned"],
"is_recommended": recommend_result["is_recommended"],
"recommendation_rank": recommend_result["rank"],
"explanation_text": explanation,
"risk_tags": risk_tags,
"risk_level": "high" if ("confusion" in risk_tags or "fact_error" in risk_tags) else "low"
}几个关键判断函数的实现要点:
check_confusion:检查解释文本中是否出现竞品品牌词。如有,计算竞品词与品牌词的文本距离。距离小于 50 字符时标记为混淆。check_fact_error:将解释文本中的关键实体(成立时间、主营业务、所在地)与品牌基准信息库做比对。不一致则标记。check_omission:解释文本字符数 < 50 或未覆盖预设的关键维度(产品、定位、优势),标记为遗漏。诊断结果拆分为两张表存储:主结果表存二值指标,异常明细表存风险标签。
-- 主诊断结果表
CREATE TABLE diagnosis_result (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
task_id VARCHAR(64) NOT NULL UNIQUE,
platform VARCHAR(30) NOT NULL,
scene_type VARCHAR(30) NOT NULL,
target_brand VARCHAR(100) NOT NULL,
validity VARCHAR(20) NOT NULL COMMENT '有效/无效类型',
is_mentioned TINYINT(1) DEFAULT 0,
is_recommended TINYINT(1) DEFAULT 0,
recommendation_rank INT DEFAULT 0,
has_explanation TINYINT(1) DEFAULT 0 COMMENT '是否有解释段落',
explanation_char_count INT DEFAULT 0,
risk_level VARCHAR(10) DEFAULT NULL COMMENT 'high/low',
raw_data_url VARCHAR(512),
diagnosed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_brand_valid (target_brand, validity),
INDEX idx_risk (risk_level)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 异常标签明细表
CREATE TABLE diagnosis_risk_tag (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
task_id VARCHAR(64) NOT NULL,
tag_type VARCHAR(30) NOT NULL COMMENT 'confusion/fact_error/omission/outdated/vague',
tag_detail TEXT COMMENT '标签触发详情',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_task (task_id),
FOREIGN KEY (task_id) REFERENCES diagnosis_result(task_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;分表设计的好处:主结果表保持轻量,适合按品牌、平台、场景做聚合查询;异常明细表独立存储风险标签的详细信息(如混淆时具体出现了哪个竞品词),方便后续复核和规则迭代。
SELECT
target_brand,
COUNT(*) AS total_samples,
SUM(CASE WHEN is_mentioned = 1 THEN 1 ELSE 0 END) AS mentioned_count,
SUM(CASE WHEN is_recommended = 1 THEN 1 ELSE 0 END) AS recommended_count,
SUM(CASE WHEN risk_level = 'high' THEN 1 ELSE 0 END) AS high_risk_count
FROM diagnosis_result
WHERE validity = 'valid'
AND diagnosed_at >= '2026-07-01'
GROUP BY target_brand;SELECT
d.target_brand,
r.tag_type,
COUNT(*) AS tag_count
FROM diagnosis_risk_tag r
JOIN diagnosis_result d ON r.task_id = d.task_id
WHERE d.diagnosed_at >= '2026-07-01'
GROUP BY d.target_brand, r.tag_type
ORDER BY d.target_brand, tag_count DESC;SELECT
task_id,
platform,
scene_type,
risk_level,
raw_data_url
FROM diagnosis_result
WHERE target_brand = '品牌A'
AND risk_level = 'high'
ORDER BY diagnosed_at DESC;高风险样本列表可直接对接人工复核界面,复核人员点击 raw_data_url 查看原始回答,修正或确认诊断结果。
1. 诊断函数要“快失败”
一级标签(有效性判断)逻辑放在最前面。对于 invalid_timeout 和 invalid_refuse 样本,不做后续的提及提取和解释分析,直接落库返回。这样可以将无效样本的处理耗时控制在毫秒级,大幅提升整体吞吐。
2. 异常标签的阈值需要持续调优
confusion 判断中的文本距离阈值、omission 判断中的字符数阈值,都不是一次设准的。建议在异常明细表的 tag_detail 字段中记录触发时的上下文数据(如实际字符数、匹配到的竞品词),定期分析误标和漏标样本,反向调整规则参数。
3. 人工复核要闭环
高风险标签(confusion、fact_error)的样本应进入人工复核队列。复核结果不应只用于修正单条数据,还应沉淀为规则的训练样本或边界案例,让下一次自动诊断更准确。
诊断链路是 AI 回答监测系统中承上启下的关键环节。上游承接采集原始数据,下游为指标计算提供结构化输入。一条设计良好的诊断链路,需要在三个维度做好平衡:调度的吞吐与可靠性、异常标签的自动化与准确率、存储结构的查询效率与扩展性。
本文介绍的队列调度策略、三级标签体系和分表存储方案,已在多品牌、多平台的日常监测任务中实际运行。开发者可在此基础上,结合自身业务对“诊断”的定义,扩展标签维度或引入轻量级 NLU 模型提升自动标注的准确度。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。