Hadoop HDFS HA 状态切换源码_wankunde的博客-程序员宝宝_ha zookeeper connection state: terminated

技术标签: hadoop  

NameNode启动进入StandBy State

NameNode  protected NameNode(Configuration conf, NamenodeRole role) {
    
  // 通过解析配置文件中的 dfs.nameservices,dfs.ha.namenodes,dfs.namenode.rpc-address 等参数,判断是否启动 HA
  boolean haEnabled;
  // 解析 dfs.namenode.startup 参数,默认值为 REGULAR
  StartupOption = StartupOption.REGULAR 
  // 如果启用 HA,且namenode没有在upgrade,进入 STANDBY_STATE ,否则进入 ACTIVE_STATE
  state = createHAState(getStartupOption(conf));
  state.enterState(haContext);
}
StandbyState public void enterState(HAContext context)
NameNodeHAContext public void startStandbyServices()
FSNamesystem void startStandbyServices(final Configuration conf) {
    
    editLogTailer = new EditLogTailer(this, conf);
    editLogTailer.start();
    if (standbyShouldCheckpoint) {
    
      standbyCheckpointer = new StandbyCheckpointer(conf, this);
      standbyCheckpointer.start();
    }
}

ZKFC 进程监控和切换 NameNode HA State

前面一部分代码在ZKFailoverController.run()方法中,全部为通用部分;

启动 HDFS ZKFC 服务

HDFS ZKFC 进程 class DFSZKFailoverController extends ZKFailoverController

DFSZKFailoverController main()
ZKFailoverController run()
ZKFailoverController doRun()

private int doRun(String[] args) {
    
    // ha.zookeeper.quorum : ZK 集群
    // 根据配置,初始化 elector = new ActiveStandbyElector(zkQuorum, zkTimeout, getParentZnode(), zkAcls, zkAuths, new ElectorCallbacks());
    initZK();

    initRPC();  // 实例 ZKFCRpcServer ,对应ProtocolPB : ZKFCProtocolPB
    initHM();  // 初始化,启动 HealthMonitor
    startRPC();  // 启动 ZKFCRpcServer
    try {
    
      mainLoop();  // 把当前线程wait() 
    } finally {
    
      rpcServer.stopAndJoin();
      
      elector.quitElection(true);
      healthMonitor.shutdown();
      healthMonitor.join();
    }
}

initZK() 建立 ZK连接和ZK event watcher

ZKFailoverController private void initZK()
ActiveStandbyElector public ActiveStandbyElector()
ActiveStandbyElector private void createConnection() // 创建连接ZK客户端 zkClient

在连接ZK 的时候顺便添加了一个ZK event watcher, 这个watch 主要用于后续有ZK节点发生变化时,释放 active 角色,重新进行选举。

/**
   * Get a new zookeeper client instance. protected so that test class can
   * inherit and pass in a mock object for zookeeper
   * 
   * @return new zookeeper client instance
   * @throws IOException
   * @throws KeeperException zookeeper connectionloss exception
   */
  protected synchronized ZooKeeper getNewZooKeeper() throws IOException,
      KeeperException {
    
    
    // Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
    // may trigger the Connected event immediately. So, if we register the
    // watcher after constructing ZooKeeper, we may miss that event. Instead,
    // we construct the watcher first, and have it block any events it receives
    // before we can set its ZooKeeper reference.
    watcher = new WatcherWithClientRef();
    ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
    watcher.setZooKeeperRef(zk);

    // Wait for the asynchronous success/failure. This may throw an exception
    // if we don't connect within the session timeout.
    watcher.waitForZKConnectionEvent(zkSessionTimeout);
    
    for (ZKAuthInfo auth : zkAuthInfo) {
    
      zk.addAuthInfo(auth.getScheme(), auth.getAuth());
    }
    return zk;
  }

class WatcherWithClientRef implements Watcher : public void process(WatchedEvent event)

// ActiveStandbyElector 

