-
😼 @childNodeAuthor Ownerso interface actually is
KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates);
Desired would be s.th. like
void branch(final KeyValue<Predicate<? super K, ? super V>, Consumer<KStream<? super K, ? super V>>>... branchPredicatesAndHandlers);
so it would be possible to write code like
new KStreamBuilder() .<byte[], EventType>stream("eventTopic") .branch( KeyValue.pair( (k, v) -> EventType::validData, stream -> stream.to("topicValidData") ), KeyValue.pair( (k, v) -> true, stream -> stream.to("topicInvalidData") ) );
better ideas welcome.
ATM I worked around by defining my own Stream Wrapper that does work like:
MyStreamWrapper .createFrom( new KStreamBuilder().<byte[], EventType>stream("eventTopic") ) .createBranch(EventType::validData) .peekBranch(s -> s.to("topicInvalidData")) .endWrapper() .to("topicValidData");
or
MyStreamWrapper .createFrom( new KStreamBuilder().<byte[], EventType>stream("eventTopic") ) .createBranch(EventType::validData) .peekBranch(s -> s.to("topicInvalidData")) .peekMain(s -> s.to("topicValidData"));
or
MyStreamWrapper .createFrom( new KStreamBuilder().<byte[], EventType>stream("eventTopic") ) .createBranch(EventType::validData) .peekBranch(s -> MyStreamWrapper.createFrom(s) .createBranch(EventType::recoverableData) .peekBranch(s -> s.map(Logger::unrecoverableData) .endWrapper() .to("recoverableData")) .createBranch(EventType::olderThanTwoDays) .peekBranch(s -> s.to("oldData")) .convert(s -> s.mapValues(OutPutType::convert)) .endWrapper() .to("topicValidData")
Edited by Marcel -
😼 @childNodeAuthor OwnerOfficially requested by https://issues.apache.org/jira/browse/KAFKA-5488
Please register or sign in to comment