Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
6
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Switch to GitLab Next
Sign in / Register
Toggle navigation
A
akka_streams_vs_rx_reactor
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Locked Files
Issues
0
Issues
0
List
Boards
Labels
Service Desk
Milestones
Iterations
Merge Requests
0
Merge Requests
0
Requirements
Requirements
List
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Test Cases
Security & Compliance
Security & Compliance
Dependency List
License Compliance
Operations
Operations
Incidents
Environments
Packages & Registries
Packages & Registries
Package Registry
Container Registry
Analytics
Analytics
CI / CD
Code Review
Insights
Issue
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Artur Jablonski
akka_streams_vs_rx_reactor
Commits
4cab98d4
Commit
4cab98d4
authored
Aug 05, 2019
by
Artur Jablonski
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
minor updates
parent
28ea33d6
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
17 additions
and
17 deletions
+17
-17
README.md
README.md
+17
-17
No files found.
README.md
View file @
4cab98d4
...
...
@@ -63,7 +63,7 @@ Flux.range(0,10)
.
take
(
2
);
```
Here we start with a range of 10 values starting from 0, we then use
`map`
to
d
uplicat
e each one, then we use
`filter`
to pass through only those greater
d
oubl
e each one, then we use
`filter`
to pass through only those greater
than 10 and then we'll
`take`
the first two such numbers. Each operator
returns an instance of
`Flux`
that can be further transformed.
...
...
@@ -97,7 +97,7 @@ Flux.range(0,10)
Here we just changed
`map`
to
`flatMap`
, the lambda passed as parameter to the
`flatMap`
returns a
`Flux`
, but if
`Flux`
is a blueprint and we don't call
`subscribe(...)`
on it, then how i
t is
executed? Well, it's the
`flatMap`
itself
`subscribe(...)`
on it, then how i
s it
executed? Well, it's the
`flatMap`
itself
that is subscribing to it and then emits out whatever the
`Flux`
emits. In this
trivial example, the
`Flux.just(...)`
returns a single value calculated inline, but
it could very well execute an asynchronous call(s) to some external system(s).
`flatMap`
...
...
@@ -164,7 +164,7 @@ Flux.range(0, 10)
```
Here the lambda inside the
`compose`
plugs in to the
`Flux`
instance
that the
`compose`
was called on. That
`Flux`
is represented by te
`f`
lambda argument.
So that lambd
d
a takes a
`Flux`
of some type type
`In`
So that lambda takes a
`Flux`
of some type type
`In`
and can transform it to
`Flux`
of some type
`Out`
, so it is "kind of" like the
`Flow`
abstraction.
Similarly to _Reactor_, all the transformations like the one above are just blueprints
...
...
@@ -252,8 +252,8 @@ explains the mechanics of handling materialized values in depth.
_Reactor_
doesn't have the concept of materialized value, though if you bend
your mind enough you can think of
`Disposable`
instance that is returned after
subscribing to a stream as a one and only possible materialized value that can
be returned when running
the
stream.
`Disposable`
can be used to cancel
the
running stream.
be returned when running
a
stream.
`Disposable`
can be used to cancel
a
running stream.
The materialized value concept complicates the API as now every
`Source`
,
`Flow`
and
`Sink`
need to be parameterized with two types and when you build
...
...
@@ -291,7 +291,7 @@ returns _Scala_ `Future` we need to convert it to _Java_ `CompletionStage`, we t
convert it to
`CompletableFuture`
and use
`join`
to block the calling thread.
This is important here as this code is executed in context of a _JUnit_ test.
Remember that the stream is run using actors in the
`ActorSystem`
which is
using it
'
s own _dispacher_/thread pool to do that. If we didn't synchronize here
using its own _dispacher_/thread pool to do that. If we didn't synchronize here
then it's possible that the _JUnit_ thread finishes before the stream finishes
execution (or before it even started!).
...
...
@@ -696,11 +696,11 @@ Here we'll implement a partitioning of a stream similar to that of `groupBy`+
`mergeSubstreams`
above. The idea is that we'll implement a
`Flow`
using the _GraphDSL_
API that takes concurrency level and another flow that captures the desired computation
and then plug it in to the linear API via
`via`
operator. Here's a relatively simple
example
example
:
```
java
private
<
I
,
O
,
M
>
Flow
<
I
,
O
,
NotUsed
>
parallelizeFlow
(
int
concurrency
,
Flow
<
I
,
O
,
M
>
innerFlow
)
Flow
<
I
,
O
,
M
>
innerFlow
)
{
return
Flow
.
fromGraph
(
GraphDSL
.
create
(
...
...
@@ -737,7 +737,7 @@ public void akkaGraphDSL()
```
First we have the
`parallelizeFlow`
that takes concurrency and a
`Flow`
that
we would like to parallelize. Then inside we use the
GraphDSL
to create appropriate
we would like to parallelize. Then inside we use the
_GraphDSL_
to create appropriate
stream pieces like
`Balance`
,
`Merge`
. We then used the
`GraphDSL.Builder`
that
is passed to our lambda to wire up all elements and return our
`Flow`
. Then
we can use that
`Flow`
as any other by plugging it in to a linear stream topology.
...
...
@@ -761,7 +761,7 @@ If you look back at the _Java_ [specification of the reactive interfaces](https:
, you will find them
pretty simple, however if you read the list of rules that govern their interactions,
you will realize the complexity involved.
To help testing
with implementation
there's _Technology Compatibility Kit_ available
To help testing
implementations
there's _Technology Compatibility Kit_ available
that can run custom implementations against a test suite that checks the constraints
coming from the spec. You can find it
[
here
](
https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/tck/README.md
)
.
However you still need to implement it in the first place. There are some utility/not documented/subject to change
...
...
@@ -904,7 +904,7 @@ Now in the `preStart` method we will actually create the
The reason why we have to go about this like that, is the use of
`createAsyncCallback`
method that is not safe to call in the
constructor (as the documentation states). The reason why we want
to use that method in the first place is that our
PauseButton
will
to use that method in the first place is that our
`PauseButton`
will
effectively be affecting the way the events dispatched from the
in and out ports are handled. These changes in the behaviour will
be triggered externally and independently of the stream itself.
...
...
@@ -1038,7 +1038,7 @@ Flux.range(-10, 20)
Here we are creating a range from -10 to 10 and then multiple each item by itself
and then dividing the result by the element itself again thus yielding the
same number that we started with, with the exception of 0 that will blow up
with
`div by 0`
exception, which is the whole point. Great!
with
`div by 0`
exception, which is the whole point. Great!
In our
`subscribe`
we now have two lambdas, one that handles data
emissions and simply prints them out and the other handles the error signal
(remember there can only be one since it's terminal) if it happens.
...
...
@@ -1088,7 +1088,7 @@ Flux.range(-10, 20)
);
```
So that's all great, but can
't
we do better? I mean from all the original stream
So that's all great, but can we do better? I mean from all the original stream
values it's only the element 0 that will trigger the exception, can't we somehow just
skip it? Well, yes we can, let's check
`onErrorContinue`
operator
...
...
@@ -1107,7 +1107,7 @@ Nice! It works here just as we wanted it, but there's a dark side to the
`onErrorContinue`
operator. This operator relies on the operators upstream to
be aware of how it works and to check some flags, so it won't work with all the
operators (you need to check Javadoc for operators to figure out which ones support
it. Also there's some unexpected behaviour when nested streams are involved.
it
)
. Also there's some unexpected behaviour when nested streams are involved.
The example below is adopted from
[
this github issue.
](
https://github.com/reactor/reactor-addons/issues/210
)
...
...
@@ -1203,7 +1203,7 @@ of _Project Reactor_. They are all well documented, so have a look.
In terms of testing both libraries provide similar facilities where you
can plugin a special component (
`StepVerifier`
and
`TestPublisher`
for _Project Reactor_
and
`Test
Prob
e`
and
`TestSink`
and
`Test
Sourc
e`
and
`TestSink`
for Akka Streams) to then assert on emissions, errors, completions, etc.
One cool feature that _Project Reactor_ has is the concept of virtual time,
...
...
@@ -1230,8 +1230,8 @@ if they don't use this mechanism under the hood to implement the distribution
of stream processing. I couldn't google up anything that would confirm or deny it.
## Summary:
In this write up I went through various features of 2 API designs of reactive
streams available for _Java_.
In this write up I went through various features of 2 API designs of
3
reactive
streams
specification implementations
available for _Java_.
_Project Reactor_
is easier to start with. The documentation, community support,
and a lot of examples everywhere make learning a good experience. Also since
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment