Kafka入门系列(八) Kafka常用操作

2017-10-19 19:56 阅读 198 views 次 评论 0 条

Kafka入门系列(八) Kafka常用操作

    之前的入门系列文章分别介绍了Kafka的各个方面,包括安装、producer、consumer以及监控等。相信各位看官已为Kafka的使用打下了非常好的基础。

    本文打算称热打铁,继续给大家推荐一些常用的Kafka工具。灵活掌握这些工具的使用无疑将为您的工具箱中添加新的强力武器,助您早日成为Kafka的“大师”:)

1. 到底该怎么删除topic

我见过太多的人给出这样的方法:直接删除底层文件系统中的日志目录以及Zookeeper中的相关节点。我必须实事求是地跟大家说,这种方法是极其不可取的。除了上述这两个地方,Kafka controller中还保存了大量的关于topic的信息——特别是副本状态机和分区状态机,而这还没有考虑元数据缓存的影响。所以笔者在这里给出正宗的topic删除方法:

  1. broker启动前先配置delete.topic.enable=true

  2. 运行:bin/kafka-topics.sh --zookeeper zkHost:2181 --topic topic_name --delete

    这个方法是个异步命令,所以你看到该命令返回了并不代表topic被真正地删除了,其后台的逻辑相当复杂,但这的确是最正确的删除方法。

2. 增加topic分区数

    很多时候我们需要增加topic的分区数以扩展消息的处理能力,提升系统吞吐量。假设我要为名为test的topic设置10个分区,命令如下:

bin/kafka-topics.sh --zookeeper zkHost:2181 --topic test --alter --partitions 10

注意红色的关键字,我们要使用--alter来变更topic的分区数。注意:Kafka目前不支持减少分区数,只能增加分区。

3. 增加/查看/删除topic级参数配置

    Kafka默认提供了超多的参数可供配置。大部分情况下我们为所有topic都设置相同的参数值,但有些情况下,我们需要为不同的topic设置不同的参数。

    假设我要为名为test的topic单独设置最大消息大小,则可以执行下列命令:

bin/kafka-configs.sh --zookeeper zkHost:2181 --alter --entity-name test --entity-type topics --add-config max.message.bytes=200000

    一旦设置成功,我可以使用下列命令查看该topic下的所有自定义配置:

bin/kafka-configs.sh --zookeeper zkHost:2181 --describe --entity-name test --entity-type topics

    最后,执行下列命令删除之前的配置:

bin/kafka-configs.sh --zookeeper zkHost:2181  --alter --entity-name test --entity-type topics --delete-config max.message.bytes

4. preferred leader选举

    随着Kafka集群的不断演进,势必会出现broker崩溃的情况。一旦发生,崩溃的broker上所有leader分区都会被自动地转移到其他broker上,但纵使该broker重启后恢复,Kafka也只当它是个follower重新将其加入集群,因此,该broker无法为客户端服务。

    基于这种情况,Kafka推出了preferred leader选举的机制,既有手动模式也有自动模式。我们这里介绍手动开启preferred leader选举的命令。首先先简要介绍一下什么是preferred leader?假设topic有个分区有3个副本,分别是:2,4,6。那么副本2就是这个分区的preferred leader(最开头的副本)。如果在集群运行的某个时间点发现该分区的leader是4或6,那么通过开启下面的preferred leader选举就可以重新将该分区的leader转移到副本2上。

    okay,介绍完基本概念, 我们来看具体命令:

bin/kafka-preferred-replica-election.sh --zookeeper zkHost:2181

上面的命令会检查Kafka集群中所有topic的所有分区。如果你只想指定特定的分区进行preferred leader选举,则需要先把待选举的分区写入到一个JSON文件中。比如我要指定test的分区0:

{"partitions":   [{"topic": "test", "partition": 0}] }

然后保存为preferred-election.json,之后执行:

bin/kafka-preferred-replica-election.sh --zookeeper zkHost:2181 --path-to-json-file <path>/preferred-election.json

