單節點:
1、解壓kafka壓縮包到安裝目錄(自己指定);
2、進入kafka目錄并執行命令
> bin/zookeeper-server-start.sh config/zookeeper.PRoperties
#如果報錯,修改kafka-run-class.sh,將 -XX:+UseCompressedOops 刪除;
3、執行命令> bin/kafka-start.sh config/server.properties 1 >/dev/null 2>&1 &
4、創建topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --zookeeper localhost:2181
5、發送消息
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
6、接收消息
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
集群搭建:
hdp0 hdp1 hdp2 hdp3
zookeeper配置在0 1 2上,并已啟動。
1、在hdp0上修改配置文件
> vi config/server.properties:
broker.id=1
zookeeper.connect=hdp0:2181,hdp1:2181,hdp2:2181
(注意:log.dir=/tmp/kafka-logs #生產環境中不用tmp目錄)
2、將kafka目錄傳到1 2 3上
3、修改各自的broker.id
4、在4臺主機,kafka目錄下分別執行命令> bin/kafka-start.sh config/server.properties &
5、創建topic
> bin/kafka-topics.sh --create --zookeeper hdp0:2181 --replication-factor 3 --partitions 1 --topic mytopic
> bin/kafka-topics.sh --create --zookeeper hdp0:2181 --replication-factor 3 --partitions 1 --topic mytopic1
> bin/kafka-topics.sh --list --zookeeper hdp0:2181
6、發送消息
hdp1主機:
> bin/kafka-console-producer.sh --broker-list hdp0:9092 --topic mytopic
7、接受消息
hdp0主機:
> bin/kafka-console-consumer.sh --zookeeper hdp1:2181 --from-beginning --topic mytopic
8、查看topic的狀態信息
> bin/kafka-topics.sh --describe --zookeeper hdp0:2181 --topic mytopic
新聞熱點
疑難解答