首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在旧DataFame的基础上创建新的DataFrame?

如何在旧DataFame的基础上创建新的DataFrame?
EN

Stack Overflow用户
提问于 2021-12-02 23:17:56
回答 3查看 79关注 0票数 0

我有csv文件: dbname1.table1.csv

代码语言:javascript
复制
|target            | source        |source_table                       |relation_type|
 ---------------------------------------------------------------------------------------
avg_ensure_sum_12m | inn_num       | custom_cib_ml_stg.p_overall_part_tend_cust | direct
avg_ensure_sum_12m | protocol_dttm | custom_cib_ml_stg.p_overall_part_tend_cust | direct
avg_ensure_sum_12m | inn_num       | custom_cib_ml_stg.p_overall_part_tend_cust | indirect

本表的csv格式:

代码语言:javascript
复制
target,source,source_table,relation_type
avg_ensure_sum_12m,inn_num,custom_cib_ml_stg.p_overall_part_tend_cust,direct
avg_ensure_sum_12m,protocol_dttm,custom_cib_ml_stg.p_overall_part_tend_cust,direct
avg_ensure_sum_12m,inn_num,custom_cib_ml_stg.p_overall_part_tend_cust,indirect

然后,我通过阅读来创建一个数据文件:

代码语言:javascript
复制
 val dfDL = spark.read.option("delimiter", ",")
                     .option("header", true)
                     .csv(file.getPath.toUri.getPath)

现在,我需要创建一个基于dfDL的新数据格式。

新dataframe的结构如下所示:

代码语言:javascript
复制
case class DataLink(schema_from: String,
                    table_from: String,
                    column_from: String,
                    link_type: String,
                    schema_to: String,
                    table_to: String,
                    column_to: String)

新DataFrame字段的信息来自csv文件:

代码语言:javascript
复制
pseudocode:
schema_from = source_table.split(".")(0) // Example: custom_cib_ml_stg
table_from  = source_table.split(".")(1) // Example: p_overall_part_tend_cust
column_from = source                     // Example: inn_num
link_type   = relation_type              // Example: direct
schema_to   = "dbname1.table1.csv".split(".")(0) // Example: dbname1
table_to    = "dbname1.table1.csv".split(".")(1) // Example: table1
column_to   = target                             // Example: avg_ensure_sum_12m

我需要创建一个新的数据格式。我一个人应付不来。

稍后我需要这个数据文件来创建一个json文件。示例JSON:

代码语言:javascript
复制
[{"schema_from":"custom_cib_ml36_stg",
"table_from":"p_overall_part_tend_cust",
"column_from":"inn_num",
"link_type":"direct",
"schema_to":"dbname1",
"table_to":"table1",
"column_to":"avg_ensure_sum_12m"
},
{"schema_from":"custom_cib_ml36_stg",
"table_from":"p_overall_part_tend_cust",
"column_from":"protocol_dttm",
"link_type":"direct","schema_to":"dbname1",
"table_to":"table1",
"column_to":"avg_ensure_sum_12m"}

我不喜欢我现在的实现:

代码语言:javascript
复制
def readDLFromHDFS(file: LocatedFileStatus): Array[DataLink] = {

    val arrTableName        = file.getPath.getName.split("\\.")
    val (schemaTo, tableTo) = (arrTableName(0), arrTableName(1))

    val dfDL = spark.read.option("delimiter", ",")
                         .option("header", true)
                         .csv(file.getPath.toUri.getPath)

    //val sourceTable = dfDL.select("source_table").collect().map(value => value.toString().split("."))

    dfDL.collect.map(row => DataLink(row.getString(2).split("\\.")(0),
                                     row.getString(2).split("\\.")(1),
                                     row.getString(1),
                                     row.getString(3),
                                     schemaTo,
                                     tableTo,
                                     row.getString(0)))
  }

  def toJSON(dataLinks: Array[DataLink]): Option[JValue] =
    dataLinks.map(Extraction.decompose).reduceOption(_ ++ _)

}
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2021-12-02 23:48:30

