[email protected] BRPC源码分析(七) SOCKET_Lollo_HA的博客-程序员宝宝

技术标签: cpp  

[email protected] BRPC源码分析(七) SOCKET

Socket.cpp代码分析

int Socket::HandleEpollOut(SocketId id) {
    
    SocketUniquePtr s;
    if (Socket::AddressFailedAsWell(id, &s) < 0) {
    
        // Ignore recycled sockets
        return -1;
    }

    EpollOutRequest* req = dynamic_cast<EpollOutRequest*>(s->user());
    if (req != NULL) {
    
        return s->HandleEpollOutRequest(0, req);
    }
    
    s->_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
    bthread::butex_wake_except(s->_epollout_butex, 0);  
    return 0;
}

Socket在被添加到epoll之前可能已经被’ SetFailed’了,这些Socket在’ SetFailed’里面没有信号,因此使用’ AddressFailedAsWell’来发送信号,以防止死锁。


int Socket::HandleEpollOutRequest(int error_code, EpollOutRequest* req) {
    
    if (SetFailed() != 0) {
    
        return -1;
    }
    GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd, false);
    return req->on_epollout_event(req->fd, error_code, req->data);
}
  • 有且只有一个线程成功地调用’ SetFailed’ 的 ’ Socket’。
  • 当引用值为0时,销毁req。
  • 调用用户的callback。
  • 在EpollOutRequest的析构函数中,将计时器删除。

int Socket::KeepWriteIfConnected(int fd, int err, void* data) {
    
    WriteRequest* req = static_cast<WriteRequest*>(data);
    Socket* s = req->socket;
    if (err == 0 && s->ssl_state() == SSL_CONNECTING) {
    
        bthread_t th;
        google::protobuf::Closure* thrd_func = brpc::NewCallback(
            Socket::CheckConnectedAndKeepWrite, fd, err, data);
        if ((err = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
                                            RunClosure, thrd_func)) == 0) {
    
            return 0;
        } else {
    
            PLOG(ERROR) << "Fail to start bthread";
        }
    }
    CheckConnectedAndKeepWrite(fd, err, data);
    return 0;
}
  • 在新的bthread中运行ssl connect以避免阻塞当前bthread(并且阻塞EventDispatcher)
  • 发生非零’ err’输出"Fail to start bthread"
  • 检查连接状况

	req->data.swap(*data);
    req->next = WriteRequest::UNCONNECTED;
    req->id_wait = opt.id_wait;
    req->set_pipelined_count_and_user_message(
        opt.pipelined_count, DUMMY_USER_MESSAGE, opt.with_auth);
    return StartWrite(req, opt);

将’ req->next’设置为UNCONNECTED,使得KeepWrite线程将一直等待,直到它指向一个有效的WriteRequest或NULL。


int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
    
    WriteRequest* const prev_head =
        _write_head.exchange(req, butil::memory_order_release);
    if (prev_head != NULL) {
    
        req->next = prev_head;
        return 0;
    }

    int saved_errno = 0;
    bthread_t th;
    SocketUniquePtr ptr_for_keep_write;
    ssize_t nw = 0;

    req->next = NULL;
    
    int ret = ConnectIfNot(opt.abstime, req);
    if (ret < 0) {
    
        saved_errno = errno;
        SetFailed(errno, "Fail to connect %s directly: %m", description().c_str());
        goto FAIL_TO_WRITE;
    } else if (ret == 1) {
    
        return 0;
    }

    req->Setup(this);
    
    if (ssl_state() != SSL_OFF) {
    
        goto KEEPWRITE_IN_BACKGROUND;
    }
   
    if (_conn) {
    
        butil::IOBuf* data_arr[1] = {
     &req->data };
        nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
    } else {
    
        nw = req->data.cut_into_file_descriptor(fd());
    }
    if (nw < 0) {
    
        if (errno != EAGAIN && errno != EOVERCROWDED) {
    
            saved_errno = errno;
            PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
            SetFailed(saved_errno, "Fail to write into %s: %s", 
                      description().c_str(), berror(saved_errno));
            goto FAIL_TO_WRITE;
        }
    } else {
    
        AddOutputBytes(nw);
    }
    if (IsWriteComplete(req, true, NULL)) {
    
        ReturnSuccessfulWriteRequest(req);
        return 0;
    }

