Я хотел бы использовать как API-интерфейс процессора, так и DSL в одном приложении потоков Kafka. Кроме того, как создавать и запускать несколько топологий в одном приложении (например, 1 с использованием API процессора и другое с использованием DSL).





Вы можете легко смешивать DSL и процессорный API.
Насколько я понимаю, вы хотели бы построить график обработки, используя оба этих двух метода, чтобы сделать это для DSL, вы можете вызвать StreamsBuilder::stream, а для API процессора вы вызываете StreamsBuilder::build(), чтобы получить Topology, а затем применить функцию для добавления процессора и т. д.
Исходный код будет примерно таким:
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("input1").to("output1");
Topology topology = builder.build();
topology.addSource("inputNode","input2");
topology.addProcessor("processor1", InputProcessor::new, "inputNode");
topology.addSink("sink1", "output2", "processor1");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
РЕДАКТИРОВАТЬ1:
Можно построить два топологии с DSL, работающие параллельно и слушающие разные темы. Это можно сделать, как @Matthias J. Sax упомянул с KStream::transform(...), KStream::transformValues(...) и KStream::process(...). Код будет примерно таким:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input1 = builder.<String, String>stream("input1").transform(SampleTransformer1::new);
KStream<String, String> input2 = builder.<String, String>stream("input2").transform(SampleTransformer2::new);
Итак, можем ли мы иметь 2 топологии, работающие параллельно и слушающие разные темы?
@ user1768610, Да, можно. Я обновил свой ответ образцом кода.
@wardziniak: спасибо
Однако использование
transform(),transformValues()иprocess()должно быть проще: docs.confluent.io/current/streams/developer-guide/…