Boost之LockFree[通俗易懂]

Boost之LockFree[通俗易懂]Boost

大家好,又见面了,我是你们的朋友全栈君。

很久之前对Boost里面LockFree的相关代码进行阅读,今天对以前的一些笔记进行一下总结!

LockFree的基础知识涉及:Atomic(原子操作),CAS(Compay and swap),内存预分配;

Atomic原子操作是无锁的核心实现,原子操作的实质是通过使用CPU的一些特殊指令(通常为汇编代码指令)或操作系统封装的一些内存锁定接口(系统封装的内存保护接口)来对指定长度的内存进行访问和修改,因为访问内存的原子性从而实现上层接口的无锁操作;

CAS,核心代码如下:

int 
compare_and_swap (
int
* reg,
int 
oldval,
int 
newval)
{
  
ATOMIC();
  
int 
old_reg_val = *reg;
  
if 
(old_reg_val == oldval)
     
*reg = newval;
  
END_ATOMIC();
  
return 
old_reg_val;
}

实质是通过不断比较预期值和当前值之间的数值从而决定是否需要交换保护内存中的内容。

了解以上基础知识后我们再来看LockFree的代码

Boost里面LockFree的代码主要分为:fressList.hpp, fresslist_base.hpp, queue.hpp,三个文件的详细注释如下:

	/// 内存无锁管理类, pool始终指向freelist的第一个可消费节点
	template< typename T, typename NodeStorage = freelist_storage< T > >
	class freelist : NodeStorage
	{
	private:
		/// 内存操作磁头节点
		atomic< target_index > pool_;
		
		/// 用于映射T{ next, data }
		struct freelist_node
		{
			/// 当前节点的下一个节点索引
			target_index next;
		};
	public:	
		typedef target_index::index_t index_t;
		typedef target_index tagged_node_handle;
		
		/*
		 * 构造函数
		 * alloc 内存初始化函数
		 * count 预分配内存大小
		 */
		template< typename Allocator >
		freelist( Allocator alloc, std::size_t count )
		 : NodeStorage( alloc, count )
		 , pool_( target_index( static_cast<index_t>(count), 0 ) )
		{
			/// 初始化预分配内存,将节点串起来
			initialize();
		}
		
		/*
		 * 获取空索引
		 */
		index_t null_handle( void ) const
		{
			/// 返回一个无效的索引值,用于比较节点的合法性,类似NULL指针
			return static_cast< index_t >( NodeStorage::node_count() );
		}
		
		/*
		 * 根据指针获取内存索引值
		 */
		index_t get_handle( T * pointer ) const
		{
			if( pointer == NULL ){ 
				/// 如果指针为空,返回无效索引
				return null_handle();
			} else {
				/// 返回指针对应的偏移量
				return static_cast< index_t >( pointer - NodeStorage::nodes() );
			}
		}

		/*
		 * 根据target_handle获取内存索引值
		 */
		index_t get_handle( tagged_node_handle const& handle ) const
		{
			/// 返回对应指针的索引值
			return handle.get_index();
		}

		/*
		 * 根据target_handle获取内存指针
		 */
		T * get_pointer( tagged_node_handle const& tptr ) const
		{
			/// 调用内存索引获取内存指针接口
			return get_pointer( tptr.get_index() );
		}

		/*
		 * 根据内存索引获取内存指针
		 */
		T * get_pointer( index_t index ) const
		{
			/// 索引值为null时返回空指针
			if( index == null_handle() ) {
				return 0;
			} else {
				/// 返回对应的指针地址
				return NodeStorage::nodes() + index;
			}
		}

		/*
		 * 消费一个内存节点, 构造节点传一个入参
		 */
		template< typename ArgType >
		T * construct( ArgType const& a )
		{
			/// 消费一个节点,返回对应节点的索引
			index_t node_index = allocate();
			if( node_index == null_handle() ){
				/// 如果无节点消费,返回NULL指针
				return NULL;
			}
			/// 获得对应索引的内存地址
			T * node = NodeStorage::nodes() + node_index;
			/// 在对应内存上构造数据
			new ( node ) T ( a );
			return node;
		}
		
		/*
		 * 消费一个内存节点, 构造节点传两个入参
		 */
		template< typename ArgType1, typename ArgType2 >
		T * construct( ArgType1 const& a1, ArgType2 const& a2 )
		{
			/// 消费一个节点,返回对应节点的索引
			index_t node_index = allocate();
			if( node_index == null_handle() ){
				/// 如果无节点消费,返回NULL指针
				return NULL;
			}
			/// 获得对应索引的内存地址
			T * node = NodeStorage::nodes() + node_index;
			/// 在对应内存上构造数据
			new ( node ) T ( a1, a2 );
			return node;
		}
		
		/*
		 * 回收对应索引下的内存节点,生产者接口
		 */
		void destruct( target_index const& ti )
		{
			/// 得到节点内存索引值
			index_t index = ti.get_index();
			/// 得到对应索引的内存地址
			T * n = NodeStorage::nodes() + index;
			( void )( n );
			/// 执行对象析构函数,此处只析构对象,不析构内存
			n->~T();
			/// 回收对应节点,生产freenode
			deallocate( index );
		}
		
		/*
		 * 回收对应指针节点,生产者接口
		 */
		void destruct( T * n )
		{
			/// 执行对象析构函数,此处只析构对象,不析构内存
			n->~T();
			/// 回收对应节点,生产freenode
			deallocate( get_handle( n ) );
		}
		
	private:
		/*
		 * 将存储节串成一个链,链头为最后一个节点,链尾为null节点
		 */
		void initialize( void )
		{
			/// 将节点以dummy为尾节点串起来
			for( std::size_t i = 0; i != NodeStorage::node_count(); ++i ){
				/// 初始化节点的内存
				target_index * next_index = reinterpret_cast< target_index* >( NodeStorage::nodes() + i );
				/// 设置节点的next为null
				next_index->set_index( null_handle() );
				/// 每执行一次,pool向后移动一个节点
				deallocate( static_cast< index_t >( i ) );
			}
		}
		
		/*
		 * 调整pool消费一个free节点
		 * pool指向下一个free节点,采用CAS原语操作pool
		 */
		index_t allocate( void )
		{
			/// 得到当前磁头所在节点索引
			target_index old_pool = pool_.load();
			
			for( ; ; ){
				/// 获得磁头的索引
				index_t index = old_pool.get_index();
				if( index == null_handle() ){
					/// 如果磁头指向空,说明队列为空,消费失败
					return index;
				}
				/// 得到磁头指向的内存指针
				T * old_node = NodeStorage::nodes() + index;
				/// 获取内存数据,这里利用结构体的特性进行强项转换,将{ next, data }转换为next
				target_index * next_index = reinterpret_cast< target_index* >( old_node );
				/// 新磁头位置为: ( 下一个节点索引,当前tag的下一个tag )
				target_index new_pool( next_index->get_index(), old_pool.get_next_tag() );
				
				if( pool_.compare_exchange_weak( old_pool, new_pool ) ){
					/// 执行CAS,修改磁头指向的索引值,并返回老磁头指向节点的索引
					return old_pool.get_index();
				}
			}
		}
		
		/*
		 * 调整pool磁头生产一个free节点
		 * pool指向index索引节点,old_pool->next指向新的pool节点,采用CAS原语操作pool
		 */
		void deallocate( index_t index )
		{
			/// 获取回收节点指向的内存
			freelist_node * new_pool_node = reinterpret_cast< freelist_node* >(
				NodeStorage::nodes() + index );
			/// 获得磁头的索引
			target_index old_pool = pool_.load();
			
			for( ; ; ){
				/// 新磁头位置: ( 回收节点索引,磁头指向节点的tag )
				target_index new_pool( index, old_pool.get_tag() );
				/// 修改回收节点指向内存的next节点的索引
				new_pool_node->next.set_index( old_pool.get_index() );
				if( pool_.compare_exchange_weak( old_pool, new_pool ) ){
					/// 执行CAS,修改磁头指向的索引值
					break;
				}
			}
		}
	};

