Boost之LockFree[通俗易懂]

Boost之LockFree[通俗易懂]Boost

大家好,又见面了,我是你们的朋友全栈君。

很久之前对Boost里面LockFree的相关代码进行阅读,今天对以前的一些笔记进行一下总结!

LockFree的基础知识涉及:Atomic(原子操作),CAS(Compay and swap),内存预分配;

Atomic原子操作是无锁的核心实现,原子操作的实质是通过使用CPU的一些特殊指令(通常为汇编代码指令)或操作系统封装的一些内存锁定接口(系统封装的内存保护接口)来对指定长度的内存进行访问和修改,因为访问内存的原子性从而实现上层接口的无锁操作;

CAS,核心代码如下:

int 
compare_and_swap (
int
* reg,
int 
oldval,
int 
newval)
{
  
ATOMIC();
  
int 
old_reg_val = *reg;
  
if 
(old_reg_val == oldval)
     
*reg = newval;
  
END_ATOMIC();
  
return 
old_reg_val;
}

实质是通过不断比较预期值和当前值之间的数值从而决定是否需要交换保护内存中的内容。

了解以上基础知识后我们再来看LockFree的代码

Boost里面LockFree的代码主要分为:fressList.hpp, fresslist_base.hpp, queue.hpp,三个文件的详细注释如下:

	/// 内存无锁管理类, pool始终指向freelist的第一个可消费节点
	template< typename T, typename NodeStorage = freelist_storage< T > >
	class freelist : NodeStorage
	{
	private:
		/// 内存操作磁头节点
		atomic< target_index > pool_;
		
		/// 用于映射T{ next, data }
		struct freelist_node
		{
			/// 当前节点的下一个节点索引
			target_index next;
		};
	public:	
		typedef target_index::index_t index_t;
		typedef target_index tagged_node_handle;
		
		/*
		 * 构造函数
		 * alloc 内存初始化函数
		 * count 预分配内存大小
		 */
		template< typename Allocator >
		freelist( Allocator alloc, std::size_t count )
		 : NodeStorage( alloc, count )
		 , pool_( target_index( static_cast<index_t>(count), 0 ) )
		{
			/// 初始化预分配内存,将节点串起来
			initialize();
		}
		
		/*
		 * 获取空索引
		 */
		index_t null_handle( void ) const
		{
			/// 返回一个无效的索引值,用于比较节点的合法性,类似NULL指针
			return static_cast< index_t >( NodeStorage::node_count() );
		}
		
		/*
		 * 根据指针获取内存索引值
		 */
		index_t get_handle( T * pointer ) const
		{
			if( pointer == NULL ){ 
				/// 如果指针为空,返回无效索引
				return null_handle();
			} else {
				/// 返回指针对应的偏移量
				return static_cast< index_t >( pointer - NodeStorage::nodes() );
			}
		}

		/*
		 * 根据target_handle获取内存索引值
		 */
		index_t get_handle( tagged_node_handle const& handle ) const
		{
			/// 返回对应指针的索引值
			return handle.get_index();
		}

		/*
		 * 根据target_handle获取内存指针
		 */
		T * get_pointer( tagged_node_handle const& tptr ) const
		{
			/// 调用内存索引获取内存指针接口
			return get_pointer( tptr.get_index() );
		}

		/*
		 * 根据内存索引获取内存指针
		 */
		T * get_pointer( index_t index ) const
		{
			/// 索引值为null时返回空指针
			if( index == null_handle() ) {
				return 0;
			} else {
				/// 返回对应的指针地址
				return NodeStorage::nodes() + index;
			}
		}

		/*
		 * 消费一个内存节点, 构造节点传一个入参
		 */
		template< typename ArgType >
		T * construct( ArgType const& a )
		{
			/// 消费一个节点,返回对应节点的索引
			index_t node_index = allocate();
			if( node_index == null_handle() ){
				/// 如果无节点消费,返回NULL指针
				return NULL;
			}
			/// 获得对应索引的内存地址
			T * node = NodeStorage::nodes() + node_index;
			/// 在对应内存上构造数据
			new ( node ) T ( a );
			return node;
		}
		
		/*
		 * 消费一个内存节点, 构造节点传两个入参
		 */
		template< typename ArgType1, typename ArgType2 >
		T * construct( ArgType1 const& a1, ArgType2 const& a2 )
		{
			/// 消费一个节点,返回对应节点的索引
			index_t node_index = allocate();
			if( node_index == null_handle() ){
				/// 如果无节点消费,返回NULL指针
				return NULL;
			}
			/// 获得对应索引的内存地址
			T * node = NodeStorage::nodes() + node_index;
			/// 在对应内存上构造数据
			new ( node ) T ( a1, a2 );
			return node;
		}
		
		/*
		 * 回收对应索引下的内存节点,生产者接口
		 */
		void destruct( target_index const& ti )
		{
			/// 得到节点内存索引值
			index_t index = ti.get_index();
			/// 得到对应索引的内存地址
			T * n = NodeStorage::nodes() + index;
			( void )( n );
			/// 执行对象析构函数,此处只析构对象,不析构内存
			n->~T();
			/// 回收对应节点,生产freenode
			deallocate( index );
		}
		
		/*
		 * 回收对应指针节点,生产者接口
		 */
		void destruct( T * n )
		{
			/// 执行对象析构函数,此处只析构对象,不析构内存
			n->~T();
			/// 回收对应节点,生产freenode
			deallocate( get_handle( n ) );
		}
		
	private:
		/*
		 * 将存储节串成一个链,链头为最后一个节点,链尾为null节点
		 */
		void initialize( void )
		{
			/// 将节点以dummy为尾节点串起来
			for( std::size_t i = 0; i != NodeStorage::node_count(); ++i ){
				/// 初始化节点的内存
				target_index * next_index = reinterpret_cast< target_index* >( NodeStorage::nodes() + i );
				/// 设置节点的next为null
				next_index->set_index( null_handle() );
				/// 每执行一次,pool向后移动一个节点
				deallocate( static_cast< index_t >( i ) );
			}
		}
		
		/*
		 * 调整pool消费一个free节点
		 * pool指向下一个free节点,采用CAS原语操作pool
		 */
		index_t allocate( void )
		{
			/// 得到当前磁头所在节点索引
			target_index old_pool = pool_.load();
			
			for( ; ; ){
				/// 获得磁头的索引
				index_t index = old_pool.get_index();
				if( index == null_handle() ){
					/// 如果磁头指向空,说明队列为空,消费失败
					return index;
				}
				/// 得到磁头指向的内存指针
				T * old_node = NodeStorage::nodes() + index;
				/// 获取内存数据,这里利用结构体的特性进行强项转换,将{ next, data }转换为next
				target_index * next_index = reinterpret_cast< target_index* >( old_node );
				/// 新磁头位置为: ( 下一个节点索引,当前tag的下一个tag )
				target_index new_pool( next_index->get_index(), old_pool.get_next_tag() );
				
				if( pool_.compare_exchange_weak( old_pool, new_pool ) ){
					/// 执行CAS,修改磁头指向的索引值,并返回老磁头指向节点的索引
					return old_pool.get_index();
				}
			}
		}
		
		/*
		 * 调整pool磁头生产一个free节点
		 * pool指向index索引节点,old_pool->next指向新的pool节点,采用CAS原语操作pool
		 */
		void deallocate( index_t index )
		{
			/// 获取回收节点指向的内存
			freelist_node * new_pool_node = reinterpret_cast< freelist_node* >(
				NodeStorage::nodes() + index );
			/// 获得磁头的索引
			target_index old_pool = pool_.load();
			
			for( ; ; ){
				/// 新磁头位置: ( 回收节点索引,磁头指向节点的tag )
				target_index new_pool( index, old_pool.get_tag() );
				/// 修改回收节点指向内存的next节点的索引
				new_pool_node->next.set_index( old_pool.get_index() );
				if( pool_.compare_exchange_weak( old_pool, new_pool ) ){
					/// 执行CAS,修改磁头指向的索引值
					break;
				}
			}
		}
	};

