首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >2021年大数据Spark(五十三):Structured Streaming Deduplication

2021年大数据Spark(五十三):Structured Streaming Deduplication

作者头像
Lansonli
发布2021-10-11 10:19:18
发布2021-10-11 10:19:18
8890
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

Streaming Deduplication

介绍

在实时流式应用中,最典型的应用场景:网站UV统计。

1:实时统计网站UV,比如每日网站UV;

2:统计最近一段时间(比如一个小时)网站UV,可以设置水位Watermark;

Structured Streaming可以使用deduplication对有无Watermark的流式数据进行去重操作:

1.无 Watermark:对重复记录到达的时间没有限制。查询会保留所有的过去记录作为状态用于去重;

2.有 Watermark:对重复记录到达的时间有限制。查询会根据水印删除旧的状态数据;

官方提供示例代码如下:

​​​​​​​需求

对网站用户日志数据,按照userId和eventType去重统计

数据如下:

代码语言:javascript
复制
{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}

{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}

{"eventTime": "2016-01-10 10:01:55","eventType": "browse","userID":"1"}

{"eventTime": "2016-01-10 10:01:55","eventType": "click","userID":"1"}

{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}

{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}

{"eventTime": "2016-01-10 10:02:00","eventType": "click","userID":"1"}

{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}

{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"1"}

{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"1"}

{"eventTime": "2016-01-10 10:01:50","eventType": "browse","userID":"1"}

{"eventTime": "2016-01-10 10:01:50","eventType": "click","userID":"3"}

{"eventTime": "2016-01-10 10:01:51","eventType": "click","userID":"2"}

​​​​​​​代码演示

代码语言:javascript
复制
package cn.itcast.structedstreaming

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}

object StructuredDeduplication {
  def main(args: Array[String]): Unit = {
    // 1. 构建SparkSession会话实例对象,设置属性信息
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .config("spark.sql.shuffle.partitions", "3")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import org.apache.spark.sql.functions._
    import spark.implicits._

    // 1. 从TCP Socket 读取数据
    val inputTable: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()

    // 2. 数据处理分析
    val resultTable: DataFrame = inputTable
      .as[String]
      .filter(StringUtils.isNotBlank(_))
      // 样本数据:{“eventTime”: “2016-01-10 10:01:50”,“eventType”: “browse”,“userID”:“1”}
      .select(
        get_json_object($"value", "$.eventTime").as("event_time"),
        get_json_object($"value", "$.eventType").as("event_type"),
        get_json_object($"value", "$.userID").as("user_id")
      )
      // 按照UserId和EventType去重
      .dropDuplicates("user_id", "event_type")
      .groupBy($"user_id", $"event_type")
      .count()

    // 3. 设置Streaming应用输出及启动
    val query: StreamingQuery = resultTable.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      .start()
    query.awaitTermination()
    query.stop()
  }
}

运行应用结果如下:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/04/23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Streaming Deduplication
    • 介绍
    • ​​​​​​​需求
    • ​​​​​​​代码演示
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档