Thread Pool Engine, and Work-Stealing scheduling algorithm-程序员宅基地

http://pages.videotron.com/aminer/threadpool.htm

http://pages.videotron.com/aminer/zip/threadpool.zip  FPC Pascal v2.2.0+ / Delphi 5+

http://pages.videotron.com/aminer/zip/pthreadpool_xe4.zip (for Delphi XE to XE4)

http://pages.videotron.com/aminer/zip/pthreadpool.zip (for FreePascal and Lazarus and Delphi 7 to 2010)

Thread Pool Engine.

Please read an article that i wrote about my Threadpool engine: article.

The following have been added:

  • Lockfree ParallelQueue for less contention and more efficiency or
    it can use lockfree_mpmc - flqueue that i have modified, enhanced and improved... -
  • A lock-free queue for each worker thread and it uses work-stealing - for more efficiency -
  • Enters in a wait state when there is no job in the queue - for more efficiency -
  • You can distribute your jobs to the workers threads and call any method with the threadpool's execute() method.


Look into defines.inc there is many options:


Lockfree_MPMC: it uses lockfree_MPMC 
RingBuffer: it uses lock-free RingBuffer
SINGLE_PRODUCER
: for a single producer (thread)
MUTIPLE_PRODUCER: mutiple producer (threads) 
CPU32: for 32 bits architecture

Required FPC switches: -O3 -Sd -dFPC -dWin32 -dFreePascal

-Sd for delphi mode....

Required Delphi switches: -DMSWINDOWS -$H+

For Delphi 5,6,7 use -DDelphi

For Delphi 2005,2006,2007,2009,2010+ use the switch -DDELPHI2005+


Please look at the examples test.pas,testpool.pas and test_thread.pas...

Note: testpool.pas is a parallel program of a Matrix multiply by a vector that uses SSE+ and it requires Delphi 5+. test.pas and test_thread.pas works with both FreePascal and Delphi.

 

Threadpool with priorities

Thread Pool Engine.

The following have been added:

- You can give the following priorities to jobs:

LOW_PRIORITY
NORMAL_PRIORITY
HIGH_PRIORITY

- Uses a FIFO queue 
that satisfies many requirements:

it is FIFO fair, it minimizes efficiently the cache-coherence traffic and it is energy efficient on the pop():

when there is no items in the queue it will not spin-wait , but it will wait on a portable manual event object..

- Enters in a wait state when there is no job in the queue - for more efficiency -

- You can distribute your jobs to the workers threads and call any method with the threadpool's execute() method.

- Uses O(1) complexity on enqueue and O(3) worst case complexity  on dequeue.

Look into defines.inc there is many options:

CPU32: for 32 bits architecture
CPU64: for 64 bits architecture

Please read an article that i wrote about my Threadpool engine: article.

Look at test1.pas demo inside the zip file...

Required FPC switches: -O3 -Sd -dFPC -dWin32 -dFreePascal

-Sd for delphi mode....

Required Delphi switches: -DMSWINDOWS -$H+

For Delphi 5,6,7 use -DDelphi

For Delphi 2005,2006,2007,2009,2010+ use the switch -DDELPHI2005+

{$DEFINE CPU32} and {$DEFINE Win32} for 32 bit systems

{$DEFINE CPU64} and {$DEFINE Win64} for 64 bit systems

Note: testpool.pas is a parallel program of a Matrix multiply by a vector that uses SSE+ and it requires Delphi 5+.

test.pas and test_thread.pas works with both FreePascal and Delphi. 

 

Threadpool engine article 

On a multicore system, your goal is to spread the work efficiently among many cores so that it does executes simultaneously.

And performance gain should be directly related to how many cores you have.

So, a quad core system should be able to get the work done 4 times faster than a single core system.

A 16-core platform should be 4-times faster than a quad-core system, and 16-times faster than a single core...

That's where my Threadpool is usefull , it spreads the work efficiently among many cores.

Threadpool (and Threadpool with priority) consist of lock-free thread safe/concurrent enabled local FIFO queues of work items,

so when you call ThreadPool.execute() , your work item get queued in the local lock-free queues.

The worker threads pick them out in a First In First Out order (i.e., FIFO order), and execute them. .


The following have been added to Threadpool: 

  • Lock-free ParallelQueue for less contention and more efficiency or
    it can use lockfree_mpmc - flqueue that i have modified, enhanced and improved... -

See lock-free ParallelQueue: http://pages.videotron.com/aminer/parallelqueue/parallelqueue.htm

  •  It  uses a lock-free queue for each worker thread and it uses work-stealing - for more efficiency -
  • The worker threads enters in a wait state when there is no job in the lock-free queues - for more efficiency -
  • You can distribute your jobs to the worker threads and call any method with the threadpool's execute() method.

Work-Stealing scheduling algorithm:

Work-Stealing scheduling concepts:

  • Every worker thread have it’s own WSQ   (Work-Stealing Queue).
  • Any new task belong to worker thread will add in it’s own WSQ.
  • When the worker thread looking for new task he will follow this:
    • Looking for task into the local WSQ.
    • Then, looking and try to steal tasks from other workers.

But what is the Work-Stealing Queue? As I said every worker thread will have it’s own Work-Stealing Queue.

WSQ is data structure designed to be effective. WSQ data structure concepts:

  • It’s a lock-free FIFO queue.
  • It allows lock-free pushes, and pops from the private end.
  • It allows lock-free pushes, and pops from the public end.
  • Explains:
    • Private end: private for this worker thread.
    • Public end: public to any other thread,
      so any other thread can use the lock-free FIFO to steal a task as kind of load balancing.

Work-Stealing scheduling algorithm offer many feature over the ordinary scheduling algorithm:

  1. Effective:
    • Using local queues, this will minimize contention.
  2. Load Balancing:
    • Every thread can steal work from the other threads, so Work-Stealing provides implicitly Load Balancing.

 

My Threadpool allows load balancing, and also minimize contention.

 

Threadpool is very easy to use, let's look now at an example in Object Pascal...

 

program test;

uses

{
     $IFDEF Delphi}
  cmem,

{
     $ENDIF}
  ThreadPool, sysutils, syncobjs;

{
     $I defines.inc}

type

  TMyThread = class( TThreadPoolThread )

    // procedure ProcessRequest(obj: Pointer); override;

    procedure MyProc1( obj : Pointer );

    procedure MyProc2( obj : Pointer );

  end;

var

  myobj : TMyThread;

  TP : TThreadPool;

  obj : Pointer;

  cs : TCriticalSection;

procedure TMyThread.MyProc1( obj : Pointer );

begin

  cs.enter;

  writeln( 'This is MyProc1 with parameter: ', integer( obj ) );

  cs.leave;

end;

procedure TMyThread.MyProc2( obj : Pointer );

begin

  cs.enter;

  writeln( 'This is MyProc2 with parameter: ', integer( obj ) );

  cs.leave;

end;

begin

  myobj := TMyThread.create;

  cs := TCriticalSection.create;

  TP := TThreadPool.create( 4, TMyThread, 20 );
  // 4 workers threads and 2^20 items for each queue.

  obj := Pointer( 1 );

  TP.execute( myobj.MyProc1, Pointer( obj ) );

  obj := Pointer( 2 );

  TP.execute( myobj.MyProc2, Pointer( obj ) );

  readln;

  TP.Terminate;

  TP.Free;

end.

 Let us look at the first line...

uses

{$IFDEF Delphi}

cmem,

{$ENDIF}

ThreadPool,sysutils,syncobjs;

cmem is required for Delphi to use TBB memory manager (from Intel) ,

this will allow delphi memory manager to scale linearely...

Note: FPC doesn't need cmem, cause it scales linearely with threads...

ThreadPool: is our threadpool unit ..

syncobjs: contains all the sychronizations stuff like CriticalSections, Events etc..

After that we have the following lines:

type

TMyThread = class (TThreadPoolThread)

//procedure ProcessRequest(obj: Pointer); override;

procedure MyProc1(obj: Pointer);

procedure MyProc2(obj: Pointer);

end;

We declare a TMyThread that ineherit from TThreadPoolThread,

and we declare our two methods MyProc1 and MyProc2 that we want to be executed by our threadpool's worker threads.

Each method has an obj as a paramater.

In the main body we create a TMyThread object like this:

myobj:=TMyThread.create;

and after that we create a TThreadPool object with 4 workers threads

and lock-free FIFO queues and 2^20 items for each lock-free queue like this:

TP := TThreadPool.Create(4, TMyThread, 20); // 4 workers threads and 2^20 items for each queue.

After that we distribute to our worker threads the methods to be executed ,

we do it by calling the Threadpool's execute() method and

we pass it myobj.myproc1 and myobj.myproc2 with there parameters:.

TP.execute(myobj.myproc1,pointer(obj));

TP.execute(myobj.myproc2,pointer(obj));

As you see, Threadpool (and threadpool with priority) is very easy to use...

Let's look now at an example of a Threadpool with priority:.

 

program test;

uses

{
     $IFDEF Delphi}
  cmem,

{
     $ENDIF}
  PThreadPool, sysutils, syncobjs;

{
     $I defines.inc}

type

  TMyThread = class( TPThreadPoolThread )

    // procedure ProcessRequest(obj: Pointer); override;

    procedure MyProc1( obj : Pointer );

    procedure MyProc2( obj : Pointer );

  end;

var

  myobj : TMyThread;

  TP : TPThreadPool;

  obj : Pointer;

  cs : TCriticalSection;

procedure TMyThread.MyProc1( obj : Pointer );

