

做 IoT 设备运维的人大概都有过这样的困惑:产线上几十台同型号设备,都在正常运转,但你就是不知道哪些设备"不太对劲"——振动偏高一点、温度波动大一点、电流消耗多一点,单看每台设备都在阈值范围内,可横向一比,有些设备明显"差了一截"。
在之前的实践中,我用 DolphinDB 做过单台设备的异常检测——设定阈值、计算 Z-score、检测持续异常。这套方法对严重故障有效,但对亚健康状态几乎无感。
举个例子:一台设备的振动均方根值从 1.2g 缓慢上升到 1.5g,绝对值仍然低于 2.0g 的告警阈值,单设备监控不会报警。但如果同型号其他设备的振动都在 1.1-1.2g 之间,那这台 1.5g 的设备显然"不正常"。
这就是群体分析的价值——通过横向对比发现偏离群体的异常个体。

本次分析要回答以下几个问题:
// 创建数据库:日期VALUE分区 + 设备HASH分区
dbName = "dfs://device_health"
if (existsDatabase(dbName)) dropDatabase(dbName)
db1 = database("", VALUE, 2024.01.01..2024.12.31)
db2 = database("", HASH, [SYMBOL, 10])
db = database(dbName, COMPO, [db1, db2])
schema = table(1:0,
`ts`deviceId`current`vibration`temperature`rpm`,
[TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE])
db.createPartitionedTable(schema, "sensor", `ts`deviceId)日期 VALUE 分区方便按天查询和做过期数据清理,设备 HASH 分区保证数据均匀分布。
编写函数生成带有差异性的模拟数据——大部分设备正常运行,少数设备有不同程度的劣化:
def generateDeviceData(deviceCount, days, anomalyDevices, anomalyLevel) {
// anomalyDevices: 异常设备列表,如 ["DEV_003", "DEV_017"]
// anomalyLevel: 劣化程度,如 1.3 表示比正常高出30%
devices = "DEV_" + lpad(string(1..deviceCount), 3, "0")
totalSecs = days * 86400
t = table(totalSecs * deviceCount:0,
`ts`deviceId`current`vibration`temperature`rpm`,
[TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE])
startTs = tempParse(timestamp, "2024.06.01 00:00:00.000")
for (sec in 0..(totalSecs - 1)) {
ts = startTs + sec * 1000
for (dev in devices) {
// 正常基线
baseCurrent = 12.5 + 2.0 * sin(double(sec % 86400) / 86400.0 * 2 * pi)
baseVibration = 1.2 + norm(0, 0.15)
baseTemp = 55.0 + 5.0 * sin(double(sec % 86400) / 86400.0 * 2 * pi)
baseRpm = 1480 + norm(0, 10)
// 异常设备:各项指标偏高
isAnomaly = anomalyDevices.find(dev) >= 0
factor = iif(isAnomaly, anomalyLevel, 1.0)
insert into t values(
ts, dev,
baseCurrent * factor + norm(0, 0.5),
baseVibration * factor + norm(0, 0.05),
baseTemp * factor + norm(0, 1.0),
baseRpm / factor + norm(0, 5)
)
}
}
return t
}
// 生成7天数据:50台设备,DEV_003和DEV_017有轻度劣化
data = generateDeviceData(50, 7, ["DEV_003", "DEV_017"], 1.3)
loadTable("dfs://device_health", "sensor").append!(data)这个模拟数据的特点是:
群体分析的第一步是提取每台设备的统计特征。在 DolphinDB 中用一条 SQL 即可完成。
// 提取每台设备7天的统计特征
select deviceId,
// 电流特征
avg(current) as avgCurrent,
std(current) as stdCurrent,
percentile(current, 95) as p95Current,
max(current) - min(current) as rangeCurrent,
// 振动特征
avg(vibration) as avgVib,
std(vibration) as stdVib,
percentile(vibration, 99) as p99Vib,
max(vibration) as maxVib,
// 温度特征
avg(temperature) as avgTemp,
std(temperature) as stdTemp,
max(temperature) as maxTemp,
// 转速特征
avg(rpm) as avgRpm,
std(rpm) as stdRpm,
min(rpm) as minRpm
from loadTable("dfs://device_health", "sensor")
group by deviceId这条查询在 3024 万条数据上耗时约 1.5 秒。所有聚合计算在存储节点上完成,无需数据搬运。
设备在不同时段的表现可能有差异。按小时分组提取特征,观察日内模式:
// 每台设备的日内小时统计
select deviceId,
hour(ts) as hour,
avg(vibration) as hourlyAvgVib,
avg(current) as hourlyAvgCurrent,
avg(temperature) as hourlyAvgTemp
from loadTable("dfs://device_health", "sensor")
group by deviceId, hour(ts)除了均值和极值,波动性(变异系数)是衡量设备运行稳定性的关键指标:
// 变异系数 = 标准差 / 均值
select deviceId,
std(current) / avg(current) as cvCurrent,
std(vibration) / avg(vibration) as cvVibration,
std(temperature) / avg(temperature) as cvTemperature,
std(rpm) / avg(rpm) as cvRpm
from loadTable("dfs://device_health", "sensor")
group by deviceId变异系数(CV)消除了量纲和均值差异的影响,适合做跨设备对比。比如设备 A 电流标准差 0.5A、均值 12A,CV=4.2%;设备 B 标准差 0.6A、均值 15A,CV=4.0%。单看标准差 B 波动更大,但考虑均值后 B 实际更稳定。

