大家好,又见面了,我是你们的朋友全栈君。
目录
介绍与动机
简介与术语
术语“非阻塞”表示并发数据结构,该结构不使用传统的同步原语(例如警卫程序)来确保线程安全。 Maurice Herlihy和Nir Shavit(比较“多处理器编程的艺术”)区分了3种类型的非阻塞数据结构,每种结构具有不同的属性:
如果保证每个并发操作都可以在有限的步骤中完成,则数据结构无需等待。因此,可以为操作次数提供最坏情况的保证。
如果保证某些并发操作可以在有限数量的步骤中完成,则数据结构是无锁的。从理论上讲,某些操作可能永远不会取得任何进展,但在实际应用中极不可能发生这种情况。
如果保证一个并发操作可以在有限的步骤中完成,那么数据结构是无障碍的,除非另一个并发操作干扰了它。
如果某些数据结构在某些限制下使用,则只能以无锁的方式实现。实现boost.lockfree的相关方面是生产者线程和使用者线程的数量。单生产者(sp)或多生产者(mp)意味着仅允许一个线程或多个并发线程将数据添加到数据结构中。单消费者(sc)或多消费者(mc)表示从数据结构中删除数据的等效项。
非阻塞数据结构的性质
非阻塞数据结构不依赖锁和互斥量来确保线程安全。 同步完全在用户空间中完成,而无需与操作系统进行任何直接交互[8]。 这意味着它们不容易出现优先级倒置之类的问题(低优先级线程需要等待高优先级线程)。
非阻塞数据结构不需要依赖guards,而是需要原子操作(执行特定的CPU指令而不会中断)。 这意味着任何线程都可以在操作之前或之后看到状态,但是无法观察到中间状态。 并非所有硬件都支持同一组原子指令。 如果它在硬件中不可用,则可以使用防护在软件中对其进行仿真。 然而,这样做就没有无锁的优点。
非阻塞数据结构的性能
在讨论非阻塞数据结构的性能时,必须区分摊销成本和最坏情况成本。 “无锁”和“无等待”的定义仅提及操作的上限。 因此,无锁数据结构不一定是每种用例的最佳选择。 为了最大化应用程序的吞吐量,应该考虑高性能的并发数据结构[9]。
为了优化系统的等待时间或避免优先级倒置,无锁数据结构将是一个更好的选择,这在实时应用程序中可能是必需的。 通常,我们建议考虑是否需要无锁数据结构或并发数据结构是否足够。 无论如何,我们建议针对特定工作负载使用不同的数据结构执行基准测试。
阻塞行为的来源
除了锁和互斥锁(无论如何我们都不在boost.lockfree中使用),还有其他三个方面可能会违反锁自由:
原子操作
某些体系结构没有以本机方式在硬件中提供必要的原子操作。如果不是这种情况,则使用自旋锁在软件中对其进行仿真,而自旋锁本身就是阻塞的。
内存分配
从操作系统分配内存不是无锁的。这使得不可能实现真正的动态大小的非阻塞数据结构。 boost.lockfree基于节点的数据结构使用内存池分配内部节点。如果此内存池已用完,则必须从操作系统分配用于新节点的内存。但是,可以配置boost.lockfree的所有数据结构来避免内存分配(相反,特定的调用将失败)。这对于需要无锁内存分配的实时系统特别有用。
异常处理
C++异常处理不对其实时行为提供任何保证。因此,我们不鼓励在无锁代码中使用异常和异常处理。
数据结构
boost.lockfree实现了三种无锁数据结构:
boost :: lockfree :: queue 无锁的多生产者/多消费者队列
boost :: lockfree :: stack 无锁的多生产者/多消费者堆栈
boost :: lockfree :: spsc_queue 一个无等待的单一生产者/单个消费者队列(通常称为环形缓冲区)
数据结构配置
可以使用Boost.Parameter样式模板配置数据结构:
boost::lockfree::fixed_sized
将数据结构配置为固定大小。 内部节点存储在数组内部,并通过数组索引对其进行寻址。 这将队列的可能大小限制为可以由索引类型(通常为2 ** 16-2)解决的元素数量,但是在缺少双倍宽度比较和交换指令的平台上,这是最好的选择实现锁定自由的方法。
boost::lockfree::capacity
在编译时设置数据结构的容量。 这意味着数据结构是固定大小的。
boost::lockfree::allocator
定义分配器。 boost.lockfree支持状态分配器,并且与Boost.Interprocess分配器兼容。
示例
队列
boost::lockfree::queue类实现了一个多写入器/多读取器队列。 下面的示例显示如何分别由4个线程生成和使用整数值:
#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>
#include <iostream>
#include <boost/atomic.hpp>
boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);
boost::lockfree::queue<int> queue(128);
const int iterations = 10000000;
const int producer_thread_count = 4;
const int consumer_thread_count = 4;
void producer(void)
{
for (int i = 0; i != iterations; ++i) {
int value = ++producer_count;
while (!queue.push(value))
;
}
}
boost::atomic<bool> done (false);
void consumer(void)
{
int value;
while (!done) {
while (queue.pop(value))
++consumer_count;
}
while (queue.pop(value))
++consumer_count;
}
int main(int argc, char* argv[])
{
using namespace std;
cout << "boost::lockfree::queue is ";
if (!queue.is_lock_free())
cout << "not ";
cout << "lockfree" << endl;
boost::thread_group producer_threads, consumer_threads;
for (int i = 0; i != producer_thread_count; ++i)
producer_threads.create_thread(producer);
for (int i = 0; i != consumer_thread_count; ++i)
consumer_threads.create_thread(consumer);
producer_threads.join_all();
done = true;
consumer_threads.join_all();
cout << "produced " << producer_count << " objects." << endl;
cout << "consumed " << consumer_count << " objects." << endl;
}
这个程序的输出如下:
produced 40000000 objects.
consumed 40000000 objects.
栈
boost::lockfree::stack类实现了多写入器/多读取器堆栈。 下面的示例显示如何分别由4个线程生成和使用整数值:
#include <boost/thread/thread.hpp>
#include <boost/lockfree/stack.hpp>
#include <iostream>
#include <boost/atomic.hpp>
boost::atomic_int producer_count(0);
boost::atomic_int consumer_count(0);
boost::lockfree::stack<int> stack(128);
const int iterations = 1000000;
const int producer_thread_count = 4;
const int consumer_thread_count = 4;
void producer(void)
{
for (int i = 0; i != iterations; ++i) {
int value = ++producer_count;
while (!stack.push(value))
;
}
}
boost::atomic<bool> done (false);
void consumer(void)
{
int value;
while (!done) {
while (stack.pop(value))
++consumer_count;
}
while (stack.pop(value))
++consumer_count;
}
int main(int argc, char* argv[])
{
using namespace std;
cout << "boost::lockfree::stack is ";
if (!stack.is_lock_free())
cout << "not ";
cout << "lockfree" << endl;
boost::thread_group producer_threads, consumer_threads;
for (int i = 0; i != producer_thread_count; ++i)
producer_threads.create_thread(producer);
for (int i = 0; i != consumer_thread_count; ++i)
consumer_threads.create_thread(consumer);
producer_threads.join_all();
done = true;
consumer_threads.join_all();
cout << "produced " << producer_count << " objects." << endl;
cout << "consumed " << consumer_count << " objects." << endl;
}
这个程序的输出如下:
produced 4000000 objects.
consumed 4000000 objects.
无等待单生产者/单消费者队列
boost::lockfree::spsc_queue类实现了免等待的单生产者/单消费者队列。 下面的示例说明如何通过2个单独的线程生成和使用整数值:
#include <boost/thread/thread.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <iostream>
#include <boost/atomic.hpp>
int producer_count = 0;
boost::atomic_int consumer_count (0);
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024> > spsc_queue;
const int iterations = 10000000;
void producer(void)
{
for (int i = 0; i != iterations; ++i) {
int value = ++producer_count;
while (!spsc_queue.push(value))
;
}
}
boost::atomic<bool> done (false);
void consumer(void)
{
int value;
while (!done) {
while (spsc_queue.pop(value))
++consumer_count;
}
while (spsc_queue.pop(value))
++consumer_count;
}
int main(int argc, char* argv[])
{
using namespace std;
cout << "boost::lockfree::queue is ";
if (!spsc_queue.is_lock_free())
cout << "not ";
cout << "lockfree" << endl;
boost::thread producer_thread(producer);
boost::thread consumer_thread(consumer);
producer_thread.join();
done = true;
consumer_thread.join();
cout << "produced " << producer_count << " objects." << endl;
cout << "consumed " << consumer_count << " objects." << endl;
}
这个程序的输出如下:
produced 10000000 objects.
consumed 10000000 objects.
基本原理
数据结构
这些实现是众所周知的数据结构的实现。 队列基于Michael Scott和Maged Michael的“简单,快速,实用的非阻塞和阻塞并发队列算法”,栈基于“系统编程:RK Treiber处理并行性”,并且spsc_queue被视为“民间传说”,并且在包括Linux内核在内的几个开源项目中实现。 Herlihy&Shavit的“多处理器编程的艺术”中详细讨论了所有数据结构。
内存管理
无锁boost::lockfree::queue和boost::lockfree::stack类是基于链表的基于节点的数据结构。 无锁数据结构的内存管理不是一个简单的问题,因为需要避免一个线程释放一个内部节点,而另一个线程仍然使用它。 boost.lockfree使用一种简单的方法,不将任何内存返回给操作系统。 相反,它们维护一个空闲列表以便以后重用。 这样做有两个原因:首先,取决于内存分配器的实现,释放内存可能会阻塞(因此该实现将不再是无锁的),其次,大多数内存回收算法均已申请专利。
ABA预防
ABA问题是实现无锁数据结构时的常见问题。使用compare_exchange操作更新原子变量时会出现问题:如果读取了值A,线程1会将其更改为C并尝试更新该变量,则仅当当前值为A时,它才使用compare_exchange写入C。如果与此同时线程2将值从A更改为B并重新更改为A,则可能会出现问题,因为线程1没有观察到状态的变化。避免ABA问题的常用方法是将版本计数器与该值相关联,并自动更改两者。
boost.lockfree使用agged_ptr帮助器类,该类将指针与整数标签关联。这通常需要双倍宽度的compare_exchange,并非在所有平台上都可用。 IA32在奔腾处理器之前没有提供cmpxchg8b操作码,在许多RISC体系结构(如PPC)中也缺少IA32。早期的X86-64处理器也未提供cmpxchg16b指令。在64位平台上,可以解决此问题,因为通常不使用完整的64位地址空间。例如,在X86_64上,地址仅使用48位,因此我们可以将其余的16位用于ABA预防标签。有关详细信息,请咨询boost :: lockfree :: detail :: tagged_ptr类的实现。
对于没有双倍宽度compare_exchange的32位平台上的无锁操作,我们支持第三种方法:通过使用固定大小的数组来存储内部节点,我们可以避免使用32位指针,但是在数组中使用16位索引就足够了。但是,这仅适用于具有内部节点上限的固定大小的数据结构。
进程间支持
boost.lockfree数据结构对Boost.Interprocess具有基本支持。 唯一的问题是无锁原子的阻塞仿真,在当前实现中不能保证该无进程间安全。
附录
支持的平台和编译器
boost.lockfree已在以下平台上经过测试:
g ++ 4.4、4.5和4.6,Linux,x86和x86_64
clang ++ 3.0,Linux,x86和x86_64
未来发展
更多数据结构(集合,哈希表,双端队列)
退避方案(指数退避或消除)
参考
简单,快速,实用的非阻塞和阻塞并发队列算法,作者:Michael Scott和Maged Michael,在“分布式计算原理研讨会”上,第267-275页,1996年。
M. Herlihy和Nir Shavit。 多处理器编程的艺术,Morgan Kaufmann出版社,2008年
脚注
[8] 自旋锁也不会直接与操作系统交互。 但是,操作系统可能会抢占拥有线程,这违反了无锁属性。
[9] 英特尔的Thread Building Blocks library提供了许多有效的并发数据结构,这些结构不一定是无锁的。
原文链接
https://www.boost.org/doc/libs/1_74_0/doc/html/lockfree.html
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/161689.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...