我使用的是Publish Kafka处理器的下一个配置:

编辑kafka和zookeeper配置:
zookeeper.properties
authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000zookeeper_jaas.conf
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_super="zookeeper"
user_admin="admin-secret";
};server.properties
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
auto.create.topics.enable=false
listeners=SASL_PLAINTEXT://172.23.199.20:9092
advertised.listeners=SASL_PLAINTEXT://172.23.199.20:9092
zookeeper.set.acl=true
super.users=User:adminkafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};身份验证工作正常。
启用授权
添加管理员:
./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret]' --entity-type users --entity-name admin添加用户:
./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456]' --entity-type users --entity-name pkalita添加权限:
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:admin --producer --topic test./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:pkalita --producer --topic test完成这些操作后,processor PublishKafka可以很好地处理主体管理员,但如果选择用户pkalita,则会抛出异常:
org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed

仅当设置为Use transactions - false时,处理器才能工作
我做错了什么?
更新:我尝试使用spring Kafka producer和用户pkalita发送消息-该消息已成功发布到主题上
发布于 2021-01-14 19:24:34
设置ACL时需要指定事务ID。
从docs
必须授权事务性生产者使用的主体
对已配置的
transactional.id执行描述和写入操作。
bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf \
--add --allow-principal User:Alice \
--producer --topic test-topic --transactional-id test-txn您还可以使用--transactional-id *来允许任何事务ID。
https://stackoverflow.com/questions/65716171
复制相似问题