kafka集群管理指南(一)
本指南使用的工具为kafkabin目录下相关脚本。添加删除topics
可以使用如下命令进行新增topics:binkafkatopics。shbootstrapserverbrokerhost:portcreatetopicmytopicnamepartitions20replicationfactor3configxy
其中,topic表示主题名称,partitions表示分区数,replicationfactor表示副本数,config表示主题配置,会覆盖默认的配置项。
可以使用下述命令删除topic:binkafkatopics。shbootstrapserverbrokerhost:portdeletetopicmytopicname修改topics配置
可以使用kafkatopics。sh命令进行修改topics。新增partitions
比如你要新增partitions,那么你可以使用如下命令:binkafkatopics。shbootstrapserverbrokerhost:portaltertopicmytopicnamepartitions40新增configsbinkafkaconfigs。shbootstrapserverbrokerhost:portentitytypetopicsentitynamemytopicnamealteraddconfigxy删除configsbinkafkaconfigs。shbootstrapserverbrokerhost:portentitytypetopicsentitynamemytopicnamealterdeleteconfigx优雅关闭kafka服务器
Kafka集群将自动检测任何broker关闭或故障,并为该机器上的分区选举新的领导者。无论服务器发生故障还是为了维护或配置更改而有意关闭,都会发生这种情况。对于后一种情况,Kafka支持一种更优雅的停止服务器的机制,而不仅仅是杀死它。当服务器正常停止时,它将利用两个优化:它将所有日志同步到磁盘,以避免在重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。日志恢复需要时间,因此这会加快有意重新启动的速度。它将在关闭之前将服务器作为领导者的任何分区迁移到其他副本。这将使领导转移更快,并将每个分区不可用的时间最小化到几毫秒。
每当服务器停止而不是硬终止时,同步日志将自动发生,但受控领导迁移需要使用特殊设置:controlled。shutdown。enabletrue
请注意,只有在broker上托管的所有分区都具有副本(即复制因子大于1并且这些副本中至少有一个处于活动状态)时,受控关闭才会成功。这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。集群建数据复制数据跨区域复制
Kafka管理员可以定义跨越单个Kafka集群、数据中心或地理区域边界的数据流。有关详细信息,请参阅异地复制部分。平衡leadership
每当Broker停止或崩溃时,该Broker分区的领导权就会转移到其他副本。当broker重新启动时,它只会是其所有分区的跟随者,这意味着它不会用于客户端读取和写入。
为了避免这种不平衡,Kafka有一个首选副本的概念。如果分区的副本列表是1,5,9,则节点1优先于节点5或9作为领导者,因为它在副本列表中更早。默认情况下,Kafka集群将尝试将领导权恢复到首选副本。也就是要进行如下配置:auto。leader。rebalance。enabletrue
您也可以将其设置为false,但您需要通过运行以下命令手动将领导恢复到恢复的副本:binkafkapreferredreplicaelection。shbootstrapserverbrokerhost:port管理消费者组列出所有消费者组
使用ConsumerGroupCommand工具,我们可以列出、描述或删除消费者组。消费者组可以手动删除,也可以在该组的最后提交的偏移量到期时自动删除。手动删除仅在组没有任何活动成员时才有效。例如,要列出所有主题的所有消费者组:binkafkaconsumergroups。shbootstrapserverlocalhost:9092listtestconsumergroup列出消费者组消费位置binkafkaconsumergroups。shbootstrapserverlocalhost:9092describegroupmygroupTOPICPARTITIONCURRENTOFFSETLOGENDOFFSETLAGCONSUMERIDHOSTCLIENTIDtopic30241019395308154289consumer2e76ea8c35d304299900547eb41f3d3c4127。0。0。1consumer2topic21520678803288282610consumer2e76ea8c35d304299900547eb41f3d3c4127。0。0。1consumer2topic31241018398817157799consumer2e76ea8c35d304299900547eb41f3d3c4127。0。0。1consumer2topic108541448558091665consumer13fc8d6f1581a4472bdf33515b4aee8c1127。0。0。1consumer1topic20460537803290342753consumer13fc8d6f1581a4472bdf33515b4aee8c1127。0。0。1consumer1topic32243655398812155157consumer4117fe4d3c6c141788ee9eb4a3954bee0127。0。0。1consumer4
有许多额外的describe选项可用于提供有关消费者组的更多详细信息:
members:此选项提供消费者组中所有活动成员的列表。binkafkaconsumergroups。shbootstrapserverlocalhost:9092describegroupmygroupmembersCONSUMERIDHOSTCLIENTIDPARTITIONSconsumer13fc8d6f1581a4472bdf33515b4aee8c1127。0。0。1consumer12consumer4117fe4d3c6c141788ee9eb4a3954bee0127。0。0。1consumer41consumer2e76ea8c35d304299900547eb41f3d3c4127。0。0。1consumer23consumer3ecea43e41f01479f8349f9130b75d8ee127。0。0。1consumer30
membersvrbose:除了上面members选项报告的信息之外,此选项还提供分配给每个成员的分区。binkafkaconsumergroups。shbootstrapserverlocalhost:9092describegroupmygroupmembersverboseCONSUMERIDHOSTCLIENTIDPARTITIONSASSIGNMENTconsumer13fc8d6f1581a4472bdf33515b4aee8c1127。0。0。1consumer12topic1(0),topic2(0)consumer4117fe4d3c6c141788ee9eb4a3954bee0127。0。0。1consumer41topic3(2)consumer2e76ea8c35d304299900547eb41f3d3c4127。0。0。1consumer23topic2(1),topic3(0,1)consumer3ecea43e41f01479f8349f9130b75d8ee127。0。0。1consumer30
offsets:这是默认的描述选项,提供与describe选项相同的输出。
state:此选项提供有用的组级信息。binkafkaconsumergroups。shbootstrapserverlocalhost:9092describegroupmygroupstateCOORDINATOR(ID)ASSIGNMENTSTRATEGYSTATEMEMBERSlocalhost:9092(0)rangeStable4
要手动删除一个或多个消费者组,可以使用delete选项:binkafkaconsumergroups。shbootstrapserverlocalhost:9092deletegroupmygroupgroupmyothergroupDeletionofrequestedconsumergroups(mygroup,myothergroup)wassuccessful。重置消费者组的偏移量
要重置消费者组的偏移量,可以使用resetoffsets选项。此选项一次支持一个消费者组。它需要定义以下范围:alltopics或topic。必须选择一个范围,除非您使用fromfile方案。此外,首先确保消费者实例处于非活动状态。有关更多详细信息,请参阅KIP122。
它有3个执行选项:(默认)显示要重置的偏移量。execute:执行resetoffsets进程。export:将结果导出为CSV格式。
resetoffsets还有以下场景可供选择(必须至少选择一个场景):todatetime:将偏移量重置为日期时间的偏移量。格式:YYYYMMDDTHH:mm:SS。ssstoearliest:将偏移量重置为最早的偏移量。tolatest:将偏移量重置为最新偏移量。shiftby:重置偏移量将当前偏移量移动n,其中n可以是正数或负数。fromfile:将偏移量重置为CSV文件中定义的值。tocurrent:将偏移重置为当前偏移。byduration:将偏移量重置为从当前时间戳开始的持续时间偏移量。格式:PnDTnHnMnStooffset:将偏移量重置为特定偏移量。
请注意,超出范围的偏移将被调整到可用的偏移结束。例如,如果offsetend为10,offsetshiftrequest为15,那么实际上会选择offsetat10。
例如,要将消费者组的偏移量重置为最新的偏移量:binkafkaconsumergroups。shbootstrapserverlocalhost:9092resetoffsetsgroupconsumergroup1topictopic1tolatestTOPICPARTITIONNEWOFFSETtopic100
如果您使用旧的highlevel消费者并将组元数据存储在ZooKeeper中(即offsets。storagezookeeper),请传递zookeeper而不是bootstrapserver:binkafkaconsumergroups。shzookeeperlocalhost:2181list扩充kafka集群
将服务器添加到Kafka集群很容易,只需为它们分配一个唯一的brokerID并在新服务器上启动Kafka。然而,这些新服务器不会自动分配任何数据分区,因此除非将分区移动到它们,否则在创建新主题之前它们不会做任何工作。因此,通常当您将机器添加到集群时,您会希望将一些现有数据迁移到这些机器上。
迁移数据的过程是手动启动的,但完全自动化。在幕后,Kafka将添加新服务器作为它正在迁移的分区的跟随者,并允许它完全复制该分区中的现有数据。当新服务器完全复制此分区的内容并加入同步副本时,现有副本之一将删除其分区的数据。
分区重新分配工具可用于在broker之间移动分区。理想的分区分布将确保所有broker的数据负载和分区大小均匀。分区重新分配工具无法自动研究Kafka集群中的数据分布并移动分区以获得均匀的负载分布。因此,管理员必须弄清楚应该移动哪些主题或分区。
分区重新分配工具可以在3种互斥模式下运行:generate:在这种模式下,给定一个主题列表和一个broker列表,该工具生成一个候选重新分配,以将指定主题的所有分区移动到新的broker。此选项仅提供一种方便的方法来生成给定主题和目标代理broker的分区重新分配计划。execute:在这种模式下,该工具根据用户提供的重新分配计划启动分区的重新分配。(使用reassignmentjsonfile选项)。这可以是由管理员手工制作的自定义重新分配计划,也可以使用generate选项提供verify:在此模式下,该工具会验证上次execute期间列出的所有分区的重新分配状态。状态可以是成功完成、失败或进行中自动将数据迁移到新机器
分区重新分配工具可用于将某些主题从当前brokers移至新添加的broker。这在扩展现有集群时通常很有用,因为将整个主题移动到新的一组broker比一次移动一个分区更容易。当用于执行此操作时,用户应提供待移动的brokers的主题列表和新brokers的目标主题列表。然后,该工具将给定主题列表的所有分区均匀分布在新的brokers上。在此过程中,主题的复制因子保持不变。实际上,输入主题列表的所有分区的副本都从旧brokers移动到新添加的brokers。
例如,以下示例将主题foo1,foo2的所有分区移动到新的一组broker5,6。在此移动结束时,主题foo1和foo2的所有分区将仅存在于broker5,6上。
由于该工具接受输入格式为json文件的主题列表,因此您首先需要确定要移动的主题并创建json文件,如下所示:cattopicstomove。json{topics:〔{topic:foo1},{topic:foo2}〕,version:1}
json文件准备好后,使用分区重新分配工具生成候选分配:binkafkareassignpartitions。shbootstrapserverlocalhost:9092topicstomovejsonfiletopicstomove。jsonbrokerlist5,6generateCurrentpartitionreplicaassignment{version:1,partitions:〔{topic:foo1,partition:2,replicas:〔1,2〕},{topic:foo1,partition:0,replicas:〔3,4〕},{topic:foo2,partition:2,replicas:〔1,2〕},{topic:foo2,partition:0,replicas:〔3,4〕},{topic:foo1,partition:1,replicas:〔2,3〕},{topic:foo2,partition:1,replicas:〔2,3〕}〕}Proposedpartitionreassignmentconfiguration{version:1,partitions:〔{topic:foo1,partition:2,replicas:〔5,6〕},{topic:foo1,partition:0,replicas:〔5,6〕},{topic:foo2,partition:2,replicas:〔5,6〕},{topic:foo2,partition:0,replicas:〔5,6〕},{topic:foo1,partition:1,replicas:〔5,6〕},{topic:foo2,partition:1,replicas:〔5,6〕}〕}
该工具生成一个候选分配,它将所有分区从主题foo1,foo2移动到broker5,6。但是请注意,此时分区移动还没有开始,它只是告诉您当前的分配和建议的新分配。如果您想回滚到当前分配,应保存当前分配。新分配应保存在json文件(例如expandclusterreassignment。json)中,以使用execute选项输入到工具中,如下所示:binkafkareassignpartitions。shbootstrapserverlocalhost:9092reassignmentjsonfileexpandclusterreassignment。jsonexecuteCurrentpartitionreplicaassignment{version:1,partitions:〔{topic:foo1,partition:2,replicas:〔1,2〕},{topic:foo1,partition:0,replicas:〔3,4〕},{topic:foo2,partition:2,replicas:〔1,2〕},{topic:foo2,partition:0,replicas:〔3,4〕},{topic:foo1,partition:1,replicas:〔2,3〕},{topic:foo2,partition:1,replicas:〔2,3〕}〕}SavethistouseasthereassignmentjsonfileoptionduringrollbackSuccessfullystartedreassignmentofpartitions{version:1,partitions:〔{topic:foo1,partition:2,replicas:〔5,6〕},{topic:foo1,partition:0,replicas:〔5,6〕},{topic:foo2,partition:2,replicas:〔5,6〕},{topic:foo2,partition:0,replicas:〔5,6〕},{topic:foo1,partition:1,replicas:〔5,6〕},{topic:foo2,partition:1,replicas:〔5,6〕}〕}
最后,该工具可以使用verify选项来检查分区重新分配的状态。请注意,相同的expandclusterreassignment。json(与execute选项一起使用)应该与verify选项一起使用:binkafkareassignpartitions。shbootstrapserverlocalhost:9092reassignmentjsonfileexpandclusterreassignment。jsonverifyStatusofpartitionreassignment:Reassignmentofpartition〔foo1,0〕completedsuccessfullyReassignmentofpartition〔foo1,1〕isinprogressReassignmentofpartition〔foo1,2〕isinprogressReassignmentofpartition〔foo2,0〕completedsuccessfullyReassignmentofpartition〔foo2,1〕completedsuccessfullyReassignmentofpartition〔foo2,2〕completedsuccessfully自定义分区分配和迁移
分区重新分配工具还可用于有选择地将分区的副本移动到一组特定的broker。当以这种方式使用时,假设用户知道重新分配计划并且不需要工具来生成候选重新分配,有效地跳过generate步骤并直接移动到execute步骤
例如,以下示例将主题foo1的分区0移动到代理5,6,将主题foo2的分区1移动到代理2,3:
第一步是在json文件中手工制作自定义重新分配计划:catcustomreassignment。json{version:1,partitions:〔{topic:foo1,partition:0,replicas:〔5,6〕},{topic:foo2,partition:1,replicas:〔2,3〕}〕}
然后,使用带有execute选项的json文件开始重新分配过程:binkafkareassignpartitions。shbootstrapserverlocalhost:9092reassignmentjsonfilecustomreassignment。jsonexecuteCurrentpartitionreplicaassignment{version:1,partitions:〔{topic:foo1,partition:0,replicas:〔1,2〕},{topic:foo2,partition:1,replicas:〔3,4〕}〕}SavethistouseasthereassignmentjsonfileoptionduringrollbackSuccessfullystartedreassignmentofpartitions{version:1,partitions:〔{topic:foo1,partition:0,replicas:〔5,6〕},{topic:foo2,partition:1,replicas:〔2,3〕}〕}
verify选项可与该工具一起使用以检查分区重新分配的状态。请注意,应将相同的customreassignment。json(与execute选项一起使用)与verify选项一起使用:binkafkareassignpartitions。shbootstrapserverlocalhost:9092reassignmentjsonfilecustomreassignment。jsonverifyStatusofpartitionreassignment:Reassignmentofpartition〔foo1,0〕completedsuccessfullyReassignmentofpartition〔foo2,1〕completedsuccessfully