/**
   * interface implementation of Zookeeper watch events (connection and node),
   * proxied by {@link WatcherWithClientRef}.
   */
  synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
    
    Event.EventType eventType = event.getType();
    //...
    if (eventType == Event.EventType.None) {
    
      // the connection state has changed
      switch (event.getState()) {
    
      case SyncConnected:
        LOG.info("Session connected.");
        // if the listener was asked to move to safe state then it needs to
        // be undone
        ConnectionState prevConnectionState = zkConnectionState;
        zkConnectionState = ConnectionState.CONNECTED;
        if (prevConnectionState == ConnectionState.DISCONNECTED &&
            wantToBeInElection) {
    
          monitorActiveStatus();
        }
        break;
      case Disconnected:
        LOG.info("Session disconnected. Entering neutral mode...");

        // ask the app to move to safe state because zookeeper connection
        // is not active and we dont know our state
        zkConnectionState = ConnectionState.DISCONNECTED;
        enterNeutralMode();
        break;
      case Expired:
        // the connection got terminated because of session timeout
        // call listener to reconnect
        LOG.info("Session expired. Entering neutral mode and rejoining...");
        enterNeutralMode();
        reJoinElection(0);
        break;
      case SaslAuthenticated:
        LOG.info("Successfully authenticated to ZooKeeper using SASL.");
        break;
      default:
        fatalError("Unexpected Zookeeper watch event state: "
            + event.getState());
        break;
      }

      return;
    }

    // a watch on lock path in zookeeper has fired. so something has changed on
    // the lock. ideally we should check that the path is the same as the lock
    // path but trusting zookeeper for now
    String path = event.getPath();
    if (path != null) {
    
      switch (eventType) {
    
      case NodeDeleted:
        if (state == State.ACTIVE) {
    
          enterNeutralMode();
        }
        joinElectionInternal();
        break;
      case NodeDataChanged:
        monitorActiveStatus();
        break;
      default:
        LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
        monitorActiveStatus();
      }

      return;
    }

    // some unexpected error has occurred
    fatalError("Unexpected watch error from Zookeeper");
  }

先介绍上面的ZK node watcher机制,对应的event 处理在后面一起介绍。

启动 HealthMonitor服务

initHM() 中实例一个HealthMonitor对象;
HealthMonitor对象内部有一个MonitorDaemon Daemon线程;
daemon线程先不断尝试通过RPC协议连接NameNode RPC Server,连接成功后开始做 doHealthChecks()

// MonitorDaemon.java
    @Override
    public void run() {
    
      while (shouldRun) {
    
        try {
     
          loopUntilConnected(); // Loop try connect to NN using RPC call,并获取一个Proxy,
          doHealthChecks();
        } catch (InterruptedException ie) {
    
        }
      }
    }

// loopUntilConnected 通过 RPC 连接 NN
// HealthMonitor HAServiceProtocol createProxy()
// HAServiceTarget HAServiceProtocol getProxy()

// 获取到 NN 的RPC
  public HAServiceProtocolClientSideTranslatorPB(
      InetSocketAddress addr, Configuration conf,
      SocketFactory socketFactory, int timeout) throws IOException {
    
    RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
        ProtobufRpcEngine.class);
    rpcProxy = RPC.getProxy(HAServiceProtocolPB.class,
        RPC.getProtocolVersion(HAServiceProtocolPB.class), addr,
        UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
  }

NameNodeRpcServer

  • class NameNodeRpcServer implements NamenodeProtocols , NamenodeProtocols 实现了和NameNode通信的协议.
  • NameNodeRpcServer 在启动的时候添加对这些协议的支持

DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, serviceRpcServer);

doHealthChecks()

循环检查 ServiceStatus,如果Service 为 HEALTHY,则通过在zookeeper中抢占创建临时节点的方式来进行active service 选举。

  // 获取服务状态
  private void doHealthChecks() throws InterruptedException {
    
    while (shouldRun) {
    
      HAServiceStatus status = null;
      boolean healthy = false;
      try {
    
        status = proxy.getServiceStatus();
        proxy.monitorHealth();
        healthy = true;
      } catch (Throwable t) {
    
        ...
      }
      
      if (status != null) {
    
        setLastServiceStatus(status);
      }
      if (healthy) {
    
        enterState(State.SERVICE_HEALTHY);
      }

      Thread.sleep(checkIntervalMillis);
    }
  }

STACK

HealthMonitor enterState(State.SERVICE_HEALTHY);
HealthCallbacks public void enteredState(HealthMonitor.State newState)
ZKFailoverController private void recheckElectability()

封装等待选举的数据,并通过elector 加入选举。

HAServiceTarget localTarget
targetToData(localTarget) : 将NN的host,port,zkfcport 等信息封住为一个Protobuf Message 对象(ActiveNodeInfo),再转换为Byte Array。

    // recheckElectability()
    // 如果State = SERVICE_HEALTHY , 将当前节点加入选举
    case SERVICE_HEALTHY:
          elector.joinElection(targetToData(localTarget));
          if (quitElectionOnBadState) {
    
            quitElectionOnBadState = false;
          }
          break;