/// 节点存储类
	template< typename T, typename Alloc = std::allocator<T> >
	class freelist_storage : Alloc
	{
	private:
		/// 内存起始指针
		T * nodes_;
		/// 内存节点个数
		std::size_t count_;
		
	public:
		/*
		 * 构造函数
		 */
		template< typename Allocator >
		freelist_storage( Allocator alloc, std::size_t count )
		 : Alloc( alloc )
		 , count_( count )
		{
			/// 预分配内存
			nodes_ = Alloc::allocate( count );
		}
		
		/*
		 * 析构函数
		 */
		~freelist_storage()
		{
			/// 销毁预分配内粗
			Alloc::deallocate( nodes_, count_ );
			nodes_ = NULL;
		}
		
		/*
		 * 预分配内存起始指针
		 */
		T * nodes() const
		{
			return nodes_;
		}
		
		/*
		 * 预分配内存的数量
		 */
		std::size_t node_count() const
		{
			return count_;
		}
	};

	/// 内存管理类,占用32位
	class target_index
	{
	public:
		typedef unsigned short tag_t; /// 16bit
		typedef unsigned short index_t; /// 16bit

		target_index()
		 : index_( 0 )
		 , target_( 0 )
		{
		}
		
		/*
		 * 构造函数
		 * idx 内存索引
		 * tag 内存当前状态标识
		 */
		target_index( index_t idx, tag_t tag = 0 )
		 : index_( idx )
		 , target_( tag )
		{
		}
		
		/*
		 * 拷贝构造
		 */
		target_index( target_index const& r )
		 : index_( r.index_ )
		 , target_( r.target_ )
		{
		}
		
		/*
		 * 设置tag
		 */
		void set_target( tag_t tag )
		{
			target_ = tag;
		}
		
		/*
		 * 获取tag值
		 */
		tag_t get_tag( ) const
		{
			return target_;
		}
		
		/*
		 * 获取下一个tag值
		 */
		tag_t get_next_tag() const
		{
			tag_t next = ( get_tag() + 1 ) & (std::numeric_limits<tag_t>::max)();
			return next;
		}
		
		/*
		 * 设置内存索引值
		 */
		void set_index( index_t idx )
		{
			index_ = idx;
		}
		
		/*
		 * 获取内存索引值
		 */
		index_t get_index( ) const
		{
			return index_;
		}
		
		/*
		 * 重载等于
		 */
		bool operator == ( target_index const& r )
		{
			return ( target_ == r.target_ ) && ( index_ == r.index_ );
		}
		
		/*
		 * 重载不等于
		 */
		bool operator != ( target_index const& r )
		{
			return !operator==( r );
		}
	private:
		index_t index_;
		tag_t target_;
	};
