我刚开始使用ksql,当我从一开始就打印主题时,我得到了以下格式的数据。
rowtime: 4/12/20, 9:00:05 AM MDT, key: {"messageId":null}, value: {"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}但是,KSQL中的所有示例都具有以下格式的数据
{"ROWTIME":1537436551210,"ROWKEY":"3375","rating_id":3375,"user_id":2,"stars":3,"route_id":6972,"rating_time":1537436551210,"channel":"web","message":"airport refurb looks great, will fly outta here more!"}所以我不能执行任何操作,格式如下所示
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING关于我的话题。如何将数据修改为特定格式?
谢谢
发布于 2020-06-03 23:39:20
ksqlDB还不支持JSON消息键(参见tracking Github issue)。
但是,您仍然可以访问键和值中的数据。JSON键毕竟只是一个字符串!
重新格式化后的值如下所示:
{
"WHS":[
{
"Character Set":"UTF-8",
"action":"finished",
"Update-Date-Time":"2020-04-11 09:00:02:25",
"Number":0,
"Abbr":"",
"Name":"",
"Name2":"",
"Country-Code":"",
"Addr-1":"",
"Addr-2":"",
"Addr-3":"",
"Addr-4":"",
"City":"",
"State":""
}
]
}假设所有行都共享一种通用格式,ksqlDB可以很容易地处理。
要导入您的流,您应该能够运行如下代码:
-- assuming v0.9 of Kafka
create stream stuff
(
ROWKEY STRING KEY,
WHS ARRAY<
STRUCT<
`Character Set` STRING,
action STRING,
`Update-Date-Time` STRING,
Number STRING,
... etc
>
>
)
WITH (kafka_topic='?', value_format='JSON');值列WHS是一个结构数组(其中将只有一个元素),结构定义了您需要访问的所有字段。请注意,某些字段名称需要使用引号,因为它们包含无效字符,例如空格和破折号。
https://stackoverflow.com/questions/61180715
复制相似问题