ActiveStandbyElector 和 ActiveStandbyElectorCallback

  • ActiveStandbyElector : HA 选举控制类,包括ZK proxy ,State, appData 等维护
  • ActiveStandbyElectorCallback : 每一个具体HA 模型要做的操作实现,通过callback调用,例如 elector.becomeActive -> callback.becomeActive()

ActiveStandbyElector : joinElection()
ActiveStandbyElector : joinElectionInternal()
ActiveStandbyElector : createLockNodeAsync()

最终调用 zkClient 创建一个节点,写入 ActiveNodeInfo 消息

zkLockFilePath = ${ha.zookeeper.parent-znode} + ${nameServiceId} + "ActiveStandbyElectorLock" , etc : /hadoop-ha/pasc/ActiveStandbyElectorLock
这里是Async 调用,异步写Zookeeper。

zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,this, zkClient);

zkClient create node 操作异步方法回调

上面方法的第5个参数是zookeeper异步操作的callback对象。class ActiveStandbyElector implements StatCallback, StringCallback 类实现了zookeeper操作的内部回调方法。

  • create node 回调 : void processResult(int rc, String path, Object ctx, String name)
  • exists node 回调 : void processResult(int rc, String path, Object ctx, Stat stat)
   /**
   * interface implementation of Zookeeper callback for create
   */
  @Override
  public synchronized void processResult(int rc, String path, Object ctx,
      String name) {
    
    // 如果节点创建成功,进入 becomeActive()
    Code code = Code.get(rc);
    if (isSuccess(code)) {
    
      // we successfully created the znode. we are the leader. start monitoring
      if (becomeActive()) {
    
        monitorActiveStatus();
      } else {
    
        reJoinElectionAfterFailureToBecomeActive();
      }
      return;
    }

    // 如果节点已经存在,进入 becomeStandby()
    if (isNodeExists(code)) {
    
      if (createRetryCount == 0) {
    
        // znode exists and we did not retry the operation. so a different
        // instance has created it. become standby and monitor lock.
        becomeStandby();
      }
      // if we had retried then the znode could have been created by our first
      // attempt to the server (that we lost) and this node exists response is
      // for the second attempt. verify this case via ephemeral node owner. this
      // will happen on the callback for monitoring the lock.
      monitorActiveStatus();
      return;
    }
    ...
  }

  private boolean becomeActive() {
    
    assert wantToBeInElection;
    if (state == State.ACTIVE) {
    
      // already active
      return true;
    }
    try {
    
      // 屏蔽oldActive. 获取 ZK 中 zkBreadCrumbPath 下 data,如果和当前 appData 不同,调用 appClient.fenceOldActive(data) 开始fence
      Stat oldBreadcrumbStat = fenceOldActive();
      // 将appData 放入 ZK的 zkBreadCrumbPath 节点
      writeBreadCrumbNode(oldBreadcrumbStat);
      
      LOG.debug("Becoming active for " + this);
      // 将关于Zookeeper的操作回传给 ZKFailoverController 来管理
      appClient.becomeActive();
      state = State.ACTIVE;
      return true;
    } catch (Exception e) {
    
        ..
    }
  }
        // 脱离HA 选举框架,实现自定义Service Action

ZK event watcher事件处理和 exists node操作回调

ZK event watcher 在上面建立ZK 连接的时候已经介绍。继续对应的事件处理。

ActiveStandbyElector private void monitorActiveStatus()
ActiveStandbyElector private void monitorLockNodeAsync()
ActiveStandbyElector zkClient.exists(zkLockFilePath, watcher, this, zkClient);

