SPDK Thread 模型设计与实现_spdk c++-程序员宅基地

技术标签: spring  SPDK  c++无锁队列  C++后端开发  网络编程  C++Linux后端  Thread  

Reactor – 单个CPU Core抽象,主要包含了:

  • Lcore对应的CPU Core id
  • Threads在该核心下的线程
  • Events 这是一个spdk ring,用于事件传递接收

Thread – 线程,但它是spdk抽象出来的线程,主要包含了:

  • io_channels资源的抽象,可以是bdev,也可以是具体的tgt
  • tailq 线程队列,用于连接下一个线程
  • name 线程的名称
  • Stats 用于计时统计闲置和忙时时间的
  • active_pollers 轮询使用的poller,非定时
  • timer_pollers 定时的poller
  • messages 这是一个spdk ring,用于消息传递接收
  • msg_cache 事件的缓存

1.1 Reactor

对象g_reactor_state有五个状态对应了应用中reactors运行运行状态,

enum spdk_reactor_state {
 SPDK_REACTOR_STATE_INVALID = 0,
 SPDK_REACTOR_STATE_INITIALIZED = 1,
 SPDK_REACTOR_STATE_RUNNING = 2,
 SPDK_REACTOR_STATE_EXITING = 3,
 SPDK_REACTOR_STATE_SHUTDOWN = 4,
};

本文福利, 免费领取C++学习资料包、技术视频/代码,1000道大厂面试题,内容包括(C++基础,网络编程,数据库,中间件,后端开发,音视频开发,Qt开发)↓↓↓↓↓↓见下面↓↓文章底部点击免费领取↓↓

初始情况下是:

SPDK_REACTOR_STATE_INVALID状态,在spdk app(任意一个target,比如nvmf_tgt)启动时,即调用了spdk_app_start方法,会调用spdk_reactors_init,在这个方法中将会初始化所有需要被初始化的reactors(可以在配置文件中指定需要使用的Core,CPU Core 和reactor是一对一的)。并且会将g_reactor_state设置为SPDK_REACTOR_STATE_INITIALIZED。具体代码如下:

Int spdk_reactors_init(void)
{
        // 初始化所有的event mempool
        g_spdk_event_mempool = spdk_mempool_create(…);
        // 为g_reactors分配内存,g_reactors是一个数组,管理了所有的reactors
posix_memalign((void **)&g_reactors, 64,  (last_core + 1) * sizeof(struct spdk_reactor));
// 这里设置了reactor创建线程的方法,之后需要初始化线程的时候将会调用该方法
spdk_thread_lib_init(spdk_reactor_schedule_thread, sizeof(struct spdk_lw_thread));
// 对于每一个启动的reactor,将会初始化它们
// 初始化reactor过程,即为绑定lcore,初始化spdk ring、threads,对rusage无操作
SPDK_ENV_FOREACH_CORE(i) {
reactor = spdk_reactor_get(i);
spdk_reactor_construct(reactor, i);
}
               // 设置好状态返回
g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
return 0;
}

在进入SPDK_REACTOR_STATE_INITIALIZED状态且spdk_app_start在创建了自己的线程并绑定到了reactors后,会调用spdk_reactors_start方法并将g_reactor_state设置为SPDK_REACTOR_STATE_RUNNING状态并会创建所有reactor的线程且轮询。

Void spdk_reactors_start(void) {
SPDK_ENV_FOREACH_CORE(i) {
                       if (i != current_core) { // 在非master reactor中
                                      reactor = spdk_reactor_get(i); // 得到相应的reactor
                                      // 设置好线程创建后的一个消息,该消息为轮询函数
                                      rc = spdk_env_thread_launch_pinned(reactor->lcore, _spdk_reactor_run, reactor);
                                      // reactor创建好线程并且会自动执行第一个消息
                                     spdk_thread_create(thread_name, tmp_cpumask);
}
}
// 当前CPU core得到reactor,并且开始轮询
               reactor = spdk_reactor_get(current_core);
               _spdk_reactor_run(reactor);
}

