技术标签: hadoop
NameNode protected NameNode(Configuration conf, NamenodeRole role) {
// 通过解析配置文件中的 dfs.nameservices,dfs.ha.namenodes,dfs.namenode.rpc-address 等参数,判断是否启动 HA
boolean haEnabled;
// 解析 dfs.namenode.startup 参数,默认值为 REGULAR
StartupOption = StartupOption.REGULAR
// 如果启用 HA,且namenode没有在upgrade,进入 STANDBY_STATE ,否则进入 ACTIVE_STATE
state = createHAState(getStartupOption(conf));
state.enterState(haContext);
}
StandbyState public void enterState(HAContext context)
NameNodeHAContext public void startStandbyServices()
FSNamesystem void startStandbyServices(final Configuration conf) {
editLogTailer = new EditLogTailer(this, conf);
editLogTailer.start();
if (standbyShouldCheckpoint) {
standbyCheckpointer = new StandbyCheckpointer(conf, this);
standbyCheckpointer.start();
}
}
前面一部分代码在ZKFailoverController.run()
方法中,全部为通用部分;
HDFS ZKFC 进程 class DFSZKFailoverController extends ZKFailoverController
DFSZKFailoverController main()
ZKFailoverController run()
ZKFailoverController doRun()
private int doRun(String[] args) {
// ha.zookeeper.quorum : ZK 集群
// 根据配置,初始化 elector = new ActiveStandbyElector(zkQuorum, zkTimeout, getParentZnode(), zkAcls, zkAuths, new ElectorCallbacks());
initZK();
initRPC(); // 实例 ZKFCRpcServer ,对应ProtocolPB : ZKFCProtocolPB
initHM(); // 初始化,启动 HealthMonitor
startRPC(); // 启动 ZKFCRpcServer
try {
mainLoop(); // 把当前线程wait()
} finally {
rpcServer.stopAndJoin();
elector.quitElection(true);
healthMonitor.shutdown();
healthMonitor.join();
}
}
ZKFailoverController private void initZK()
ActiveStandbyElector public ActiveStandbyElector()
ActiveStandbyElector private void createConnection() // 创建连接ZK客户端 zkClient
在连接ZK 的时候顺便添加了一个ZK event watcher, 这个watch 主要用于后续有ZK节点发生变化时,释放 active 角色,重新进行选举。
/**
* Get a new zookeeper client instance. protected so that test class can
* inherit and pass in a mock object for zookeeper
*
* @return new zookeeper client instance
* @throws IOException
* @throws KeeperException zookeeper connectionloss exception
*/
protected synchronized ZooKeeper getNewZooKeeper() throws IOException,
KeeperException {
// Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
// may trigger the Connected event immediately. So, if we register the
// watcher after constructing ZooKeeper, we may miss that event. Instead,
// we construct the watcher first, and have it block any events it receives
// before we can set its ZooKeeper reference.
watcher = new WatcherWithClientRef();
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
watcher.setZooKeeperRef(zk);
// Wait for the asynchronous success/failure. This may throw an exception
// if we don't connect within the session timeout.
watcher.waitForZKConnectionEvent(zkSessionTimeout);
for (ZKAuthInfo auth : zkAuthInfo) {
zk.addAuthInfo(auth.getScheme(), auth.getAuth());
}
return zk;
}
class WatcherWithClientRef implements Watcher : public void process(WatchedEvent event)
// ActiveStandbyElector
/**
* interface implementation of Zookeeper watch events (connection and node),
* proxied by {@link WatcherWithClientRef}.
*/
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
//...
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to
// be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED &&
wantToBeInElection) {
monitorActiveStatus();
}
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
break;
}
return;
}
// a watch on lock path in zookeeper has fired. so something has changed on
// the lock. ideally we should check that the path is the same as the lock
// path but trusting zookeeper for now
String path = event.getPath();
if (path != null) {
switch (eventType) {
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
case NodeDataChanged:
monitorActiveStatus();
break;
default:
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
monitorActiveStatus();
}
return;
}
// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
}
先介绍上面的ZK node watcher机制,对应的event 处理在后面一起介绍。
initHM()
中实例一个HealthMonitor
对象;
HealthMonitor
对象内部有一个MonitorDaemon Daemon
线程;
daemon线程先不断尝试通过RPC协议连接NameNode RPC Server,连接成功后开始做 doHealthChecks()
// MonitorDaemon.java
@Override
public void run() {
while (shouldRun) {
try {
loopUntilConnected(); // Loop try connect to NN using RPC call,并获取一个Proxy,
doHealthChecks();
} catch (InterruptedException ie) {
}
}
}
// loopUntilConnected 通过 RPC 连接 NN
// HealthMonitor HAServiceProtocol createProxy()
// HAServiceTarget HAServiceProtocol getProxy()
// 获取到 NN 的RPC
public HAServiceProtocolClientSideTranslatorPB(
InetSocketAddress addr, Configuration conf,
SocketFactory socketFactory, int timeout) throws IOException {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
rpcProxy = RPC.getProxy(HAServiceProtocolPB.class,
RPC.getProtocolVersion(HAServiceProtocolPB.class), addr,
UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
}
class NameNodeRpcServer implements NamenodeProtocols
, NamenodeProtocols 实现了和NameNode通信的协议.DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, serviceRpcServer);
循环检查 ServiceStatus,如果Service 为 HEALTHY,则通过在zookeeper中抢占创建临时节点的方式来进行active service 选举。
// 获取服务状态
private void doHealthChecks() throws InterruptedException {
while (shouldRun) {
HAServiceStatus status = null;
boolean healthy = false;
try {
status = proxy.getServiceStatus();
proxy.monitorHealth();
healthy = true;
} catch (Throwable t) {
...
}
if (status != null) {
setLastServiceStatus(status);
}
if (healthy) {
enterState(State.SERVICE_HEALTHY);
}
Thread.sleep(checkIntervalMillis);
}
}
STACK
HealthMonitor enterState(State.SERVICE_HEALTHY);
HealthCallbacks public void enteredState(HealthMonitor.State newState)
ZKFailoverController private void recheckElectability()
封装等待选举的数据,并通过elector 加入选举。
HAServiceTarget localTarget
targetToData(localTarget) : 将NN的host,port,zkfcport 等信息封住为一个Protobuf Message 对象(ActiveNodeInfo),再转换为Byte Array。
// recheckElectability()
// 如果State = SERVICE_HEALTHY , 将当前节点加入选举
case SERVICE_HEALTHY:
elector.joinElection(targetToData(localTarget));
if (quitElectionOnBadState) {
quitElectionOnBadState = false;
}
break;
ActiveStandbyElector : joinElection()
ActiveStandbyElector : joinElectionInternal()
ActiveStandbyElector : createLockNodeAsync()
最终调用 zkClient 创建一个节点,写入 ActiveNodeInfo 消息
zkLockFilePath = ${ha.zookeeper.parent-znode} + ${nameServiceId} + "ActiveStandbyElectorLock" , etc : /hadoop-ha/pasc/ActiveStandbyElectorLock
这里是Async 调用,异步写Zookeeper。
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,this, zkClient);
上面方法的第5个参数是zookeeper异步操作的callback对象。class ActiveStandbyElector implements StatCallback, StringCallback
类实现了zookeeper操作的内部回调方法。
void processResult(int rc, String path, Object ctx, String name)
void processResult(int rc, String path, Object ctx, Stat stat)
/**
* interface implementation of Zookeeper callback for create
*/
@Override
public synchronized void processResult(int rc, String path, Object ctx,
String name) {
// 如果节点创建成功,进入 becomeActive()
Code code = Code.get(rc);
if (isSuccess(code)) {
// we successfully created the znode. we are the leader. start monitoring
if (becomeActive()) {
monitorActiveStatus();
} else {
reJoinElectionAfterFailureToBecomeActive();
}
return;
}
// 如果节点已经存在,进入 becomeStandby()
if (isNodeExists(code)) {
if (createRetryCount == 0) {
// znode exists and we did not retry the operation. so a different
// instance has created it. become standby and monitor lock.
becomeStandby();
}
// if we had retried then the znode could have been created by our first
// attempt to the server (that we lost) and this node exists response is
// for the second attempt. verify this case via ephemeral node owner. this
// will happen on the callback for monitoring the lock.
monitorActiveStatus();
return;
}
...
}
private boolean becomeActive() {
assert wantToBeInElection;
if (state == State.ACTIVE) {
// already active
return true;
}
try {
// 屏蔽oldActive. 获取 ZK 中 zkBreadCrumbPath 下 data,如果和当前 appData 不同,调用 appClient.fenceOldActive(data) 开始fence
Stat oldBreadcrumbStat = fenceOldActive();
// 将appData 放入 ZK的 zkBreadCrumbPath 节点
writeBreadCrumbNode(oldBreadcrumbStat);
LOG.debug("Becoming active for " + this);
// 将关于Zookeeper的操作回传给 ZKFailoverController 来管理
appClient.becomeActive();
state = State.ACTIVE;
return true;
} catch (Exception e) {
..
}
}
// 脱离HA 选举框架,实现自定义Service Action
ZK event watcher 在上面建立ZK 连接的时候已经介绍。继续对应的事件处理。
ActiveStandbyElector private void monitorActiveStatus()
ActiveStandbyElector private void monitorLockNodeAsync()
ActiveStandbyElector zkClient.exists(zkLockFilePath, watcher, this, zkClient);
// ActiveStandbyElector
/**
* interface implementation of Zookeeper callback for monitor (exists)
*/
@Override
public synchronized void processResult(int rc, String path, Object ctx,
Stat stat) {
if (isStaleClient(ctx)) return;
monitorLockNodePending = false;
assert wantToBeInElection :
"Got a StatNode result after quitting election";
LOG.debug("StatNode result: " + rc + " for path: " + path
+ " connectionState: " + zkConnectionState + " for " + this);
Code code = Code.get(rc);
if (isSuccess(code)) {
// the following owner check completes verification in case the lock znode
// creation was retried
if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
// we own the lock znode. so we are the leader
if (!becomeActive()) {
reJoinElectionAfterFailureToBecomeActive();
}
} else {
// we dont own the lock znode. so we are a standby.
becomeStandby();
}
// the watch set by us will notify about changes
return;
}
if (isNodeDoesNotExist(code)) {
// the lock znode disappeared before we started monitoring it
enterNeutralMode();
joinElectionInternal();
return;
}
String errorMessage = "Received stat error from Zookeeper. code:"
+ code.toString();
LOG.debug(errorMessage);
if (shouldRetry(code)) {
if (statRetryCount < NUM_RETRIES) {
++statRetryCount;
monitorLockNodeAsync();
return;
}
errorMessage = errorMessage
+ ". Not retrying further znode monitoring connection errors.";
} else if (isSessionExpired(code)) {
// This isn't fatal - the client Watcher will re-join the election
LOG.warn("Lock monitoring failed because session was lost");
return;
}
fatalError(errorMessage);
}
// 将appClient 的方法调用回到 ZKFailoverController的
/**
* Callbacks from elector
*/
class ElectorCallbacks implements ActiveStandbyElectorCallback {
@Override
public void becomeActive() throws ServiceFailedException {
ZKFailoverController.this.becomeActive();
}
@Override
public void becomeStandby() {
ZKFailoverController.this.becomeStandby();
}
}
// ZKFailoverController
private synchronized void becomeActive() throws ServiceFailedException {
LOG.info("Trying to make " + localTarget + " active...");
try {
HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
conf, FailoverController.getRpcTimeoutToNewActive(conf)),
createReqInfo());
String msg = "Successfully transitioned " + localTarget +
" to active state";
LOG.info(msg);
serviceState = HAServiceState.ACTIVE;
recordActiveAttempt(new ActiveAttemptRecord(true, msg));
} catch (Throwable t) {
....
}
}
HAServiceProtocolHelper public static void transitionToActive() // TODO : 这里有一个请求封装来确定 RPC协议的代码
NameNodeRpcServer public synchronized void transitionToActive()
NameNode synchronized void transitionToActive()
进入 StandbyState 切换 NN State
StandbyState.java STANDBY_STATE 向 ACTIVE_STATE 切换状态,通过 setStateInternal() 方法切换。
ActiveState.java 状态切换,代码基本类似。
// StandbyState
@Override
public void setState(HAContext context, HAState s) throws ServiceFailedException {
if (s == NameNode.ACTIVE_STATE) {
setStateInternal(context, s);
return;
}
super.setState(context, s);
}
/**
* Internal method to transition the state of a given namenode to a new state.
* @param nn Namenode
* @param s new state
* @throws ServiceFailedException on failure to transition to new state.
*/
protected final void setStateInternal(final HAContext context, final HAState s)
throws ServiceFailedException {
prepareToExitState(context);
s.prepareToEnterState(context);
context.writeLock();
try {
exitState(context);
context.setState(s);
s.enterState(context);
s.updateLastHATransitionTime();
} finally {
context.writeUnlock();
}
}
参考文献:
CTime() throw( );CTime( __time64_ttime ) throw( );CTime( intnYear, intnMonth, intnDay, intnHour, intnMin, intnSec, int nDST = -1);CTime(
1. 点击链接打开会话窗口2. 点击链接添加好友3. 点击加群4. 手机中网页点击链接打开指定会话窗口5. Android中打开指定QQ会话窗口6. Android中打开指定应用程序1. 点击链接打开会话窗口&amp;lt;a href=&quot;http://wpa.qq.com/msgrd?v=3&amp;amp;uin=741047261&amp;amp;site=qq&amp;amp;men...
常用的网络分析工具
我在某处读到Java是区分大小写的。 我一直无法证实这一点。Java源代码是区分大小写的,如果你的意思是。 即Double与double不是同一个types,并且可以有两个不同的variablesmyData和mydata 。是吗? 如果是这样,为什么?区分大小写在大多数编程语言和环境中都是常见的,因为在最低级别下,大小写字母的表示方式不同。 对一台电脑来说,“a”和“a”是两个完全不同的东西,需...
String的split方法可以将指定字符串按规定的字符分隔,用法如下:String str = "a,b,c,d,a";//将str字符串用‘,’分隔开String[] arr = str.split(",");分隔出来的数组为"a", "b", "c", "d", "a",数组长度为5如果str = "a,b,c,,",则数组长度为3。如果str = "a,b,c,,a",则数组长度为5。可见,如果末尾的分隔符分隔出来的值为空值,则不会计入数组;而中间的分隔符分出来的空值则
课题介绍matlab具有完备的图形处理功能、友好的用户界面以及功能强大的图形处理工具箱,能够实现对数字图像的编辑和处理工作,实现功能包括数字图像的读取、存储、显示、去色、图像翻转、局部放大、透明度调整、去噪、平滑、锐化、压缩、边缘检测等操作。本文的主要内容如下:1.研究图像处理技术,包括图像处理技术的分类、数字图像处理的特点,主要内容以及应用。2.分析MATLAB软件及其在图像处理中的应用。3.完...
RetroSpect的方向US: Sprint US是否存在delayed或Cancelled? BurnDownChat走势是否符合预期? US实际工作量是否与评估Point一致? US拆分是否符合最小MVP、垂直切分原则? US的排期执行过程中是否被调整? US的优先级安排是否遵循"小功能高价值"、轻重缓急。 是否有紧急US的加入? 版本: ...
今天遇到一个面试题,如何测试交流电的有效值,因此特此百度了一下。有效值在相同的电阻上分别通过直流电流和交流电流,经过一个交流周期的时间,如果它们在电阻上所消耗的电能相等的话,则把该直流电流(电压)的大小作为交流电流(电压)的有效值,正弦电流(电压)的有效值等于其最大值(幅值)的1/√2,约0.707倍。在正弦交流电流电中根据热等效原理,定义电流和电压的有效值为其瞬时...
最后,贴一个fgetcsv()方法的替代函数,这种是针对有些老版本的php该函数自带着一些bug的情况。如果上边两点还是没办法解决您的问题,您可以尝试一下使用该函数,与前边两点配合使用。function fgetcsv_reg(& $handle, $length = null, $d = ',', $e = '"') {$d = preg_quote($d);$e = preg_quot...
mockito接口没法赋值 使用Mockito进行Java类的模拟和存根的任何人,可能都熟悉InjectMocks -annotation。 在要测试的类上使用此批注,Mockito将尝试通过构造函数注入,setter注入或属性注入来注入模拟。 魔术成功了,它无声地失败了,或者抛出了MockitoException 。 我想解释什么原因导致“ MockitoException:无法实例化名..._1671465600
与一般的计算机语言相比,PLC软件的编程语言具有明显的特点. 它不同于高级语言和通用汇编语言,必须满足易于编写和调试的要求.早期PLC仅支持梯形图编程语言和指令列表编程语言. 根据国际电工委员会的说法,有五种语言可以支持PLC编程. 今天我将简短地谈谈这些语言及其特征. 和应用程序.目录首先,梯形图删除(LD)第二,指令表Delete(IL)三,功能框图删除(FBD)第四个顺序功能流程图删除(SF...
Kafka 后台 server.log 日志报错[2019-09-11 15:40:16,341] ERROR [KafkaApi-1001] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'of...