// ActiveStandbyElector
/**
   * interface implementation of Zookeeper callback for monitor (exists)
   */
  @Override
  public synchronized void processResult(int rc, String path, Object ctx,
      Stat stat) {
    
    if (isStaleClient(ctx)) return;
    monitorLockNodePending = false;

    assert wantToBeInElection :
        "Got a StatNode result after quitting election";
    
    LOG.debug("StatNode result: " + rc + " for path: " + path
        + " connectionState: " + zkConnectionState + " for " + this);
        

    Code code = Code.get(rc);
    if (isSuccess(code)) {
    
      // the following owner check completes verification in case the lock znode
      // creation was retried
      if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
    
        // we own the lock znode. so we are the leader
        if (!becomeActive()) {
    
          reJoinElectionAfterFailureToBecomeActive();
        }
      } else {
    
        // we dont own the lock znode. so we are a standby.
        becomeStandby();
      }
      // the watch set by us will notify about changes
      return;
    }

    if (isNodeDoesNotExist(code)) {
    
      // the lock znode disappeared before we started monitoring it
      enterNeutralMode();
      joinElectionInternal();
      return;
    }

    String errorMessage = "Received stat error from Zookeeper. code:"
        + code.toString();
    LOG.debug(errorMessage);

    if (shouldRetry(code)) {
    
      if (statRetryCount < NUM_RETRIES) {
    
        ++statRetryCount;
        monitorLockNodeAsync();
        return;
      }
      errorMessage = errorMessage
          + ". Not retrying further znode monitoring connection errors.";
    } else if (isSessionExpired(code)) {
    
      // This isn't fatal - the client Watcher will re-join the election
      LOG.warn("Lock monitoring failed because session was lost");
      return;
    }

    fatalError(errorMessage);
  }

ElectorCallbacks 回传选举结果给 ZKFailoverController

// 将appClient 的方法调用回到 ZKFailoverController的
  /**
   * Callbacks from elector
   */
  class ElectorCallbacks implements ActiveStandbyElectorCallback {
    
    @Override
    public void becomeActive() throws ServiceFailedException {
    
      ZKFailoverController.this.becomeActive();
    }

    @Override
    public void becomeStandby() {
    
      ZKFailoverController.this.becomeStandby();
    }
  }

  // ZKFailoverController
  private synchronized void becomeActive() throws ServiceFailedException {
    
    LOG.info("Trying to make " + localTarget + " active...");
    try {
    
      HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
          conf, FailoverController.getRpcTimeoutToNewActive(conf)),
          createReqInfo());
      String msg = "Successfully transitioned " + localTarget +
          " to active state";
      LOG.info(msg);
      serviceState = HAServiceState.ACTIVE;
      recordActiveAttempt(new ActiveAttemptRecord(true, msg));

    } catch (Throwable t) {
    
      ....
    }
  }

HAServiceProtocolHelper

通过 RPC Call 改变 localTarget 的状态

HAServiceProtocolHelper public static void transitionToActive() // TODO : 这里有一个请求封装来确定 RPC协议的代码
NameNodeRpcServer public synchronized void transitionToActive()
NameNode synchronized void transitionToActive()
进入 StandbyState 切换 NN State

HA State 切换代码

StandbyState.java STANDBY_STATE 向 ACTIVE_STATE 切换状态,通过 setStateInternal() 方法切换。
ActiveState.java 状态切换,代码基本类似。

  // StandbyState
  @Override
  public void setState(HAContext context, HAState s) throws ServiceFailedException {
    
    if (s == NameNode.ACTIVE_STATE) {
    
      setStateInternal(context, s);
      return;
    }
    super.setState(context, s);
  }

  /**
   * Internal method to transition the state of a given namenode to a new state.
   * @param nn Namenode
   * @param s new state
   * @throws ServiceFailedException on failure to transition to new state.
   */
  protected final void setStateInternal(final HAContext context, final HAState s)
      throws ServiceFailedException {
    
    prepareToExitState(context);
    s.prepareToEnterState(context);
    context.writeLock();
    try {
    
      exitState(context);
      context.setState(s);
      s.enterState(context);
      s.updateLastHATransitionTime();
    } finally {
    
      context.writeUnlock();
    }
  }

参考文献:

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/wankunde/article/details/90046896

智能推荐

CTime 使用总结_jimonanhai1的博客-程序员宝宝_用ctime初始化类

