Java Spring Kafka 类不在可信包中
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/51688924/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spring Kafka The class is not in the trusted packages
提问by alexanoid
In my Spring Boot/Kafka application before the library update, I used the following class org.telegram.telegrambots.api.objects.Update
in order to post messages to the Kafka topic. Right now I use the following org.telegram.telegrambots.meta.api.objects.Update
. As you may see - they have different packages.
在库更新之前的 Spring Boot/Kafka 应用程序中,我使用以下类org.telegram.telegrambots.api.objects.Update
将消息发布到 Kafka 主题。现在我使用以下org.telegram.telegrambots.meta.api.objects.Update
. 正如您所看到的 - 他们有不同的包。
After application restart I ran into the following issue:
应用程序重新启动后,我遇到了以下问题:
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultHymanson2JavaTypeMapper.getClassIdType(DefaultHymanson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.converter.DefaultHymanson2JavaTypeMapper.toJavaType(DefaultHymanson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access00(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
This is my config:
这是我的配置:
@EnableAsync
@Configuration
public class ApplicationConfig {
@Bean
public StringJsonMessageConverter jsonConverter() {
return new StringJsonMessageConverter();
}
}
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);
return props;
}
@Bean
public ProducerFactory<String, Update> updateProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Update> updateKafkaTemplate() {
return new KafkaTemplate<>(updateProducerFactory());
}
}
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.consumer.max.poll.interval.ms}")
private String kafkaConsumerMaxPollIntervalMs;
@Value("${kafka.consumer.max.poll.records}")
private String kafkaConsumerMaxPollRecords;
@Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}")
private Integer updateConsumerConcurrency;
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory(kafkaProperties));
return factory;
}
@Bean
public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(updateConsumerFactory(kafkaProperties));
factory.setConcurrency(updateConsumerConcurrency);
return factory;
}
}
application.properties
应用程序属性
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
How to solve this issue and let Kafka deserialize old messages into the new ones ?
如何解决这个问题,让 Kafka 将旧消息反序列化为新消息?
UPDATED
更新
This is my listener
这是我的听众
@Component
public class UpdateConsumer {
@KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {
//do some logic here
ack.acknowledge();
}
}
采纳答案by Gary Russell
See the documentation.
请参阅文档。
Starting with version 2.1, type information can be conveyed in record Headers, allowing the handling of multiple types. In addition, the serializer/deserializer can be configured using Kafka properties.
JsonSerializer.ADD_TYPE_INFO_HEADERS (default true); set to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).
JsonDeserializer.KEY_DEFAULT_TYPE; fallback type for deserialization of keys if no header information is present.
JsonDeserializer.VALUE_DEFAULT_TYPE; fallback type for deserialization of values if no header information is present.
JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang); comma-delimited list of package patterns allowed for deserialization; * means deserialize all.
从 2.1 版开始,类型信息可以在记录头中传送,允许处理多种类型。此外,可以使用 Kafka 属性配置序列化器/反序列化器。
JsonSerializer.ADD_TYPE_INFO_HEADERS(默认为真);设置为 false 以在 JsonSerializer 上禁用此功能(设置 addTypeInfo 属性)。
JsonDeserializer.KEY_DEFAULT_TYPE; 如果不存在标头信息,则用于反序列化键的回退类型。
JsonDeserializer.VALUE_DEFAULT_TYPE; 如果不存在标头信息,则用于反序列化值的回退类型。
JsonDeserializer.TRUSTED_PACKAGES (默认 java.util, java.lang); 允许反序列化的包模式的逗号分隔列表;* 表示反序列化所有。
By default, the serializer will add type information to the headers.
默认情况下,序列化程序会将类型信息添加到标头中。
请参阅引导文档。
Similarly, you can disable the JsonSerializer default behavior of sending type information in headers:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
同样,您可以禁用在标头中发送类型信息的 JsonSerializer 默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
Or you can add type mapping to the inbound message converter to map the source type to the destination type.
或者,您可以向入站消息转换器添加类型映射,以将源类型映射到目标类型。
EDIT
编辑
Having said that, what version are you using?
说了这么多,你用的是什么版本?
回答by Iroshan
There are two key points should be mentioned.
有两个关键点需要提及。
- There are two separated project for Producer and Consumer.
- Then sending message(value) is an Object type rather primitive type.
- 生产者和消费者有两个独立的项目。
- 然后发送消息(值)是一个对象类型而不是原始类型。
The problem is that the producing message object is not available in consumer side because those are two separate projects.
问题是生产消息对象在消费者端不可用,因为它们是两个独立的项目。
Two overcome this issue please follow below mention steps in Spring boot Producer and Consumer applications.
两个克服这个问题请按照下面提到的步骤在 Spring boot Producer 和 Consumer 应用程序中。
----Producer App -------------
----生产者应用程序 -------------
** Producer Configuration Class **
** 生产者配置类 **
import com.kafka.producer.models.Container;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Container> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate<String, Container> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
Note : Container is the custom Object to be posted in a kafka topic.
注意:容器是要在 kafka 主题中发布的自定义对象。
** Producer Class **
** 生产者类 **
import com.kafka.producer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "final-topic";
@Autowired
private KafkaTemplate<String, Container> kafkaTemplate;
public void sendUserMessage(Container msg) {
LOGGER.info(String.format("\n ===== Producing message in JSON ===== \n"+msg));
Message<Container> message = MessageBuilder
.withPayload(msg)
.setHeader(KafkaHeaders.TOPIC, TOPIC)
.build();
this.kafkaTemplate.send(message);
}
}
** Producer Controller **
** 生产者控制器 **
import com.kafka.producer.models.Container;
import com.kafka.producer.services.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/message")
public class MessageController {
@Autowired
private Producer producer;
@PostMapping(value = "/publish")
public String sendMessageToKafkaTopic(@RequestBody Container containerMsg) {
this.producer.sendUserMessage(containerMsg);
return "Successfully Published !!";
}
}
Note: The message with type Container will be published to the kafka topic name :final-topic as JSON message.
注意:类型为 Container 的消息将作为 JSON 消息发布到 kafka 主题名称 :final-topic。
===============================================================================
================================================== ==============================
-- Consumer App --
-- 消费者应用程序 --
** Configuration Class **
** 配置类 **
import com.kafka.consumer.models.Container;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerOneConfig {
@Bean
public ConsumerFactory<String, Container> consumerFactory(){
JsonDeserializer<Container> deserializer = new JsonDeserializer<>(Container.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_one");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Container> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Container> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Note: Here you can see, instead of using default JsonDeserializer() we have to use custom JsonDeserializer to consume Container object type Json Messages from final-topic(topic name).
注意:在这里您可以看到,我们必须使用自定义 JsonDeserializer 来使用来自 final-topic(topic name) 的容器对象类型 Json 消息,而不是使用默认的 JsonDeserializer()。
** Consumer Service **
** 消费者服务 **
import com.kafka.consumer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class ConsumerOne {
private final Logger LOGGER = LoggerFactory.getLogger(ConsumerOne.class);
@KafkaListener(topics = "final-topic", groupId = "group_one", containerFactory = "kafkaListenerContainerFactory")
public void consumeUserMessage(@Payload Container msg, @Headers MessageHeaders headers) throws IOException {
System.out.println("received data in Consumer One ="+ msg.getMessageTypes());
}
}
回答by jumping_monkey
jsonDeserializer.addTrustedPackages("*");
solved my issue for spring-kafka-2.2.8.
解决了我的 spring-kafka-2.2.8 问题。
回答by Sylhare
For this one there are two ways of doing it, either in your deserializer or in your application.yml.
对于这个,有两种方法可以做到,要么在您的解串器中,要么在您的 application.yml 中。
Either in your deserializer
无论是在您的解串器中
In your deserializer, that you use within your DefaultKafkaConsumerFactory
(to create your consumer factory).
Let's say you want to make a ConsumerFactory<String, Foo>
with Foo
being the model/POJO you want to use for your kafka messages.
在你的解串器中,你在你的DefaultKafkaConsumerFactory
(创建你的消费者工厂)中使用。比方说,你想使一个ConsumerFactory<String, Foo>
与Foo
作为模型/ POJO你想使用你的卡夫卡的消息。
You need to addTrustedPackages
from JsonDeserializer
I have an example in Kotlin, but it's the same syntax in java:
你需要addTrustedPackages
从JsonDeserializer
我在 Kotlin 中有一个例子,但它在 java 中的语法是相同的:
val deserializer = JsonDeserializer<Foo>()
deserializer.addTrustedPackages("com.example.entity.Foo") // Adding Foo to our trusted packages
val consumerFactory = DefaultKafkaConsumerFactory(
consumerConfigs(), // your consumer config
StringDeserializer(),
deserializer // Using our newly created deserializer
)
Or in your application.yml
或者在你的 application.yml 中
In your application.yml file following spring-kafkainstructions. We add the Foo class from com.example.entity.Foopackage in the trusted store using:
在您的 application.yml 文件中遵循spring-kafka说明。我们使用com.example.entity.Foo包在受信任的商店中添加 Foo 类:
spring:
kafka:
consumer:
properties:
spring.json.trusted.packages: "com.example.entity.Foo"
With spring.json.trusted.packages
accepting an array of packages. You can specify a class package, or use *
for any packages. In that case you don't need to pass your deserializer
in DefaultKafkaConsumerFactory()
only in the consumer config.
与spring.json.trusted.packages
接受封装的阵列。您可以指定一个类包,或*
用于任何包。在这种情况下,你并不需要你通过deserializer
在DefaultKafkaConsumerFactory()
仅在消费者配置。