kafka
Spring for Apache Kafka
一、Request
Apache Kafka Clients 3.3.x
Spring Framework 6.0.x
Minimum Java version: 17
二、入门demo
.Maven依赖
pom.xml <dependency>
<groupId> org.springframework.boot</groupId>
<artifactId> spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId> org.springframework.kafka</groupId>
<artifactId> spring-kafka</artifactId>
</dependency>
.Springboot配置文件
application.yml spring :
kafka :
bootstrap-servers : 192.168.0.161:9092
producer :
key-serializer : org.apache.kafka.common.serialization.StringSerializer
value-serializer : org.apache.kafka.common.serialization.StringSerializer
consumer :
key-deserializer : org.apache.kafka.common.serialization.StringDeserializer
value-deserializer : org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset : latest
.主题配置
TopicConfig.java @Configuration
public class TopicConfig {
@Bean
public KafkaAdmin . NewTopics topicsCreate () {
return new KafkaAdmin . NewTopics (
// TopicBuilder.name("defaultBoth")
// .build(),
// TopicBuilder.name("defaultPart")
// .replicas(1)
// .build(),
TopicBuilder . name ( "shafish-dev" )
. partitions ( 10 )
. replicas ( 2 )
. build ());
}
}
.消费端
ConsumerListener.java @Component
public class ConsumerListener {
@KafkaListener ( topics = { KafkaProperties . devTopic }, groupId = KafkaProperties . consumerGroupId )
void listener ( ConsumerRecord < String , String > record ) {
Optional < String > message = Optional . ofNullable ( record . value ());
if ( message . isPresent ()) {
System . out . println ( message . get ());
}
}
}
.生产端
MessageQuery.java public record MessageQuery ( String message ) {
}
MessageController.java @RestController
@RequestMapping ( "api/v1/message" )
public class MessageController {
private KafkaTemplate < String , String > kafkaTemplate ;
public MessageController ( KafkaTemplate < String , String > kafkaTemplate ) {
this . kafkaTemplate = kafkaTemplate ;
}
@PostMapping ( "/publish" )
public void publish ( @RequestBody MessageQuery query ) {
kafkaTemplate . send ( KafkaProperties . devTopic , query . message ());
}
}
Tip
在broker允许自动创建主题情况下,可NewTopic中配置主题名称、分区数量、分区副本数;
可以使用kafka提供的shell查看信息消费:./kafka-console-consumer.sh --bootstrap-server 192.168.0.161:9092 --topic shafish-dev --from-beginning
三、主题配置
Starting with version 2.7
broker端允许自动创建主题情况下
MyTopicConfig.java @Bean
public KafkaAdmin . NewTopics topics456 () {
return new NewTopics (
TopicBuilder . name ( "defaultBoth" ) // 主题名称
. build (),
TopicBuilder . name ( "defaultPart" )
. replicas ( 1 ) // 副本数量
. build (),
TopicBuilder . name ( "defaultRepl" )
. partitions ( 3 ) // 分区数量
. config ( TopicConfig . MESSAGE_TIMESTAMP_TYPE_CONFIG , "CreateTime" ) // LOG_APPEND_TIME 设置主题消息的时间戳
. build ());
}
四、消息发送
Abstract
Spring提供了KafkaTemplate
封装类,可以方便得发送消息给主题。
提供了KafkaTemplate的子类RoutingKafkaTemplate
,可以根据不同的主题选择不同Producer进行发送
1.KafkaTemplate
KafkaTemplate中提供消息发送的方法
KafkaTemplate.java CompletableFuture < SendResult < K , V >> sendDefault ( V data );
CompletableFuture < SendResult < K , V >> sendDefault ( K key , V data );
CompletableFuture < SendResult < K , V >> sendDefault ( Integer partition , K key , V data );
CompletableFuture < SendResult < K , V >> sendDefault ( Integer partition , Long timestamp , K key , V data );
CompletableFuture < SendResult < K , V >> send ( String topic , V data );
CompletableFuture < SendResult < K , V >> send ( String topic , K key , V data );
CompletableFuture < SendResult < K , V >> send ( String topic , Integer partition , K key , V data );
CompletableFuture < SendResult < K , V >> send ( String topic , Integer partition , Long timestamp , K key , V data );
CompletableFuture < SendResult < K , V >> send ( ProducerRecord < K , V > record );
CompletableFuture < SendResult < K , V >> send ( Message <?> message );
Map < MetricName , ? extends Metric > metrics ();
List < PartitionInfo > partitionsFor ( String topic );
< T > T execute ( ProducerCallback < K , V , T > callback );
// Flush the producer.
void flush ();
interface ProducerCallback < K , V , T > {
T doInKafka ( Producer < K , V > producer );
}
Quote
调用sendDefault
方法需要提前设置好默认的主题(spring:kafka:template:default-topic: shafish-dev
);
如果配置主题时设置了TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG
为CREATE_TIME
或者LOG_APPEND_TIME
,则会配置对应时间戳到消息中;
3.0后消息发送返回的是异步回调CompletableFuture
,当发送成功或者失败时会执行配置的回调方法;
CompletableFuture < SendResult < Integer , String >> future = template . send ( "myTopic" , "something" );
future . whenComplete (( result , ex ) -> {
...
});
如果需要统一处理回调,可以实现ProducerListener
方法,在其onSuccess
与onError
中编写回调逻辑(spring提供了一个默认实现LoggingProducerListener
);
.KafkaTemplate基本配置
可以在template中传入不同ProducerFactory
进行不同使用场景的配置
KafkaProducerConfig.java @Configuration
public class KafkaProducerConfig {
@Value ( "${spring.kafka.bootstrap-servers}" )
private String boostrapServers ;
@Bean
public Map < String , Object > producerConfig () {
HashMap < String , Object > props = new HashMap <> ();
props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , boostrapServers );
props . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , StringSerializer . class );
props . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer . class );
return props ;
}
@Bean
public ProducerFactory < String , String > producerFactory () {
return new DefaultKafkaProducerFactory <> ( producerConfig ());
}
@Bean
public KafkaTemplate < String , String > kafkaTemplate ( ProducerFactory < String , String > producerFactory ) {
return new KafkaTemplate <> ( producerFactory );
}
}
// 发送string类型的消息
@Bean
public KafkaTemplate < String , String > stringTemplate ( ProducerFactory < String , String > pf ) {
return new KafkaTemplate <> ( pf );
}
// 发送字节类型的消息
@Bean
public KafkaTemplate < String , byte []> bytesTemplate ( ProducerFactory < String , byte []> pf ) {
return new KafkaTemplate <> ( pf ,
Collections . singletonMap ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , ByteArraySerializer . class ));
}
.异步发送消息
public record MessageQuery ( String message ) {
}
@RestController
@RequestMapping ( "api/v1/message" )
public class MessageController {
private KafkaTemplate < String , String > kafkaTemplate ;
public MessageController ( KafkaTemplate < String , String > kafkaTemplate ) {
this . kafkaTemplate = kafkaTemplate ;
}
@PostMapping ( "/publish" )
public void publish ( @RequestBody MessageQuery query ) {
kafkaTemplate . send ( KafkaProperties . devTopic , query . message ());
}
@PostMapping ( "/publishAsync" )
public void publishAsync ( @RequestBody MessageQuery query ) {
CompletableFuture < SendResult < String , String >> send = kafkaTemplate . send ( KafkaProperties . devTopic , query . message ());
send . whenComplete (( result , ex ) -> {
if ( ex == null ) {
// handleSuccess(data);
System . out . println ( "success" );
}
else {
// handleFailure(data, record, ex);
ex . printStackTrace ();
}
});
}
}
.同步阻塞发送消息
public record MessageQuery ( String message ) {
}
@RestController
@RequestMapping ( "api/v1/message" )
public class MessageController {
private KafkaTemplate < String , String > kafkaTemplate ;
public MessageController ( KafkaTemplate < String , String > kafkaTemplate ) {
this . kafkaTemplate = kafkaTemplate ;
}
@PostMapping ( "/publish" )
public void publish ( @RequestBody MessageQuery query ) {
kafkaTemplate . send ( KafkaProperties . devTopic , query . message ());
}
@PostMapping ( "/publishSync" )
public void publishSync ( @RequestBody MessageQuery query ) {
try {
kafkaTemplate . send ( KafkaProperties . devTopic , query . message ()). get ( 10 , TimeUnit . SECONDS );
// handleSuccess(data);
System . out . println ( "success" );
}
catch ( ExecutionException e ) {
// handleFailure(data, record, e.getCause());
e . printStackTrace ();
}
catch ( TimeoutException | InterruptedException e ) {
// handleFailure(data, record, e);
e . printStackTrace ();
}
}
}
.同步阻塞发送消息
ConsumerListener.java @Component
public class ConsumerListener {
@KafkaListener ( topics = { KafkaProperties . devTopic }, groupId = KafkaProperties . consumerGroupId )
void listener ( ConsumerRecord < String , String > record ) {
Optional < String > message = Optional . ofNullable ( record . value ());
if ( message . isPresent ()) {
System . out . println ( message . get ());
}
}
}
2.RoutingKafkaTemplate
RoutingKafkaTemplate不支持transactions
, execute
, flush
, or metrics
等操作,具体等后续深入理解。
.修改ProducerFactory
KafkaProducerConfig.java @Configuration
public class KafkaProducerConfig {
@Value ( "${spring.kafka.bootstrap-servers}" )
private String boostrapServers ;
@Autowired
private KafkaProperties kafkaProperties ;
public Map < String , Object > producerConfig () {
HashMap < String , Object > props = new HashMap <> ();
props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , boostrapServers );
props . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , StringSerializer . class );
props . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer . class );
return props ;
}
@Bean
public ProducerFactory < String , String > producerFactory () {
return new DefaultKafkaProducerFactory <> ( producerConfig ());
}
@Bean
public ProducerFactory < Object , Object > pf () {
return new DefaultKafkaProducerFactory <> ( producerConfig ());
}
@Bean
public KafkaTemplate < String , String > kafkaTemplate ( ProducerFactory < String , String > producerFactory ) {
return new KafkaTemplate <> ( producerFactory );
}
/**
* pf中使用了配置的producerConfig,其中的key与value都用String序列化;
* 为了传输不同的主题信息就要配置不同的value序列化,比如这里的ByteArraySerializer
*/
@Bean
public RoutingKafkaTemplate routingTemplate ( GenericApplicationContext context ,
ProducerFactory < Object , Object > pf ) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map < String , Object > configs = new HashMap <> ( pf . getConfigurationProperties ());
configs . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , ByteArraySerializer . class );
DefaultKafkaProducerFactory < Object , Object > bytesPF = new DefaultKafkaProducerFactory <> ( configs );
context . registerBean ( DefaultKafkaProducerFactory . class , "bytesPF" , bytesPF );
Map < Pattern , ProducerFactory < Object , Object >> map = new LinkedHashMap <> ();
map . put ( Pattern . compile ( "shafish-byte-dev" ), bytesPF ); // KafkaProperties.devByteTopic
map . put ( Pattern . compile ( ".+" ), pf ); // Default PF with StringSerializer
return new RoutingKafkaTemplate ( map );
}
}
.消息生产者
MessageController.java @RestController
@RequestMapping ( "api/v1/message" )
public class MessageController {
private RoutingKafkaTemplate routingKafkaTemplate ;
public MessageController ( RoutingKafkaTemplate routingKafkaTemplate ) {
this . routingKafkaTemplate = routingKafkaTemplate ;
}
@PostMapping ( "/publishByte" )
public void publishByte ( @RequestBody MessageQuery query ) {
// 第二个参数就可以传入字节数组类型了
routingKafkaTemplate . send ( KafkaProperties . devByteTopic , query . message (). getBytes ());
// handleSuccess(data);
System . out . println ( "success" );
}
}
.消息消费者
ConsumerListener.java @Component
public class ConsumerListener {
@KafkaListener ( topics = { "shafish-byte-dev" }, groupId = KafkaProperties . consumerGroupId )
void byteListener ( ConsumerRecord < String , String > record ) {
Optional < String > message = Optional . ofNullable ( record . value ());
if ( message . isPresent ()) {
System . out . println ( message . get ());
}
}
}
3.DefaultKafkaProducerFactory
ProducerFactory用于创建kafka的producer实例,具体可以看KafkaProducerConfig
相关配置。在不使用事务的情况下,kafka默认只会创建一个单例的producer实例给客户端使用,但如果其中某个客户端调用了flush()
方法,会造成使用该producer实例的其他客户端也阻塞。
如果配置ProducerFactory的producerPerThread
为true,则会为每个线程都分配一个新的producer实例,避免flush阻塞问题。
4.ReplyingKafkaTemplate
.创建收发主题
./kafka-topics.sh --create --bootstrap-server 192.168.0.162:9092 --replication-factor 2 --partitions 10 --topic shafish-dev-requests
./kafka-topics.sh --create --bootstrap-server 192.168.0.162:9092 --replication-factor 2 --partitions 10 --topic shafish-dev-replies
.producer配置
KafkaProducerConfig.java @Configuration
public class KafkaProducerConfig {
@Value ( "${spring.kafka.bootstrap-servers}" )
private String boostrapServers ;
public Map < String , Object > producerConfig () {
HashMap < String , Object > props = new HashMap <> ();
props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , boostrapServers );
props . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , StringSerializer . class );
props . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer . class );
return props ;
}
@Bean
public ProducerFactory < String , String > producerFactory () {
DefaultKafkaProducerFactory < String , String > objectObjectDefaultKafkaProducerFactory = new DefaultKafkaProducerFactory <> ( producerConfig ());
//objectObjectDefaultKafkaProducerFactory.setProducerPerThread(true);
return objectObjectDefaultKafkaProducerFactory ;
}
/**
* 多实例消费
*/
@Bean
public ConcurrentMessageListenerContainer < String , String > repliesContainer ( ConcurrentKafkaListenerContainerFactory < String , String > containerFactory ) {
ConcurrentMessageListenerContainer < String , String > container = containerFactory . createContainer ( "shafish-dev-replies" ); // KafkaProperties.DEVREPLIES
container . getContainerProperties (). setGroupId ( "repliesGroup" );
container . setAutoStartup ( false );
return container ;
}
@Bean
public ReplyingKafkaTemplate < String , String , String > replyingKafkaTemplate ( ProducerFactory < String , String > producerFactory , ConcurrentMessageListenerContainer < String , String > repliesContainer ) {
return new ReplyingKafkaTemplate <> ( producerFactory , repliesContainer );
}
}
.消息生产者
@RestController
@RequestMapping ( "api/v1/message" )
public class MessageController {
@Autowire
private ReplyingKafkaTemplate replyingKafkaTemplate ;
@PostMapping ( "/publishReply" )
public String publishReply ( @RequestBody MessageQuery query ) throws ExecutionException , InterruptedException , TimeoutException {
ProducerRecord < String , String > record = new ProducerRecord <> ( "shafish-dev-requests" , query . message ()); //KafkaProperties.DEVREQUEST
// record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, query.message().getBytes()));
RequestReplyFuture < String , String , String > requestReplyFuture = replyingKafkaTemplate . sendAndReceive ( record );
SendResult < String , String > sendResult = requestReplyFuture . getSendFuture (). get ( 10 , TimeUnit . SECONDS );
System . out . println ( "send:" + sendResult . getRecordMetadata ());
ConsumerRecord < String , String > stringStringConsumerRecord = requestReplyFuture . get ( 10 , TimeUnit . SECONDS );
System . out . println ( "return value:" + stringStringConsumerRecord . value ());
return stringStringConsumerRecord . value ();
}
}
.消息消费
@Component
public class ConsumerListener {
//KafkaProperties.DEVREQUEST
@KafkaListener ( topics = "shafish-dev-requests" , groupId = KafkaProperties . consumerGroupId )
@SendTo
public String requestListener ( ConsumerRecord < String , String > record ) {
Optional < String > message = Optional . ofNullable ( record . value ());
if ( message . isPresent ()) {
System . out . println ( message . get ());
// Message<String> build = MessageBuilder.withPayload(record.value().toUpperCase())
// //.setHeader(KafkaHeaders.TOPIC, replyTo)
// .setHeader(KafkaHeaders.KEY, 42)
// //.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
// .build();
return "yes" ;
}
return null ;
}
}
五、消费接收
Abstract
可以通过配置MessageListenerContainer
监听器或者使用@KafkaListener
注解的方式接收kafka消息。
1.message listener
八种消息监听
当监听消息时,需要提供一个监听者去接收消息,提供了八种监听接口:
// 接收一条消息,自动提交消费记录
public interface MessageListener < K , V > {
void onMessage ( ConsumerRecord < K , V > data );
}
// 接收一条消息,手动提交消费记录
public interface AcknowledgingMessageListener < K , V > {
void onMessage ( ConsumerRecord < K , V > data , Acknowledgment acknowledgment );
}
// 接收一条消息,自动提交消费记录,并附带消费者相关信息
public interface ConsumerAwareMessageListener < K , V > extends MessageListener < K , V > {
void onMessage ( ConsumerRecord < K , V > data , Consumer <? , ?> consumer );
}
// 接收一条消息,手动提交消费记录,并附带消费者相关信息
public interface AcknowledgingConsumerAwareMessageListener < K , V > extends MessageListener < K , V > {
void onMessage ( ConsumerRecord < K , V > data , Acknowledgment acknowledgment , Consumer <? , ?> consumer );
}
public interface BatchMessageListener < K , V > {
void onMessage ( List < ConsumerRecord < K , V >> data );
}
public interface BatchAcknowledgingMessageListener < K , V > {
void onMessage ( List < ConsumerRecord < K , V >> data , Acknowledgment acknowledgment );
}
public interface BatchConsumerAwareMessageListener < K , V > extends BatchMessageListener < K , V > {
void onMessage ( List < ConsumerRecord < K , V >> data , Consumer <? , ?> consumer );
}
public interface BatchAcknowledgingConsumerAwareMessageListener < K , V > extends BatchMessageListener < K , V > {
void onMessage ( List < ConsumerRecord < K , V >> data , Acknowledgment acknowledgment , Consumer <? , ?> consumer );
}
方法中的consumer不是线程安全的
2.message listener containers
两种实现类
提供了两个listenerContainer实现类:KafkaMessageListenerContainer
和ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
在单线程中接收订阅主题的所有消息;ConcurrentMessageListenerContainer
提供一个或多个KafkaMessageListenerContainer
实例去消费消息
KafkaMessageListenerContainer
public KafkaMessageListenerContainer ( ConsumerFactory < K , V > consumerFactory ,
ContainerProperties containerProperties )
// 传入指定分区下标,明确该container使用哪个分区。默认使用负数,表示当前主题最后一个分区
public ContainerProperties ( TopicPartitionOffset ... topicPartitions )
public ContainerProperties ( String ... topics )
public ContainerProperties ( Pattern topicPattern )
创建日期:
February 28, 2023 00:30:17
最后更新:
June 21, 2024 22:29:28