CTime() throw( );CTime(   __time64_ttime ) throw( );CTime(   intnYear,   intnMonth,   intnDay,   intnHour,   intnMin,   intnSec,      int nDST = -1);CTime(

网页中QQ常用代码_跟派大星学编程的博客-程序员宝宝_qq网页代码

1. 点击链接打开会话窗口2. 点击链接添加好友3. 点击加群4. 手机中网页点击链接打开指定会话窗口5. Android中打开指定QQ会话窗口6. Android中打开指定应用程序1. 点击链接打开会话窗口&amp;amp;lt;a href=&amp;quot;http://wpa.qq.com/msgrd?v=3&amp;amp;amp;uin=741047261&amp;amp;amp;site=qq&amp;amp;amp;men...

java严格区分大小写吗_Java是否区分大小写?_晓晓姑娘的博客-程序员宝宝

我在某处读到Java是区分大小写的。 我一直无法证实这一点。Java源代码是区分大小写的,如果你的意思是。 即Double与double不是同一个types,并且可以有两个不同的variablesmyData和mydata 。是吗? 如果是这样,为什么?区分大小写在大多数编程语言和环境中都是常见的,因为在最低级别下,大小写字母的表示方式不同。 对一台电脑来说,“a”和“a”是两个完全不同的东西,需...

java中String的split方法最后一个分隔符无内容_许普诺斯!叮的博客-程序员宝宝_split 最后一个分隔符

String的split方法可以将指定字符串按规定的字符分隔,用法如下:String str = "a,b,c,d,a";//将str字符串用‘,’分隔开String[] arr = str.split(",");分隔出来的数组为"a", "b", "c", "d", "a",数组长度为5如果str = "a,b,c,,",则数组长度为3。如果str = "a,b,c,,a",则数组长度为5。可见,如果末尾的分隔符分隔出来的值为空值,则不会计入数组;而中间的分隔符分出来的空值则

matlab数字图像处理系统_你的matlab大师的博客-程序员宝宝

课题介绍matlab具有完备的图形处理功能、友好的用户界面以及功能强大的图形处理工具箱,能够实现对数字图像的编辑和处理工作,实现功能包括数字图像的读取、存储、显示、去色、图像翻转、局部放大、透明度调整、去噪、平滑、锐化、压缩、边缘检测等操作。本文的主要内容如下:1.研究图像处理技术,包括图像处理技术的分类、数字图像处理的特点,主要内容以及应用。2.分析MATLAB软件及其在图像处理中的应用。3.完...

随便推点

敏捷Scrum指南二:Scrum流程之RetroSpect_Tina-Deng的博客-程序员宝宝

RetroSpect的方向US: Sprint US是否存在delayed或Cancelled? BurnDownChat走势是否符合预期? US实际工作量是否与评估Point一致? US拆分是否符合最小MVP、垂直切分原则? US的排期执行过程中是否被调整? US的优先级安排是否遵循"小功能高价值"、轻重缓急。 是否有紧急US的加入? 版本: ...

有效值——百度百科_weixin_30797027的博客-程序员宝宝

今天遇到一个面试题,如何测试交流电的有效值,因此特此百度了一下。有效值在相同的电阻上分别通过直流电流和交流电流,经过一个交流周期的时间,如果它们在电阻上所消耗的电能相等的话,则把该直流电流(电压)的大小作为交流电流(电压)的有效值,正弦电流(电压)的有效值等于其最大值(幅值)的1/√2,约0.707倍。在正弦交流电流电中根据热等效原理,定义电流和电压的有效值为其瞬时...

原声php 读取excel乱码_PHP读取csv时,读取中文乱码问题解决方法_weixin_39782832的博客-程序员宝宝

最后,贴一个fgetcsv()方法的替代函数,这种是针对有些老版本的php该函数自带着一些bug的情况。如果上边两点还是没办法解决您的问题,您可以尝试一下使用该函数,与前边两点配合使用。function fgetcsv_reg(&amp; $handle, $length = null, $d = ',', $e = '"') {$d = preg_quote($d);$e = preg_quot...

mockito接口没法赋值_Mockito:无法实例化@InjectMocks字段:类型是接口_dnc8371的博客-程序员宝宝

mockito接口没法赋值 使用Mockito进行Java类的模拟和存根的任何人,可能都熟悉InjectMocks -annotation。 在要测试的类上使用此批注,Mockito将尝试通过构造函数注入,setter注入或属性注入来注入模拟。 魔术成功了,它无声地失败了,或者抛出了MockitoException 。 我想解释什么原因导致“ MockitoException:无法实例化名..._1671465600

计算机控制系统编程语言,PLC的五种主要编程语言是什么?_Towuresii的博客-程序员宝宝

与一般的计算机语言相比,PLC软件的编程语言具有明显的特点. 它不同于高级语言和通用汇编语言,必须满足易于编写和调试的要求.早期PLC仅支持梯形图编程语言和指令列表编程语言. 根据国际电工委员会的说法,有五种语言可以支持PLC编程. 今天我将简短地谈谈这些语言及其特征. 和应用程序.目录首先,梯形图删除(LD)第二,指令表Delete(IL)三,功能框图删除(FBD)第四个顺序功能流程图删除(SF...

Kafka 报错异常_张伯毅的博客-程序员宝宝

Kafka 后台 server.log 日志报错[2019-09-11 15:40:16,341] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'of...

推荐文章

热门文章

相关标签