之前提到spdk_reactors_init方法中调用了spdk_thread_lib_init方法传入了创建thread的spdk_reactor_schedule_thread方法,在调用spdk_thread_create会回调该方法。这个方法它主要的功能就是告诉这个新创建的线程绑定创建该线程的reactor。

spdk_reactor_schedule_thread(struct spdk_thread *thread)
{
               // 得到该线程设置的cpu mask
               cpumask = spdk_thread_get_cpumask(thread);
               for (i = 0; i < spdk_env_get_core_count(); i++) {
                              …. // 遍历cpu core
                              // 通过cpu mask找到对应的核心,并产生event
                              if (spdk_cpuset_get_cpu(cpumask, core)) {
                                             evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
                                             break;
                              }
               }
               // 传递该event,即对应的reatcor会调用_schedule_thread方法,
spdk_event_call(evt);
}
_schedule_thread(void *arg1, void *arg2)
{
               struct spdk_lw_thread *lw_thread = arg1;
               struct spdk_reactor *reactor;
               // 消息传递到对应的reactor后将该thread加入到reactor中
reactor = spdk_reactor_get(spdk_env_get_current_core());
               TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
}
在SPDK_REACTOR_STATE_RUNNING后,此时所有reactor就进入了轮询状态。_spdk_reactor_run函数为线程提供了轮询方法:
static int _spdk_reactor_run(void *arg) {
        while (1) {
                       // 处理reactor上的event消息,消息会在之后讲到
                       _spdk_event_queue_run_batch(reactor);
                       // 每一个reactor上注册的thread进行遍历并且处理poller事件
                       TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
                                      rc = spdk_thread_poll(thread, 0, now);
                       }
                       // 检查reactor的状态
                       if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
                                      break;
                       }
}
}

而当spdk app被调用spdk_app_stop方法后将会相应的通知每一个reactor调用spdk_reactors_stop方法,将g_reactor_state赋值为SPDK_REACTOR_STATE_EXITING,即开始退出了。回到_spdk_reactor_run函数中,轮询将会被跳出,并且执行销毁线程的代码。

static int _spdk_reactor_run(void *arg) {
        …..  // 轮询
        TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
                       thread = spdk_thread_get_from_ctx(lw_thread);
                       TAILQ_REMOVE(&reactor->threads, lw_thread, link);
                       spdk_set_thread(thread);
                       spdk_thread_exit(thread);
                       spdk_thread_destroy(thread);
        }
}

在这之后,主线程的_spdk_reactor_run会返回到spdk_reactors_start中,并将g_reactor_state赋值为SPDK_REACTOR_STATE_SHUTDOWN,返回到spdk_app_start中等待应用退出。

最后,总结一下reactors和CPU core以及spdk thread关系应该如图1所示

图1 CPU cores、reactors和thread关系图

Reactor生命周期流程图则如图2所示

图2 reactor生命周期流程图

1.2 thread

当Reactors进行轮询时,除了处理自己的事件消息之外,还会调用注册在该reactor下面的每一个线程进行轮询。不过通常一个reactor只有一个thread,在spdk应用中,更多的是注册多个poller而不是注册多个thread。具体的轮询方法为:

Int spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) {
        // 首先先处理ring传递过来的消息
        msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
        // 调用非定时poller中的方法
TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
                                                        active_pollers_head, tailq, tmp) {
                       // 调用poller注册的方法之前,会对poller状态检测且转换
if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
                                      free(poller);
                                      continue;
                       }
                       poller->state = SPDK_POLLER_STATE_RUNNING;
// 调用poller注册的方法
poller_rc = poller->fn(poller->arg);
// poller转换状态
poller->state = SPDK_POLLER_STATE_WAITING;
}
// 调用定时poller中的方法
TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) {
               // 类似非定时poller过程,不过会检查是否到了预定的时间
               if (now < poller->next_run_tick)  break;
}
// 最后统计时间
}