/*
	 * 判断表达式是否相等
	 */
	static inline bool likely( bool expr )
	{
#ifdef __GNUC__
		return __builtin_expect( expr, true );
#else
		return ( expr );
#endif
	}
	
	/*
	 * 隐式构造的拷贝
	 */
	template< typename T, typename U >
	static inline void copy_padload( T & l, U & r )
	{
		r = ( U )l;
	}

} /// namespace detail

	/*
	 * 无锁队列
	 */
	template< typename T >class TQueue
	{
	private:
		/// 队列节点,按照64字节对齐,{ index, tag }
		struct ALIGNMENT( LOCKFREE_CACHELINE_BYTES ) QueueNode
		{
			typedef target_index tagged_node_handle;
			typedef target_index::tag_t handle_type;
			
			/// 存储数据
			QueueNode( T const& v, handle_type null_handle )
			 : data( v )
			{
				/// 默认构造target_index,获取next
				tagged_node_handle old_next = next.load();
				/// 设置新的next为( index, 增加tag )
				tagged_node_handle new_next( null_handle, old_next.get_next_tag() );
				/// 存储next节点
				next.store( new_next );
			}
			
			/// 用索引值初始化节点
			QueueNode( handle_type null_handle )
			 : next( tagged_node_handle( null_handle ) )
			{
			}
			
			/// 默认初始化
			QueueNode( void )
			{
			}
			
			/// 指向下一个索引节点,队列元素
			atomic< tagged_node_handle > next;
			/// 存放存储数据
			T data;
		};
		
		typedef std::allocator< QueueNode > node_allocator;
		typedef freelist< QueueNode > pool_t;
		typedef typename pool_t::index_t handle_type;
		typedef typename pool_t::tagged_node_handle tagged_node_handle;
		
		void initialize( void )
		{
			/// 初始化head和tail,head和tail指向null_handle
			QueueNode * n = pool.construct( pool.null_handle() );
			/// dummy_node( n, 0 )
			tagged_node_handle dummy_node( pool.get_handle(n), 0 );
			/// head指向target_index( n, 0 )
			head_.store( dummy_node );
			/// tail指向target_index( n, 0 )
			tail_.store( dummy_node );
		}
		
	public:
		/*
		 * 构造函数
		 */
		explicit TQueue( size_t n )
		 : head_( tagged_node_handle( 0, 0 ) )
		 , tail_( tagged_node_handle( 0, 0 ) )
		 , pool( node_allocator(), n + 1 )
		{
			/// 初始化n+1个节点,其中第n+1个节点为dummy
			initialize();
		}
		
		/*
		 * 析构函数
		 */
		~TQueue()
		{
			T dummy;
			/// 执行每一个节点的析构函数
			while( pop( dummy ) ) {}
			/// 执行最后一个指向节点的析构函数
			pool.destruct( head_.load() );
		}
		
		/*
		 * 判断链表是否为空
		 */
		bool empty() const
		{
			/// 使用head==tail判断当前队列是否为空
			return pool.get_handle( head_.load() ) == pool.get_handle( tail_.load() );
		}
		
		/*
		 * 队列入队操作
		 */
		bool push( T const& v )
		{
			/// 执行push
			return do_push( v );
		}
		
		/*
		 * 队列的出队操作
		 */
		bool pop( T & ret )
		{
			/// 执行pop
			return do_pop< T >( ret );
		}
		
		/*
		 * 消费一个元素节点
		 */
		template< typename Functor >
		bool consume_one( Functor const& f )
		{
			/// 数据临时缓存
			T element;
			/// 获取最后一个节点数据
			bool success = pop( element );
			/// 执行消费回调
			if( success ) { f( element ); }
			/// 如果链表有数据,默认消费成功
			return success;
		}
		
		/*
		 * 消费所有元素节点
		 */
		template< typename Functor >
		size_t consume_all( Functor const& f )
		{
			/// 链表元素个数
			size_t element_count = 0;
			/// 处理一个链表节点
			while( consume_one( f ) ){
				/// 累加处理个数
				++element_count;
			}
			/// 返回链表处理的个数
			return element_count;
		}
		
	private:
		/*
		 * 入队操作实作
		 */
		bool do_push( T const& v )
		{
			/// 构造一个新的节点,新节点next指向null_handle( NULL )
			QueueNode * n = pool.construct( v, pool.null_handle() );
			/// 获得节点的索引
			handle_type node_handle = pool.get_handle( n );
			
			if( n == NULL ) {
				/// 如果构造节点为null
				return false;
			}
			
			for( ; ; ) {
				/// 获取尾节点tail
				tagged_node_handle tail = tail_.load();
				/// 获取尾节点内存指针
				QueueNode * tail_node = pool.get_pointer( tail );
				/// 获取尾节点的next指针
				tagged_node_handle next = tail_node->next.load();
				/// 获取尾节点的next节点指针
				QueueNode * next_ptr = pool.get_pointer( next );

				/// 获取链表尾数据
				tagged_node_handle tail2 = tail_.load();
				/// 如果链表尾的数据满足cas条件
				if( detail::likely( tail == tail2 ) ) {
					/// 下一个节点为null
					if( next_ptr == 0 ) {
						/// 当前tail节点的下一个节点为分配节点
						tagged_node_handle new_tail_next( node_handle, next.get_next_tag() );
						/// 执行cas操作,修改tail->next = node
						if( tail_node->next.compare_exchange_weak( next, new_tail_next ) ) {
							/// 新tail节点为node节点,更新tail的tag
							tagged_node_handle new_tail( node_handle, tail.get_next_tag() );
							/// 执行cas操作,修改tail = node
							tail_.compare_exchange_storage( tail, new_tail );
							return true;
						}
					} else {
						/// 如果tail节点的下一节点不为null,新tail节点指向下一节点,更新tail的tag
						tagged_node_handle new_tail( pool.get_handle( next_ptr ), tail.get_next_tag() );
						/// 执行cas操作tail = tail->next
						tail_.compare_exchange_storage( tail, new_tail );
					}
				}
			}
		}
		
		/*
		 * 出队操作实作
		 */
		template< typename U >
		bool do_pop( U & ret )
		{
			for( ; ; ) {
				/// 获取head节点
				tagged_node_handle head = head_.load();
				/// 获取head节点的内存值
				QueueNode * head_ptr = pool.get_pointer( head );
				
				/// 获取tail节点
				tagged_node_handle tail = tail_.load();
				/// 获取head下一节点,dummy节点保证head的next节点非空或链表为空
				tagged_node_handle next = head_ptr->next.load();
				/// 获取head下一节点内存
				QueueNode * next_ptr = pool.get_pointer( next );
				
				/// 获取head节点数据
				tagged_node_handle head2 = head_.load();
				/// 如果链头节点满足cas条件
				if( detail::likely( head == head2 ) ) {
					/// 如果链头和链尾相等
					if( pool.get_handle( head ) == pool.get_handle( tail ) ) {
						/// 如果链尾节点的下一节点为null,即链表为空
						if( next_ptr == 0 ){
							/// pop失败
							return false;
						}
						/// 否则head和tail都指向dummy节点
						/// 链尾向后移动一个节点
						tagged_node_handle new_tail( pool.get_handle( next ), tail.get_next_tag() );
						/// 指向cas,链尾向后移tail = tail->next
						tail_.compare_exchange_storage( tail, new_tail );
					} else {
						if( next_ptr == 0 ){
							/// 安全性考虑,本方案的实作代码下应该不会出现该场景
							continue;
						}
						/// 拷贝下一节点的数据
						detail::copy_padload( next_ptr->data, ret );
						/// head向后移动一个节点
						tagged_node_handle new_head( pool.get_handle( next ), head.get_next_tag() );
						/// 执行cas操作,head向前移动一个节点,head = head->next
						if( head_.compare_exchange_weak( head, new_head ) ){
							/// 更新pool, pool指向old_head, pool->next = old_pool
							pool.destruct( head );
							return true;
						}
					}
				}
			}
		}
		
		/// 按照target_index对齐,即64字节
		static const int padding_size = LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle);
		/// 链表头
		atomic< tagged_node_handle > head_;
		/// 链表头对齐
		char padding1[ padding_size ];
		/// 链表尾
		atomic< tagged_node_handle > tail_;
		/// 链表尾对齐
		char padding2[ padding_size ];
		/// 内存操作对象
		pool_t pool;
	};