/// 节点存储类
	template< typename T, typename Alloc = std::allocator<T> >
	class freelist_storage : Alloc
	{
	private:
		/// 内存起始指针
		T * nodes_;
		/// 内存节点个数
		std::size_t count_;
		
	public:
		/*
		 * 构造函数
		 */
		template< typename Allocator >
		freelist_storage( Allocator alloc, std::size_t count )
		 : Alloc( alloc )
		 , count_( count )
		{
			/// 预分配内存
			nodes_ = Alloc::allocate( count );
		}
		
		/*
		 * 析构函数
		 */
		~freelist_storage()
		{
			/// 销毁预分配内粗
			Alloc::deallocate( nodes_, count_ );
			nodes_ = NULL;
		}
		
		/*
		 * 预分配内存起始指针
		 */
		T * nodes() const
		{
			return nodes_;
		}
		
		/*
		 * 预分配内存的数量
		 */
		std::size_t node_count() const
		{
			return count_;
		}
	};

	/// 内存管理类,占用32位
	class target_index
	{
	public:
		typedef unsigned short tag_t; /// 16bit
		typedef unsigned short index_t; /// 16bit

		target_index()
		 : index_( 0 )
		 , target_( 0 )
		{
		}
		
		/*
		 * 构造函数
		 * idx 内存索引
		 * tag 内存当前状态标识
		 */
		target_index( index_t idx, tag_t tag = 0 )
		 : index_( idx )
		 , target_( tag )
		{
		}
		
		/*
		 * 拷贝构造
		 */
		target_index( target_index const& r )
		 : index_( r.index_ )
		 , target_( r.target_ )
		{
		}
		
		/*
		 * 设置tag
		 */
		void set_target( tag_t tag )
		{
			target_ = tag;
		}
		
		/*
		 * 获取tag值
		 */
		tag_t get_tag( ) const
		{
			return target_;
		}
		
		/*
		 * 获取下一个tag值
		 */
		tag_t get_next_tag() const
		{
			tag_t next = ( get_tag() + 1 ) & (std::numeric_limits<tag_t>::max)();
			return next;
		}
		
		/*
		 * 设置内存索引值
		 */
		void set_index( index_t idx )
		{
			index_ = idx;
		}
		
		/*
		 * 获取内存索引值
		 */
		index_t get_index( ) const
		{
			return index_;
		}
		
		/*
		 * 重载等于
		 */
		bool operator == ( target_index const& r )
		{
			return ( target_ == r.target_ ) && ( index_ == r.index_ );
		}
		
		/*
		 * 重载不等于
		 */
		bool operator != ( target_index const& r )
		{
			return !operator==( r );
		}
	private:
		index_t index_;
		tag_t target_;
	};