Io_device 和 io_channel在thread中也是非常重要的概念。它们的实现都在thread.c中,io_device是设备的抽象,io_channel是对该设备通道的抽象。一个线程可以创建多个io_channel . io_channel只能和一个io_device绑定,并且这个io_channel是别的线程使用不了的。

图 3 io_device、io_channel和线程关系图


Io_device结构

struct io_device {
        void                                                    *io_device; // 抽象的device指针
        char                                                    name[SPDK_MAX_DEVICE_NAME_LEN + 1]; // 名字
        spdk_io_channel_create_cb         create_cb; // io_channel创建的回调函数
        spdk_io_channel_destroy_cb       destroy_cb; // io_channel销毁的回调函数
        spdk_io_device_unregister_cb     unregister_cb; // io_device解绑的回调函数
        struct spdk_thread                          *unregister_thread; // 不使用该device线程
        uint32_t                                             ctx_size;              // ctx的大小,将会传给io_channel处理
        uint32_t                                             for_each_count;  // io_channel的数量
        TAILQ_ENTRY(io_device)                              tailq;    // device队列头
        uint32_t                                             refcnt;                  // 计数器
        bool                                                    unregistered; // 是否该device被注册
};

可以看到,io_device实际上只提供了一些自身io_device的操作和io_channel相关的方法,具体的io_device实体其实是那个名字叫io_device的void指针。因为thread中的io_device只提供了thread这一层接口,具体的io操作每一个设备很难被抽象出来,所以这一层的接口只负责管理io_channel的创建、销毁和绑定等。

Io_channel的结构
struct spdk_io_channel {
        struct spdk_thread                          *thread; // 绑定的线程
        struct io_device                *dev;            // 绑定的io_device
        uint32_t                                             ref; // io_channel引用计数
        uint32_t                                             destroy_ref;  // destroy前被引用的次数
        TAILQ_ENTRY(spdk_io_channel)  tailq; // io_channel 队列头
        spdk_io_channel_destroy_cb       destroy_cb;         // io_channel销毁的回调函数
};

虽然io_channel看起来是很简单的结构体,实际上在创建一个io_device的时候,会要求使用者传入一个io_channel_ctx的大小作为调用的参数,而在给io_channel分配内存的时候,除了分配本身io_channel结构体的大小外,还会额外分配一个io_channel_ctx的大小,这个context可以理解成一个void指针,当用户在使用io_channel的时候,实际上还是通过context的部分去访问io_device。

NVMe-oF实例

nvmf_tgt 是spdk中一个重要的模块,这里详细的写一下它作为一个target实例是如何使用thread、io_device以及io_channel的。

在spdk应用刚启动的时候,reactor模块就会自动加载起来,然后在加载nvmf subsystem的时候,会调用spdk_nvmf_subsystem_init(lib/event/subsystems/nvmf/nvmf_tgt.c)方法,nvmf_tgt其实也是有生命周期,并且有一个状态机去管理它的生命周期。

enum nvmf_tgt_state {
        NVMF_TGT_INIT_NONE = 0, // 最初的状态
        NVMF_TGT_INIT_PARSE_CONFIG, // 解析配置文件
        NVMF_TGT_INIT_CREATE_POLL_GROUPS, // 创建poll groups
        NVMF_TGT_INIT_START_SUBSYSTEMS, // 启动subsystem
        NVMF_TGT_INIT_START_ACCEPTOR,      // 开始接收
        NVMF_TGT_RUNNING,                                // running
        NVMF_TGT_FINI_STOP_SUBSYSTEMS,
        NVMF_TGT_FINI_DESTROY_POLL_GROUPS,
        NVMF_TGT_FINI_STOP_ACCEPTOR,
        NVMF_TGT_FINI_FREE_RESOURCES,
        NVMF_TGT_STOPPED,
        NVMF_TGT_ERROR,
};

首先在NVMF_TGT_INIT_PARSE_CONFIG状态中,nvmf_tgt会去解析启动时传入的配置文件,当解析了[nvmf]这个label后,会调用spdk_nvmf_tgt_create这个方法,这个方法将初始化了全局的g_nvmf_tgt变量,同时也将tgt注册成了一个io_device。

1 spdk_io_device_register(tgt,
2                                                      spdk_nvmf_tgt_create_poll_group,
3                                                      spdk_nvmf_tgt_destroy_poll_group,
4                                                      sizeof(struct spdk_nvmf_poll_group),
5                                                      "nvmf_tgt");

spdk_nvmf_tgt_create_poll_group和spdk_nvmf_tgt_destroy_poll_group是io_channel创建和销毁的回调方法(在spdk_get_io_channel时调用 create_cb)。第三个参数是io_channel_ctx的size,既然这里传入了spdk_nvmf_poll_group的大小,那么很明显说明在nvmf中io_channel_ctx对象就是spdk_nvmf_poll_group。

当config文件解析完了之后,nvmf_tgt状态到了NVMF_TGT_INIT_CREATE_POLL_GROUPS,这个状态下会为每一个线程都创建相应的poll group。

spdk_for_each_thread(nvmf_tgt_create_poll_group,
                                                                         NULL,
                                                                         nvmf_tgt_create_poll_group_done);
static void nvmf_tgt_create_poll_group(void *ctx)
{
        struct nvmf_tgt_poll_group *pg;
               ….
        pg->thread = spdk_get_thread();
        pg->group = spdk_nvmf_poll_group_create(g_spdk_nvmf_tgt);
        ….
}

再看spdk_nvmf_poll_group_create中,

struct spdk_nvmf_poll_group * spdk_nvmf_poll_group_create(struct spdk_nvmf_tgt *tgt)
{
        struct spdk_io_channel *ch;
        ch = spdk_get_io_channel(tgt);
        ….
        return spdk_io_channel_get_ctx(ch);
}

在spdk_get_io_channel中,会先去检查传入的io_device是不是已经注册好了的,如果已经注册了,将会创建一个新的io_channel返回,创建的过程会回调在注册io_device时注册的io_channel创建方法(即方法spdk_nvmf_tgt_create_poll_group)。

static int spdk_nvmf_tgt_create_poll_group(void *io_device, void *ctx_buf)
{
        ….. // 初始化transport 、nvmf subsystem等
// 注册一个poller
        group->poller = spdk_poller_register(spdk_nvmf_poll_group_poll, group, 0);
        group->thread = spdk_get_thread();
        return 0;
}

在spdk_nvmf_poll_group_poll中,因为spdk_nvmf_poll_group对象中有transport的poll group,所以它会调用对应的transport的poll_group_poll方法,比如rdma的poll_group_poll就会轮询rdma注册的poller处理每个在相应的qpair来的请求,进入rdma的状态机将请求处理好。

然后这个状态就结束了,之后再初始化好了nvmf subsystem相关的东西之后,到了状态NVMF_TGT_INIT_START_ACCEPTOR。在这个状态中,只注册了一个poller。

1 g_acceptor_poller = spdk_poller_register(acceptor_poll, g_spdk_nvmf_tgt,
2                                                                                g_spdk_nvmf_tgt_conf->acceptor_poll_rate);

这个poller调用的transport的方法,不断的监听是不是有新的fd连接进来,如果有就调用new_qpair的回调。

总结

spdk thread 模型是spdk无锁化的基础,在一个线程中,当分配一个任务后,一直会运行到任务结束为止,这确保了不需要进行线程之间的切换而带来额外的损耗。同时,高效的spdk ring提供了不同线程之间的消息传递,这就使得任务结束的结果可以高效的传递给别的处理线程。而io_device和io_channel的设计保证了资源的抽象访问以及独立的路径不去争抢资源池,并且块设备由于是对块进行操作的所以也十分适合抽象成io_device。正是因为以上几点才让spdk线程模型能够达到无锁化且为多个target提供了基础线程框架的支持。

本文福利, 免费领取C++学习资料包、技术视频/代码,1000道大厂面试题,内容包括(C++基础,网络编程,数据库,中间件,后端开发,音视频开发,Qt开发)↓↓↓↓↓↓见下面↓↓文章底部点击免费领取↓↓

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/m0_60259116/article/details/133905530

智能推荐

hive使用适用场景_大数据入门:Hive应用场景-程序员宅基地

文章浏览阅读5.8k次。在大数据的发展当中,大数据技术生态的组件,也在不断地拓展开来,而其中的Hive组件,作为Hadoop的数据仓库工具,可以实现对Hadoop集群当中的大规模数据进行相应的数据处理。今天我们的大数据入门分享,就主要来讲讲,Hive应用场景。关于Hive,首先需要明确的一点就是,Hive并非数据库,Hive所提供的数据存储、查询和分析功能,本质上来说,并非传统数据库所提供的存储、查询、分析功能。Hive..._hive应用场景

zblog采集-织梦全自动采集插件-织梦免费采集插件_zblog 网页采集插件-程序员宅基地

文章浏览阅读496次。Zblog是由Zblog开发团队开发的一款小巧而强大的基于Asp和PHP平台的开源程序,但是插件市场上的Zblog采集插件,没有一款能打的,要么就是没有SEO文章内容处理,要么就是功能单一。很少有适合SEO站长的Zblog采集。人们都知道Zblog采集接口都是对Zblog采集不熟悉的人做的,很多人采取模拟登陆的方法进行发布文章,也有很多人直接操作数据库发布文章,然而这些都或多或少的产生各种问题,发布速度慢、文章内容未经严格过滤,导致安全性问题、不能发Tag、不能自动创建分类等。但是使用Zblog采._zblog 网页采集插件

Flink学习四:提交Flink运行job_flink定时运行job-程序员宅基地

文章浏览阅读2.4k次,点赞2次,收藏2次。restUI页面提交1.1 添加上传jar包1.2 提交任务job1.3 查看提交的任务2. 命令行提交./flink-1.9.3/bin/flink run -c com.qu.wc.StreamWordCount -p 2 FlinkTutorial-1.0-SNAPSHOT.jar3. 命令行查看正在运行的job./flink-1.9.3/bin/flink list4. 命令行查看所有job./flink-1.9.3/bin/flink list --all._flink定时运行job

STM32-LED闪烁项目总结_嵌入式stm32闪烁led实验总结-程序员宅基地

文章浏览阅读1k次,点赞2次,收藏6次。这个项目是基于STM32的LED闪烁项目,主要目的是让学习者熟悉STM32的基本操作和编程方法。在这个项目中,我们将使用STM32作为控制器,通过对GPIO口的控制实现LED灯的闪烁。这个STM32 LED闪烁的项目是一个非常简单的入门项目,但它可以帮助学习者熟悉STM32的编程方法和GPIO口的使用。在这个项目中,我们通过对GPIO口的控制实现了LED灯的闪烁。LED闪烁是STM32入门课程的基础操作之一,它旨在教学生如何使用STM32开发板控制LED灯的闪烁。_嵌入式stm32闪烁led实验总结

Debezium安装部署和将服务托管到systemctl-程序员宅基地

文章浏览阅读63次。本文介绍了安装和部署Debezium的详细步骤,并演示了如何将Debezium服务托管到systemctl以进行方便的管理。本文将详细介绍如何安装和部署Debezium,并将其服务托管到systemctl。解压缩后,将得到一个名为"debezium"的目录,其中包含Debezium的二进制文件和其他必要的资源。注意替换"ExecStart"中的"/path/to/debezium"为实际的Debezium目录路径。接下来,需要下载Debezium的压缩包,并将其解压到所需的目录。