LockFree的数据流程图:

Boost之LockFree[通俗易懂]

Boost之LockFree[通俗易懂]

Boost之LockFree[通俗易懂]

Boost之LockFree[通俗易懂]

伪代码:
bool push( )
{
	/// 获取pool
	node = pool;
	/// pool向后移位
	pool = pool->next
	/// 新pool的next为NULL
	pool->next = NULL;
	/// 如果node为空
	if( node == NULL ) {
		/// 无freelist节点
		return false;
	}
	/// 如果tail的下一个节点不为空
	if( tail->next != NULL ) {
		/// tail为dummy,则tail后移一个几点
		tail = tail->next
	} else {
		/// tail的下一个节点为node
		tail->next = node;
		/// 将node赋值给tail
		tail = node;
	}
}

bool pop()
{
	/// 如果head和tail相等
	if( head == tail ) {
		if( head->next = NULL ) {
			/// 链表为空
			return false;
		} else {
			/// head链表指向dummy,tail向后移一位
			tail = tail->next;
		}
	} else {
		/// 保存老的head节点
		old_head = head;
		/// head向后移位
		head = head->next;
		/// 保存老的pool节点
		old_pool = pool;
		/// pool指向老的head节点
		pool = old_head;
		/// pool下一节点指向old_pool
		pool->next = old_pool;
	}
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/161672.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • JavaScript概要

    JavaScript概要

  • Lena图像分解成小块与从小块合成

    Lena图像分解成小块与从小块合成 ➤01背景在2020年人工神经网络课程第一次作业第八题中需要对Lena图像使用AutoEncode网络进行压缩。将Lena(灰度图像)拆解成不同尺寸的大小形成训练压缩样本过程;或者从训练结果重新组合成Lena灰度图像是实验的基础。▲Lena灰度图像下面给出相关操作的Python程序和相关的结果。主要操作包括:将512×512的Lena灰度图片(0~255)分割成边长8~16的图像块,并通过行扫描形行向量;对图像进行归一化,形成数据在-0.5~0.5之

  • 中国移动校园WLAN客户端及使用方法「建议收藏」

    中国移动校园WLAN客户端及使用方法「建议收藏」学校终于覆盖了移动WLAN,坑爹的是信息中心没有给任何使用说明,给很多同学使用造成了障碍,现在把使用方法做一个简单的总结。文章中软件打包下载:http://pan.baidu.com/share/l

  • 如何免费的、完整的把 PDF 转换为 Word?

    如何免费的、完整的把 PDF 转换为 Word?先给大家打个预防针:由于PDF文件本身的特性,想要百分百完美地将它转回Word格式基本上是不可能的!我们都知道PDF是一种不能编辑的文件,如果要将pdf文件转换成word文件又该怎样转换呢?其实我们可以借助工具实现pdf转换成word的操作哦。你有没有遇到过要将PDF转换为Word需求的时候,找了一大堆工具,进行转换发现有一些转换不了,有一些转换限制,需要付费才能全部转换完成。我最近也是遇到有一个需求需要将PDF转换为Wold,找了好几个工具都是收费的(因为只是转换一两次,没必要充个会员,不.

  • Latex公式换行与对齐

    Latex公式换行与对齐Latex公式换行与对齐1、应用amsmath包:\usepackage{amsmath}2、公式中用aligned:\begin{equation}\begin{aligned}……\end{aligned}\end{equation}或者直接用align:\begin{align}……\end{align}3、换行时用“\”换行符\begin{align}a=1+2+3+4+5\\=6+9\end{align}效果:4、左对齐用“&amp

  • MySQL规范

    MySQL规范

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号