大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺
Disruptor 源码
https://github.com/LMAX-Exchange/disruptor/blob/master/README.md
https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
Disruptor 与 RingBuffer 的关系
- Disruptor 的存储部分实现了 RingBuffer。
- Disruptor 提供了方法供 Producer 和 Consumer 线程来通过 ringbuffer 传输数据。
RingBuffer 的本质
- 固定大小的
- 先入先出的 (FIFO)
- Producer-Consumer 模型的
- 循环使用的一段内存
- 由于进程周期内,可不用重新释放和分配空间
本质就是一个可重用的 FIFO 队列
(图片来自:https://blog.csdn.net/qq51931373/article/details/46652029)
Disruptor 适用场景
- Producer-Consumer 场景,一生产者多消费者,多生产者多消费者(线程安全)
- 线程之间交换数据
- 轻量化的消息队列
- 对队列性能要求高:Disruptor 的速度比 LinkedBlockingQueue 提高了七倍(无锁设计)
- 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图)
- 典型场景:Canal,从一个 mysql 实例读取 binlog,放到 Disruptor,下游可有多个并发消费者
核心概念
(图自官网:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction)
Ring Buffer
- com/lmax/disruptor/RingBuffer.java
- Disruptor 的核心存储,环形缓冲区。
Sequence
- com/lmax/disruptor/Sequence.java
- 每个Consumer (EventProcessor) 和 Disruptor 本身各保有一个 Sequence。
- 用来追踪 ringbuffer 和每个 Consumer 的进度。
tracking the progress of the ring buffer and event processors
- 并发相关代码主要依赖 Sequence 值的改变。
- Sequence 的核心是一个 protected volatile long value;
- 可理解 Sequence 为一个加强版的 AtomicLong。在后者基础上增加了防止伪共享的代码。
(关于伪共享:https://www.cnblogs.com/blastbao/p/8290332.html) - Sequence 如何避免伪共享
简单地说:就是通过 Padding 的方式,将一个 Sequence 在内存中的大小和一个 cache line 对齐,避免伪共享,提高性能。
Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field.
Sequencer
- com/lmax/disruptor/MultiProducerSequencer.java
- com/lmax/disruptor/SingleProducerSequencer.java
- Disruptor 的核心组件
- 协调 Producer 和 Consumer 对同一段 ringBuffer 的使用
- 在生产者和消费者之间快速、正确地传递数据的并发算法
Sequence Barrier
- 由 Sequencer 产生
- Sequence Barrier 包含 “决定 Consumer 是否有数据可供消费” 的逻辑
// com/lmax/disruptor/BatchEventProcessor.java
private void processEvents()
{
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
{
try
{
// 调用 sequenceBarrier.waitFor(nextSequence) 来确定当前可消费的数据位点
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null && availableSequence >= nextSequence)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
// com/lmax/disruptor/ProcessingSequenceBarrier.java
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// 可见 SequenceBarrier 的核心是用 waitStrategy 去 waitFor 数据
// 下面的变量,
// sequence: 记录 consumer 消费位置的
// cursorSequence: 记录 ringBuffer 生产位置的
// dependentSequence: 记录当前 consumer 依赖的其他 consumer 的消费位置的(如果当前 consumer 只从 ringBuffer 读取数据,而不依赖于其他 consumer,那么 dependentSequence 就和 cursorSequence 是同一个,参考 ProcessingSequenceBarrier 的构造函数)
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
Wait Strategy
- com/lmax/disruptor/BlockingWaitStrategy.java
- com/lmax/disruptor/BusySpinWaitStrategy.java
// 当 Producer 往 ringBuffer 写入了新数据之后,是怎么通知 Consumer 的呢?
// 这个逻辑就在 waitStrategy 里。
// 以 BlockingWaitStrategy 为例
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if (cursorSequence.get() < sequence)
{
synchronized (mutex)
{
while (cursorSequence.get() < sequence)
{
barrier.checkAlert();
// 如果没有数据,调用 mutex (就是一个普通 Object )的wait,这里代码阻塞。直到有 notify/ notifyAll 被调用时,代码继续执行。
// 通过 wait 方法阻塞一个线程时,这个线程会放弃 CPU 时间片。
// 那么 notify / notifyAll 被谁调用呢?答案,BlockingWaitStrategy 有 signalAllWhenBlocking 方法调用 notifyAll,这个方法在 Producer 调用 ringBuffer 的 publish 时被调用。
mutex.wait();
}
}
}
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
}
public void signalAllWhenBlocking()
{
synchronized (mutex)
{
mutex.notifyAll();
}
}
Event
- Producer 传输数据给 Consumer 的单位
EventProcessor
- com/lmax/disruptor/BatchEventProcessorTest.java
- com/lmax/disruptor/BatchEventProcessor.java
- 处理 Disruptor 产生数据的主要事件循环,持有 Consumer 的 SequenceEventHandler
The main event loop for handling events from the Disruptor and has ownership of consumer's Sequence
- 实际就是 Consumer 的主循环。Consumer 只需要注入 eventHandler,BatchEventProcessor 就会调用 eventHandler.onEvent() 来处理 Producer 写入到 ringbuffer 的数据。
// com/lmax/disruptor/BatchEventProcessor.java
private void processEvents()
{
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null && availableSequence >= nextSequence)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
// 获取数据,并处理
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
......
}
}
EventHandler
- Disruptor 只定义了接口。
- 由 Consumer 实现,并注入到 EventProcessor。
Producer
- 生产者
Disruptor 为什么快而且线程安全
官方文档:http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html
http://ifeve.com/dissecting-disruptor-whats-so-special/
http://ifeve.com/locks-are-bad/ 锁为什么慢,以及 Disruptor 如何避免
http://ifeve.com/disruptor-cacheline-padding/ cache-line-padding
http://ifeve.com/disruptor-memory-barrier/ disruptor-memory-barrier
简单说:
- 它是数组,所以要比链表快(添加删除更简单,耗费内存更小),且可以利用 CPU 缓存来预加载
- 数组对象本身一直存在,避免了大对象的垃圾回收(当然元素本身还是要回收的)
- 在需要确保线程安全的地方,用 CAS 取代锁。
- 没有竞争 = 没有锁 = 非常快。
- 所有 Consumer 都记录自己的序号(Sequence),允许多个 Producer 与多个 Consumer 共享 ringbuffer。
- 在每个对象中都能跟踪 Sequence(ring buffer,claim Strategy,生产者和消费者),加上 Sequence 的 cache line padding,就意味着没有为伪共享和非预期的竞争。
个人觉得最重要的设计就是:
- 每个 Consumer 持有一个 Sequence,各 Consumer 消费独立。
- Producer 根据所有 Consumer 的 Sequence 位置决定是否能写入到 ringbuffer,以及写入到何位置。
- 各 Producer 在并发写时,通过 CAS 避免锁。(可参考下面的代码分析)
关键问题
-
问题1:多个 Producer 如何协调把数据写入到 ringBuffer
-
问题2:ringbuffer 如何根据各 consumer 消费速度告知各 Producer 现在是否能写入数据
// 我本地起了 2 个 Producer。
LongEventProducer producer = new LongEventProducer(ringBuffer);
LongEventProducer producer2 = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
producer2.onData(bb);
Thread.sleep(1000);
}
// com/lmax/disruptor/shicaiExample/LongEventProducer.java
public void onData(ByteBuffer bb)
{
// 抢占 ringBuffer 的最新空位,以便把自己的数据写入。
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
// 那么 ringBuffer.next() 是如何在多个 producer 之间协调的呢?
// com/lmax/disruptor/RingBuffer.java
* Increment and return the next sequence for the ring buffer. Calls of this
* method should ensure that they always publish the sequence afterward. E.g.
public long next()
{
// 这个 sequencer 是 com/lmax/disruptor/MultiProducerSequencer.java
return sequencer.next();
}
// com/lmax/disruptor/MultiProducerSequencer.java 这个就是 disruptor 的核心,Sequencer
public long next(int n)
{
if (n < 1 || n > bufferSize)
{
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
long current;
long next;
do
{
// cursor 就是一个 sequence,多个 Producer 公用一个 sequence 进行写控制
current = cursor.get();
next = current + n;
// 判断 ringBuffer 是否满了
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
// gatingSequences 是什么?
// 实际就是所有 consumer 的 seuqence 的集合。创建 consumer 时,通过 updateGatingSequencesForNextInChain 函数把它注册到 MultiProducerSequencer 的。
// 这里做的,是从各个 consumer 的 sequence 中,找到最小的哪个(就是消费最慢的那个)。
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 如果 buffer 已经满了,就一直自旋等待 consumer 消费。当 consumer 消费后,gatingSequence 就更新,从而 gatingSequenceCache 更新,从而从新判断 wrapPoint > cachedGatingSequence, 从而有可能 Producer 获得 buffer 中可写入的位置。
// 为什么有一个 gatingSequenceCache, 这个只是为了减少 getMinimuSequence 的次数,真实逻辑和没有这个 cache 一样。
// 回答了问题 2。
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
// 通过 CAS 来确保多个 Producer 能够正确写入,且不冲突。
// 回答了问题 1。
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
// GatingSequence 是由各个 Consumer 在启动 EventProcessor 时添加的
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
// MultiProducerSequencer 的 publish 做了什么?
public void publish(final long sequence)
{
setAvailable(sequence); // 设置被 publish 的位置为 not available
waitStrategy.signalAllWhenBlocking(); // 通知所有 consumer
}
- Consumer 启动 EventProcessor 时 addGatingSequences
-
问题3:Consumer 是怎么注册的?
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// com/lmax/disruptor/dsl/Disruptor.java
* <p>Set up event handlers to handle events from the ring buffer. These handlers will process events
* as soon as they become available, in parallel.</p>
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
// 把 consumer 加入 consumer 库中。其实就是一个 观察者模式,ringBuffer 有数据后,通知各个 consumer 线程。
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
// 两个 sequence,一个是 barrierSequence, 一个是 processorSequence
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
SequenceBarrier 是用来控制 consumer 读取进度的。
* Wait for the given sequence to be available for consumption.
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
* Get the current cursor value that can be read.
long getCursor();
// 再往里看。sequence
// com/lmax/disruptor/ProcessingSequenceBarrier.java
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
@Override
public long getCursor()
{
// 返回当前位置,dependentSequence 是一个 Sequence 对象
return dependentSequence.get();
}
// ringBuffer 有了新数据时,disruptor 怎么通知各 consumer 的?
* <p>Starts the event processors and returns the fully configured ring buffer.</p>
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
// 为每个 consumer 启动一个线程,线程逻辑由 BatchEventProcessor 控制
// BatchEventProcessor 有一个 eventHandler 字段,那就是我们自己写处理代码。
consumerInfo.start(executor);
}
return ringBuffer;
}
Demo
- Event
public class LongEvent {
private long value;
public void set(long value)
{
this.value = value;
}
public long getValue() {
return value;
}
}
- Customer
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
try {
Thread.sleep(5000);
} catch (Exception e) {
}
System.out.println("Event: " + event.getValue());
}
}
- Producer
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
- Main
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 2;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
LongEventProducer producer2 = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
producer2.onData(bb);
Thread.sleep(1000);
}
}
}
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/195345.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...