Disruptor并发框架简介
- Disruptor是一个开源的并发框架,能够在无锁的情况下实现网络的Queue并发操作。
- Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量级JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。
- 参考系列文档:http://ifeve.com/disruptor-getting-started/
Disruptor Hello World
在Disruptor中,进行开发时需要引入相应的jar包。我们实现Hello World需要如下几个步骤:
- 建立一个Event类。
- 建立一个工厂Event类,用于创建Event类实例对象。
- 需要有一个事件监听事件类,用于处理数据(Event类)。
- 我们需要进行测试代码编写。实例化Disruptor实例,配置一系列参数。然后我们对Disruptor实例绑定监听事件类,接收并处理数据。
- 在Disruptor中,真正存储数据的核心叫做RingBuffer,我们通过Disruptor实例来拿到它,然后把数据生产出来,把数据加入到RingBuffer的实例对象中即可。
1 | public class LongEvent { |
1 | /** |
1 | /** |
1 | public class LongEventMain { |
1 | public class LongEventProducer { |
Disruptor术语说明
- RingBuffer:
被看做Disruptor最主要组件,然而从3.0开始RingBuffer仅仅负责存储和更新在Disruptor中流通的数据。对一些特殊的使用场景能够被用户(其他数据结构)完全替代。 - Sequence:
Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样,每个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这些Sequence值得运转,因此Sequence支持多种当前为AtomicLong类的特性。 - Sequencer:
这是Disruptor真正的核心。实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确快速的数据传递。 - SequenceBarrier:
有Sequence生成,并且包含了已经发布的Sequence的引用,这些Sequence源于Sequenceer和一些独立的消费者的Sequence。它包含了决定是否有供消费者来消费者的Event的逻辑。 - WaitStrategy:
决定了一个消费者将如何等待生产者将Event置于Disruptor。 - Event:
从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,因此它完全是由用户定义。 - EventProcessor:
主要时间循环,处理Disruptor中的Event,并且拥有消费者的Sequence。他有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。 - EventHandler:
由用户实现并且代表了Disruptor中的一个消费者的接口。 - Producer:
由用户实现,它调用RingBuffer来插入事件(Event),在Disruptor中没有相应的实现代码,由用户实现。 - WorkProcessor:
确保每个sequence只被一个processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence。 - WorkerPool:
一个WorkProcessor池,其中WorkProcessor将消费Sequence,所以任务可以在实现WorkHandler接口的worker之间移交。 - LifecycleAware:
当BatchEventProcessor启动和停止时,实现这个接口用于接收通知。Disruptor印象
初看Disruptor,给人的印象就是RingBuffer是其和兴,生产者向RingBuffer中写入元素,消费者从RingBuffer中消费元素。理解RingBuffer
- RingBuffer到底是什么?
它是一个环(首尾相接的环),可以把它用作在不同上下文(线程)间传递数据的buffer。RingBuffer拥有一个序号,这个序号(sequence)指向数组中下一个可用元素。随着你不停地填充这个buffer(可能会有相应的读取),这个序号一直增长,直到绕过这个环。要找到数组中当前序号指向的元素,可以通过mod操作:sequence mod array.length = array.index(取模操作)。如果槽的个数是2的N次方更有利于基于二进制的计算机计算。场景使用
在HelloWorld的实例中,我们创建Disruptor实例,然后调用getRingBuffer方法区获取RingBuffer,其实很多时候,我们可以直接使用RingBuffer,以及其他的API操作。 - 使用EventProcessor消息处理器:
1 | public class Main1 { |
1 | public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { |
1 | public class Trade { |
- 使用WorkerPool消息处理器:
1 | public class Main2 { |
1 | public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { |
1 | public class Trade { |
- 在复杂场景下使用RingBuffer(希望P1生产的数据给C1、C2并行执行,最后C1、C2执行结束后C3执行):
1 | public class Main { |
1 | public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> { |
1 | public class Handler2 implements EventHandler<Trade> { |
1 | public class Handler3 implements EventHandler<Trade> { |
1 | public class Handler4 implements EventHandler<Trade>,WorkHandler<Trade> { |
1 | public class Handler5 implements EventHandler<Trade>,WorkHandler<Trade> { |
1 | public class TradePublisher implements Runnable { |
- 多生产者和多消费者(复杂场景使用Disruptor,简单场景使用RingBuffer即可):
1 | public class Main { |
1 | public class Order { |
1 | public class Producer { |
1 | public class Consumer implements WorkHandler<Order>{ |