Disruptor 系列一

很久之前就听说过这个框架,不过之前有点跟消息队列混起来,这个也是种队列,但不是跟 rocketmq,nsq 那种一样的,而是在进程内部提供队列服务的,偏向于取代ArrayBlockingQueue,因为这个阻塞队列是使用了锁来控制阻塞,关于并发其实有一些通用的最佳实践,就是用锁,即使是 JDK 提供的锁,也是比较耗资源的,当然这是跟不加锁的对比,同样是锁,JDK 的实现还是性能比较优秀的。常见的阻塞队列中例如 ArrayBlockingQueueLinkedBlockingQueue 都有锁的身影的存在,区别在于 ArrayBlockingQueue 是一把锁,后者是两把锁,不过重点不在几把锁,这里其实是两个问题,一个是所谓的 lock free, 对于一个单生产者的 disruptor 来说,因为写入是只有一个线程的,是可以不用加锁,多生产者的时候使用的是 cas 来获取对应的写入坑位,另一个是解决“伪共享”问题,后面可以详细点分析,先介绍下使用
首先是数据源

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

事件生产

public class LongEventFactory implements EventFactory<LongEvent>
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

事件处理器

public class LongEventHandler implements EventHandler<LongEvent> {

    // event 事件,
    // sequence 当前的序列 
    // 是否当前批次最后一个数据
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        String str = String.format("long event : %s l:%s b:%s", event.getValue(), sequence, endOfBatch);
        System.out.println(str);
    }
}

主方法代码

package disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // 这个需要是 2 的幂次,这样在定位的时候只需要位移操作,也能减少各种计算操作
        int bufferSize = 1024; 

        Disruptor<LongEvent> disruptor = 
                new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // 类似于注册处理器
        disruptor.handleEventsWith(new LongEventHandler());
        // 或者直接用 lambda
        disruptor.handleEventsWith((event, sequence, endOfBatch) ->
                System.out.println("Event: " + event));
        // 启动我们的 disruptor
        disruptor.start(); 


        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            // 生产事件
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

运行下可以看到运行结果

这里其实就只是最简单的使用,生产者只有一个,然后也不是批量的。