提取特征之后,下一步是横向对比。DolphinDB 的 context by + rank() 可以在单个查询中完成截面排名。
// 各维度排名:1=最好,50=最差
select deviceId,
avgVib, rank(avgVib, true) as vibRank,
avgCurrent, rank(avgCurrent, true) as currentRank,
maxTemp, rank(maxTemp, true) as tempRank,
avgRpm, rank(avgRpm, false) as rpmRank
from (
select deviceId,
avg(vibration) as avgVib,
avg(current) as avgCurrent,
max(temperature) as maxTemp,
avg(rpm) as avgRpm
from loadTable("dfs://device_health", "sensor")
group by deviceId
)
order by vibRank descrank(avgVib, true) 按振动均值升序排名(振动越低排名越靠前,即越好)。rank(avgRpm, false) 按转速降序排名(转速越高越好)。

实测结果:DEV_003 和 DEV_017 在振动和电流排名中均位列末位(rank 49-50),直观地体现了它们的异常。
pivot by 可以把长表转为矩阵格式,方便做跨设备的热力图:
// 每台设备每小时的平均振动值,转为矩阵(行=小时,列=设备)
exec avg(vibration) as avgVib
from loadTable("dfs://device_health", "sensor")
pivot by hour(ts), deviceId这个查询返回一个 24×50 的矩阵,行是 0-23 小时,列是 50 台设备。在 Grafana 或 Python 中可以很方便地渲染成热力图,一眼看出哪些设备在哪些时段振动偏高。

分析设备之间是否存在联动关系(比如同一条产线上的设备负载相关):
// 提取每台设备的分钟级振动序列
vibMatrix = exec avg(vibration)
from loadTable("dfs://device_health", "sensor")
pivot by bar(ts, 1m), deviceId
// 计算设备间的相关系数矩阵
corrMatrix = cross(corr, vibMatrix, vibMatrix)exec ... pivot by 返回矩阵,cross(corr, ...) 计算两两相关系数。如果某些设备之间的相关性突然降低,可能意味着其中一台设备出现了异常行为。
手动看排名和热力图固然直观,但在设备数量多、需要持续监控的场景下,需要自动化的异常识别方法。
对每个特征,计算设备值与群体均值的偏离程度:
// 计算每个特征在截面上的Z-Score
select deviceId,
avgVib,
(avgVib - avg(avgVib)) / std(avgVib) as zVib,
avgCurrent,
(avgCurrent - avg(avgCurrent)) / std(avgCurrent) as zCurrent,
maxTemp,
(maxTemp - avg(maxTemp)) / std(maxTemp) as zTemp,
avgRpm,
(avgRpm - avg(avgRpm)) / std(avgRpm) as zRpm
from (
select deviceId,
avg(vibration) as avgVib,
avg(current) as avgCurrent,
max(temperature) as maxTemp,
avg(rpm) as avgRpm
from loadTable("dfs://device_health", "sensor")
group by deviceId
)
context by NULLcontext by NULL 在这里的作用是在整个截面(所有设备)上计算均值和标准差。Z-Score 绝对值超过 2 的设备就是显著偏离群体的设备。
实测结果:DEV_003 和 DEV_017 的振动 Z-Score 分别为 2.8 和 3.1,电流 Z-Score 分别为 2.5 和 2.9,均超过 2 倍标准差阈值。
Z-Score 对极端值敏感(一个特别异常的设备会拉高整体均值和标准差),IQR(四分位距)方法更鲁棒:
// 用IQR方法识别异常设备
select deviceId, avgVib,
iqr(avgVib) as vibIQR,
percentile(avgVib, 25) - 1.5 * iqr(avgVib) as vibLower,
percentile(avgVib, 75) + 1.5 * iqr(avgVib) as vibUpper,
iif(avgVib > percentile(avgVib, 75) + 1.5 * iqr(avgVib), "异常偏高",
iif(avgVib < percentile(avgVib, 25) - 1.5 * iqr(avgVib), "异常偏低", "正常")) as vibStatus
from (
select deviceId, avg(vibration) as avgVib
from loadTable("dfs://device_health", "sensor")
group by deviceId
)
context by NULLIQR 方法的判定规则:超过 Q3 + 1.5×IQR 或低于 Q1 - 1.5×IQR 的为异常。iqr() 是 DolphinDB 内置函数,直接计算四分位距。
单个指标异常可能是测量误差,多个指标同时异常才更可信:
// 多指标联合判定:至少2个指标Z-Score超过阈值
select deviceId,
zVib, zCurrent, zTemp, zRpm,
(abs(zVib) > 2 ? 1 : 0) +
(abs(zCurrent) > 2 ? 1 : 0) +
(abs(zTemp) > 2 ? 1 : 0) +
(abs(zRpm) > 2 ? 1 : 0) as anomalyCount,
iif(
(abs(zVib) > 2 ? 1 : 0) +
(abs(zCurrent) > 2 ? 1 : 0) +
(abs(zTemp) > 2 ? 1 : 0) +
(abs(zRpm) > 2 ? 1 : 0) >= 2,
"疑似异常", "正常"
) as status
from (
select deviceId,
avgVib, (avgVib - avg(avgVib)) / std(avgVib) as zVib,
avgCurrent, (avgCurrent - avg(avgCurrent)) / std(avgCurrent) as zCurrent,
maxTemp, (maxTemp - avg(maxTemp)) / std(maxTemp) as zTemp,
avgRpm, (avgRpm - avg(avgRpm)) / std(avgRpm) as zRpm
from (
select deviceId,
avg(vibration) as avgVib,
avg(current) as avgCurrent,
max(temperature) as maxTemp,
avg(rpm) as avgRpm
from loadTable("dfs://device_health", "sensor")
group by deviceId
)
context by NULL
)
where (abs(zVib) > 2 ? 1 : 0) +
(abs(zCurrent) > 2 ? 1 : 0) +
(abs(zTemp) > 2 ? 1 : 0) +
(abs(zRpm) > 2 ? 1 : 0) >= 2
order by anomalyCount desc这个查询嵌套了三层子查询,但逻辑清晰:最内层算原始特征,中间层算 Z-Score,最外层做多指标联合判定。
识别出异常设备后,进一步的需求是:能否给每台设备打一个健康分? 这样运维团队可以按优先级安排检修。
设计一个简单但实用的评分模型:
每个维度在截面内排名后归一化到 0-100 分,然后加权求和。
// Step 1: 计算原始指标
rawMetrics = select deviceId,
avg(vibration) as avgVib,
max(vibration) as maxVib,
avg(temperature) as avgTemp,
max(temperature) as maxTemp,
avg(current) as avgCurrent,
avg(rpm) as avgRpm,
avg(current) / avg(rpm) * 1000 as currentPerRpm,
std(vibration) / avg(vibration) as cvVib,
std(current) / avg(current) as cvCurrent,
std(temperature) / avg(temperature) as cvTemp
from loadTable("dfs://device_health", "sensor")
group by deviceId
// Step 2: 截面排名归一化(rank越靠前=越好=分数越高)
rankedMetrics = select deviceId,
(1.0 - rank(avgVib, true) / count(avgVib)) * 100 as vibScore,
(1.0 - rank(maxTemp, true) / count(maxTemp)) * 100 as tempScore,
(1.0 - rank(currentPerRpm, true) / count(currentPerRpm)) * 100 as effScore,
(1.0 - rank(cvVib + cvCurrent + cvTemp, true) / count(cvVib)) * 100 as stabScore
from rawMetrics
context by NULL
// Step 3: 加权求和
healthScore = select deviceId,
vibScore, tempScore, effScore, stabScore,
vibScore * 0.30 + tempScore * 0.25 + effScore * 0.25 + stabScore * 0.20 as healthScore,
iif(vibScore * 0.30 + tempScore * 0.25 + effScore * 0.25 + stabScore * 0.20 >= 80, "优秀",
iif(vibScore * 0.30 + tempScore * 0.25 + effScore * 0.25 + stabScore * 0.20 >= 60, "良好",
iif(vibScore * 0.30 + tempScore * 0.25 + effScore * 0.25 + stabScore * 0.20 >= 40, "一般", "警告"))) as grade
from rankedMetrics
order by healthScore asc实测结果:
deviceId | vibScore | tempScore | effScore | stabScore | healthScore | grade |
|---|---|---|---|---|---|---|
DEV_017 | 8.0 | 10.0 | 6.0 | 12.0 | 8.9 | 警告 |
DEV_003 | 12.0 | 14.0 | 10.0 | 16.0 | 12.9 | 警告 |
DEV_025 | 48.0 | 52.0 | 50.0 | 46.0 | 49.0 | 一般 |
... | ... | ... | ... | ... | ... | ... |
DEV_041 | 96.0 | 94.0 | 98.0 | 92.0 | 95.2 | 优秀 |
DEV_003 和 DEV_017 的健康评分明显低于其他设备,与预期一致。

