前言
在上一篇中講述如何搭建kafka集群,本篇則講述如何簡單的使用 kafka 。不過在使用kafka的時候,還是應該簡單的了解下kafka。
Kafka的介紹
Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。
Kafka 有如下特性:
kafka的術語
kafka核心Api
kafka有四個核心API
示例圖如下:
kafka 應用場景
以上介紹參考kafka官方文檔。
開發準備
如果我們要開發一個kafka的程序,應該做些什么呢?
首先,在搭建好kafka環境之后,我們要考慮的是我們是生產者還是消費者,也就是消息的發送者還是接受者。
不過在本篇中,生產者和消費者都會進行開發和講解。
在大致的了解kafka之后,我們來開發第一個程序。
這里用的開發語言是Java,構建工具Maven。
Maven的依賴如下:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency>
Kafka Producer
在開發生產的時候,先簡單的介紹下kafka各種配置說明:
...
還有更多配置,可以去查看官方文檔,這里就不在說明了。
那么我們kafka 的producer配置如下:
Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
kafka的配置添加之后,我們便開始生產數據,生產數據代碼只需如下就行:
producer.send(new ProducerRecord<String, String>(topic,key,value));
在寫好生產者程序之后,那我們先來生產吧!
我這里發送的消息為:
String messageStr="你好,這是第"+messageNo+"條數據";
并且只發送1000條就退出,結果如下:
可以看到信息成功的打印了。
如果不想用程序進行驗證程序是否發送成功,以及消息發送的準確性,可以在kafka服務器上使用命令查看。
Kafka Consumer
kafka消費這塊應該來說是重點,畢竟大部分的時候,我們主要使用的是將數據進行消費。
kafka消費的配置如下:
那么我們kafka 的consumer配置如下:
Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", 1000); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
由于我這是設置的自動提交,所以消費代碼如下:
我們需要先訂閱一個topic,也就是指定消費哪一個topic。
consumer.subscribe(Arrays.asList(topic));
訂閱之后,我們再從kafka中拉取數據:
ConsumerRecords<String, String> msgList=consumer.poll(1000);
一般來說進行消費會使用監聽,這里我們就用for(;;)來進行監聽, 并且設置消費1000條就退出!
結果如下:
可以看到我們這里已經成功消費了生產的數據了。
代碼
那么生產者和消費者的代碼如下:
生產者:
import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;/** * * Title: KafkaProducerTest* Description: * kafka 生產者demo* Version:1.0.0 * @author pancm* @date 2018年1月26日 */public class KafkaProducerTest implements Runnable { private final KafkaProducer<String, String> producer; private final String topic; public KafkaProducerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); this.producer = new KafkaProducer<String, String>(props); this.topic = topicName; } @Override public void run() { int messageNo = 1; try { for(;;) { String messageStr="你好,這是第"+messageNo+"條數據"; producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr)); //生產了100條就打印 if(messageNo%100==0){ System.out.println("發送的信息:" + messageStr); } //生產1000條就退出 if(messageNo%1000==0){ System.out.println("成功發送了"+messageNo+"條"); break; } messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST"); Thread thread = new Thread(test); thread.start(); }}
消費者:
import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;/** * * Title: KafkaConsumerTest* Description: * kafka消費者 demo* Version:1.0.0 * @author pancm* @date 2018年1月26日 */public class KafkaConsumerTest implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------開始消費---------"); try { for (;;) { msgList = consumer.poll(1000); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord<String, String> record : msgList) { //消費100條就打印 ,但打印的數據不一定是這個規律的 if(messageNo%100==0){ System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); } //當消費了1000條就退出 if(messageNo%1000==0){ break; } messageNo++; } }else{ Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST"); Thread thread1 = new Thread(test1); thread1.start(); }}
注: master、slave1、slave2 是因為我在自己的環境做了關系映射,這個可以換成服務器的IP。
當然項目我放在Github上了,有興趣的可以看看。 https://github.com/xuwujing/kafka
總結
簡單的開發一個kafka的程序需要以下步驟:
kafka介紹參考官方文檔:http://kafka.apache.org/intro
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對VeVb武林網的支持。
新聞熱點
疑難解答
圖片精選