首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用KSTREAM或KSQL将JSON数组转换为JSON对象

使用KSTREAM或KSQL将JSON数组转换为JSON对象
EN

Stack Overflow用户
提问于 2020-04-27 13:52:37
回答 2查看 918关注 0票数 2

我有以下格式的数据进入Kafka。

代码语言:javascript
复制
{"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}

我希望它像这样转换。

代码语言:javascript
复制
{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}

我尝试使用ksql进行扁平化,但是ksql还不支持数组。

我尝试使用下面的代码使用kstream进行扁平化。

代码语言:javascript
复制
builder.stream(inputTopic).flatMapValues(Object -> Arrays.asList()).to(outputTopic);

但它没有产生任何输出。在这方面的任何帮助都将非常感谢。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-04-27 17:00:54

KSQL / ksqlDB支持数组。下面是如何使用它来做你想做的事情:

代码语言:javascript
复制
-- Declare the stream
CREATE STREAM TEST1 
    (WHS ARRAY<STRUCT<"action"           VARCHAR
                    , "Update-Date-Time" VARCHAR
                    , "Number"           VARCHAR
                    , "Abbr"             VARCHAR
                    , "Name"             VARCHAR
                    , "Name2"            VARCHAR
                    , "Country-Code"     VARCHAR
                    , "Addr-1"           VARCHAR
                    , "Addr-2"           VARCHAR
                    , "Addr-4"           VARCHAR
                    , "City"             VARCHAR
                    , "State"            VARCHAR>>) 
    WITH (KAFKA_TOPIC ='test1'
         ,VALUE_FORMAT='JSON');

-- Set querying from beginning of the topic
SET 'auto.offset.reset' = 'earliest';

-- Query the array         
ksql> SELECT WHS FROM TEST1 EMIT CHANGES LIMIT 1;
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|WHS                                                                                                                                                   |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}] |
Limit Reached
Query terminated
ksql>         

-- Flatten the array
ksql> SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES LIMIT 1;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|KSQL_COL_0                                                                                                                                                                                                                                                                               |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}                                                                                                                                      |
Limit Reached
Query terminated
ksql>

您可以将其写入另一个流(topic):

代码语言:javascript
复制
ksql> CREATE STREAM TEST1_EXPLODE WITH (KAFKA_TOPIC='NEW_TEST1') AS SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES;

 Message
-------------------------------------------
 Created query with ID CSAS_TEST1_EXPLODE_155
-------------------------------------------
ksql> PRINT NEW_TEST1;
…
Value format: JSON or KAFKA_STRING
rowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"KSQL_COL_0":{"ACTION":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","NUMBER":"0","ABBR":"","NAME":"","NAME2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","STATE":""}}

如果你想展平生成的结构,你也可以这样做:

代码语言:javascript
复制
CREATE STREAM TEST1_FLATTENED AS SELECT  EXPLODE(WHS)->"action"           AS "action"           ,
        EXPLODE(WHS)->"Update-Date-Time" AS "Update-Date-Time" ,
        EXPLODE(WHS)->"Number"           AS "Number"           ,
        EXPLODE(WHS)->"Abbr"             AS "Abbr"             ,
        EXPLODE(WHS)->"Name"             AS "Name"             ,
        EXPLODE(WHS)->"Name2"            AS "Name2"            ,
        EXPLODE(WHS)->"Country-Code"     AS "Country-Code"     ,
        EXPLODE(WHS)->"Addr-1"           AS "Addr-1"           ,
        EXPLODE(WHS)->"Addr-2"           AS "Addr-2"           ,
        EXPLODE(WHS)->"Addr-4"           AS "Addr-4"           ,
        EXPLODE(WHS)->"City"             AS "City"             ,
        EXPLODE(WHS)->"State"            AS "State"
    FROM TEST1 EMIT CHANGES;
代码语言:javascript
复制
ksql> PRINT TEST1_FLATTENED;
…
Value format: JSON or KAFKA_STRING
rowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":"0","Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","State":""}
票数 2
EN

Stack Overflow用户

发布于 2020-04-27 16:26:23

Arrays.asList()只创建空列表。

添加从输入获取数组的实际逻辑,并将其转换为实现Iterable的集合(例如,ArrayList),这里我尝试将flatMapValues与Jackson一起使用:

代码语言:javascript
复制
        builder.stream(inputTopic).flatMapValues((ValueMapper<JsonNode, Iterable<JsonNode>>) value -> {
            ArrayNode arrayNode = (ArrayNode) value.get("WHS");
            return arrayNode::iterator;
        });
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61452495

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档