lockfree 的队列的实现

lockfree 的队列的实现一个高速无锁循环队列的实现。需要注意的是:(1)队列的大小(m_lMaxQueueSize)应该足够的大,避免处理不过来时,找半天找不到空位置。(2)还有一点是这种队列在push数据足够快时效率高点,不然pop时就阻塞了,改善的方式就是使用费阻塞的方式,判断几次没有就跳出去,还有这种队列也就在push数据足够快时效率高点,不然判断的次数就多了。(3)使用了原子操作的锁(4)需…

大家好,又见面了,我是你们的朋友全栈君。

一个高速无锁循环队列的实现。

需要注意的是:

(1)队列的大小(m_lMaxQueueSize)应该足够的大,避免处理不过来时,找半天找不到空位置。

(2)还有一点是这种队列在push数据足够快时效率高点,不然pop时就阻塞了,改善的方式就是使用费阻塞的方式,判断几次没有就跳出去,还有这种队列也就在push数据足够快时效率高点,不然判断的次数就多了。 

(3)使用了原子操作的锁

(4)需要优化的的是数值开始与结束之间长度小于阈值就不取。

代码如下:

 

// 临界锁,线程安全  
// 必须要有一个不会用的空值
typedef unsigned int var_4;
typedef unsigned long var_u8;

template <class T_Key>  //队列里的值
class CQueue_Lockfree  
{  
public:  
    CQueue_Lockfree()  
    {  
        m_lMaxQueueSize = -1;  
        m_tpQueue = NULL;  //队列的内存
        m_lBegPos = 0;  //队列的开始位置
        m_lEndPos = 0;  //队列的结束位置
    };  
    ~CQueue_Lockfree()  
    {  
        if(m_tpQueue)  
            delete m_tpQueue;  
    };  
    var_4 InitQueue(var_4 lMaxQueueSize, T_Key null)//null为空的值(目前为0)
    {  
        m_lMaxQueueSize = lMaxQueueSize;  
        m_tpQueue = new T_Key[m_lMaxQueueSize];  
        if(m_tpQueue == NULL)  
            return -1;  
        m_lBegPos = 0;  
        m_lEndPos = 0;  
        m_null = null;  
        for( var_4 i = 0; i < lMaxQueueSize; i++ )  
        {  
            m_tpQueue[i] = null;  
        }  
        return 0;  
    };  
  
    void ResetQueue()  
    {  
        m_lBegPos = 0;  
        m_lEndPos = 0;    
    };  
  
    void ClearQueue()  
    {  
        if(m_tpQueue)  
        {  
            delete m_tpQueue;  
            m_tpQueue = NULL;  
        }  
        m_lMaxQueueSize = -1;  
        m_lBegPos = 0;  
        m_lEndPos = 0;  
    };  
    void PushData(T_Key tKey)  
    {  
        register var_u8 cnt = 1;  
        var_u8 pos = fetch_and_add(&m_lEndPos, cnt)%m_lMaxQueueSize;  //队列的m_lEndPos位置下一个位置
        while( m_null != m_tpQueue[pos] )  //等到该位置为空才能放数值
            cp_sleep(1);  
        m_tpQueue[pos] = tKey;  
    };  
  
    T_Key PopData()  
    {  
        register var_u8 cnt =1;  
        var_u8 pos = fetch_and_add(&m_lBegPos, cnt)%m_lMaxQueueSize;  //队列的m_lBegPos位置下一个位置
        T_Key *p = m_tpQueue + pos;  
        while( m_null == *p )  //等到该位置不为空才能取数值
            cp_sleep(1);  
  
        T_Key ret = *p;  
        *p = m_null;  
        return ret;  
    };  
private:  
    var_4 m_lMaxQueueSize;  
    T_Key* m_tpQueue;  
    T_Key m_null;  
  
    var_u4 m_lBegPos;  
    var_u4 m_lEndPos;  
};  

别人的测试结果,这种方式比用锁少了40%。

 

10亿次放入和读取,10线程花费200秒。

测试说加了阈值判断后说是8核1.5GHz花了88秒,没加前是

24核机器200秒

vendor_id       : GenuineIntel
cpu family      : 6
model           : 45
model name      : Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz
stepping        : 7
cpu MHz         : 2300.469
cache size      : 15360 KB

