Skip to content
  • Author Owner

    so interface actually is

        KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates);

    Desired would be s.th. like

        void branch(final KeyValue<Predicate<? super K, ? super V>, Consumer<KStream<? super K, ? super V>>>... branchPredicatesAndHandlers);

    so it would be possible to write code like

            new KStreamBuilder()
            .<byte[], EventType>stream("eventTopic")
            .branch(
                KeyValue.pair(
                    (k, v) -> EventType::validData,
                    stream -> stream.to("topicValidData")
                ),
                KeyValue.pair(
                    (k, v) -> true,
                    stream -> stream.to("topicInvalidData")
                )
            );

    better ideas welcome.

    ATM I worked around by defining my own Stream Wrapper that does work like:

            MyStreamWrapper
                .createFrom(
                    new KStreamBuilder().<byte[], EventType>stream("eventTopic")
                )
                .createBranch(EventType::validData)
                .peekBranch(s -> s.to("topicInvalidData"))
                .endWrapper()
                .to("topicValidData");

    or

            MyStreamWrapper
                .createFrom(
                    new KStreamBuilder().<byte[], EventType>stream("eventTopic")
                )
                .createBranch(EventType::validData)
                .peekBranch(s -> s.to("topicInvalidData"))
                .peekMain(s -> s.to("topicValidData"));

    or

            MyStreamWrapper
                .createFrom(
                    new KStreamBuilder().<byte[], EventType>stream("eventTopic")
                )
                .createBranch(EventType::validData)
                .peekBranch(s -> MyStreamWrapper.createFrom(s)
                    .createBranch(EventType::recoverableData)
                    .peekBranch(s -> s.map(Logger::unrecoverableData)
                    .endWrapper()
                    .to("recoverableData"))
                .createBranch(EventType::olderThanTwoDays)
                .peekBranch(s -> s.to("oldData"))
                .convert(s -> s.mapValues(OutPutType::convert))
                .endWrapper()
                .to("topicValidData")
    Edited by Marcel
  • Author Owner
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment