加入收藏 | 设为首页 | 会员中心 | 我要投稿 辽源站长网 (https://www.0437zz.com/)- 云专线、云连接、智能数据、边缘计算、数据安全!
当前位置: 首页 > 服务器 > 搭建环境 > Windows > 正文

Kafka源码分析及图解原理之Broker端

发布时间:2019-09-20 13:23:16 所属栏目:Windows 来源:IT技术分享
导读:首先从kafka如何创建一个topic来开始: kafka-topics--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topictest 其中有这么几个参数: --zookeeper:zookeeper的地址 --replication-factor:副本因子 --partitions:分区个数(默认
副标题[/!--empirenews.page--]

 Kafka源码分析及图解原理之Broker端

首先从kafka如何创建一个topic来开始:

  1. kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 

其中有这么几个参数:

  • --zookeeper:zookeeper的地址
  • --replication-factor:副本因子
  • --partitions:分区个数(默认是1)
  • --topic:topic名称

二.什么是分区

一个topic可以有多个分区,每个分区的消息都是不同的。 虽然分区可以提供更高的吞吐量,但是分区不是越多越好。一般分区数不要超过kafka集群的机器数量。分区越多占用的内存和文件句柄。 一般分区设置为3-10个。比如现在集群有3个机器,要创建一个名为test的topic,分区数为2,那么如图:

Kafka源码分析及图解原理之Broker端

partiton都是有序切顺序不可变的记录集,并且不断追加到log文件,partition中的每一个消息都回分配一个id,也就是offset(偏移量),offset用来标记分区的一条记录 ,这里就用官网的图了,我画的不好:

Kafka源码分析及图解原理之Broker端

2.1 producer端和分区关系

就图上的情况,producer端会把mq给哪个分区呢?这也是上一节我们提到的一个参数partitioner.class。 默认分区器的处理是:有key则用murmur2算法计算key的哈希值,对总分区取模算出分区号,无key则轮询。(org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition)。当然了我们也可以自定义分区策略,只要实现org.apache.kafka.clients.producer.Partitioner接口即可:

  1. /** 
  2.  * Compute the partition for the given record. 
  3.  * 
  4.  * @param topic The topic name 
  5.  * @param key The key to partition on (or null if no key) 
  6.  * @param keyBytes serialized key to partition on (or null if no key) 
  7.  * @param value The value to partition on or null 
  8.  * @param valueBytes serialized value to partition on or null 
  9.  * @param cluster The current cluster metadata 
  10.  */ 
  11.  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 
  12.  List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 
  13.  int numPartitions = partitions.size(); 
  14.  if (keyBytes == null) { 
  15.  int nextValue = nextValue(topic); 
  16.  List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); 
  17.  if (availablePartitions.size() > 0) { 
  18.  int part = Utils.toPositive(nextValue) % availablePartitions.size(); 
  19.  return availablePartitions.get(part).partition(); 
  20.  } else { 
  21.  // no partitions are available, give a non-available partition 
  22.  return Utils.toPositive(nextValue) % numPartitions; 
  23.  } 
  24.  } else { 
  25.  // hash the keyBytes to choose a partition 
  26.  return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; 
  27.  } 
  28.  } 

2.2 consumer端和分区关系

先来看下官网对于消费组的定义:Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group.

翻译:消费者使用一个消费者组名来标记自己,一个topic的消息会被发送到订阅它的消费者组的 一个 消费者实例上。

consumer group是用于实现高伸缩性,高容错性的consumer机制。如果有consumer挂了或者新增一个consumer,consumer group会进行重平衡(rebalance),重平衡机制会在consumer篇具体讲解,本节不讲。那么按照上面的图继续画消费者端:

Kafka源码分析及图解原理之Broker端

这里是最好的情况,2个partition对应1个group中的2个consumer。那么思考,如果一个消费组的消费者大于分区数呢?或者小于分区数呢?

如果一个消费组的消费者大于分区数,那么相当于多余的消费者是一种浪费,多余的消费者将无法消费消息。

(编辑:辽源站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读