亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

Kafka源碼系列教程之刪除topic

2024-07-14 08:42:04
字體:
來源:轉載
供稿:網友

前言

Apache Kafka發源于LinkedIn,于2011年成為Apache的孵化項目,隨后于2012年成為Apache的主要項目之一。Kafka使用Scala和Java進行編寫。Apache Kafka是一個快速、可擴展的、高吞吐、可容錯的分布式發布訂閱消息系統。Kafka具有高吞吐量、內置分區、支持數據副本和容錯的特性,適合在大規模消息處理場景中使用。

本文依然是以kafka0.8.2.2為例講解

一,如何刪除一個topic

刪除一個topic有兩個關鍵點:

1,配置刪除參數

delete.topic.enable這個Broker參數配置為True。

2,執行

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

假如不配置刪除參數為true的話,topic其實并沒有被清除,只是被標記為刪除。此時,估計一般人的做法是刪除topic在Zookeeper的信息和日志,其實這個操作并不會清除kafkaBroker內存的topic數據。所以,此時最佳的策略是配置刪除參數為true然后,重啟kafka。

二,重要的類介紹

1,PartitionStateMachine

該類代表分區的狀態機。決定者分區的當前狀態,和狀態轉移。四種狀態

  • NonExistentPartition
  • NewPartition
  • OnlinePartition
  • OfflinePartition

2,ReplicaManager

負責管理當前機器的所有副本,處理讀寫、刪除等具體動作。

讀寫:寫獲取partition對象,再獲取Replica對象,再獲取Log對象,采用其管理的Segment對象將數據寫入、讀出。

3,ReplicaStateMachine

副本的狀態機。決定者副本的當前狀態和狀態之間的轉移。一個副本總共可以處于一下幾種狀態的一種
NewReplica:Crontroller在分區重分配的時候可以創建一個新的副本。只能接受變為follower的請求。前狀態可以是NonExistentReplica

OnlineReplica:新啟動的分區,能接受變為leader或者follower請求。前狀態可以是NewReplica, OnlineReplica or OfflineReplica

OfflineReplica:死亡的副本處于這種狀態。前狀態可以是NewReplica, OnlineReplica

ReplicaDeletionStarted:分本刪除開始的時候處于這種狀態,前狀態是OfflineReplica

ReplicaDeletionSuccessful:副本刪除成功。前狀態是ReplicaDeletionStarted

ReplicaDeletionIneligible:刪除失敗的時候處于這種狀態。前狀態是ReplicaDeletionStarted

NonExistentReplica:副本成功刪除之后處于這種狀態,前狀態是ReplicaDeletionSuccessful

4,TopicDeletionManager

該類管理著topic刪除的狀態機

1),TopicCommand通過創建/admin/delete_topics/<topic>,來發布topic刪除命令。

2),Controller監聽/admin/delete_topic子節點變動,開始分別刪除topic

3),Controller有個后臺線程負責刪除Topic

三,源碼徹底解析topic的刪除過程

此處會分四個部分:

A),客戶端執行刪除命令作用

B),不配置delete.topic.enable整個流水的源碼

C),配置了delete.topic.enable整個流水的源碼

D),手動刪除zk上topic信息和磁盤數據

1,客戶端執行刪除命令

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

進入kafka-topics.sh我們會看到

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@

進入TopicCommand里面,main方法里面

else if(opts.options.has(opts.deleteOpt)) deleteTopic(zkClient, opts)

實際內容是

val topics = getTopics(zkClient, opts)if (topics.length == 0) { println("Topic %s does not exist".format(opts.options.valueOf(opts.topicOpt)))}topics.foreach { topic => try { ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))

在"/admin/delete_topics"目錄下創建了一個topicName的節點。

2,假如不配置delete.topic.enable整個流水是

總共有兩處listener會響應:

A),TopicChangeListener

B),DeleteTopicsListener

使用topic的刪除命令刪除一個topic的話,指揮觸發DeleteTopicListener。

var topicsToBeDeleted = { import JavaConversions._ (children: Buffer[String]).toSet}val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))topicsToBeDeleted --= nonExistentTopicsif(topicsToBeDeleted.size > 0) { info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(",")) // mark topic ineligible for deletion if other state changes are in progress topicsToBeDeleted.foreach { topic => val preferredReplicaElectionInProgress =  controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic) val partitionReassignmentInProgress =  controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic) if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)  controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) } // add topic to deletion list  controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)}

由于都會判斷delete.topic.enable是否為true,假如不為true就不會執行,為true就進入執行

controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)

3,delete.topic.enable配置為true

此處與步驟2的區別,就是那兩個處理函數。

controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)

markTopicIneligibleForDeletion函數的處理為

if(isDeleteTopicEnabled) { val newTopicsToHaltDeletion = topicsToBeDeleted & topics topicsIneligibleForDeletion ++= newTopicsToHaltDeletion if(newTopicsToHaltDeletion.size > 0) info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))}

主要是停止刪除topic,假如存儲以下三種情況