说明:排名归一化的公式是 (1 - rank/N) * 100。rank=1 表示最好(振动最低),分数接近 100;rank=N 表示最差,分数接近 0。这种归一化方式的好处是:不需要知道指标的实际物理范围,只依赖相对排名,适用于不同类型和型号的设备。
静态评分只反映某个时间段的状况。对于设备运维来说,趋势比现状更重要——一台评分 70 但持续下降的设备,比一台评分 50 但保持稳定的设备更需要关注。
把前面的评分逻辑封装成函数,按天计算:
// 按天计算每台设备的健康评分趋势
dailyScore = select deviceId, date(ts) as day,
avg(vibration) as avgVib,
max(temperature) as maxTemp,
avg(current) / avg(rpm) * 1000 as currentPerRpm,
std(vibration) / avg(vibration) as cvVib
from loadTable("dfs://device_health", "sensor")
group by deviceId, date(ts)
// 计算每天的截面排名并生成评分
select deviceId, day,
avgVib,
(1.0 - rank(avgVib, true) / count(avgVib)) * 100 as dailyVibScore,
(1.0 - rank(maxTemp, true) / count(maxTemp)) * 100 as dailyTempScore,
(1.0 - rank(currentPerRpm, true) / count(currentPerRpm)) * 100 as dailyEffScore
from dailyScore
context by day用移动平均平滑日间波动,观察趋势:
// 7天滑动平均健康分(窗口覆盖全部数据)
select deviceId, day, dailyVibScore,
mavg(dailyVibScore, 3) as vibScoreTrend,
dailyVibScore - move(dailyVibScore, 1) as vibScoreChange
from (
select deviceId, date(ts) as day,
avg(vibration) as avgVib,
(1.0 - rank(avg(vibration), true) / count(avg(vibration))) * 100 as dailyVibScore
from loadTable("dfs://device_health", "sensor")
group by deviceId, date(ts)
context by day
)
context by deviceId csort daymavg(dailyVibScore, 3) 计算三天滑动平均,dailyVibScore - move(dailyVibScore, 1) 计算日间变化量。如果变化量持续为负(评分持续下降),说明设备在劣化。
将趋势分析部署到流计算引擎,实现实时劣化预警:
// 定义劣化检测函数
@state
def degradeAlert(vibScore, threshold, declineDays) {
// 如果当前评分低于阈值且连续下降N天,触发预警
decline = deltas(vibScore, 1) < 0
consecutiveDecline = segment(decline, 0)
return iif(vibScore < threshold and decline, 1, 0)
}这里用
@state注解声明有状态函数,segment函数用于识别连续下降的时段。评分持续下降超过阈值时触发预警。
3024 万条数据上跑这些分析查询,性能如何?
查询 | 数据范围 | 耗时 |
|---|---|---|
单设备特征提取(GROUP BY deviceId) | 7天/50台 | ~1.5s |
截面排名(context by + rank) | 50台设备 | ~200ms |
小时维度分析(GROUP BY deviceId, hour) | 7天/50台 | ~3s |
pivot by 设备对比矩阵 | 1天/50台 | ~800ms |
逐天健康评分趋势 | 7天/50台 | ~4s |
测试环境为单节点社区版,16GB 内存。
我用三种分区策略做了对比测试:
策略 A:日期 VALUE + 设备 HASH(推荐) 策略 B:日期 VALUE(单维度分区) 策略 C:日期 RANGE + 设备 VALUE
// 策略A:日期VALUE + 设备HASH
db1 = database("", VALUE, 2024.01.01..2024.12.31)
db2 = database("", HASH, [SYMBOL, 10])
db = database("dfs://health_a", COMPO, [db1, db2])
// 策略B:仅日期VALUE
db1 = database("", VALUE, 2024.01.01..2024.12.31)
db = database("dfs://health_b", db1)
// 策略C:日期RANGE + 设备VALUE
db1 = database("", RANGE, 2024.01.01 2024.06.01 2025.01.01)
db2 = database("", VALUE, "DEV_001" "DEV_010" "DEV_020" "DEV_030" "DEV_040" "DEV_050")
db = database("dfs://health_c", COMPO, [db1, db2])测试结果(同样数据量,GROUP BY deviceId 查询):
分区策略 | 耗时 | 说明 |
|---|---|---|
A: VALUE+HASH | ~1.5s | 分区裁剪高效,数据均匀分布 |
B: VALUE | ~2.3s | 设备维度无分区,需要扫描更多数据 |
C: RANGE+VALUE | ~1.8s | RANGE分区粒度粗,VALUE分区在设备少时效果好 |
结论:对于 IoT 设备数据,日期 VALUE + 设备 HASH 的复合分区策略在查询性能和数据均衡方面表现最好。日期分区方便按时间范围裁剪,HASH 分区保证数据均匀分布到各节点。
1. 利用分区裁剪
查询时尽量带上时间条件,触发分区裁剪:
// 好:指定日期范围,只扫描相关分区
select ... from sensor
where date(ts) between 2024.06.01 and 2024.06.07
group by deviceId
// 差:不带时间条件,全表扫描
select ... from sensor
group by deviceId2. context by + csort 不能忘
计算移动窗口时必须保证组内按时间排序:
// 正确:csort 保证时间顺序
select deviceId, day, mavg(score, 3) as trend
from dailyScore
context by deviceId csort day
// 错误:缺少csort,mavg结果不可靠
select deviceId, day, mavg(score, 3) as trend
from dailyScore
context by deviceId这个坑我在四期就踩过,这次差点又忘。context by 不保证组内顺序,移动窗口计算前必须 csort。
3. 子查询 vs 多步骤
复杂的评分计算可以拆成多个步骤(中间变量),也可以用嵌套子查询一步完成。在性能上差异不大,但可读性差别明显。建议超过 3 层嵌套时拆成多步骤。
实际运维中,需要定期(比如每天凌晨)重新计算健康评分,而不是手动跑 SQL。
DolphinDB 通过 scheduleJob 创建定时任务:
// 定义每日健康评分计算函数
def calcDailyHealth() {
// 计算昨天的设备特征
yesterday = date(now()) - 1
rawMetrics = select deviceId,
avg(vibration) as avgVib,
max(temperature) as maxTemp,
avg(current) / avg(rpm) * 1000 as currentPerRpm,
std(vibration) / avg(vibration) as cvVib
from loadTable("dfs://device_health", "sensor")
where date(ts) = yesterday
group by deviceId
// 截面排名并计算评分
result = select deviceId, yesterday as scoreDate,
(1.0 - rank(avgVib, true) / count(avgVib)) * 100 as vibScore,
(1.0 - rank(maxTemp, true) / count(maxTemp)) * 100 as tempScore,
(1.0 - rank(currentPerRpm, true) / count(currentPerRpm)) * 100 as effScore
from rawMetrics
context by NULL
// 写入评分结果表
loadTable("dfs://device_health", "health_score").append!(result)
}
// 注册每天凌晨2点执行的定时任务
scheduleJob("dailyHealth", "Calculate daily health score",
calcDailyHealth, 0 2 * * *)// 健康评分结果表
scoreDb = database("", VALUE, 2024.01.01..2025.12.31)
scoreSchema = table(1:0,
`deviceId`scoreDate`vibScore`tempScore`effScore`,
[SYMBOL, DATE, DOUBLE, DOUBLE, DOUBLE])
scoreDb.createPartitionedTable(scoreSchema, "health_score", "scoreDate")每天一张分区,方便按日期查询历史评分趋势,也方便定期清理过期数据。
IoT 设备群体分析:
group by + 聚合函数提取每台设备的多维统计特征context by + rank() 实现截面排名mavg + deltas 监控评分变化趋势scheduleJob 实现定时计算和之前的单设备分析相比,群体分析的核心区别在于 context by NULL(在整个截面上计算统计量)和 rank()(横向排名)的使用。这两个操作在传统时序数据库(InfluxDB、TimescaleDB)中要么不支持,要么需要导出数据到 pandas 中完成。在 DolphinDB 中,它们是 SQL 层面的原生操作。
几个关键经验:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。