大家好,又见面了,我是你们的朋友全栈君。
很久之前对Boost里面LockFree的相关代码进行阅读,今天对以前的一些笔记进行一下总结!
LockFree的基础知识涉及:Atomic(原子操作),CAS(Compay and swap),内存预分配;
Atomic原子操作是无锁的核心实现,原子操作的实质是通过使用CPU的一些特殊指令(通常为汇编代码指令)或操作系统封装的一些内存锁定接口(系统封装的内存保护接口)来对指定长度的内存进行访问和修改,因为访问内存的原子性从而实现上层接口的无锁操作;
CAS,核心代码如下:
int
compare_and_swap (
int
* reg,
int
oldval,
int
newval)
{
ATOMIC();
int
old_reg_val = *reg;
if
(old_reg_val == oldval)
*reg = newval;
END_ATOMIC();
return
old_reg_val;
}
实质是通过不断比较预期值和当前值之间的数值从而决定是否需要交换保护内存中的内容。
了解以上基础知识后我们再来看LockFree的代码
Boost里面LockFree的代码主要分为:fressList.hpp, fresslist_base.hpp, queue.hpp,三个文件的详细注释如下:
/// 内存无锁管理类, pool始终指向freelist的第一个可消费节点
template< typename T, typename NodeStorage = freelist_storage< T > >
class freelist : NodeStorage
{
private:
/// 内存操作磁头节点
atomic< target_index > pool_;
/// 用于映射T{ next, data }
struct freelist_node
{
/// 当前节点的下一个节点索引
target_index next;
};
public:
typedef target_index::index_t index_t;
typedef target_index tagged_node_handle;
/*
* 构造函数
* alloc 内存初始化函数
* count 预分配内存大小
*/
template< typename Allocator >
freelist( Allocator alloc, std::size_t count )
: NodeStorage( alloc, count )
, pool_( target_index( static_cast<index_t>(count), 0 ) )
{
/// 初始化预分配内存,将节点串起来
initialize();
}
/*
* 获取空索引
*/
index_t null_handle( void ) const
{
/// 返回一个无效的索引值,用于比较节点的合法性,类似NULL指针
return static_cast< index_t >( NodeStorage::node_count() );
}
/*
* 根据指针获取内存索引值
*/
index_t get_handle( T * pointer ) const
{
if( pointer == NULL ){
/// 如果指针为空,返回无效索引
return null_handle();
} else {
/// 返回指针对应的偏移量
return static_cast< index_t >( pointer - NodeStorage::nodes() );
}
}
/*
* 根据target_handle获取内存索引值
*/
index_t get_handle( tagged_node_handle const& handle ) const
{
/// 返回对应指针的索引值
return handle.get_index();
}
/*
* 根据target_handle获取内存指针
*/
T * get_pointer( tagged_node_handle const& tptr ) const
{
/// 调用内存索引获取内存指针接口
return get_pointer( tptr.get_index() );
}
/*
* 根据内存索引获取内存指针
*/
T * get_pointer( index_t index ) const
{
/// 索引值为null时返回空指针
if( index == null_handle() ) {
return 0;
} else {
/// 返回对应的指针地址
return NodeStorage::nodes() + index;
}
}
/*
* 消费一个内存节点, 构造节点传一个入参
*/
template< typename ArgType >
T * construct( ArgType const& a )
{
/// 消费一个节点,返回对应节点的索引
index_t node_index = allocate();
if( node_index == null_handle() ){
/// 如果无节点消费,返回NULL指针
return NULL;
}
/// 获得对应索引的内存地址
T * node = NodeStorage::nodes() + node_index;
/// 在对应内存上构造数据
new ( node ) T ( a );
return node;
}
/*
* 消费一个内存节点, 构造节点传两个入参
*/
template< typename ArgType1, typename ArgType2 >
T * construct( ArgType1 const& a1, ArgType2 const& a2 )
{
/// 消费一个节点,返回对应节点的索引
index_t node_index = allocate();
if( node_index == null_handle() ){
/// 如果无节点消费,返回NULL指针
return NULL;
}
/// 获得对应索引的内存地址
T * node = NodeStorage::nodes() + node_index;
/// 在对应内存上构造数据
new ( node ) T ( a1, a2 );
return node;
}
/*
* 回收对应索引下的内存节点,生产者接口
*/
void destruct( target_index const& ti )
{
/// 得到节点内存索引值
index_t index = ti.get_index();
/// 得到对应索引的内存地址
T * n = NodeStorage::nodes() + index;
( void )( n );
/// 执行对象析构函数,此处只析构对象,不析构内存
n->~T();
/// 回收对应节点,生产freenode
deallocate( index );
}
/*
* 回收对应指针节点,生产者接口
*/
void destruct( T * n )
{
/// 执行对象析构函数,此处只析构对象,不析构内存
n->~T();
/// 回收对应节点,生产freenode
deallocate( get_handle( n ) );
}
private:
/*
* 将存储节串成一个链,链头为最后一个节点,链尾为null节点
*/
void initialize( void )
{
/// 将节点以dummy为尾节点串起来
for( std::size_t i = 0; i != NodeStorage::node_count(); ++i ){
/// 初始化节点的内存
target_index * next_index = reinterpret_cast< target_index* >( NodeStorage::nodes() + i );
/// 设置节点的next为null
next_index->set_index( null_handle() );
/// 每执行一次,pool向后移动一个节点
deallocate( static_cast< index_t >( i ) );
}
}
/*
* 调整pool消费一个free节点
* pool指向下一个free节点,采用CAS原语操作pool
*/
index_t allocate( void )
{
/// 得到当前磁头所在节点索引
target_index old_pool = pool_.load();
for( ; ; ){
/// 获得磁头的索引
index_t index = old_pool.get_index();
if( index == null_handle() ){
/// 如果磁头指向空,说明队列为空,消费失败
return index;
}
/// 得到磁头指向的内存指针
T * old_node = NodeStorage::nodes() + index;
/// 获取内存数据,这里利用结构体的特性进行强项转换,将{ next, data }转换为next
target_index * next_index = reinterpret_cast< target_index* >( old_node );
/// 新磁头位置为: ( 下一个节点索引,当前tag的下一个tag )
target_index new_pool( next_index->get_index(), old_pool.get_next_tag() );
if( pool_.compare_exchange_weak( old_pool, new_pool ) ){
/// 执行CAS,修改磁头指向的索引值,并返回老磁头指向节点的索引
return old_pool.get_index();
}
}
}
/*
* 调整pool磁头生产一个free节点
* pool指向index索引节点,old_pool->next指向新的pool节点,采用CAS原语操作pool
*/
void deallocate( index_t index )
{
/// 获取回收节点指向的内存
freelist_node * new_pool_node = reinterpret_cast< freelist_node* >(
NodeStorage::nodes() + index );
/// 获得磁头的索引
target_index old_pool = pool_.load();
for( ; ; ){
/// 新磁头位置: ( 回收节点索引,磁头指向节点的tag )
target_index new_pool( index, old_pool.get_tag() );
/// 修改回收节点指向内存的next节点的索引
new_pool_node->next.set_index( old_pool.get_index() );
if( pool_.compare_exchange_weak( old_pool, new_pool ) ){
/// 执行CAS,修改磁头指向的索引值
break;
}
}
}
};
/// 节点存储类
template< typename T, typename Alloc = std::allocator<T> >
class freelist_storage : Alloc
{
private:
/// 内存起始指针
T * nodes_;
/// 内存节点个数
std::size_t count_;
public:
/*
* 构造函数
*/
template< typename Allocator >
freelist_storage( Allocator alloc, std::size_t count )
: Alloc( alloc )
, count_( count )
{
/// 预分配内存
nodes_ = Alloc::allocate( count );
}
/*
* 析构函数
*/
~freelist_storage()
{
/// 销毁预分配内粗
Alloc::deallocate( nodes_, count_ );
nodes_ = NULL;
}
/*
* 预分配内存起始指针
*/
T * nodes() const
{
return nodes_;
}
/*
* 预分配内存的数量
*/
std::size_t node_count() const
{
return count_;
}
};
/// 内存管理类,占用32位
class target_index
{
public:
typedef unsigned short tag_t; /// 16bit
typedef unsigned short index_t; /// 16bit
target_index()
: index_( 0 )
, target_( 0 )
{
}
/*
* 构造函数
* idx 内存索引
* tag 内存当前状态标识
*/
target_index( index_t idx, tag_t tag = 0 )
: index_( idx )
, target_( tag )
{
}
/*
* 拷贝构造
*/
target_index( target_index const& r )
: index_( r.index_ )
, target_( r.target_ )
{
}
/*
* 设置tag
*/
void set_target( tag_t tag )
{
target_ = tag;
}
/*
* 获取tag值
*/
tag_t get_tag( ) const
{
return target_;
}
/*
* 获取下一个tag值
*/
tag_t get_next_tag() const
{
tag_t next = ( get_tag() + 1 ) & (std::numeric_limits<tag_t>::max)();
return next;
}
/*
* 设置内存索引值
*/
void set_index( index_t idx )
{
index_ = idx;
}
/*
* 获取内存索引值
*/
index_t get_index( ) const
{
return index_;
}
/*
* 重载等于
*/
bool operator == ( target_index const& r )
{
return ( target_ == r.target_ ) && ( index_ == r.index_ );
}
/*
* 重载不等于
*/
bool operator != ( target_index const& r )
{
return !operator==( r );
}
private:
index_t index_;
tag_t target_;
};
/*
* 判断表达式是否相等
*/
static inline bool likely( bool expr )
{
#ifdef __GNUC__
return __builtin_expect( expr, true );
#else
return ( expr );
#endif
}
/*
* 隐式构造的拷贝
*/
template< typename T, typename U >
static inline void copy_padload( T & l, U & r )
{
r = ( U )l;
}
} /// namespace detail
/*
* 无锁队列
*/
template< typename T >class TQueue
{
private:
/// 队列节点,按照64字节对齐,{ index, tag }
struct ALIGNMENT( LOCKFREE_CACHELINE_BYTES ) QueueNode
{
typedef target_index tagged_node_handle;
typedef target_index::tag_t handle_type;
/// 存储数据
QueueNode( T const& v, handle_type null_handle )
: data( v )
{
/// 默认构造target_index,获取next
tagged_node_handle old_next = next.load();
/// 设置新的next为( index, 增加tag )
tagged_node_handle new_next( null_handle, old_next.get_next_tag() );
/// 存储next节点
next.store( new_next );
}
/// 用索引值初始化节点
QueueNode( handle_type null_handle )
: next( tagged_node_handle( null_handle ) )
{
}
/// 默认初始化
QueueNode( void )
{
}
/// 指向下一个索引节点,队列元素
atomic< tagged_node_handle > next;
/// 存放存储数据
T data;
};
typedef std::allocator< QueueNode > node_allocator;
typedef freelist< QueueNode > pool_t;
typedef typename pool_t::index_t handle_type;
typedef typename pool_t::tagged_node_handle tagged_node_handle;
void initialize( void )
{
/// 初始化head和tail,head和tail指向null_handle
QueueNode * n = pool.construct( pool.null_handle() );
/// dummy_node( n, 0 )
tagged_node_handle dummy_node( pool.get_handle(n), 0 );
/// head指向target_index( n, 0 )
head_.store( dummy_node );
/// tail指向target_index( n, 0 )
tail_.store( dummy_node );
}
public:
/*
* 构造函数
*/
explicit TQueue( size_t n )
: head_( tagged_node_handle( 0, 0 ) )
, tail_( tagged_node_handle( 0, 0 ) )
, pool( node_allocator(), n + 1 )
{
/// 初始化n+1个节点,其中第n+1个节点为dummy
initialize();
}
/*
* 析构函数
*/
~TQueue()
{
T dummy;
/// 执行每一个节点的析构函数
while( pop( dummy ) ) {}
/// 执行最后一个指向节点的析构函数
pool.destruct( head_.load() );
}
/*
* 判断链表是否为空
*/
bool empty() const
{
/// 使用head==tail判断当前队列是否为空
return pool.get_handle( head_.load() ) == pool.get_handle( tail_.load() );
}
/*
* 队列入队操作
*/
bool push( T const& v )
{
/// 执行push
return do_push( v );
}
/*
* 队列的出队操作
*/
bool pop( T & ret )
{
/// 执行pop
return do_pop< T >( ret );
}
/*
* 消费一个元素节点
*/
template< typename Functor >
bool consume_one( Functor const& f )
{
/// 数据临时缓存
T element;
/// 获取最后一个节点数据
bool success = pop( element );
/// 执行消费回调
if( success ) { f( element ); }
/// 如果链表有数据,默认消费成功
return success;
}
/*
* 消费所有元素节点
*/
template< typename Functor >
size_t consume_all( Functor const& f )
{
/// 链表元素个数
size_t element_count = 0;
/// 处理一个链表节点
while( consume_one( f ) ){
/// 累加处理个数
++element_count;
}
/// 返回链表处理的个数
return element_count;
}
private:
/*
* 入队操作实作
*/
bool do_push( T const& v )
{
/// 构造一个新的节点,新节点next指向null_handle( NULL )
QueueNode * n = pool.construct( v, pool.null_handle() );
/// 获得节点的索引
handle_type node_handle = pool.get_handle( n );
if( n == NULL ) {
/// 如果构造节点为null
return false;
}
for( ; ; ) {
/// 获取尾节点tail
tagged_node_handle tail = tail_.load();
/// 获取尾节点内存指针
QueueNode * tail_node = pool.get_pointer( tail );
/// 获取尾节点的next指针
tagged_node_handle next = tail_node->next.load();
/// 获取尾节点的next节点指针
QueueNode * next_ptr = pool.get_pointer( next );
/// 获取链表尾数据
tagged_node_handle tail2 = tail_.load();
/// 如果链表尾的数据满足cas条件
if( detail::likely( tail == tail2 ) ) {
/// 下一个节点为null
if( next_ptr == 0 ) {
/// 当前tail节点的下一个节点为分配节点
tagged_node_handle new_tail_next( node_handle, next.get_next_tag() );
/// 执行cas操作,修改tail->next = node
if( tail_node->next.compare_exchange_weak( next, new_tail_next ) ) {
/// 新tail节点为node节点,更新tail的tag
tagged_node_handle new_tail( node_handle, tail.get_next_tag() );
/// 执行cas操作,修改tail = node
tail_.compare_exchange_storage( tail, new_tail );
return true;
}
} else {
/// 如果tail节点的下一节点不为null,新tail节点指向下一节点,更新tail的tag
tagged_node_handle new_tail( pool.get_handle( next_ptr ), tail.get_next_tag() );
/// 执行cas操作tail = tail->next
tail_.compare_exchange_storage( tail, new_tail );
}
}
}
}
/*
* 出队操作实作
*/
template< typename U >
bool do_pop( U & ret )
{
for( ; ; ) {
/// 获取head节点
tagged_node_handle head = head_.load();
/// 获取head节点的内存值
QueueNode * head_ptr = pool.get_pointer( head );
/// 获取tail节点
tagged_node_handle tail = tail_.load();
/// 获取head下一节点,dummy节点保证head的next节点非空或链表为空
tagged_node_handle next = head_ptr->next.load();
/// 获取head下一节点内存
QueueNode * next_ptr = pool.get_pointer( next );
/// 获取head节点数据
tagged_node_handle head2 = head_.load();
/// 如果链头节点满足cas条件
if( detail::likely( head == head2 ) ) {
/// 如果链头和链尾相等
if( pool.get_handle( head ) == pool.get_handle( tail ) ) {
/// 如果链尾节点的下一节点为null,即链表为空
if( next_ptr == 0 ){
/// pop失败
return false;
}
/// 否则head和tail都指向dummy节点
/// 链尾向后移动一个节点
tagged_node_handle new_tail( pool.get_handle( next ), tail.get_next_tag() );
/// 指向cas,链尾向后移tail = tail->next
tail_.compare_exchange_storage( tail, new_tail );
} else {
if( next_ptr == 0 ){
/// 安全性考虑,本方案的实作代码下应该不会出现该场景
continue;
}
/// 拷贝下一节点的数据
detail::copy_padload( next_ptr->data, ret );
/// head向后移动一个节点
tagged_node_handle new_head( pool.get_handle( next ), head.get_next_tag() );
/// 执行cas操作,head向前移动一个节点,head = head->next
if( head_.compare_exchange_weak( head, new_head ) ){
/// 更新pool, pool指向old_head, pool->next = old_pool
pool.destruct( head );
return true;
}
}
}
}
}
/// 按照target_index对齐,即64字节
static const int padding_size = LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle);
/// 链表头
atomic< tagged_node_handle > head_;
/// 链表头对齐
char padding1[ padding_size ];
/// 链表尾
atomic< tagged_node_handle > tail_;
/// 链表尾对齐
char padding2[ padding_size ];
/// 内存操作对象
pool_t pool;
};
LockFree的数据流程图:
伪代码:
bool push( )
{
/// 获取pool
node = pool;
/// pool向后移位
pool = pool->next
/// 新pool的next为NULL
pool->next = NULL;
/// 如果node为空
if( node == NULL ) {
/// 无freelist节点
return false;
}
/// 如果tail的下一个节点不为空
if( tail->next != NULL ) {
/// tail为dummy,则tail后移一个几点
tail = tail->next
} else {
/// tail的下一个节点为node
tail->next = node;
/// 将node赋值给tail
tail = node;
}
}
bool pop()
{
/// 如果head和tail相等
if( head == tail ) {
if( head->next = NULL ) {
/// 链表为空
return false;
} else {
/// head链表指向dummy,tail向后移一位
tail = tail->next;
}
} else {
/// 保存老的head节点
old_head = head;
/// head向后移位
head = head->next;
/// 保存老的pool节点
old_pool = pool;
/// pool指向老的head节点
pool = old_head;
/// pool下一节点指向old_pool
pool->next = old_pool;
}
}
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/161672.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...