首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PyFlink - Scala如何在表API中转换Scala?

PyFlink - Scala如何在表API中转换Scala?
EN

Stack Overflow用户
提问于 2020-11-06 21:31:28
回答 1查看 475关注 0票数 0

我正在尝试将我的Scala (scala.collection.immutable.map)的Map[String,String]对象输出映射到表API中的一些有效数据类型,即通过Java (java.util.Map),如这里所建议的:Flink Table API & SQL and map types (Scala)。然而,我得到了以下错误。

知道该怎么做吗?如果是,是否有一种将转换为Map[String,Any]类型的(嵌套) Scala对象的方法?

Scala UDF

代码语言:javascript
复制
class dummyMap() extends ScalarFunction {
  def eval() = {
    val whatevermap = Map("key1" -> "val1", "key2" -> "val2")
    whatevermap.asInstanceOf[java.util.Map[java.lang.String,java.lang.String]]
  }
}

水槽

代码语言:javascript
复制
my_sink_ddl = f"""
    create table mySink (
        output_of_dummyMap_udf MAP<STRING,STRING>
    ) with (
        ...
    )
"""

误差

代码语言:javascript
复制
Py4JJavaError: An error occurred while calling o430.execute.
: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink `default_catalog`.`default_database`.`mySink` do not match.
Query result schema: [output_of_my_scala_udf: GenericType<java.util.Map>]
TableSink schema:    [output_of_my_scala_udf: Map<String, String>]

谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-12-02 19:12:51

最初的答案来自Wei Zhong。我只是个记者。谢谢小薇!

在这一点上(Flink 1.11),有两种方法正在工作:

registering

  • Outdated:当前: UDF定义中的DataTypeHint + UDF中的SQL覆盖UDF定义中的getResultType +用于UDF注册

的t_env.register_java_function。

Scala UDF

代码语言:javascript
复制
package com.dummy

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

class dummyMap extends ScalarFunction {

  // If the udf would be registered by the SQL statement, you need add this typehint
  @DataTypeHint("ROW<s STRING,t STRING>")
  def eval(): Row = {

    Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))

  }

  // If the udf would be registered by the method 'register_java_function', you need override this
  // method.
  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
    // The type of the return values should be TypeInformation
    Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), Types.STRING()))
  }
}

Python代码

代码语言:javascript
复制
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)

# load the scala udf jar file, the path should be modified to yours
# or your can also load the jar file via other approaches
st_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/zhongwei/the-dummy-udf.jar")

# register the udf via 
st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE SCALA")
# or register via the method
# st_env.register_java_function("dummyMap", "com.dummy.dummyMap")

# prepare source and sink
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
st_env.execute_sql("""create table mySink (
        output_of_my_scala_udf ROW<s STRING,t STRING>
    ) with (
        'connector' = 'print'
    )""")

# execute query
t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64721849

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档