个人觉得主要是因为两个sleep 阻塞了,还是换成非阻塞的方法吧,push的数据加一个等待队列,pop的方法不要用sleep,要非阻塞的,没有要取的数据就跳出去干别的活去。

上面是用两个数来记录开始和结束的下标,我觉得用两个指针也可以。另外还有一个改进是使用引用计数,管理下内存,免得一个节点被重复释放或加入,对于引用计数的加减就只能用下原子函数来保证原子性了,下文提到的ABA 的问题,个人觉得也就只有在内存被重复释放或添加时有影响,所以引用计数是个比较好的方案,也可以避免重复内容的拷贝,效率还是不错的。还有提一下的是如果使用C++11中的 STL 中的 atomic 类的函数和可以跨下平台,不用的人自己加些宏定义也可以,没什么特别的,说是无锁其实也就是应用层的无锁,使用了下原子操作来进行很小颗粒的锁操作。

 关于原子操作:

 

关于 CAS 等原子操作

  在开始说无锁队列之前,我们需要知道一个很重要的技术就是 CAS 操作——Compare & Set,或是 Compare & Swap,现在几乎所有的 CPU 指令都支持 CAS 的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构。

  这个操作用C语言来描述就是下面这个样子:(代码来自 Wikipedia 的 Compare And Swap 词条)意思就是说,看一看内存*reg 里的值是不是 oldval,如果是的话,则对其赋值 newval。

 

int compare_and_swap (int* reg, int oldval, int newval)
{
  int old_reg_val = *reg;
  if (old_reg_val == oldval)
     *reg = newval;
  return old_reg_val;
}

 

 这个操作可以变种为返回 bool 值的形式(返回 bool 值的好处在于,可以调用者知道有没有更新成功):

 

