如何在小组中找出事件之间的时间?
例如,我有流源(Kafka),我可以从中获得许多列。这个流被读入spark、预处理、清理,只有这四列被保留:"ClientTimestamp“、"sensor_type”、"activity“、"User_detail”。
现在,我要计算每个用户存在临界活动的总时间。。
Clientimestamp Sensor_type activity User_detail
4/11/2021 10:00:00 ultrasonic critical user_A
4/11/2021 10:00:00 ultrasonic normal user_B
4/11/2021 10:03:00 ultrasonic normal user_A
4/11/2021 10:05:00 ultrasonic critical user_B
4/11/2021 10:06:00 ultrasonic critical user_A
4/11/2021 10:07:00 ultrasonic critical user_A
4/11/2021 10:08:00 ultrasonic critical user_B
4/11/2021 10:09:00 ultrasonic critical user_B因此,对于user_A,所有临界活动之间的总时间是通过找出两个临界事件之间的差异来计算的,并对这些差异进行总结。
(10:00:00 - 10:06:00)+(10:06:00 - 10:07:00)
therefore for userA critical activity lasted for total minute of (5+1)= 6 minutes.同样,对于user_B来说,
(10:05:00 - 10:08:00)+ (10:08:00-10:09:00)
userB critical activity lasted for total minute of (3+1) = 4 minute对于每个窗口,我想调用一个将计算总时间的自定义函数。如何在按窗口分组的组上应用函数?
df = df.withWatermark("clientTimestamp", "10 minutes")\
.groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail'), col('activity'))
.apply(calculate_time)发布于 2021-04-12 07:32:57
看起来,这可以通过取窗口内每个User_detail的最大和最小时间之间的差异来解决。此外,可以将活动上的筛选器应用于忽略“正常”行。
我看不出为什么这里需要应用自定义函数(如"calculate_time“)。请注意,我并不完全熟悉Python语法,但您的代码如下所示:
df = df \
.filter(df.activity == "critical") \
.withWatermark("clientTimestamp", "10 minutes") \
.groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail')) \
.agg((max("clientTimestamp") - min("clientTimestamp")).alias("time_difference"))https://stackoverflow.com/questions/67042787
复制相似问题