干扰器github地址是:https://github.com/LMAX-Exchange/disruptor
我有一个简单的测试如下:
public class DisruptorMain {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(String[] args) throws Exception {
class Element {
private int value;
public int get() {
return value;
}
public void set(int value) {
this.value = value;
}
}
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};
EventHandler<Element> handler = new EventHandler<Element>() {
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch) {
try {
Thread.sleep(1000 * sequence);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Element: " + element.get());
}
};
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
int bufferSize = 4;
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
disruptor.handleEventsWith(handler);
disruptor.start();
RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
for (int l = 0; l < 8; l++) {
long sequence = ringBuffer.next();
System.out.println("sequence:" + sequence);
try {
Element event = ringBuffer.get(sequence);
event.set(l);
} finally {
ringBuffer.publish(sequence);
}
}
}
}结果表明:序列:0序列:1序列:2序列:3元素:0元素:1元素:2元素:3序列:4序列:5序列:6序列:7元素:4元素:6元素:6元素:7元素:7元素:6元素:6元素:7元素:6元素:6元素:7元素:6元素:6元素:7元素:6元素:6元素:6元素:7序列
在我的测试中,我定义了一个4的循环缓冲区大小,并且我有一个生产者为它创建了8个任务,我的问题是,当生产者将4个任务放在了循环缓冲区中时,使用者就开始从循环缓冲区中接受任务,在任务1完成后,循环缓冲区应该有一个空空间来执行任务5,但是结果显示,只有当所有的任务都在循环缓冲区中完成时,循环缓冲区才能接受新的任务,为什么?
发布于 2018-01-20 09:02:55
这是因为Disruptor将在事件处理程序中批处理。如果事件处理程序较慢或环形缓冲区较小,则批处理大小通常可以是环缓冲区的大小。Disruptor将只更新该事件处理程序的处理序列,直到批处理完成为止。这减少了需要对发布服务器使用的序列变量进行更新的次数,以确定空间是否可用。如果需要在默认时间之前提供可用空间,那么可以使用SequenceReportingEventHandler。
public class MyEventHandler implements SequenceReportingEventHandler<Element> {
Sequence processedSequence;
public void setSequenceCallback(Sequence s) {
processedSequence = s;
}
public void onEvent(Element e, long sequence, boolean endOfBatch) {
// Do stuff
processedSequence.set(sequence);
}
}https://stackoverflow.com/questions/48352411
复制相似问题