kafka源码解析之十OffsetManager_亮亮-AC米兰的博客-程序员宝宝

技术标签: kafka  kafka源码解析  

OffsetManager主要提供对offset的保存和读取,kafka管理topic的偏移量有2种方式:1)zookeeper,即把偏移量提交至zk上;2)kafka,即把偏移量提交至kafka内部,主要由offsets.storage参数决定,默认为zookeeper。也就是说如果配置offsets.storage= kafka,则kafka会把这种offsetcommit请求转变为一种Producer,保存至topic为“__consumer_offsets”的log里面。

查看OffsetManager类:
class OffsetManager(val config: OffsetManagerConfig,
                    replicaManager: ReplicaManager,
                    zkClient: ZkClient,
                    scheduler: Scheduler) extends Logging with KafkaMetricsGroup {

  /* offsets and metadata cache */
//通过offsetsCache提供对GroupTopicPartition的查询
  private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
  //把过时的偏移量刷入磁盘,因为这些偏移量长时间没有被更新,意味着消费者可能不再消费了,也就不需要了,因此刷入到磁盘
  scheduler.schedule(name = "offsets-cache-compactor",
                     fun = compact,
                     period = config.offsetsRetentionCheckIntervalMs,
                     unit = TimeUnit.MILLISECONDS)
……
}

主要完成2件事情:
           1)提供对topic偏移量的查询
           2)将偏移量消息刷入到以__consumer_offsets命名的topic的log中

10.1 offsetsCache的更新机制

那么offsetsCache是如何生成的呢?是通过producer端发送消息给leader,然后leader不断更新此偏移量。Leader更新此偏移量分3种情况:
1)当produceRequest.requiredAcks == 0时,即不需要ack,则立刻调用putOffsets更新偏移量
2)当produceRequest.requiredAcks == 1时,即需要立即返回response时,则立刻调用putOffsets更新偏移量
3)当produceRequest.requiredAcks == -1时,即只有此批消息达到最小副本数的时候,通过ProducerRequestPurgatory触发调用putOffsets更新偏移量 (ProducerRequestPurgatory之后的章节会讲)

10.2 compact机制

那么compact是如何工作的呢?


//去除offsetsCache过时的OffsetAndMetadata,并把偏移量刷入磁盘
private def compact() {
  debug("Compacting offsets cache.")
  val startMs = SystemTime.milliseconds
//过滤出长时间没有被更新的offset
  val staleOffsets = offsetsCache.filter(startMs - _._2.timestamp > config.offsetsRetentionMs)
  debug("Found %d stale offsets (older than %d ms).".format(staleOffsets.size, config.offsetsRetentionMs))
  // delete the stale offsets from the table and generate tombstone messages to remove them from the log
  val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) =>
    val offsetsPartition = partitionFor(groupTopicAndPartition.group)
    trace("Removing stale offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
    offsetsCache.remove(groupTopicAndPartition)
    val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group,
      groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
    (offsetsPartition, new Message(bytes = null, key = commitKey))
  }.groupBy{ case (partition, tombstone) => partition }
  // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
  // if we crash or leaders move) since the new leaders will get rid of stale offsets during their own purge cycles.
  val numRemoved = tombstonesForPartition.flatMap { case(offsetsPartition, tombstones) =>
    val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
    partitionOpt.map { partition =>
      val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
      val messages = tombstones.map(_._2).toSeq
      trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
      try {
//把偏移量刷入磁盘,供kafka重启的时候读取,即loadOffsetsFromLog
        partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
        tombstones.size
      }
      catch {
        case t: Throwable =>
          error("Failed to mark %d stale offsets for deletion in %s.".format(messages.size, appendPartition), t)
          // ignore and continue
          0
      }
    }
  }.sum
  debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs))
}

其实就是把不再有消息发送的topic的偏移量刷入到磁盘,并且leader在重启的时候可以调用loadOffsetsFromLog从磁盘加载偏移量。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/wl044090432/article/details/51038600

智能推荐

Java math类的方法__七七的博客-程序员宝宝

Math用于做数学运算。Math类中的方法全部是静态方法,直接用类名调用即可。方法:public static int abs(int a)获取参数a的绝对值:public static double ceil(double a)向上取整public static double floor (double a)向下取整public static double pow(double a, double b)获取a的b次幂public static long round (double a

vscode使用svn插件_Έι的博客-程序员宝宝_mac vscode svn插件

首先下载svnsvn下载地址下载svn就傻瓜式安装就行如果出现报错 SVN not found. Install it or configure it using the "svn.path"找到安装包重新按一下下面步骤选中安装第一项vscode中搜索插件:svn和tortoisesvn安装在setting.json中配置svn地址"svn.path":"C://Program Files//TortoiseSVN//bin" 之前没改变大多都在这个路径重启v

批量删除outlook里面重复的邮件_wh62592855的博客-程序员宝宝_outlook怎么删除重复邮件

有时候Outlook里有许多封邮件时,但由于误操作或误导入或其他原因,其中大多是重复的邮件。要手动一封一封的删除太费劲了,基本不可能。如果能自动批量的检测并删除就好了。那到底有没有什么好办法能够把重复的邮件批量删除?有,1、用一个工具先检测所有重复的邮件2、把

unity制作登录界面(UGUI知识)_头号理想的博客-程序员宝宝_unity登录界面

我们在平时玩游戏的时候,都会有登录系统,在这里我给大家简单实现一下这个登录系统1.制作背景我们先可以在网页上找几张自己喜欢的背景图片然后把这张照片保存于我们的项目的文件夹中;然后在unity中修改图片的属性然后把Texture Type修改成Sprite(2D and UI) 就可以在把这张图片拖到Image上了,这样背景就做好了。2.UI的创建在unity中我们首先创建两个Te..._1671465600

Vertex Shader几何变换---扭曲_nicolelili1的博客-程序员宝宝_cococs2dx shader 置换扭曲

1、顶点的旋转// Upgrade NOTE: replaced 'mul(UNITY_MATRIX_MVP,*)' with 'UnityObjectToClipPos(*)'Shader "Custom/ls_ten" { SubShader{ pass { CGPROGRAM#pragma vertex vert#pragma fragment frag#in

生信入门(四)——使用DESeq2进行RNA-seq数据分析_柚子味的羊的博客-程序员宝宝_rna-seq的deseq2分析

生信入门(三)——使用DESeq2进行RNA-seq数据分析文章目录生信入门(三)——使用DESeq2进行RNA-seq数据分析一、学习目标二、实验数据1、数据来源2、建模计数数据3、转录本丰度4、salmon定量三、用tximport导入R1、指定文件位置2、将转录本映射到基因今日学习内容DESeq2分析RNA-seq数据一、学习目标直观地评估 RNA-seq 数据的质量对 RNA-seq 数据进行基本的差异分析将 RNA-seq 结果与其他实验数据进行比较从 FASTQ 文件中量化转录表

随便推点

关于renren-wap-ugc-photo-service的架构_wangqiaowqo的博客-程序员宝宝

1、关于renren-wap-ugc-photo-service的架构pom.xml[code="java"] com.xiaonei renren-wap-framework-utils com.xiaonei renren-wap-framework-logic xiaonei-pho...

【面试题】Spring IoC与DI、自动装配与循环依赖_架构攻城之路的博客-程序员宝宝

Spring IoC是Spring最经典设计,自动装配是IoC注入“自动化”的一个简化配置操作。尽管IoC注入帮我们管理了对象之间的依赖关系,但是仍可能发生设计不当而导致了循环依赖问题。强大Spring也提供了一些优雅的解决方案。目录什么是IoCIoC与控制反转如何理解IoC容器IoC注入方式优缺点比较自动装配装配方式循环依赖解决办法什么是IoCSpring IoC容器负责对象的生命周期和对象之间的(依赖)关系。在创建新的Bean时,IoC容器会自动注入新B

sso 服务端配置_使用身份传播配置服务提供商启动的SSO_cuxiong8996的博客-程序员宝宝

关于本系列 这个由三部分组成的系列文章“使用带有WebSphere Liberty的SAML 2.0进行跨域单点登录”介绍了在混合云环境中使用IBMCloud的端到端单点登录(SSO)解决方案。 它说明了如何使用WebSphereLiberty使云上的Java EE标准应用程序能够安全地调用私有网络中公开的服务。 在此解决方案中,您将了解如何配置IBM WebSphere Liberty...

AdapterView 的setOnItemClickListener 与子view setOnclick。__houzhi的博客-程序员宝宝

额,首先介绍一下adapterview吧。AbsListView ,AbsSpinner,AdapterViewAnimator 都是它的子类,所以啥ListView,GridView,Spinner 啥的都是其子类。这样在用到这些类的时候,都会遇到下面这些问题和疑惑吧。其实有两种方式可以实现AdapterView 的item onclick监听。一种是setOnItemClickLi

爬山算法_楚兴的博客-程序员宝宝_爬山算法

爬山算法是一种局部择优的方法,采用启发式方法,是对深度优先搜索的一种改进,它利用反馈信息帮助生成解的决策。 属于人工智能算法的一种。算法描述从当前的节点开始,和周围的邻居节点的值进行比较。 如果当前节点是最大的,那么返回当前节点,作为最大值(既山峰最高点);反之就用最高的邻居节点来,替换当前节点,从而实现向山峰的高处攀爬的目的。如此循环直到达到最高点。function HillClimbing(pr

解决Django出现OverflowError: Python int too large to convert to C long_Black leaves的博客-程序员宝宝

File "C:\Users\huaixiao\AppData\Local\Programs\Python\Python37-32\lib\site-packages\django\db\backends\utils.py", line 84, in _execute return self.cursor.execute(sql, params) File "C:\Users\huaixiao\AppData\Local\Programs\Python\Python37-32\lib\sit..

推荐文章

热门文章

相关标签