安裝java安裝zookeeper1 部署配置2 配置說明21 myid文件和servermyid22 zoocfg23 log4jPRoperties24 zkEnvsh和zkServersh文件3 參數說明4 啟動測試安裝KAFKA1 部署配置2 啟動服務器3 kafka測試3 日志說明supervisor管理1 管理zookeeper2 管理kafka開發
mkdir /usr/local/java cp jdk-8u20-linux-x64.tar.gz /usr/local/java tar zxvf jdk-8u20-linux-x64.tar.gz vim /etc/profile
JAVA_HOME=/usr/local/java/jdk1.8.0_20JRE_HOME=JAVA_HOME/jreCLASS_PATH=.:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$JAVA_HOME/lib/dt.jarPATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATHexport JAVA_HOME JRE_HOME PATH CLASS_PATHsource /etc/profile java -version
mkdir /usr/local/zookeeper-cluster cp zookeeper-3.5.2-alpha.tar.gz /usr/local/zookeeper-cluster/ tar zxvf zookeeper-3.5.2-alpha.tar.gz cd /usr/local/zookeeper-cluster/zookeeper-3.5.2-alpha mv conf/zoo_sample.cfg conf/zoo.cfg mkdir data mkdir datalog vim conf/zoo.cfg
clientPort=2181dataDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datadatailogDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datalogsyncLimit=5initLimit=10tickTime=2000server.1=localhost:2887:3887server.2=localhost:2888:3888server.3=localhost:2889:3889mv zookeeper-3.5.2-alpha/ zookeeper-3.5.2-node1 cp -R zookeeper-3.5.2-node1 zookeeper-3.5.2-node2 cp -R zookeeper-3.5.2-node1 zookeeper-3.5.2-node3 node2 conf/zoo.cfg
clientPort=2182dataDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datadatailogDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datalogsyncLimit=5initLimit=10tickTime=2000server.1=localhost:2887:3887server.2=localhost:2888:3888server.3=localhost:2889:3889node3 conf/zoo.cfg
clientPort=2183dataDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datadatailogDir=/usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/datalogsyncLimit=5initLimit=10tickTime=2000server.1=localhost:2887:3887server.2=localhost:2888:3888server.3=localhost:2889:3889寫入 myid
#node1echo "1" > zookeeper-3.5.2-node1/data/myid#node2echo "2" > zookeeper-3.5.2-node2/data/myid#node3echo "3" > zookeeper-3.5.2-node3/data/myid在快照目錄下存放的標識本臺服務器的文件,他是整個zk集群用來發現彼此的一個重要標識。
文件是zookeeper配置文件 在conf目錄里。
文件是zk的日志輸出文件在conf目錄里用java寫的程序基本上有個共同點日志都用log4j,來進行管理。
zkServer.sh 主的管理程序文件 zkEnv.sh 是主要配置,zookeeper集群啟動時配置環境變量的文件
啟動
zookeeper-3.5.2-node1/bin/zkServer.sh startzookeeper-3.5.2-node2/bin/zkServer.sh startzookeeper-3.5.2-node3/bin/zkServer.sh sta測試
[root@paasagento zookeeper-cluster]# zookeeper-3.5.2-node1/bin/zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /usr/local/zookeeper-cluster/zookeeper-3.5.2-node1/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost.Mode: leader連接
zookeeper-3.5.2-node1/bin/zkCli.sh -server 127.0.0.1:2181ls /mkdir /usr/local/kafka cp kafka_2.11-0.10.1.1.tgz /usr/local/kafka/ tar zxvf kafka_2.11-0.10.1.1.tgz cd kafka_2.11-0.10.1.1 vim config/server.properties
broker.id=1 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣,每一個broker在集群中的唯一表示,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息情況port=9092 #當前kafka對外提供服務的端口默認是9092host.name=192.168.1.172 #broker的主機地址,若是設置了,那么會綁定到這個地址上,若是沒有,會綁定到所有的接口上,并將其中之一發送到ZK,一般不設置num.network.threads=3 #broker處理消息的最大線程數,一般情況下數量為cpu核數num.io.threads=8 #broker處理磁盤IO的線程數,數值為cpu核數2倍socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小log.dirs=/tmp/kafka-logs_1 #kafka數據的存放地址,多個地址的話用逗號分割,多個目錄分布在不同磁盤上可以提高讀寫性能 /data/kafka-logs-1,/data/kafka-logs-2num.partitions=3 #每個topic的分區個數,若是在topic創建時候沒有指定的話會被topic創建時的指定參數覆蓋num.recovery.threads.per.data.dir=1 #用于在啟動時,用于日志恢復的線程個數,默認是1.log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天log.segment.bytes=1073741824 #topic的分區是以一堆segment文件存儲的,這個控制每個segment的大小,會被topic創建時的指定參數覆蓋log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除zookeeper.connect=localhost:2181,localhost:2182,localhost:2183zookeeper.connection.timeout.ms=6000 #ZooKeeper的連接超時時間mv config/server.properties config/server1.properties cp -R config/server1.properties config/server2.properties cp -R config/server1.properties config/server3.properties kafka2
broker.id=2port=9093host.name=192.168.1.172num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logs_2num.partitions=3num.recovery.threads.per.data.dir=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181,localhost:2182,localhost:2183zookeeper.connection.timeout.ms=6000kafka3
broker.id=3port=9094host.name=192.168.1.172num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logs_2num.partitions=3num.recovery.threads.per.data.dir=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181,localhost:2182,localhost:2183zookeeper.connection.timeout.ms=6000bin/kafka-server-start.sh config/server1.properties & bin/kafka-server-start.sh config/server2.properties & bin/kafka-server-start.sh config/server3.properties &
[root@paasagento kafka_1]# jobs[1] 運行中 bin/kafka-server-start.sh config/server1.properties &[2]- 運行中 bin/kafka-server-start.sh config/server2.properties &[3]+ 運行中 bin/kafka-server-start.sh config/server3.properties &bin/zkCli.sh -server 192.168.1.172:2182[zk: 192.168.1.172:2182(CONNECTED) 8] ls /[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, zookeeper][zk: 192.168.1.172:2182(CONNECTED) 5] get /brokers/ids/1{"jmx_port":-1,"timestamp":"1484654956028","endpoints":["PLAINTEXT://192.168.1.172:9092"],"host":"192.168.1.172","version":3,"port":9092}[zk: 192.168.1.172:2182(CONNECTED) 6] get /brokers/ids/2{"jmx_port":-1,"timestamp":"1484655055260","endpoints":["PLAINTEXT://192.168.1.172:9093"],"host":"192.168.1.172","version":3,"port":9093}[zk: 192.168.1.172:2182(CONNECTED) 7] get /brokers/ids/3{"jmx_port":-1,"timestamp":"1484655071043","endpoints":["PLAINTEXT://192.168.1.172:9094"],"host":"192.168.1.172","version":3,"port":9094產生topic,3個分片,3個副本 bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 3 –topic test_topic
[root@paasagento kafka_1]# bin/kafka-topics.sh --list --zookeeper localhost:2181test_topic[root@paasagento kafka_1]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:3 Configs: Topic: test_topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2發布消息 bin/kafka-console-producer.sh –broker-list 192.168.1.172:9092,192.168.1.172:9093,192.168.172:9094 –topic test_topic 消費消息 bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test_topic zookeeper查看topic bin/zkCli.sh -server 192.168.1.172:2182
[zk: 192.168.1.172:2182(CONNECTED) 2] get /brokers/topics/test_topic{"version":1,"partitions":{"2":[3,1,2],"1":[2,3,1],"0":[1,2,3]}}server.log kafka的運行日志 state-change.log kafka他是用zookeeper來保存狀態,所以他可能會進行切換,切換的日志就保存在這里 controller.log kafka選擇一個節點作為“controller”,當發現有節點down掉的時候它負責在分區的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區節點的主從關系。如果controller down掉了,活著的節點中的一個會備切換為新的controller.
vim bin/zkE 在最上面增加環境路徑
JAVA_HOME=/usr/local/java/jdk1.8.0_20export JAVA_HOMEvim /etc/supervisor/zookeeper.conf
[program:zookeeper]command=/usr/local/zookeeper-3.5.2-alpha/bin/zkServer.sh start-foregroundautostart=trueautorestart=truestartsecs=10stdout_logfile=/var/log/zookeeper.logstdout_logfile_maxbytes=1MBstdout_logfile_backups=10stdout_capture_maxbytes=1MBstderr_logfile=/var/log/zookeeper.logstderr_logfile_maxbytes=1MBstderr_logfile_backups=10stderr_capture_maxbytes=1MBsupervisorctl reload bin/zkServer.sh status
vim bin/kafka-run-class.sh 在最上面增加環境路徑
JAVA_HOME=/usr/local/java/jdk1.8.0_20export JAVA_HOMEvim /etc/supervisor/kafka.conf
[program:kafka]command=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.propertiesuser=rootautostart=trueautorestart=truestartsecs=10stdout_logfile=/var/log/kafka.logstdout_logfile_maxbytes=1MBstdout_logfile_backups=10stdout_capture_maxbytes=1MBstderr_logfile=/var/log/kafka.logstderr_logfile_maxbytes=1MBstderr_logfile_backups=10stderr_capture_maxbytes=1MBsupervisorctl reload
靜待第二章的py開發和第三章的c#開發
新聞熱點
疑難解答