* Halt delete topic if -
* 1. replicas being down
* 2. partition reassignment in progress for some partitions of the topic
* 3. preferred replica election in progress for some partitions of the topic

enqueueTopicsForDeletion主要作用是更新刪除topic的集合,并激活TopicDeleteThread

def enqueueTopicsForDeletion(topics: Set[String]) { if(isDeleteTopicEnabled) { topicsToBeDeleted ++= topics partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic) resumeTopicDeletionThread() }}

在刪除線程DeleteTopicsThread的doWork方法中

topicsQueuedForDeletion.foreach { topic =>// if all replicas are marked as deleted successfully, then topic deletion is done if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) { // clear up all state for this topic from controller cache and zookeeper completeDeleteTopic(topic) info("Deletion of topic %s successfully completed".format(topic)) }

進入completeDeleteTopic方法中

// deregister partition change listener on the deleted topic. This is to prevent the partition change listener// firing before the new topic listener when a deleted topic gets auto createdpartitionStateMachine.deregisterPartitionChangeListener(topic)val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)// controller will remove this replica from the state machine as well as its partition assignment cachereplicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)// move respective partition to OfflinePartition and NonExistentPartition statepartitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)topicsToBeDeleted -= topicpartitionsToBeDeleted.retain(_.topic != topic)controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))controllerContext.removeTopic(topic)

主要作用是解除掉監控分區變動的listener,刪除Zookeeper具體節點信息,刪除磁盤數據,更新內存數據結構,比如從副本狀態機里面移除分區的具體信息。

其實,最終要的是我們的副本磁盤數據是如何刪除的。我們重點介紹這個部分。

首次清除的話,在刪除線程DeleteTopicsThread的doWork方法中

{ // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion // or there is at least one failed replica (which means topic deletion should be retried). if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) { // mark topic for deletion retry markTopicForDeletionRetry(topic) }

進入markTopicForDeletionRetry

val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)info("Retrying delete topic for topic %s since replicas %s were not successfully deleted" .format(topic, failedReplicas.mkString(",")))controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)

在ReplicaStateMachine的handleStateChanges方法中,調用了handleStateChange,處理OfflineReplica

// send stop replica command to the replica so that it stops fetching from the leaderbrokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)

接著在handleStateChanges中

brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)

給副本數據存儲節點發送StopReplicaKey副本指令,并開始刪除數據

stopReplicaRequestMap foreach { case(broker, replicaInfoList) => val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet debug("The stop replica request (delete = true) sent to broker %d is %s" .format(broker, stopReplicaWithDelete.mkString(","))) debug("The stop replica request (delete = false) sent to broker %d is %s" .format(broker, stopReplicaWithoutDelete.mkString(","))) replicaInfoList.foreach { r => val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,  Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) controller.sendRequest(broker, stopReplicaRequest, r.callback) }}stopReplicaRequestMap.clear()

Broker的KafkaApis的Handle方法在接受到指令后

case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)

接著是在stopReplicas方法中

