disruptor框架原理_disruptor使用

disruptor框架原理_disruptor使用Disruptor源码https://github.com/LMAX-Exchange/disruptor/blob/master/README.mdhttps://github.com/LMAX-Exchange/disruptor/wiki/Introductionhttps://github.com/LMAX-Exchange/disruptor/wiki/Getting-Starte…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

Disruptor 源码

https://github.com/LMAX-Exchange/disruptor/blob/master/README.md
https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

Disruptor 与 RingBuffer 的关系

  • Disruptor 的存储部分实现了 RingBuffer。
  • Disruptor 提供了方法供 Producer 和 Consumer 线程来通过 ringbuffer 传输数据。

RingBuffer 的本质

  • 固定大小的
  • 先入先出的 (FIFO)
  • Producer-Consumer 模型的
  • 循环使用的一段内存
  • 由于进程周期内,可不用重新释放和分配空间

本质就是一个可重用的 FIFO 队列

disruptor框架原理_disruptor使用

(图片来自:https://blog.csdn.net/qq51931373/article/details/46652029

Disruptor 适用场景

  • Producer-Consumer 场景,一生产者多消费者,多生产者多消费者(线程安全)
  • 线程之间交换数据
  • 轻量化的消息队列
  • 对队列性能要求高:Disruptor 的速度比 LinkedBlockingQueue 提高了七倍(无锁设计)
  • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图)
  • 典型场景:Canal,从一个 mysql 实例读取 binlog,放到 Disruptor,下游可有多个并发消费者

核心概念

disruptor框架原理_disruptor使用