begin

  cs.enter;

  writeln( 'This is MyProc1 with parameter: ', integer( obj ) );

  cs.leave;

end;

procedure TMyThread.MyProc2( obj : Pointer );

begin

  cs.enter;

  writeln( 'This is MyProc2 with parameter: ', integer( obj ) );

  cs.leave;

end;

begin

  myobj := TMyThread.create;

  cs := TCriticalSection.create;

  TP := TPThreadPool.create( 4, TMyThread, 20 );
  // 4 workers threads and 2^20 items for each queue.

  obj := Pointer( 1 );

  TP.execute( myobj.MyProc1, Pointer( obj ), NORMAL_PRIORITY );

  obj := Pointer( 2 );

  TP.execute( myobj.MyProc2, Pointer( obj ), NORMAL_PRIORITY );

  readln;

  TP.Terminate;

  TP.Free;

end.

 

 

 

 

As you have noticed, this is almost the same as threadpool..

You use PThreadPool - P for priority - rather than Threadpool

TPThreadPoolThread rather that TThreadPoolThread

TPThreadPool.Create rather than TThreadPool.Create

and as you have noticed in TP.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY) we are using priorities.

You can give the following priorities to jobs:

LOW_PRIORITY
NORMAL_PRIORITY
HIGH_PRIORITY

 

That's all.

You can download threadpool (and threadpool with priority) from:

http://pages.videotron.com/aminer/

 

Sincerely,
Amine Moulay Ramdane.

 

lock-free ParallelQueue

http://pages.videotron.com/aminer/parallelqueue/parallelqueue.htm

 

I have tested 3 lockfree fifo queue algorithms against my fifo queue ParallelQueue

that uses a hash based method and used 4 threads under contention and the results follows on the graphs bellow:

 

Lock-free flqueue at: http://www.emadar.com/fpc/lockfree.htm

 

Lock-free RingBuffer at: http://www.odsrv.com/RingBuffer/RingBuffer.htm

 

GpLockfreequeue at: http://17slon.com/gp/gp/gplockfreequeue.htm

 

and my ParallelQueue at: http://pages.videotron.com/aminer/

 

 

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_34235105/article/details/85661599

智能推荐

keep-alive vue不起作用的原因_vue3+vite的keep-alive浏览器返回不生效-程序员宅基地

文章浏览阅读2w次。keep-alive 从列表页进入到详情页,再回到列表页,然后页面又重新请求,而不是直接读缓存里面的(ps:之前keep-alive用在项目中是可以起作用的,但是不知道为什么这两天就出现了这样的问题。就连activated,deactivated这两个钩子函数也是没有触发到) 具体代码如下 我尝试在项目中新建一个很简单的页面,也是没有效果。一开始以为是vue的版本过低导致的,看了一下v..._vue3+vite的keep-alive浏览器返回不生效

十进制小数部分如何转化成二进制算法实现_小数进制转换编程-程序员宅基地

文章浏览阅读483次。十进制小数转化成二进制,就是不断地乘二,判断之后的这个数是否比1大,比1大则输出输出1,留下小数部分继续前面的操作。将3.75的小数部分转化为二进制。最后的输出结果是.11。_小数进制转换编程

免费OFD文件在线转PDF_在线生成一个 ofd文件-程序员宅基地

文章浏览阅读739次。ofd文件打不开?ofd怎么转换为pdf?本文将给告诉大家ofd是什么文件格式?ofd怎么打开?ofd怎么免费转换为配pdf文件等,以下是具体的方法:一,什么是OFD文件?OFD是我国电子公文交换和存储格式标准。OFD格式是我国自主可控的电子文件版式文档格式。OFD版式文件,版面固定、不跑版、所见即所得,可以视为计算机时代的“数字纸张”;是电子文档发布、数字化信息传播和存档的理想文档格式。OFD格式是当下对于全国产环境具有明显的优势。因此,在自主可控档案系统中,OFD格式无疑是自主可控档案系.._在线生成一个 ofd文件

14、HDFS 透明加密KMS_mapreduce读写sequencefile、mapfile、orcfile和parquetfil-程序员宅基地

文章浏览阅读3w次。HDFS中的数据会以block的形式保存在各台数据节点的本地磁盘中,但这些block都是明文的。通过Web UI页面找到Block的ID和副本位于的机器信息如果在操作系统中直接访问block所在的目录,通过Linux的cat命令是可以直接查看里面的内容的,且是明文。在datanode找到其文件为:HDFS透明加密(Transparent Encryption)支持端到端的透明加密,启用以后,对于一些需要加密的HDFS目录里的文件可以实现透明的加密和解密,而不需要修改用户的业务代码。_mapreduce读写sequencefile、mapfile、orcfile和parquetfile文件

Java SE 第三章 常用类 API_java se api常用类-程序员宅基地

