首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring函数不使用Avro反序列化器

Spring函数不使用Avro反序列化器
EN

Stack Overflow用户
提问于 2022-09-06 20:25:42
回答 1查看 150关注 0票数 0

我有一个应用程序,我正在尝试利用功能接口。我得到了使用String的概念的证明,但是现在我正试图处理Avro生成的消息,我无法让反序列化工作。我看过类似的问题,但是他们要么没有和Avro一起工作,要么让Avro开始工作,而我尝试使用他们的配置失败了。

下面是代码和配置。我尝试过各种不同的yml条目,但没有任何结果,甚至尝试创建自己的反序列化器。

代码语言:javascript
复制
public class StreamFunctions {

    @Bean
    public Function<KStream<String, MyAvroInputClass>, KStream<String, MyAvroOutputClass>> myConsumer() {
        return input -> {
            log.error("Received event");
            // ignore the message
            log.error("Ignoring event");
            return input.map((k, v) -> new KeyValue<>(null, null));
        };
    }

}

Application.yml

代码语言:javascript
复制
---
spring:
  application:
    name: functional-streams
  main:
    banner-mode: off
  cloud:
    function:
      definition: myConsumer
    stream:
      default-binder: kafka
      bindings:
        myConsumer-in-0:
          destination: input-topic
          content-type: application/*+avro
          consumer:
            use-native-decoding: true
            auto-startup: true
        myConsumer-out-0:
          content-type: application/*+avro
          destination: output-topic
          producer:
            use-native-encoding: true
      kafka:
        streams:
          binder:
            applicationId: ${info.app.name}
            configuration:
              commit.interval.ms: 100
              schema.registry.url: http://localhost:9091
        binder:
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://localhost:9092
          consumer-properties:
            key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            schema.registry.url: http://localhost:9092
            specific.avro.reader: true
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class:  io.confluent.kafka.serializers.KafkaAvroDeserializer
          configuration:
            value:
              deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring:
              deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

分级文件

代码语言:javascript
复制
    implementation("org.springframework.boot:spring-boot-starter")
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.springframework.boot:spring-boot-starter-aop")
    implementation("org.springframework.boot:spring-boot-starter-actuator")
    implementation("org.springframework.boot:spring-boot-starter-log4j2")

    implementation("org.apache.avro:avro:1.11.0")
    implementation("io.confluent:kafka-avro-serializer:7.1.1")

    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.2.4")
    // https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka-streams
    implementation ("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.2.4")
    // https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-schema
    testImplementation("org.springframework.cloud:spring-cloud-stream-schema:2.2.1.RELEASE")
//    implementation("org.springframework.cloud:spring-cloud-schema-registry-client")// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-function-context
    implementation("org.springframework.cloud:spring-cloud-function-context:3.2.6")

    implementation("com.fasterxml.jackson.core:jackson-core:2.13.3")
    implementation("com.fasterxml.jackson.core:jackson-annotations")
    implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml")
    implementation("com.fasterxml.jackson.core:jackson-databind")
    implementation("org.springdoc:springdoc-openapi-ui:1.6.11")

错误结果:

代码语言:javascript
复制
2022-09-06 15:09:39,987 [ERROR] [core-cls-sync-advertising-df1651cc-0ab8-47b5-b7b9-fd307b6a6d8e-StreamThread-1] LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_0, topic: inbound-topic, partition: 0, offset: 3 {}
org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 6, 0, 0, 0, 0, 2, 8, 74, 111, 104, 110, 2, 6, 68, 111, 101, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]] from topic [inbound-topic]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:588) ~[spring-kafka-2.8.8.jar:2.8.8]

我想是我的yml文件出了问题,但我不知道我做错了什么

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-09-06 22:48:01

我认为这与一些配置问题有关。请将您的配置与此示例应用程序进行比较:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples/kafka-streams-avro

看看配置这里

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73627437

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档