首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KSQL在CharConversionException中失败:读取流时无效的UTF-32字符

KSQL在CharConversionException中失败:读取流时无效的UTF-32字符
EN

Stack Overflow用户
提问于 2019-07-02 02:05:03
回答 1查看 3.3K关注 0票数 2

我正在使用Kafka将一些数据传输到Kafka。我可以使用print 'kdc-01-orders' from beginning;查看这些数据。数据看起来是正确的JSON。为了确保,我成功地将它解析为JSON。

因此,我使用这个主题创建了一个表,如下所示:

代码语言:javascript
复制
create table orders
(order_num varchar, cust_id integer, order_date integer)
with
(kafka_topic='kdc-01-orders', value_format='json', key='order_num');

该表已成功创建。但是,当我像这样查询时:

代码语言:javascript
复制
select * from orders limit 100;

我在ksql日志中看到了很多错误。它们看起来是这样的:

代码语言:javascript
复制
ksql-server_1      | [2019-07-01 21:21:43,803] WARN task [0_0] Skipping record due to deserialization error. topic=[kdc-01-orders] partition=[0] offset=[999] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
ksql-server_1      | org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: kdc-01-orders
ksql-server_1      | Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
ksql-server_1      |    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:80)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:67)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:37)
ksql-server_1      |    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:124)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:711)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:995)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:833)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
ksql-server_1      | Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1a9ef2e (above 0x0010ffff) at char #1, byte #7)
ksql-server_1      | Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x1a9ef2e (above 0x0010ffff) at char #1, byte #7)
ksql-server_1      |    at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
ksql-server_1      |    at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
ksql-server_1      |    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:243)
ksql-server_1      |    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2331)
ksql-server_1      |    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:646)
ksql-server_1      |    at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
ksql-server_1      |    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
ksql-server_1      |    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
ksql-server_1      |    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:80)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:67)
ksql-server_1      |    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:37)
ksql-server_1      |    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:124)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:711)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:995)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:833)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
ksql-server_1      |    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

问题中的字符,0x1a9ef2e,似乎不是一个有效的字符,就像错误描述的那样。这些数据是从使用latin1编码的数据库中提取的,但我不知道如何将其告诉KSQL。此错误多次重复,每次都有不同的字符。

EN

回答 1

Stack Overflow用户

发布于 2020-09-02 11:14:15

PRINT命令还将输出Avro记录,使其看起来类似于JSON。但是,PRINT也输出主题中值(和键)的格式。检查这个数据,看看数据实际上是JSON还是AVRO:

代码语言:javascript
复制
    Key format: KAFKA_STRING
    Value format: AVRO
    rowtime: 12/21/18 23:58:42 PM PSD, key: k0, value: {"v0":"hello": "v1": 10}
    ^CTopic printing ceased

本文将更深入地介绍转换器和序列化:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56844413

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档