/*
	 * 判断表达式是否相等
	 */
	static inline bool likely( bool expr )
	{
#ifdef __GNUC__
		return __builtin_expect( expr, true );
#else
		return ( expr );
#endif
	}
	
	/*
	 * 隐式构造的拷贝
	 */
	template< typename T, typename U >
	static inline void copy_padload( T & l, U & r )
	{
		r = ( U )l;
	}

} /// namespace detail

	/*
	 * 无锁队列
	 */
	template< typename T >class TQueue
	{
	private:
		/// 队列节点,按照64字节对齐,{ index, tag }
		struct ALIGNMENT( LOCKFREE_CACHELINE_BYTES ) QueueNode
		{
			typedef target_index tagged_node_handle;
			typedef target_index::tag_t handle_type;
			
			/// 存储数据
			QueueNode( T const& v, handle_type null_handle )
			 : data( v )
			{
				/// 默认构造target_index,获取next
				tagged_node_handle old_next = next.load();
				/// 设置新的next为( index, 增加tag )
				tagged_node_handle new_next( null_handle, old_next.get_next_tag() );
				/// 存储next节点
				next.store( new_next );
			}
			
			/// 用索引值初始化节点
			QueueNode( handle_type null_handle )
			 : next( tagged_node_handle( null_handle ) )
			{
			}
			
			/// 默认初始化
			QueueNode( void )
			{
			}
			
			/// 指向下一个索引节点,队列元素
			atomic< tagged_node_handle > next;
			/// 存放存储数据
			T data;
		};
		
		typedef std::allocator< QueueNode > node_allocator;
		typedef freelist< QueueNode > pool_t;
		typedef typename pool_t::index_t handle_type;
		typedef typename pool_t::tagged_node_handle tagged_node_handle;
		
		void initialize( void )
		{
			/// 初始化head和tail,head和tail指向null_handle
			QueueNode * n = pool.construct( pool.null_handle() );
			/// dummy_node( n, 0 )
			tagged_node_handle dummy_node( pool.get_handle(n), 0 );
			/// head指向target_index( n, 0 )
			head_.store( dummy_node );
			/// tail指向target_index( n, 0 )
			tail_.store( dummy_node );
		}
		
	public:
		/*
		 * 构造函数
		 */
		explicit TQueue( size_t n )
		 : head_( tagged_node_handle( 0, 0 ) )
		 , tail_( tagged_node_handle( 0, 0 ) )
		 , pool( node_allocator(), n + 1 )
		{
			/// 初始化n+1个节点,其中第n+1个节点为dummy
			initialize();
		}
		
		/*
		 * 析构函数
		 */
		~TQueue()
		{
			T dummy;
			/// 执行每一个节点的析构函数
			while( pop( dummy ) ) {}
			/// 执行最后一个指向节点的析构函数
			pool.destruct( head_.load() );
		}
		
		/*
		 * 判断链表是否为空
		 */
		bool empty() const
		{
			/// 使用head==tail判断当前队列是否为空
			return pool.get_handle( head_.load() ) == pool.get_handle( tail_.load() );
		}
		
		/*
		 * 队列入队操作
		 */
		bool push( T const& v )
		{
			/// 执行push
			return do_push( v );
		}
		
		/*
		 * 队列的出队操作
		 */
		bool pop( T & ret )
		{
			/// 执行pop
			return do_pop< T >( ret );
		}
		
		/*
		 * 消费一个元素节点
		 */
		template< typename Functor >
		bool consume_one( Functor const& f )
		{
			/// 数据临时缓存
			T element;
			/// 获取最后一个节点数据
			bool success = pop( element );
			/// 执行消费回调
			if( success ) { f( element ); }
			/// 如果链表有数据,默认消费成功
			return success;
		}
		
		/*
		 * 消费所有元素节点
		 */
		template< typename Functor >
		size_t consume_all( Functor const& f )
		{
			/// 链表元素个数
			size_t element_count = 0;
			/// 处理一个链表节点
			while( consume_one( f ) ){
				/// 累加处理个数
				++element_count;
			}
			/// 返回链表处理的个数
			return element_count;
		}
		
	private:
		/*
		 * 入队操作实作
		 */
		bool do_push( T const& v )
		{
			/// 构造一个新的节点,新节点next指向null_handle( NULL )
			QueueNode * n = pool.construct( v, pool.null_handle() );
			/// 获得节点的索引
			handle_type node_handle = pool.get_handle( n );
			
			if( n == NULL ) {
				/// 如果构造节点为null
				return false;
			}
			
			for( ; ; ) {
				/// 获取尾节点tail
				tagged_node_handle tail = tail_.load();
				/// 获取尾节点内存指针
				QueueNode * tail_node = pool.get_pointer( tail );
				/// 获取尾节点的next指针
				tagged_node_handle next = tail_node->next.load();
				/// 获取尾节点的next节点指针
				QueueNode * next_ptr = pool.get_pointer( next );

				/// 获取链表尾数据
				tagged_node_handle tail2 = tail_.load();
				/// 如果链表尾的数据满足cas条件
				if( detail::likely( tail == tail2 ) ) {
					/// 下一个节点为null
					if( next_ptr == 0 ) {
						/// 当前tail节点的下一个节点为分配节点
						tagged_node_handle new_tail_next( node_handle, next.get_next_tag() );
						/// 执行cas操作,修改tail->next = node
						if( tail_node->next.compare_exchange_weak( next, new_tail_next ) ) {
							/// 新tail节点为node节点,更新tail的tag
							tagged_node_handle new_tail( node_handle, tail.get_next_tag() );
							/// 执行cas操作,修改tail = node
							tail_.compare_exchange_storage( tail, new_tail );
							return true;
						}
					} else {
						/// 如果tail节点的下一节点不为null,新tail节点指向下一节点,更新tail的tag
						tagged_node_handle new_tail( pool.get_handle( next_ptr ), tail.get_next_tag() );
						/// 执行cas操作tail = tail->next
						tail_.compare_exchange_storage( tail, new_tail );
					}
				}
			}
		}
		
		/*
		 * 出队操作实作
		 */
		template< typename U >
		bool do_pop( U & ret )
		{
			for( ; ; ) {
				/// 获取head节点
				tagged_node_handle head = head_.load();
				/// 获取head节点的内存值
				QueueNode * head_ptr = pool.get_pointer( head );
				
				/// 获取tail节点
				tagged_node_handle tail = tail_.load();
				/// 获取head下一节点,dummy节点保证head的next节点非空或链表为空
				tagged_node_handle next = head_ptr->next.load();
				/// 获取head下一节点内存
				QueueNode * next_ptr = pool.get_pointer( next );
				
				/// 获取head节点数据
				tagged_node_handle head2 = head_.load();
				/// 如果链头节点满足cas条件
				if( detail::likely( head == head2 ) ) {
					/// 如果链头和链尾相等
					if( pool.get_handle( head ) == pool.get_handle( tail ) ) {
						/// 如果链尾节点的下一节点为null,即链表为空
						if( next_ptr == 0 ){
							/// pop失败
							return false;
						}
						/// 否则head和tail都指向dummy节点
						/// 链尾向后移动一个节点
						tagged_node_handle new_tail( pool.get_handle( next ), tail.get_next_tag() );
						/// 指向cas,链尾向后移tail = tail->next
						tail_.compare_exchange_storage( tail, new_tail );
					} else {
						if( next_ptr == 0 ){
							/// 安全性考虑,本方案的实作代码下应该不会出现该场景
							continue;
						}
						/// 拷贝下一节点的数据
						detail::copy_padload( next_ptr->data, ret );
						/// head向后移动一个节点
						tagged_node_handle new_head( pool.get_handle( next ), head.get_next_tag() );
						/// 执行cas操作,head向前移动一个节点,head = head->next
						if( head_.compare_exchange_weak( head, new_head ) ){
							/// 更新pool, pool指向old_head, pool->next = old_pool
							pool.destruct( head );
							return true;
						}
					}
				}
			}
		}
		
		/// 按照target_index对齐,即64字节
		static const int padding_size = LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle);
		/// 链表头
		atomic< tagged_node_handle > head_;
		/// 链表头对齐
		char padding1[ padding_size ];
		/// 链表尾
		atomic< tagged_node_handle > tail_;
		/// 链表尾对齐
		char padding2[ padding_size ];
		/// 内存操作对象
		pool_t pool;
	};