Android 控制屏幕唤醒常亮或熄灭_android实现拿起手机亮屏-程序员宅基地

文章浏览阅读4.4k次。需求:在诗词曲文项目中,诗词整篇朗读的时候,文章没有读完会因为屏幕熄灭停止朗读。要求:在文章没有朗读完毕之前屏幕常亮,读完以后屏幕常亮关闭;1.权限配置:设置电源管理的权限。

随便推点

目标检测简介-程序员宅基地

文章浏览阅读2.3k次。目标检测简介、评估标准、经典算法_目标检测

记SQL server安装后无法连接127.0.0.1解决方法_sqlserver 127 0 01 无法连接-程序员宅基地

文章浏览阅读6.3k次,点赞4次,收藏9次。实训时需要安装SQL server2008 R所以我上网上找了一个.exe 的安装包链接:https://pan.baidu.com/s/1_FkhB8XJy3Js_rFADhdtmA提取码:ztki注:解压后1.04G安装时Microsoft需下载.NET,更新安装后会自动安装如下:点击第一个傻瓜式安装,唯一注意的是在修改路径的时候如下不可修改:到安装实例的时候就可以修改啦数据..._sqlserver 127 0 01 无法连接

js 获取对象的所有key值,用来遍历_js 遍历对象的key-程序员宅基地

文章浏览阅读7.4k次。1. Object.keys(item); 获取到了key之后就可以遍历的时候直接使用这个进行遍历所有的key跟valuevar infoItem={ name:'xiaowu', age:'18',}//的出来的keys就是[name,age]var keys=Object.keys(infoItem);2. 通常用于以下实力中 <div *ngFor="let item of keys"> <div>{{item}}.._js 遍历对象的key

粒子群算法(PSO)求解路径规划_粒子群算法路径规划-程序员宅基地

文章浏览阅读2.2w次,点赞51次,收藏310次。粒子群算法求解路径规划路径规划问题描述    给定环境信息,如果该环境内有障碍物,寻求起始点到目标点的最短路径, 并且路径不能与障碍物相交,如图 1.1.1 所示。1.2 粒子群算法求解1.2.1 求解思路    粒子群优化算法(PSO),粒子群中的每一个粒子都代表一个问题的可能解, 通过粒子个体的简单行为,群体内的信息交互实现问题求解的智能性。    在路径规划中,我们将每一条路径规划为一个粒子,每个粒子群群有 n 个粒 子,即有 n 条路径,同时,每个粒子又有 m 个染色体,即中间过渡点的_粒子群算法路径规划

量化评价:稳健的业绩评价指标_rar 海龟-程序员宅基地

文章浏览阅读353次。所谓稳健的评估指标,是指在评估的过程中数据的轻微变化并不会显著的影响一个统计指标。而不稳健的评估指标则相反,在对交易系统进行回测时,参数值的轻微变化会带来不稳健指标的大幅变化。对于不稳健的评估指标,任何对数据有影响的因素都会对测试结果产生过大的影响,这很容易导致数据过拟合。_rar 海龟

IAP在ARM Cortex-M3微控制器实现原理_value line devices connectivity line devices-程序员宅基地

文章浏览阅读607次,点赞2次,收藏7次。–基于STM32F103ZET6的UART通讯实现一、什么是IAP,为什么要IAPIAP即为In Application Programming(在应用中编程),一般情况下,以STM32F10x系列芯片为主控制器的设备在出厂时就已经使用J-Link仿真器将应用代码烧录了,如果在设备使用过程中需要进行应用代码的更换、升级等操作的话,则可能需要将设备返回原厂并拆解出来再使用J-Link重新烧录代码,这就增加了很多不必要的麻烦。站在用户的角度来说,就是能让用户自己来更换设备里边的代码程序而厂家这边只需要提供给_value line devices connectivity line devices