你绝对不想收集,这就违背了在这里使用火花的意义。和以往一样,你有很多选择。您可以使用RDDs,但我认为这里不需要在模式之间切换。您只想将自定义逻辑应用于某些列,并最终得到一个仅包含结果列的dataframe。

首先,定义要应用的UDF

代码语言:javascript
复制
def convert(target, source, source_table, relation_type) =
  DataLink(source_table.split("\\.")(0),
           source_table.split("\\.")(1),
           source,
           "dbname1.table1.csv".split(".")(0)
           "dbname1.table1.csv".split(".")(1)
           target))

然后将此函数应用于所有相关列(确保将其包装在udf中以使其成为火花函数,而不是普通的Scala函数),并对结果进行select

代码语言:javascript
复制
df.select(udf(convert)($"target", $"source", $"source_table", $"relation_type"))

如果您想要一个结果为7列的DataFrame

代码语言:javascript
复制
df.select(
  split(col("source_table"), "\\.").getItem(0),
  split(col("source_table"), "\\.").getItem(1),
  col("source"),
  lit("dbname1"),
  lit("table1"),
  col("target")
)

您还可以向这7列中的每一列添加.as("column_name")

票数 1
EN

Stack Overflow用户

发布于 2021-12-02 23:59:56

您可以直接使用dataset。

代码语言:javascript
复制
import spark.implicits._

case class DataLink(schema_from: String,
                    table_from: String,
                    column_from: String,
                    link_type: String,
                    schema_to: String,
                    table_to: String,
                    column_to: String)

val filename = "dbname1.table1.csv"
val df = spark.read.option("header","true").csv("test.csv")
df.show(false)
+------------------+-------------+------------------------------------------+-------------+
|target            |source       |source_table                              |relation_type|
+------------------+-------------+------------------------------------------+-------------+
|avg_ensure_sum_12m|inn_num      |custom_cib_ml_stg.p_overall_part_tend_cust|direct       |
|avg_ensure_sum_12m|protocol_dttm|custom_cib_ml_stg.p_overall_part_tend_cust|direct       |
|avg_ensure_sum_12m|inn_num      |custom_cib_ml_stg.p_overall_part_tend_cust|indirect     |
+------------------+-------------+------------------------------------------+-------------+

df.createOrReplaceTempView("table")

val df2 = spark.sql(s"""
select split(source_table, '[.]')[0] as schema_from
     , split(source_table, '[.]')[1] as table_from
     , source                        as column_from
     , relation_type                 as link_type
     , split('${filename}', '[.]')[0] as schema_to
     , split('${filename}', '[.]')[1] as table_to
     , target                        as column_to
  from table
""").as[DataLink]

df2.show()

+-----------------+--------------------+-------------+---------+---------+--------+------------------+
|      schema_from|          table_from|  column_from|link_type|schema_to|table_to|         column_to|
+-----------------+--------------------+-------------+---------+---------+--------+------------------+
|custom_cib_ml_stg|p_overall_part_te...|      inn_num|   direct|  dbname1|  table1|avg_ensure_sum_12m|
|custom_cib_ml_stg|p_overall_part_te...|protocol_dttm|   direct|  dbname1|  table1|avg_ensure_sum_12m|
|custom_cib_ml_stg|p_overall_part_te...|      inn_num| indirect|  dbname1|  table1|avg_ensure_sum_12m|
+-----------------+--------------------+-------------+---------+---------+--------+------------------+
票数 1
EN

Stack Overflow用户

发布于 2021-12-05 21:05:03

我的进步..。现在,我可以创建新的DataFrame,但是他只包含一个列。

代码语言:javascript
复制
val dfDL = spark.read.option("delimiter", ",")
                     .option("header", true)
                     .csv(file.getPath.toUri.getPath)

val convertCase = (target: String, source: String, source_table: String, relation_type: String) =>
                    DataLink(
                      source_table.split("\\.")(0),
                      source_table.split("\\.")(1),
                      source,
                      relation_type,
                      schemaTo,
                      tableTo,
                      target,
                    )


val udfConvert = udf(convertCase)

val dfForJson  = dfDL.select(udfConvert(col("target"),
                                        col("source"),
                                        col("source_table"),
                                        col("relation_type")))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70207656

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档