LockFree的数据流程图:

Boost之LockFree[通俗易懂]

Boost之LockFree[通俗易懂]

Boost之LockFree[通俗易懂]

Boost之LockFree[通俗易懂]

伪代码:
bool push( )
{
	/// 获取pool
	node = pool;
	/// pool向后移位
	pool = pool->next
	/// 新pool的next为NULL
	pool->next = NULL;
	/// 如果node为空
	if( node == NULL ) {
		/// 无freelist节点
		return false;
	}
	/// 如果tail的下一个节点不为空
	if( tail->next != NULL ) {
		/// tail为dummy,则tail后移一个几点
		tail = tail->next
	} else {
		/// tail的下一个节点为node
		tail->next = node;
		/// 将node赋值给tail
		tail = node;
	}
}

bool pop()
{
	/// 如果head和tail相等
	if( head == tail ) {
		if( head->next = NULL ) {
			/// 链表为空
			return false;
		} else {
			/// head链表指向dummy,tail向后移一位
			tail = tail->next;
		}
	} else {
		/// 保存老的head节点
		old_head = head;
		/// head向后移位
		head = head->next;
		/// 保存老的pool节点
		old_pool = pool;
		/// pool指向老的head节点
		pool = old_head;
		/// pool下一节点指向old_pool
		pool->next = old_pool;
	}
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/161672.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • oracle创建数据库的三种方法[通俗易懂]

    oracle创建数据库的三种方法[通俗易懂]新建Oracle数据库三种方法:1.通过运行OracleDatabaseConfigurationAssistant创建配置或删除数据库(也可在命令行下输入dbca);2.用命令行的方式建立

  • Android平台下OpenGL初步

    转自网上,网上没找到出处,只看到一些论坛中有这篇文章,组织的有点混乱,这篇文章感觉讲的挺好的。http://www.bangchui.org/read.php?tid=7572&page=1本文只关注于如何一步步实现在Android平台下运用OpenGl。 1、GLSurfaceViewGLSurfaceView是Android应用程序中实现OpenGl画图的重要组成部分。

  • 深入理解Spring容器初始化(二):BeanFactory的初始化

    深入理解Spring容器初始化(二):BeanFactory的初始化前言我们知道,spring的启动其实就是容器的启动,而一般情况下,容器指的其实就是上下文ApplicationContext。AbstractApplicationContext作为整个A

  • apache 配置跨域

    apache 配置跨域<VirtualHost*:80>DocumentRoot“D:/project/xuanhua_shop/public”ServerName192.168.18.182#servername后的ip为接口所在服务器IP<Directory“D:/project/xuanhua_shop/public”>OptionsIndexesFollowSymLinksMultiViews Options+Indexes+Includes+FollowSymLi

  • GSLB调度服务原理

    GSLB调度服务原理GSLB,全局负载均衡(GlobalServerLoadBalancing),主要的目的是在整个网络范围内将用户的请求定向到最近的节点(或者区域)。是对物理集群的负载均衡,不止是简单的流量均匀分配,还会根据应用场景的不同来制定不同的策略。本文将讨论GSLB的几种实现,并介绍调度服务实现的大体情况。

  • 完全卸载oracle11g步骤

    完全卸载oracle11g步骤完全卸载oracle11g步骤:1、开始->设置->控制面板->管理工具->服务停止所有Oracle服务。2、开始->程序->Oracle-OraHome81->OracleInstallationProducts->UniversalInstaller,单击“卸载产品”-“全部展开”,选中除“OraDb11g_home1”外的全部目录,删除。5、运行regedit

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号