Disruptor 系列三

原来一直有点被误导,
gatingSequences用来标识每个 processer 的操作位点,但是怎么记录更新有点搞不清楚
其实问题在于 gatingSequences 是个 Sequence 数组,首先要看下怎么加进去的,
可以看到是在 com.lmax.disruptor.RingBuffer#addGatingSequences 这个方法里添加
首先是 com.lmax.disruptor.dsl.Disruptor#handleEventsWith(com.lmax.disruptor.EventHandler<? super T>...)
然后执行 com.lmax.disruptor.dsl.Disruptor#createEventProcessors(com.lmax.disruptor.Sequence[], com.lmax.disruptor.EventHandler<? super T>[])

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
EventHandlerGroup<T> createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();

final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];

// 这里将 handler 包装成一个 BatchEventProcessor
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}

consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}

updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

BatchEventProcessor 在类内有个定义 sequence

1
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);

然后在上面循环中的这一句取出来

1
processorSequences[i] = batchEventProcessor.getSequence();

调用com.lmax.disruptor.dsl.Disruptor#updateGatingSequencesForNextInChain 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
// 然后在这里添加
ringBuffer.addGatingSequences(processorSequences);
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}

而如何更新则是在处理器 com.lmax.disruptor.BatchEventProcessor#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void run()
{
if (running.compareAndSet(IDLE, RUNNING))
{
sequenceBarrier.clearAlert();

notifyStart();
try
{
if (running.get() == RUNNING)
{
processEvents();
}
}
finally
{
notifyShutdown();
running.set(IDLE);
}
}
else
{
// This is a little bit of guess work. The running state could of changed to HALTED by
// this point. However, Java does not have compareAndExchange which is the only way
// to get it exactly correct.
if (running.get() == RUNNING)
{
throw new IllegalStateException("Thread is already running");
}
else
{
earlyExit();
}
}
}

然后是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private void processEvents()
{
T event = null;
long nextSequence = sequence.get() + 1L;

while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}

while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 如果正常处理完,那就是会更新为 availableSequence,因为都处理好了
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
handleEventException(ex, nextSequence, event);
// 如果是异常就只是 nextSequence
sequence.set(nextSequence);
nextSequence++;
}
}
}