bool compare_and_swap (int *accum, int *dest, int newval)
{
  if ( *accum == *dest ) {
      *dest = newval;
      return true;
  }
  return false;
}

 

 与 CAS 相似的还有下面的原子操作:(这些东西大家自己看 Wikipedia 吧)

  注:在实际的C/C++程序中,CAS 的各种实现版本如下:

  1)GCC 的 CAS

  GCC4.1+ 版本中支持 CAS 的原子操作(完整的原子操作可参看 GCC Atomic Builtins

 

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

 2)Windows 的 CAS

 

  在 Windows 下,你可以使用下面的 Windows API 来完成 CAS:(完整的 Windows 原子操作可参看 MSDN 的 InterLocked Functions

 

InterlockedCompareExchange ( __inout LONG volatile *Target,
                                __in LONG Exchange,
                                __in LONG Comperand);

 

  3) C++11 中的 CAS

  C++11中的 STL 中的 atomic 类的函数可以让你跨平台。(完整的C++11的原子操作可参看Atomic Operation Library

 

template< class T >
bool atomic_compare_exchange_weak ( std::atomic<T>* obj,
                                   T* expected, T desired );
template< class T >
bool atomic_compare_exchange_weak ( volatile std::atomic<T>* obj,
                                   T* expected, T desired );

 

无锁队列的链表实现

  下面的东西主要来自 John D. Valois 1994 年 10 月在拉斯维加斯的并行和分布系统系统国际大会上的一篇论文——《Implementing Lock-Free Queues》。

  我们先来看一下进队列用 CAS 实现的方式:

 

EnQueue (x) //进队列
{
    //准备新加入的结点数据
    q = new record ();
    q->value = x;
    q->next = NULL;
    do {
        p = tail; //取链表尾指针的快照
    } while( CAS (p->next, NULL, q) != TRUE); //如果没有把结点链上,再试
 
    CAS (tail, p, q); //置尾结点
}

 

  我们可以看到,程序中的那个 do- while 的 Re-Try-Loo。就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是 tail 指针就变了,于是我的 CAS 返回了 false,于是程序再试,直到试成功为止。这个很像我们的抢电话热的不停重播的情况。

  你会看到,为什么我们的“置尾结点”的操作不判断是否成功,因为:

  1. 如果有一个线程 T1,它的 while 中的 CAS 如果成功的话,那么其它所有的随后线程的 CAS 都会失败,然后就会再循环,
  2. 此时,如果 T1 线程还没有更新 tail 指针,其它的线程继续失败,因为 tail->next 不是 NULL 了。
  3. 直到 T1 线程更新完 tail 指针,于是其它的线程中的某个线程就可以得到新的 tail 指针,继续往下走了。

  这里有一个潜在的问题——如果 T1 线程在用 CAS 更新 tail 指针的之前,线程停掉了,那么其它线程就进入死循环了。下面是改良版的 EnQueue ()

 

EnQueue (x) //进队列改良版
{
    q = new record ();
    q->value = x;
    q->next = NULL;
 
    p = tail;
    oldp = p
    do {
        while (p->next != NULL)
            p = p->next;
    } while( CAS (p.next, NULL, q) != TRUE); //如果没有把结点链上,再试
 
    CAS (tail, oldp, q); //置尾结点
}

  我们让每个线程,自己 fetch 指针 p 到链表尾。但是这样的 fetch 会很影响性能。而通实际情况看下来,99.9% 的情况不会有线程停转的情况,所以,更好的做法是,你可以接合上述的这两个版本,如果 retry 的次数超了一个值的话(比如说 3 次),那么,就自己 fetch 指针。

 

  好了,我们解决了 EnQueue,我们再来看看 DeQueue 的代码:(很简单,我就不解释了)

 

DeQueue () //出队列
{
    do{
        p = head;
        if (p->next == NULL){
            return ERR_EMPTY_QUEUE;
        }
    while( CAS (head, p, p->next);
    return p->next->value;
}

 

 我们可以看到,DeQueue 的代码操作的是 head->next,而不是 head 本身。这样考虑是因为一个边界条件,我们需要一个 dummy 的头指针来解决链表中如果只有一个元素,head 和 tail 都指向同一个结点的问题,这样 EnQueue 和 DeQueue 要互相排斥了。

lockfree 的队列的实现

注:上图的 tail 正处于更新之前的装态。

  DAS 的 ABA 问题

  所谓 ABA(见维基百科的 ABA 词条),问题基本是这个样子:

  1. 进程 P1 在共享变量中读到值为A
  2. P1被抢占了,进程 P2 执行
  3. P2把共享变量里的值从A改成了B,再改回到A,此时被 P1 抢占。
  4. P1回来看到共享变量里的值没有被改变,于是继续执行。

  虽然 P1 以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA 问题最容易发生在 lock free 的算法中的,DAS 首当其冲,因为 DAS 判断的是指针的地址。如果这个地址被重用了呢,问题就很大了。

  比如上述的 DeQueue ()函数,因为我们要让 head 和 tail 分开,所以我们引入了一个 dummy 指针给 head,当我们做 CAS 的之前,如果 head 的那块内存被回收并被重用了,而重用的内存又被 EnQueue ()进来了,这会有很大的问题。(内存管理中重用内存基本上是一种很常见的行为)

  这个例子你可能没有看懂,维基百科上给了一个活生生的例子——

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

  这就是 ABA 的问题。

  解决 ABA 的问题

  维基百科上给了一个解——使用 double-CAS(双保险的 CAS),例如,在 32 位系统上,我们要检查 64 位的内容

  1)一次用 CAS 检查双倍长度的值,前半部是指针,后半部分是一个计数器。

  2)只有这两个都一样,才算通过检查,要吧赋新的值。并把计数器累加1。

  这样一来,ABA 发生时,虽然值一样,但是计数器就不一样(但是在 32 位的系统上,这个计数器会溢出回来又从 1 开始的,这还是会有 ABA 的问题)

  当然,我们这个队列的问题就是不想让那个内存重用,这样明确的业务问题比较好解决,论文《Implementing Lock-Free Queues》给出一这么一个方法——使用结点内存引用计数 refcnt!

 

SafeRead (q)
{
    loop:
        p = q->next;
        if (p == NULL){
            return p;
        }
 
        Fetch&Add (p->refcnt, 1);
 
        if (p == q->next){
            return p;
        }else{
            Release (p);
        }
    goto loop
}

 

  其中的 Fetch&Add 和 Release 分是是加引用计数和减引用计数,都是原子操作,这样就可以阻止内存被回收了。

  用数组实现无锁队列

  本实现来自论文《Implementing Lock-Free Queues

  使用数组来实现队列是很常见的方法,因为没有内存的分部和释放,一切都会变得简单,实现的思路如下:

  1)数组队列应该是一个 ring buffer 形式的数组(环形数组)

  2)数组的元素应该有三个可能的值:HEAD,TAIL,EMPTY(当然,还有实际的数据)

  3)数组一开始全部初始化成 EMPTY,有两个相邻的元素要初始化成 HEAD 和 TAIL,这代表空队列。

  4)EnQueue 操作。假设数据x要入队列,定位 TAIL 的位置,使用 double-CAS 方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),则说明队列满了。

  5)DeQueue 操作。定位 HEAD 的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同样需要注意,如果x是 TAIL,则说明队列为空。

  算法的一个关键是——如何定位 HEAD 或 TAIL?

  1)我们可以声明两个计数器,一个用来计数 EnQueue 的次数,一个用来计数 DeQueue 的次数。

  2)这两个计算器使用使用 Fetch&ADD 来进行原子累加,在 EnQueue 或 DeQueue 完成的时候累加就好了。

  3)累加后求个模什么的就可以知道 TAIL 和 HEAD 的位置了。

  如下图所示:

lockfree 的队列的实现

   小结

  以上基本上就是所有的无锁队列的技术细节,这些技术都可以用在其它的无锁数据结构上。

  1)无锁队列主要是通过 CAS、FAA 这些原子操作,和 Retry-Loop 实现。

  2)对于 Retry-Loop,我个人感觉其实和锁什么什么两样。只是这种“锁”的粒度变小了,主要是“锁”HEAD 和 TAIL 这两个关键资源。而不是整个数据结构。

  还有一些和 Lock Free 的文章你可以去看看:

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/161685.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 转:在线检测网页错误工具

    转:在线检测网页错误工具

  • java爬虫系列(一)——爬虫入门[通俗易懂]

    java爬虫系列(一)——爬虫入门[通俗易懂]爬虫框架介绍Heritrix优势劣势简单demo地址crawler4j优势劣势简单demo地址WebMagic优势劣势简单demo地址快速入门seimicrawler项目地址简单爬虫实现导入项目编写爬虫启动爬虫同系列文章爬虫框架介绍java爬虫框架非常多,比如较早的有Heritrix,轻量级的crawler4j…

  • java数组截取[通俗易懂]

    java数组截取[通俗易懂]JAVA数组截取publicclassDay12_2{publicstaticvoidmain(String[]args){int[]A=newint[]{1,3,5,7,9};intstart=1;intend=3;arraySub(A,start,end);}public…

  • 使用 Converter Standalone进行P2V操作指导「建议收藏」

    使用 Converter Standalone进行P2V操作指导「建议收藏」介绍VMwarevCenterConverterStandalone是一款免费程序,可以安装在运行Windows的物理计算机上。ConverterStandalone会将硬盘驱动器上的数据复制到虚拟磁盘文件(.vmdk)中,此文件随后可在其他VMware产品中使用。该过程不会影响您的计算机,在使用Converter之后您可以继续使用计算机。VMwarevCenterConverter可以在多种硬件上运行,并支持最常用的MicrosoftWindows操作系统版本

  • ubuntu安装qt5.12_ubuntu安装分区

    ubuntu安装qt5.12_ubuntu安装分区下载Qt安装包官网下载速度较慢,可以从国内镜像下载。清华大学:https://mirrors.tuna.tsinghua.edu.cn/qt/中国科学技术大学:http://mirrors.ustc.edu.cn/qtproject/中国互联网络信息中心:https://mirrors.cnnic.cn/qt/安装包名称:qt-opensource-linux-x64-5.14.2.run./qt-opensource-linux-x64-5.14.2.run安装好后需要一些配置才能开始开发

    2022年10月15日
  • php curl_init post/get请求

    php curl_init post/get请求publicfunctiongetCurlApi(){$url=’地址’;$headers=array(‘access_token:’.$token);$curl=curl_init();curl_setopt($curl,CURLOPT_URL,$url);//设置调用地址curl_setopt($curl,CURLOPT_HTTPHEADER,$headers);//添加头…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号