(图自官网:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction

Ring Buffer

  • com/lmax/disruptor/RingBuffer.java
  • Disruptor 的核心存储,环形缓冲区。

Sequence

  • com/lmax/disruptor/Sequence.java
  • 每个Consumer (EventProcessor) 和 Disruptor 本身各保有一个 Sequence。
  • 用来追踪 ringbuffer 和每个 Consumer 的进度。
tracking the progress of the ring buffer and event processors
  • 并发相关代码主要依赖 Sequence 值的改变。
  • Sequence 的核心是一个 protected volatile long value;
  • 可理解 Sequence 为一个加强版的 AtomicLong。在后者基础上增加了防止伪共享的代码。
    (关于伪共享:https://www.cnblogs.com/blastbao/p/8290332.html
  • Sequence 如何避免伪共享
    简单地说:就是通过 Padding 的方式,将一个 Sequence 在内存中的大小和一个 cache line 对齐,避免伪共享,提高性能。
Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field. 

Sequencer

  • com/lmax/disruptor/MultiProducerSequencer.java
  • com/lmax/disruptor/SingleProducerSequencer.java
  • Disruptor 的核心组件
  • 协调 Producer 和 Consumer 对同一段 ringBuffer 的使用
  • 在生产者和消费者之间快速、正确地传递数据的并发算法

Sequence Barrier

  • 由 Sequencer 产生
  • Sequence Barrier 包含 “决定 Consumer 是否有数据可供消费” 的逻辑
// com/lmax/disruptor/BatchEventProcessor.java
    private void processEvents()
    {
        T event = null;
        long nextSequence = sequence.get() + 1L;

        while (true)
        {
            try
            {
                // 调用 sequenceBarrier.waitFor(nextSequence) 来确定当前可消费的数据位点
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (batchStartAware != null && availableSequence >= nextSequence)
                {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }

                while (nextSequence <= availableSequence)
                {
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }

                sequence.set(availableSequence);
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (running.get() != RUNNING)
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }

// com/lmax/disruptor/ProcessingSequenceBarrier.java
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

	// 可见 SequenceBarrier 的核心是用 waitStrategy 去 waitFor 数据
	// 下面的变量,
    // sequence: 记录 consumer 消费位置的
	// cursorSequence: 记录 ringBuffer 生产位置的
	// dependentSequence: 记录当前 consumer 依赖的其他 consumer 的消费位置的(如果当前 consumer 只从 ringBuffer 读取数据,而不依赖于其他 consumer,那么 dependentSequence 就和 cursorSequence 是同一个,参考 ProcessingSequenceBarrier 的构造函数)
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

Wait Strategy

  • com/lmax/disruptor/BlockingWaitStrategy.java
  • com/lmax/disruptor/BusySpinWaitStrategy.java
// 当 Producer 往 ringBuffer 写入了新数据之后,是怎么通知 Consumer 的呢?
// 这个逻辑就在 waitStrategy 里。
// 以 BlockingWaitStrategy 为例
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence)
        {
            synchronized (mutex)
            {
                while (cursorSequence.get() < sequence)
                {
                    barrier.checkAlert();
		// 如果没有数据,调用 mutex (就是一个普通 Object )的wait,这里代码阻塞。直到有 notify/ notifyAll 被调用时,代码继续执行。
		// 通过 wait 方法阻塞一个线程时,这个线程会放弃 CPU 时间片。
		// 那么 notify / notifyAll 被谁调用呢?答案,BlockingWaitStrategy  有 signalAllWhenBlocking 方法调用 notifyAll,这个方法在 Producer 调用 ringBuffer 的 publish 时被调用。
                    mutex.wait();
                }
            }
        }

        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

    public void signalAllWhenBlocking()
    {
        synchronized (mutex)
        {
            mutex.notifyAll();
        }
    }

disruptor框架原理_disruptor使用

Event

  • Producer 传输数据给 Consumer 的单位

EventProcessor

  • com/lmax/disruptor/BatchEventProcessorTest.java
  • com/lmax/disruptor/BatchEventProcessor.java
  • 处理 Disruptor 产生数据的主要事件循环,持有 Consumer 的 SequenceEventHandler
The main event loop for handling events from the Disruptor and has ownership of consumer's Sequence
  • 实际就是 Consumer 的主循环。Consumer 只需要注入 eventHandler,BatchEventProcessor 就会调用 eventHandler.onEvent() 来处理 Producer 写入到 ringbuffer 的数据。
// com/lmax/disruptor/BatchEventProcessor.java
private void processEvents()
    {
        T event = null;
        long nextSequence = sequence.get() + 1L;

        while (true)
        {
            try
            {
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (batchStartAware != null && availableSequence >= nextSequence)
                {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }
                
                // 获取数据,并处理
                while (nextSequence <= availableSequence)
                {
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }

                sequence.set(availableSequence);
            }
            ......
        }
    }

EventHandler

  • Disruptor 只定义了接口。
  • 由 Consumer 实现,并注入到 EventProcessor。 

Producer

  • 生产者

Disruptor 为什么快而且线程安全

官方文档:http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html

http://ifeve.com/dissecting-disruptor-whats-so-special/

http://ifeve.com/locks-are-bad/      锁为什么慢,以及 Disruptor 如何避免

http://ifeve.com/disruptor-cacheline-padding/         cache-line-padding

http://ifeve.com/disruptor-memory-barrier/           disruptor-memory-barrier

简单说:

  • 它是数组,所以要比链表快(添加删除更简单,耗费内存更小),且可以利用 CPU 缓存来预加载
  • 数组对象本身一直存在,避免了大对象的垃圾回收(当然元素本身还是要回收的)
  • 在需要确保线程安全的地方,用 CAS 取代锁。
  • 没有竞争 = 没有锁 = 非常快。
  • 所有 Consumer 都记录自己的序号(Sequence),允许多个 Producer 与多个 Consumer 共享 ringbuffer。
  • 在每个对象中都能跟踪 Sequence(ring buffer,claim Strategy,生产者和消费者),加上 Sequence 的 cache line padding,就意味着没有为伪共享和非预期的竞争。

个人觉得最重要的设计就是:

  • 每个 Consumer 持有一个 Sequence,各 Consumer 消费独立。
  • Producer 根据所有 Consumer 的 Sequence 位置决定是否能写入到 ringbuffer,以及写入到何位置。
  • 各 Producer 在并发写时,通过 CAS 避免锁。(可参考下面的代码分析)

关键问题

  • 问题1:多个 Producer 如何协调把数据写入到 ringBuffer

  • 问题2:ringbuffer 如何根据各 consumer 消费速度告知各 Producer 现在是否能写入数据

// 我本地起了 2 个 Producer。
        LongEventProducer producer = new LongEventProducer(ringBuffer);
        LongEventProducer producer2 = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            producer.onData(bb);
            producer2.onData(bb);
            Thread.sleep(1000);
        }

// com/lmax/disruptor/shicaiExample/LongEventProducer.java
public void onData(ByteBuffer bb)
    {
        // 抢占 ringBuffer 的最新空位,以便把自己的数据写入。
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
            // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }

// 那么 ringBuffer.next() 是如何在多个 producer 之间协调的呢?
// com/lmax/disruptor/RingBuffer.java
     * Increment and return the next sequence for the ring buffer.  Calls of this
     * method should ensure that they always publish the sequence afterward.  E.g.
    public long next()
    {
	    // 这个 sequencer 是 com/lmax/disruptor/MultiProducerSequencer.java
        return sequencer.next();
    }

// com/lmax/disruptor/MultiProducerSequencer.java 这个就是 disruptor 的核心,Sequencer
    public long next(int n)
    {
        if (n < 1 || n > bufferSize)
        {
            throw new IllegalArgumentException("n must be > 0 and < bufferSize");
        }

        long current;
        long next;

        do
        {
            // cursor 就是一个 sequence,多个 Producer 公用一个 sequence 进行写控制
            current = cursor.get();          
            next = current + n;

            // 判断 ringBuffer 是否满了
            long wrapPoint = next - bufferSize;           
            long cachedGatingSequence = gatingSequenceCache.get();

            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
            // gatingSequences 是什么?
	        // 实际就是所有 consumer 的 seuqence 的集合。创建 consumer 时,通过 updateGatingSequencesForNextInChain 函数把它注册到  MultiProducerSequencer 的。
	        // 这里做的,是从各个 consumer 的 sequence 中,找到最小的哪个(就是消费最慢的那个)。
	    long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

	        // 如果 buffer 已经满了,就一直自旋等待 consumer 消费。当 consumer 消费后,gatingSequence 就更新,从而 gatingSequenceCache 更新,从而从新判断 wrapPoint > cachedGatingSequence, 从而有可能 Producer 获得 buffer 中可写入的位置。
	        // 为什么有一个 gatingSequenceCache, 这个只是为了减少 getMinimuSequence 的次数,真实逻辑和没有这个 cache 一样。
	        // 回答了问题 2。
                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);
            }
	       // 通过 CAS 来确保多个 Producer 能够正确写入,且不冲突。
	       // 回答了问题 1。
            else if (cursor.compareAndSet(current, next))      
            {
                break;
            }
        }
        while (true);

        return next;
    }

// GatingSequence 是由各个 Consumer 在启动 EventProcessor 时添加的
    public final void addGatingSequences(Sequence... gatingSequences)
    {
        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
    }

// MultiProducerSequencer 的 publish 做了什么?
    public void publish(final long sequence)
    {
        setAvailable(sequence);          // 设置被 publish 的位置为 not available
        waitStrategy.signalAllWhenBlocking();          // 通知所有 consumer 
    }
  • Consumer 启动 EventProcessor 时 addGatingSequences 

disruptor框架原理_disruptor使用

  • 问题3:Consumer 是怎么注册的?

// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());

// com/lmax/disruptor/dsl/Disruptor.java
* <p>Set up event handlers to handle events from the ring buffer. These handlers will process events
     * as soon as they become available, in parallel.</p>
    public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        return createEventProcessors(new Sequence[0], handlers);
    }

    EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();

        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];

            final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }

            // 把 consumer 加入 consumer 库中。其实就是一个 观察者模式,ringBuffer 有数据后,通知各个 consumer 线程。
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);         
            processorSequences[i] = batchEventProcessor.getSequence();
        }

	    // 两个 sequence,一个是 barrierSequence, 一个是 processorSequence
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
    }