5. 监控consumer group

    Kafka提供了kafka-consumer-groups.sh脚本用于管理consumer group,特别是可以监控consumer group的消费进度,这点对于我们实时了解consumer的工作情况至关重要。

    由于目前存在新旧两个版本的consumer,所以本套命令自然也有两个版本。

5.1 列出当前集群中所有消费组

新版本consumer:bin/kafka-consumer-groups.sh --bootstrap-server broker01:9092 --list 

旧版本consumer:bin/kafka-consumer-groups.sh --zookeeper zkHost:2181 --list

5.2 查看组名为test的消费进度

新版本consumer:bin/kafka-consumer-groups.sh --bootstrap-server broker01:9092 --describe --group test

旧版本consumer:bin/kafka-consumer-groups.sh --zookeeper zkHost:2181 --describe test

6. 数据迁移到指定分区

    随着Kafka集群规模的不断扩大,新的broker会被持续地加入到集群中,但Kafka默认不会自动地把现有topic的分区搬移到新broker上。用户需要自己手动完成整个操作才能充分利用新增的broker。那么,具体应该怎么做呢?

    假定我们有一个topic:test,它有两个分区0,1,replication factor是2,即每个分区总2个副本。现副本分布情况如下:

Topic:test      PartitionCount:2        ReplicationFactor:2     Configs:

        Topic: test     Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1

        Topic: test     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2

    倘若我们打算把该topic完全搬移到broker 0以外的所有其他broker上,那么应当如何操作呢?

    首先,我们先要构建一个JSON文件表明我们要搬移的topic集合。本例中就只有一个topic:test

{"topics": [{"topic": "test"}], "version": 1}

然后将其保存成一个JSON文件:topics-to-move.json,之后运行:

bin/kafka-reassign-partitions.sh --zookeeper zkHost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

上述命令表明我们希望把topics-to-move.json中的所有topic都搬移到broker 1和broker 2上。该命令输出结果为:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"test","partition":1,"replicas":[1,2]},{"topic":"test","partition":0,"replicas":[0,1]}]}

Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"test","partition":1,"replicas":[1,2]},{"topic":"test","partition":0,"replicas":[2,1]}]}

    由此可见,该工具已经为我们生成了新的迁移计划,就在Proposed partition reassignment configuration部分。现在我们将生成的计划保存进plan-json-file文件中。

    有了执行计划,我们开始执行真正的迁移操作:

bin/kafka-reassign-partitions.sh --zookeeper zkHost:2181 --reassignment-json-file plan-json-file --execute

命令执行成功后,再次查看该topic,结果如下:

Topic:test      PartitionCount:2        ReplicationFactor:2     Configs:

        Topic: test     Partition: 0    Leader: 2       Replicas: 2,1   Isr: 1,2

        Topic: test     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2

    很显然,所有的分区都已经被迁移到broker 1和broker 2了,即实现了数据的完全迁移。特别说明一下,这个脚本在broker维护时特别有用——比如你若想停掉broker 0一段时间便可以先将其上的所有分区都迁移出去,待维护完毕再迁移回来。

7. 增加备份因子replication factor

    很多时候,我们需要调整新的备份因子以提供更大的可靠性和高可用性。因此你可以使用下面的步骤来增加replication factor。

    还是以上面的例子来说明,假设我想将test的分区0的replication factor从2增加到3,那么方法如下:

  • 创建一个JSON文件,内容是{"version":1, "partitions":[{"topic":"test", "partition":0, "replicas":[0,1,2]}]}。假设文件名是incease-rf.json

  • 运行命令: bin/kafka-reassign-partitions.sh --zookeeper zkHost:2181 --reassignment-json-file increase-rf.json --execute

    运行只有查看topic会发现分区0的replication factor已经变成了3,而分区1依然还只是2个副本:

Topic:test      PartitionCount:2        ReplicationFactor:2     Configs:

        Topic: test     Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 1,2,0

        Topic: test     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2

okay,写到这里基本上就差不多了。这7个日常命令基本上涵盖了Kafka最常见的一些运维操作,希望对广大读者有所助益。


版权声明:本文版权由木秀林网所有,转载请保留链接:Kafka入门系列(八) Kafka常用操作
分类:Kafka解析 标签:

发表评论


表情