這篇文章主要介紹了spring boot整合kafka過程解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
一、啟動kafka
啟動kafka之前一定要啟動zookeeper,因為要使用kafka必須要使用zookeeper。
windows環境下啟動,直接使用kafka自帶的zookeeper:
E:/kafka_2.12-2.4.0/bin/windows zookeeper-server-start.bat ../../config/zookeeper.properties
接下來啟動kafka
E:/kafka_2.12-2.4.0/bin/windows kafka-server-start.bat ../../config/server.properties
二、spring boot整合kafka項目實例
1.導入的maven
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置文件:
server.port=80#kafka地址,可以有多個spring.kafka.bootstrap-servers=127.0.0.1:9092#------生產者配置文件---------#指定kafka消息體和key的編碼格式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#設置等待acks返回的機制,有三個值# 0:不等待返回的acks(可能會丟數據,因為發送消息沒有了失敗重試機制,但是這是最低延遲)# 1:消息發送給kafka分區中的leader后就返回(如果follower沒有同步完成leader就宕機了,就會丟數據)# -1(默認):等待所有follower同步完消息后再發送(絕對不會丟數據)spring.kafka.producer.acks=-1# 消息累計到batch-size的值后,才會發送消息,默認為16384spring.kafka.producer.batch-size=16384#如果kafka遲遲不發送消息(這里指的是消息沒堆積到指定數量),那么過了這個時間(單位:毫米)開始發送spring.kafka.producer.properties.linger.ms=1#設置緩沖區大小,默認是33554432#這個緩沖區是kafka中兩個線程里的共享變量#這個兩個線程是main和sender,main負責把消息發送到共享變量,sender從共享變量拉數據spring.kafka.producer.buffer-memory=33554432#失敗重試發送的次數spring.kafka.producer.retries=2#------消費者配置文件---------#指定kafka消息體和key的編碼格式spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#指定消費者組的group_idspring.kafka.consumer.group-id=kafka_test#kafka意外宕機時,再次開啟消息消費的策略,共有三種策略#earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費#latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據、#none:當所有分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常spring.kafka.consumer.auto-offset-reset=earliest#自動提交offsetspring.kafka.consumer.enable-auto-commit=true#提交offset時間間隔spring.kafka.consumer.auto-commit-interval=100#消費監聽接口監聽的主題不存在時,默認會報錯因此要關掉這個spring.kafka.listener.missing-topics-fatal=false
2.創建topic
import org.apache.kafka.clients.admin.NewTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * 使用代碼創建的topic * 三個參數意思:topic的名稱;分區數量,新主題的復制因子;如果指定了副本分配,則為-1。 */@Configurationpublic class KafkaTopic { @Bean public NewTopic batchTopic() { return new NewTopic("testTopic", 8, (short) 1); }}
新聞熱點
疑難解答