文章浏览阅读92次。3.0 API概述https://www.oracle.com/cn/java/technologies/java-se-api-doc.html如何使用API看类的描述​ Random类是用于生成随机数的类看构造方法​ Random():无参构造方法 Random r = new Random();看成员方法​ public int nextInt(int n):产生的是一个[0,n)范围内的随机数调用方法: int number = r.nextInt(10_java se api常用类

3-16心电图多分类预测task01_机器学习在呼吸心跳信号检测中应用ti-程序员宅基地

文章浏览阅读285次。3-16心电图多分类预测task01一、赛题理解1.赛题理解1.数据概况1.评价指标二、baseline学习1.引入库2.读入数据3.数据预处理4.训练、测试数据准备5.模型训练总结提示:以下是本篇文章正文内容,下面案例可供参考一、赛题理解1.赛题理解以心电图心跳信号数据为背景,要求根据心电图感应数据预测心跳信号所属类别,其中心跳信号对应正常病例以及受不同心律不齐和心肌梗塞影响的病例,这是一个多分类的问题。1.数据概况以预测心电图心跳信号类别为任务,总数据量超过20万,主要为1列心跳信号序列数_机器学习在呼吸心跳信号检测中应用ti

随便推点

【JZ2440笔记】裸机实验使用SDRAM_京累里101123-程序员宅基地

文章浏览阅读393次。S3C2440A 存储器控制器为访问外部存储的需要器提供了存储器控制信号。S3C2440A 包含以下特性:–大/小端(通过软件选择)–地址空间:每个 Bank 有 128M 字节(总共 1G/8 个 Bank)–大/小端(通过软件选择)–除了 BANK0(16/32 位)之外,其它全部 BANK 都可编程访问宽度(8/16/32 位)–总共 8 个存储器 Bank6 个存储器 Bank 为 ROM,SRAM 等其余 2 个存储器 Bank 为 ROM,SRAM,SDRAM 等–7 个固定的_京累里101123

工具系列:TensorFlow决策森林_(3)使用dtreeviz可视化-程序员宅基地

文章浏览阅读1.2k次,点赞19次,收藏19次。之前的教程演示了如何使用TensorFlow的决策森林(随机森林、梯度提升树和CART)分类器和回归器来准备数据、训练和评估。(我们将TensorFlow决策森林缩写为TF-DF。)您还学会了如何使用内置的函数可视化树,并显示特征重要性度量。本教程的目标是通过可视化更深入地解释分类器和回归器决策树。我们将查看详细的树结构图示,以及决策树如何划分特征空间以做出决策的描绘。树结构图帮助我们理解模型的行为,特征空间图帮助我们通过展示特征和目标变量之间的关系来理解数据。我们将使用的可视化库称为dtreeviz。_dtreeviz

MySQL8.0学习记录10 - 字符集与校对规则_mysql8.0存储系统元数据的字符集是-程序员宅基地

文章浏览阅读2.1k次。MySQL8.0字符集_mysql8.0存储系统元数据的字符集是

漫威所有电影的 按时间线的观影顺序-程序员宅基地

文章浏览阅读3.1k次。美国队长1 - 2011年惊奇队长 - 2019年钢铁侠1 - 2008年无敌浩克 - 2008年钢铁侠2 - 2010年雷神 - 2011年复仇者联盟 - 2012年雷神2 - 2013年钢铁侠3 - 2013年美国队长2 - 2014年复仇者联盟2 - 2015年银河护卫队 - 2017年蚁人 - 2015年美国队长3 - 2016年奇异博士 - 2016年银河护卫队2 - 2017..._漫威电影观看顺序时间线

PhotoZoom Classic 7中的新功能-程序员宅基地

文章浏览阅读142次。众所周知PhotoZoom Classic是家庭使用理想的放大图像软件。目前很多用户还在使用PhotoZoom Classic 6,对于PhotoZoom Classic 7还是有点陌生。其实在6代衍生下出了7代,7代比6代多了很多适用的功能。下面我们就介绍一下PhotoZoom Classic 7中的新功能。PhotoZoom Classic 6的功能我们就不过多介绍,主要介绍7代中特有的功..._photozoon的作用

tensorflow中tf.keras.models.Sequential()用法-程序员宅基地

文章浏览阅读4.6w次,点赞75次,收藏349次。tensorflow中tf.keras.models.Sequential()用法Sequential()方法是一个容器,描述了神经网络的网络结构,在Sequential()的输入参数中描述从输入层到输出层的网络结构model = tf.keras.models.Sequential([网络结构]) #描述各层网络网络结构举例:拉直层:tf.keras.layers.Flatten() #拉直层可以变换张量的尺寸,把输入特征拉直为一维数组,是不含计算参数的层全连接层:tf.ker._tf.keras.models.sequential

推荐文章

热门文章

相关标签