SequenceBarrier 是用来控制 consumer 读取进度的。
* Wait for the given sequence to be available for consumption.
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
* Get the current cursor value that can be read.
long getCursor();

// 再往里看。sequence 
// com/lmax/disruptor/ProcessingSequenceBarrier.java
    @Override
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();

        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

        if (availableSequence < sequence)
        {
            return availableSequence;
        }

        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

    @Override
    public long getCursor()
    {
        // 返回当前位置,dependentSequence 是一个 Sequence 对象
        return dependentSequence.get();
    }

// ringBuffer 有了新数据时,disruptor 怎么通知各 consumer 的?
* <p>Starts the event processors and returns the fully configured ring buffer.</p>
public RingBuffer<T> start()
    {
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository)
        {
            // 为每个 consumer 启动一个线程,线程逻辑由 BatchEventProcessor 控制
            // BatchEventProcessor 有一个 eventHandler 字段,那就是我们自己写处理代码。
            consumerInfo.start(executor);        
        }

        return ringBuffer;
    }

Demo

  • Event
public class LongEvent {
    private long value;

    public void set(long value)
    {
        this.value = value;
    }

    public long getValue() {
        return value;
    }
}
  • Customer 
public class LongEventHandler implements EventHandler<LongEvent>
{
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        try {
            Thread.sleep(5000);
        } catch (Exception e) {
        }
        System.out.println("Event: " + event.getValue());
    }
}
  • Producer 