{ controllerEpoch = stopReplicaRequest.controllerEpoch // First stop fetchers for all partitions, then stop the corresponding replicas replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition))) for(topicAndPartition <- stopReplicaRequest.partitions){ val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions) responseMap.put(topicAndPartition, errorCode) } (responseMap, ErrorMapping.NoError)}

進一步進入stopReplica方法,正式進入日志刪除

getPartition(topic, partitionId) match { case Some(partition) => if(deletePartition) {  val removedPartition = allPartitions.remove((topic, partitionId))  if (removedPartition != null)  removedPartition.delete() // this will delete the local log }

以上就是kafka的整個日志刪除流水。

4,手動刪除zk上topic信息和磁盤數據

TopicChangeListener會監聽處理,但是處理很簡單,只是更新了

val deletedTopics = controllerContext.allTopics -- currentChildrencontrollerContext.allTopics = currentChildrenval addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>

四,總結

Kafka的topic的刪除過程,實際上就是基于Zookeeper做了一個訂閱發布系統。Zookeeper的客戶端創建一個節點/admin/delete_topics/<topic>,由kafka Controller監聽到事件之后正式觸發topic的刪除:解除Partition變更監聽的listener,清除內存數據結構,刪除副本數據,刪除topic的相關Zookeeper節點。

delete.topic.enable配置該參數為false的情況下執行了topic的刪除命令,實際上未做任何動作。我們此時要徹底刪除topic建議修改該參數為true,重啟kafka,這樣topic信息會被徹底刪除,已經測試。

一般流行的做法是手動刪除Zookeeper的topic相關信息及磁盤數據但是這樣的話會造成部分內存數據未清除。至于是否會有隱患,未測試。

好了,以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對VeVb武林網的支持。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
性欧美暴力猛交69hd| 97在线精品视频| 欧美另类69精品久久久久9999| 爱福利视频一区| 夜夜躁日日躁狠狠久久88av| 亚洲精品久久久久中文字幕二区| 国产精品丝袜久久久久久高清| 亚洲aaa激情| 亚洲免费影视第一页| 国产精品一区二区av影院萌芽| 97久久久久久| 北条麻妃一区二区三区中文字幕| 国产精品小说在线| 亚洲www视频| 精品成人在线视频| 美女999久久久精品视频| 欧美丝袜第一区| 7m第一福利500精品视频| 国产精品视频免费在线观看| 久久精品国产一区二区电影| 精品成人久久av| 国产一区欧美二区三区| 精品欧美国产一区二区三区| 国产精品久久久久影院日本| 国产97在线|日韩| 亚洲无av在线中文字幕| 91久久久久久久| 蜜臀久久99精品久久久久久宅男| 国产日韩欧美自拍| 日本不卡高字幕在线2019| 中文字幕日韩高清| 91精品国产高清久久久久久| 91精品一区二区| 久久久久国产精品免费| 国产成人啪精品视频免费网| 久久久久久亚洲精品不卡| 日本国产高清不卡| 欧美一级在线播放| 久久精品欧美视频| 亚洲欧美中文字幕| 国产精品视频yy9099| 日韩在线中文字| 一本一道久久a久久精品逆3p| 成人国产精品一区二区| 欧美激情在线一区| 国产日韩视频在线观看| 亚洲欧美日韩天堂一区二区| 成人午夜在线观看| 久久影视电视剧凤归四时歌| 国产美女久久久| 中文字幕日韩专区| 2020久久国产精品| 在线亚洲国产精品网| 亚洲人成毛片在线播放| 国产日韩在线观看av| 欧美一级在线播放| 欧美成人精品影院| 亚洲国产精品嫩草影院久久| 欧美国产日本高清在线| 欧美精品生活片| 日韩中文字幕免费| 97国产suv精品一区二区62| 久久99国产精品自在自在app| 欧美精品日韩www.p站| 国产精品免费久久久久久| 日韩欧美成人精品| 91久久国产精品91久久性色| 日韩美女在线播放| 国产精品福利在线观看| 欧美电影在线播放| 亚洲精品美女在线| 亚洲国产天堂久久综合| 国产精品小说在线| 欧美日韩性视频在线| 成人免费网站在线观看| 久久久99免费视频| 日韩大片免费观看视频播放| 日韩一区二区久久久| 尤物精品国产第一福利三区| 在线成人激情视频| 日韩视频中文字幕| 国产成人综合一区二区三区| 国产精品久久久久久久一区探花| 国产精自产拍久久久久久| 日本精品免费观看| 久久久久久久香蕉网| 中文字幕免费精品一区高清| 97在线视频免费看| 伊人久久大香线蕉av一区二区| 国产69久久精品成人| www国产亚洲精品久久网站| 日韩av片永久免费网站| 亚洲成av人影院在线观看| 91高清视频免费| 欧美日韩国产123| 亚洲成人国产精品| 午夜精品三级视频福利| 国产精品久久久91| 亚洲国产天堂久久国产91| 亚洲在线视频观看| 亚洲国产日韩精品在线| 最近2019年手机中文字幕| 日本中文字幕久久看| 久久福利视频导航| 亚洲成人黄色在线观看| 久久久亚洲精选| 欧美性猛交xxxxx水多| 久久视频在线观看免费| 中文字幕久久久| 亚洲成av人影院在线观看| 亚洲国产精品字幕| 久久99久久99精品免观看粉嫩| 亚洲美女精品久久| 黄网站色欧美视频| 成人激情黄色网| 久久久精品免费视频| 国模极品一区二区三区| 亚洲成人久久久久| 日韩中文字在线| 97在线观看免费高清| 国产精品极品在线| 亚洲国产精品国自产拍av秋霞| 亚洲高清久久网| 欧美xxxx18国产| 国产日韩欧美在线视频观看| 亚洲国产精品悠悠久久琪琪| 日韩亚洲国产中文字幕| 亚洲大胆人体在线| 欧美激情视频一区二区| 青青久久av北条麻妃海外网| 日本欧美爱爱爱| 亚洲色图综合久久| 国产深夜精品福利| 亚洲综合大片69999| 一区二区三区天堂av| 超在线视频97| 久久精品视频在线| 久久免费福利视频| 亚洲欧美激情另类校园| 成人亚洲激情网| 日本高清不卡在线| 青青草精品毛片| 一区二区三区四区精品| 日韩激情av在线免费观看| 成人深夜直播免费观看| 欧美裸体xxxx| 久久久久久中文| 中文字幕久久久av一区| 2019中文字幕免费视频| 国产精品视频不卡| 91探花福利精品国产自产在线| 亚洲第一福利在线观看| 亚洲在线一区二区| 国产视频福利一区| 成人av在线亚洲| 68精品国产免费久久久久久婷婷| 2019亚洲男人天堂| 欧美小视频在线| 久久国产精品电影| 亚洲色无码播放| 欧美国产视频日韩| 成人午夜在线视频一区| 韩国一区二区电影| 一区二区三区回区在观看免费视频|