KEEPWRITE_IN_BACKGROUND:
    ReAddress(&ptr_for_keep_write);
    req->socket = ptr_for_keep_write.release();
    if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
                                 KeepWrite, req) != 0) {
    
        LOG(FATAL) << "Fail to start KeepWrite";
        KeepWrite(req);
    }
    return 0;

FAIL_TO_WRITE:
    ReleaseAllFailedWriteRequests(req);
    errno = saved_errno;
    return -1;
}
  • 释放fence确保得到请求的线程获取*req
  • 当对fd进行数据写入时。KeepWrite线程可能会一直请求Spin协议,直到旁边的req->next变为非未连接状态。这个过程不是无锁。
  • 设置写入的权限。
  • 测试是否连接到remote_side()
  • 进行连接并且CallbackKeepWriteIfConnectedKeepWriteIfConnected之后将会被’ req’调用
  • Setup()在Connect之后调用,而Connect可能会调用app_connect
  • 写入SSL可能会阻塞当前bthread,SSL在后台写入。
  • 在调用线程中写入一次。如果写入没有完成,在KeepWrite线程中继续。
  • 如果nw < 0,RTMP可能返回EOVERCROWDED
  • EPIPE在池连接并且请求备份。
  • ReturnFailedWriteRequest(ReturnFailedWriteRequest调用的on_reset函数并且在id对象中callback)之前的’ SetFailed’,这样可以立即知道这个Socket在on_reset函数中Callback失败。

 while (true) {
    
        int rc = SSL_do_handshake(_ssl_session);
        if (rc == 1) {
    
            _ssl_state = SSL_CONNECTED;
            AddBIOBuffer(_ssl_session, fd, FLAGS_ssl_bio_buffer_size);
            return 0;
        }

        int ssl_error = SSL_get_error(_ssl_session, rc);
        switch (ssl_error) {
    
        case SSL_ERROR_WANT_READ:
#if defined(OS_LINUX)
            if (bthread_fd_wait(fd, EPOLLIN) != 0) {
    
#elif defined(OS_MACOSX)
            if (bthread_fd_wait(fd, EVFILT_READ) != 0) {
    
#endif
                return -1;
            }

        case SSL_ERROR_WANT_WRITE:
#if defined(OS_LINUX)
            if (bthread_fd_wait(fd, EPOLLOUT) != 0) {
    
#elif defined(OS_MACOSX)
            if (bthread_fd_wait(fd, EVFILT_WRITE) != 0) {
    
#endif
                return -1;
            }

循环直到SSL握手完成。对于SSL_ERROR_WANT_READ/WRITE,使用bthread_fd_wait作为轮询机制而不是EventDispatcherEventDispatcher可能会混淆原始事件处理代码。

SocketPool(池)

在prev rev中,SocketPool可以被分片到多个subsocketpool中,以减少线程争用。分片键与pthread-id混合,以便更好地保持数据局部性。

inline void SocketPool::ReturnSocket(Socket* sock) {
    
    const int connection_pool_size = FLAGS_max_connection_pool_size;

    if (_numfree.fetch_add(1, butil::memory_order_relaxed) <
        connection_pool_size) {
    
        const SocketId sid = sock->id();
        BAIDU_SCOPED_LOCK(_mutex);
        _pool.push_back(sid);
    } else {
    
        _numfree.fetch_sub(1, butil::memory_order_relaxed);
        sock->SetFailed(EUNUSED, "Close unused pooled socket");
    }
    _numinflight.fetch_sub(1, butil::memory_order_relaxed);
}

保存可以在任何时候重新加载的gflag。检查池是否已满,满了就取消添加并关闭池中的Socket。


int Socket::ReturnToPool() {
    
    SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire);
    if (sp == NULL) {
    
        LOG(ERROR) << "_shared_part is NULL";
        SetFailed(EINVAL, "_shared_part is NULL");
        return -1;
    }
    SocketPool* pool = sp->socket_pool.load(butil::memory_order_consume);
    if (pool == NULL) {
    
        LOG(ERROR) << "_shared_part->socket_pool is NULL";
        SetFailed(EINVAL, "_shared_part->socket_pool is NULL");
        sp->RemoveRefManually();
        return -1;
    }
    _connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
    _controller_released_socket.store(false, butil::memory_order_relaxed);
    pool->ReturnSocket(this);
    sp->RemoveRefManually();
    return 0;
}
  • 返回sp和pool为NULL时的错误。
  • sp和pool在返回池之前被重置。
  • sp在返回pool后被释放,因为sp拥有sp独立的pool。
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Lollo_HA/article/details/121244502

智能推荐

SpringCloud(四): 断路器—Hystrix_shihlei的博客-程序员宝宝

(编写不易,转载请注明:http://shihlei.iteye.com/blog/2431224) 一 概述有段时间没有更新SpringCloud的文章了,目前 SpringBoot 版本 2.0.5.RELEASE,SpringCloud 的版本:Finchley SR1 ,本文继续之前的规划,介绍下 SpringCloud集成 Hystrix 断路器功能。 Hystr...

c# .net 特性Attribute_cplvfx的博客-程序员宝宝

准备第一步,新建一个“控制台”,代码如下:using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace Atttibute{ class Program { sta...

oracle+omf+格式,Oracle OMF管理数据文件_杨明月luna的博客-程序员宝宝

1、什么是OMF?Oracle managed file的缩写,简单的理解,就是oracle自己管理自己的文件,可以是dbf,redolog 等等,具体可以参考官方文档Adiministrator中的17.Using Oracle Managed Files2、如何查看当前系统是否使用了OMF?show parameter db_create; 参数的值是空的,说明我们没有使用OMF。3、开启OM...

Python:密度聚类DBSCAN,使用了sklearn.cluster._DeniuHe的博客-程序员宝宝

DBSCAN 的聚类类簇数k是自适应的。 太忙了没工夫写文字了。from sklearn import datasetsimport numpy as npimport matplotlib.pyplot as pltfrom sklearn.cluster import DBSCANX1, y1 = datasets.make_circles(n_samples=5000, fa...

100多个shell脚本的例子_double_happy111的博客-程序员宝宝

本文用于记录学习和日常中使用过的shell脚本【脚本1】打印形状打印等腰三角形、直角三角形、倒直角三角形、菱形#!/bin/bash等腰三角形read -p "Please input the length: " nfor i in seq 1 $ndofor ((j=$n;j&gt;i;j–))doecho -n " "donefor m in seq 1 $idoe...

[email protected]实现客户端单点登录_大大头_1991的博客-程序员宝宝

spring-security-oauth2单点登录流程图  使用支付宝扫描上方二维码领取红包

随便推点

HttpClientFactory 结合 Polly 轻松实现重试机制_dotNET跨平台的博客-程序员宝宝

HttpClientFactory 结合 Polly 轻松实现重试机制Intro我们的服务里有一个 API 会去调用第三方的接口,设置了超时时间,最近偶尔会发生超时的情况,微软在提供 H...

Chapter2.1:控制系统的数学模型_FUXI_Willard的博客-程序员宝宝

此系列属于胡寿松《自动控制原理题海与考研指导》(第三版)习题精选,仅包含部分经典习题,需要完整版习题答案请自行查找,本系列属于知识点巩固部分,可用于期末考试和考研复习。

【无源汇有上下界可行流】ZOJ - 2314 Reactor Cooling_笑对这个世界的志贵的博客-程序员宝宝_无源汇上下界可行流

Step1 Problem: 给你 n 个点,m 条流量下界是 low ,上界是 up 的单向边。问你能否满足每时每刻每条边的流量都在 [low, up] 范围内,如果不满足输出 NO, 满足输出 YES 同时输出每条边的流量。Step2 Ideas: 上下界网络流和平常的网络流不同在于多出了两个点:超级源点 S, 超级汇点 T. 超级源点 S:每条边下界流量都由 S ...

如何寻找数组中的最大值和最小值_JohnLee_chun的博客-程序员宝宝_数组找最大值和最小值

以下五种解法可以寻找到数组中的最大值和最小值;1)问题分解法。    把本题看做两个独立的问题,而非一个问题,所以,每次分别找出最小值和最大值即可,此时,一共需要遍历两次数组,比较次数为2N次;(N表示数组的长度) 2)取单元素法。    维持两个变量min和max,min标记为最小值,max标记为最大值,每次取出一个元素,先与已找到的最小值比较,再与已找到的最大值比较,此种方法只