技术标签: cpp
[email protected] BRPC源码分析(七) SOCKET
int Socket::HandleEpollOut(SocketId id) {
SocketUniquePtr s;
if (Socket::AddressFailedAsWell(id, &s) < 0) {
// Ignore recycled sockets
return -1;
}
EpollOutRequest* req = dynamic_cast<EpollOutRequest*>(s->user());
if (req != NULL) {
return s->HandleEpollOutRequest(0, req);
}
s->_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
bthread::butex_wake_except(s->_epollout_butex, 0);
return 0;
}
Socket在被添加到epoll之前可能已经被’ SetFailed’了,这些Socket在’ SetFailed’里面没有信号,因此使用’ AddressFailedAsWell’来发送信号,以防止死锁。
int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) {
if (SetFailed() != 0) {
return -1;
}
GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd, false);
return req->on_epollout_event(req->fd, error_code, req->data);
}
int Socket::KeepWriteIfConnected(int fd, int err, void* data) {
WriteRequest* req = static_cast<WriteRequest*>(data);
Socket* s = req->socket;
if (err == 0 && s->ssl_state() == SSL_CONNECTING) {
bthread_t th;
google::protobuf::Closure* thrd_func = brpc::NewCallback(
Socket::CheckConnectedAndKeepWrite, fd, err, data);
if ((err = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
RunClosure, thrd_func)) == 0) {
return 0;
} else {
PLOG(ERROR) << "Fail to start bthread";
}
}
CheckConnectedAndKeepWrite(fd, err, data);
return 0;
}
req->data.swap(*data);
req->next = WriteRequest::UNCONNECTED;
req->id_wait = opt.id_wait;
req->set_pipelined_count_and_user_message(
opt.pipelined_count, DUMMY_USER_MESSAGE, opt.with_auth);
return StartWrite(req, opt);
将’ req->next’设置为UNCONNECTED,使得KeepWrite线程将一直等待,直到它指向一个有效的WriteRequest或NULL。
int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
WriteRequest* const prev_head =
_write_head.exchange(req, butil::memory_order_release);
if (prev_head != NULL) {
req->next = prev_head;
return 0;
}
int saved_errno = 0;
bthread_t th;
SocketUniquePtr ptr_for_keep_write;
ssize_t nw = 0;
req->next = NULL;
int ret = ConnectIfNot(opt.abstime, req);
if (ret < 0) {
saved_errno = errno;
SetFailed(errno, "Fail to connect %s directly: %m", description().c_str());
goto FAIL_TO_WRITE;
} else if (ret == 1) {
return 0;
}
req->Setup(this);
if (ssl_state() != SSL_OFF) {
goto KEEPWRITE_IN_BACKGROUND;
}
if (_conn) {
butil::IOBuf* data_arr[1] = {
&req->data };
nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
} else {
nw = req->data.cut_into_file_descriptor(fd());
}
if (nw < 0) {
if (errno != EAGAIN && errno != EOVERCROWDED) {
saved_errno = errno;
PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
SetFailed(saved_errno, "Fail to write into %s: %s",
description().c_str(), berror(saved_errno));
goto FAIL_TO_WRITE;
}
} else {
AddOutputBytes(nw);
}
if (IsWriteComplete(req, true, NULL)) {
ReturnSuccessfulWriteRequest(req);
return 0;
}
KEEPWRITE_IN_BACKGROUND:
ReAddress(&ptr_for_keep_write);
req->socket = ptr_for_keep_write.release();
if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
KeepWrite, req) != 0) {
LOG(FATAL) << "Fail to start KeepWrite";
KeepWrite(req);
}
return 0;
FAIL_TO_WRITE:
ReleaseAllFailedWriteRequests(req);
errno = saved_errno;
return -1;
}
*req
。KeepWrite
线程可能会一直请求Spin协议,直到旁边的req->next
变为非未连接状态。这个过程不是无锁。remote_side()
。KeepWriteIfConnected
,KeepWriteIfConnected
之后将会被’ req’调用Setup()
在Connect之后调用,而Connect可能会调用app_connect
。KeepWrite
线程中继续。EOVERCROWDED
。ReturnFailedWriteRequest
(ReturnFailedWriteRequest
调用的on_reset
函数并且在id对象中callback)之前的’ SetFailed’,这样可以立即知道这个Socket在on_reset
函数中Callback失败。 while (true) {
int rc = SSL_do_handshake(_ssl_session);
if (rc == 1) {
_ssl_state = SSL_CONNECTED;
AddBIOBuffer(_ssl_session, fd, FLAGS_ssl_bio_buffer_size);
return 0;
}
int ssl_error = SSL_get_error(_ssl_session, rc);
switch (ssl_error) {
case SSL_ERROR_WANT_READ:
#if defined(OS_LINUX)
if (bthread_fd_wait(fd, EPOLLIN) != 0) {
#elif defined(OS_MACOSX)
if (bthread_fd_wait(fd, EVFILT_READ) != 0) {
#endif
return -1;
}
case SSL_ERROR_WANT_WRITE:
#if defined(OS_LINUX)
if (bthread_fd_wait(fd, EPOLLOUT) != 0) {
#elif defined(OS_MACOSX)
if (bthread_fd_wait(fd, EVFILT_WRITE) != 0) {
#endif
return -1;
}
循环直到SSL握手完成。对于SSL_ERROR_WANT_READ/WRITE,使用bthread_fd_wait作为轮询机制而不是EventDispatcher
,EventDispatcher
可能会混淆原始事件处理代码。
在prev rev中,SocketPool可以被分片到多个subsocketpool中,以减少线程争用。分片键与pthread-id混合,以便更好地保持数据局部性。
inline void SocketPool::ReturnSocket(Socket* sock) {
const int connection_pool_size = FLAGS_max_connection_pool_size;
if (_numfree.fetch_add(1, butil::memory_order_relaxed) <
connection_pool_size) {
const SocketId sid = sock->id();
BAIDU_SCOPED_LOCK(_mutex);
_pool.push_back(sid);
} else {
_numfree.fetch_sub(1, butil::memory_order_relaxed);
sock->SetFailed(EUNUSED, "Close unused pooled socket");
}
_numinflight.fetch_sub(1, butil::memory_order_relaxed);
}
保存可以在任何时候重新加载的gflag。检查池是否已满,满了就取消添加并关闭池中的Socket。
int Socket::ReturnToPool() {
SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire);
if (sp == NULL) {
LOG(ERROR) << "_shared_part is NULL";
SetFailed(EINVAL, "_shared_part is NULL");
return -1;
}
SocketPool* pool = sp->socket_pool.load(butil::memory_order_consume);
if (pool == NULL) {
LOG(ERROR) << "_shared_part->socket_pool is NULL";
SetFailed(EINVAL, "_shared_part->socket_pool is NULL");
sp->RemoveRefManually();
return -1;
}
_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
_controller_released_socket.store(false, butil::memory_order_relaxed);
pool->ReturnSocket(this);
sp->RemoveRefManually();
return 0;
}
(编写不易,转载请注明:http://shihlei.iteye.com/blog/2431224) 一 概述有段时间没有更新SpringCloud的文章了,目前 SpringBoot 版本 2.0.5.RELEASE,SpringCloud 的版本:Finchley SR1 ,本文继续之前的规划,介绍下 SpringCloud集成 Hystrix 断路器功能。 Hystr...
准备第一步,新建一个“控制台”,代码如下:using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace Atttibute{ class Program { sta...
1、什么是OMF?Oracle managed file的缩写,简单的理解,就是oracle自己管理自己的文件,可以是dbf,redolog 等等,具体可以参考官方文档Adiministrator中的17.Using Oracle Managed Files2、如何查看当前系统是否使用了OMF?show parameter db_create; 参数的值是空的,说明我们没有使用OMF。3、开启OM...
DBSCAN 的聚类类簇数k是自适应的。 太忙了没工夫写文字了。from sklearn import datasetsimport numpy as npimport matplotlib.pyplot as pltfrom sklearn.cluster import DBSCANX1, y1 = datasets.make_circles(n_samples=5000, fa...
本文用于记录学习和日常中使用过的shell脚本【脚本1】打印形状打印等腰三角形、直角三角形、倒直角三角形、菱形#!/bin/bash等腰三角形read -p "Please input the length: " nfor i in seq 1 $ndofor ((j=$n;j>i;j–))doecho -n " "donefor m in seq 1 $idoe...
spring-security-oauth2单点登录流程图 使用支付宝扫描上方二维码领取红包
Java向mysql数据库插入datetime类型数据实例参考URL: http://www.125jz.com/1859.html
HttpClientFactory 结合 Polly 轻松实现重试机制Intro我们的服务里有一个 API 会去调用第三方的接口,设置了超时时间,最近偶尔会发生超时的情况,微软在提供 H...
此系列属于胡寿松《自动控制原理题海与考研指导》(第三版)习题精选,仅包含部分经典习题,需要完整版习题答案请自行查找,本系列属于知识点巩固部分,可用于期末考试和考研复习。
Step1 Problem: 给你 n 个点,m 条流量下界是 low ,上界是 up 的单向边。问你能否满足每时每刻每条边的流量都在 [low, up] 范围内,如果不满足输出 NO, 满足输出 YES 同时输出每条边的流量。Step2 Ideas: 上下界网络流和平常的网络流不同在于多出了两个点:超级源点 S, 超级汇点 T. 超级源点 S:每条边下界流量都由 S ...
以下五种解法可以寻找到数组中的最大值和最小值;1)问题分解法。 把本题看做两个独立的问题,而非一个问题,所以,每次分别找出最小值和最大值即可,此时,一共需要遍历两次数组,比较次数为2N次;(N表示数组的长度) 2)取单元素法。 维持两个变量min和max,min标记为最小值,max标记为最大值,每次取出一个元素,先与已找到的最小值比较,再与已找到的最大值比较,此种方法只