public class LongEventProducer
{
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
            // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}
  • Main
public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 2;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);
        LongEventProducer producer2 = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            producer.onData(bb);
            producer2.onData(bb);
            Thread.sleep(1000);
        }
    }
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/195345.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • rabbitmq集群搭建_mongodb集群搭建

    rabbitmq集群搭建_mongodb集群搭建先来介绍一下RabbitMQ集群RabbitMQ集群有两种模式,一种是普通模式,即默认的集群模式,另外一种为镜像模式,可以把队列做成镜像队列我们在学习两种模式之前,先来了解下一些关于RabbitMQ集群的概念元数据:包括队列元数据,交换器元数据,交换器元数据,vhost元数据(1)队列元数据:队列名称和它的属性;(2)交换器元数据:交换器名称、类型和属性;(3)绑定元数据:一张简…

  • jenkinsfile docker_docker build命令详解

    jenkinsfile docker_docker build命令详解前言之前我们用docker手动安装了jenkins环境,在jenkins中又安装了python3环境和各种安装包,如果我们想要在其他3台机器上安装,又是重复操作,重复劳动,那会显得很low,这里可以

  • css-day05笔记-清除浮动&学成网布局准备工作

    css-day05笔记-清除浮动&学成网布局准备工作typora-copy-images-to:media第01阶段.WEB基础:css-day05笔记-清除浮动&学成网布局准备工作一.清除浮动1.为什么要清除浮动因为父级盒子很多情况下,不方便给高度,但是子盒子浮动就不占有位置,最后父级盒子高度为0,就影响了后面的标准流盒子。总结:由于浮动元素不再占用原文档流的位置,所以它会对后面的元素排版产生影响准确地说…

  • 分享.NET 轻量级的ORM

    分享.NET 轻量级的ORM

  • ZOJ 3829 贪心 思维题

    ZOJ 3829 贪心 思维题

  • ShellExecute, WinExec, CreateProcess的使用[通俗易懂]

    ShellExecute, WinExec, CreateProcess的使用[通俗易懂]ShellExecute  ShellExecute的功能是运行一个外部程序(或者是打开一个已注册的文件、打开一个目录、打印一个文件等等),并对外部程序有一定的控制。  有几个API函数都可以实现这些功能,但是在大多数情况下ShellExecute是更多的被使用的,同时它并不是太复杂。  ShellExecute函数原型及参数含义如下:  ShellExecute(

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号