Пробуя библиотеку OSGi PushStream, я почувствовал, что она очень медленная. Я создал два метода, которые делают то же самое, один с использованием PushStream, а другой - с простой BlockingQueue (см. Код ниже), результат следующий:
Queue needs 3 milliseconds to process 1000 events.
PushStream needs 31331 milliseconds to process 1000 events.
Почему PushStream медленнее? Что я делаю не так?
С помощью PushStream:
public class TestPush{
@Test
public void testPushStream() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class).withQueuePolicy(QueuePolicyOption.BLOCK).build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.createStream(source).onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.createStream(source).count();
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
С помощью ArrayBlockingQueue:
@Test
public void testBlockingQueue() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final Executor e = Executors.newFixedThreadPool(1);
final ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(32);
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
final Deferred<Integer> nbEvent = pf.deferred();
e.execute( () -> {
try {
Integer i = 0;
Integer last = 0;
do {
i = abq.take();
if (i == 0) {
startD.resolve(Instant.now());
} else if (i != -1) {
last = i;
}
}
while (i != -1);
endD.resolve(Instant.now());
nbEvent.resolve(last + 1);
}
catch (final InterruptedException exception) {
exception.printStackTrace();
}
});
for (int i = 0; i < 1000; i++) {
abq.put(i);
}
abq.put(-1);
System.out.println("Queue needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getPromise().getValue() + " events.");
}
}


Это забавный вопрос :)
Why the PushStream is slower? What I am doing wrong?
Спасибо, что не предположили, что реализация PushStream - отстой. В этом случае он работает медленнее, потому что (вероятно, сам того не осознавая) вы просили об этом!
По умолчанию потоки PushStream буферизуются. Это означает, что они включают очередь, в которую помещаются события до их обработки. Таким образом, буферизация делает несколько вещей, которые отрицательно влияют на скорость пропускной способности.
В этом случае подавляющее большинство замедления происходит из-за противодавления. Когда вы создаете поток с помощью psp.createStream(source), он настраивается с буфером из 32 элементов и линейной политикой обратного давления на основе размера буфера, возвращая одну секунду, когда он заполнен, и 31 миллисекунд, когда в нем есть один элемент. Стоит отметить, что 31 миллис на элемент составляет 30 секунд!
Важно отметить, что SimplePushEventSource всегда учитывает запросы обратного давления от добавленных к нему потребителей. Это означает, что вы можете перекачивать события в SimplePushEventSource так быстро, как только можете, но они будут доставляться только с той скоростью, с какой они запрашиваются конвейером.
Если мы удалим буферизацию из создаваемых вами push-потоков, мы получим следующий тест:
@Test
public void testPushStream2() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.buildStream(source).unbuffered().build().onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count();
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
Результат выполнения этого (на моей машине):
PushStream needs 39 milliseconds to process 1000 events.
Это, очевидно, намного ближе к тому, что вы ожидаете, но все же заметно медленнее. Обратите внимание, что у нас все еще могла быть буферизация, но мы настроили PushbackPolicy. Это дало бы нам более высокую пропускную способность, но не так быстро, как это.
Следующее, на что следует обратить внимание, это то, что вы используете обработчик onClose(). Это добавляет дополнительный этап в конвейер push-потока. Фактически вы можете переместить onClose как результат обещания, уменьшив длину вашего конвейера (вам нужно запустить его только один раз).
@Test
public void testPushStream3() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.buildStream(source).unbuffered().build().forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count()
.onResolve(() -> endD.resolve( Instant.now()));
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
Результат этой версии (на моей машине):
PushStream needs 21 milliseconds to process 1000 events.
Ключевое различие между примером «очереди блокировки необработанного массива» и примером PushStream заключается в том, что вы фактически создаете два PushStreams. Первый выполняет работу по фиксации времени начала, второй - по подсчету событий. Это заставляет SimplePushEventSource мультиплексировать события по нескольким потребителям.
Что, если бы мы свернули поведение в один конвейер, чтобы SimplePushEventSource мог использовать быструю доставку?
@Test
public void testPushStream4() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build()
.filter(i -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
return true;
})
.count()
.onResolve(() -> endD.resolve( Instant.now()));
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
Результат этой версии (на моей машине):
PushStream needs 3 milliseconds to process 1000 events.
PushStreams - это быстрый и эффективный способ получения асинхронно поступающих событий, но очень важно понимать, какое поведение буферизации подходит для вашего приложения. Если у вас есть большой кусок данных, который вы хотите перебрать очень быстро, вам нужно быть осторожным при настройке, поскольку значения по умолчанию для буферизации предназначены для другого варианта использования!
Всегда приятно задать вопрос и получить такой ответ, большое спасибо за уделенное время!