Commit 4da96655 authored by Paul Swartz's avatar Paul Swartz

presentation

parent 32b8c8f1
# GenStage
<div>march 20, 2018</div>
<div>boston elixir @ MBTA</div>
Note: thanks everyone for coming! I'm Paul Swartz, lead architect here at MBTA, and tonight I'll be talking about GenStage.
---
> "a specification for exchanging events between producers and consumers."
Note: The README describes it as .... But that doesn't showcase what's
interesting about the library.
---
#### `everyone |> loves |> pipelines!`
Note: almost everyone coming to Elixir raves about the pipeline
operator. but, it's syntactic sugar for calling functions; it doesn't take
advantage of OTP's concurrency or supervision
---
# Stages
Note: GenStage introduces the concept of stages. Conceptually, they are the
functions in our pipeline operator example, but allow for concurrency and
supervision. There are three (3) types.
---
## Producer
* returns a given amount of data when requested
* implements the `handle_demand/2` callback
---
## Consumer
* does something with incoming data
* implements the `handle_events/3` callback
---
## Producer Consumer
* combination of Producer and Consumer
* work as a filter/map stage
* implements `handle_event/3`
---
# Examples
Note: let's look at some small examples from the documentation
---
## Producer
<pre><code class="elixir" data-trim>
defmodule A do
use GenStage
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
{:ok, a} = GenStage.start_link(A, 0)
</code></pre>
Note: If the counter is 3 and we ask for 2 items, we wil emit the items 3 and
4, and set the state to 5. `init/1` returns the stage type and the initial state.
---
## Producer Consumer
<pre><code class="elixir" data-trim>
defmodule B do
use GenStage
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
events = for event <- events, do: event * number
{:noreply, events, number}
end
end
{:ok, b} = GenStage.start_link(B, 2)
</code></pre>
Note: multiplies each event by the given number. It can also return more or
fewer events than it receives.
---
## Consumer
<pre><code class="elixir" data-trim>
defmodule C do
use GenStage
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
Process.sleep(1000)
IO.inspect(events)
{:noreply, [], state}
end
end
{:ok, c} = GenStage.start_link(C, :ok)
</code></pre>
Note: Sleeps for 1 second, then prints the events it received. Since it's a
consumer, we would never emit items: the second item of the tuple is always
the empty list. Now all that's left is to hook them together.
---
# Subscribing
connecting the stages
---
`sync_subscribe/2`
<pre><code class="elixir" data-trim>
GenStage.sync_subscribe(b, to: a)
GenStage.sync_subscribe(c, to: b)
</code></pre>
Note: there's also `async_subscribe`, but that's more for internal use.
---
better: subscribe from `init/1`
<pre><code class="elixir" data-trim>
defmodule C
def init(producers)
{:consumer, :does_not_matter, subscribe_to: producers}
end
end
{:ok, _} = GenStage.start_link(B, 2, name: B)
{:ok, _} = GenStage.start_link(C, [B])
</code></pre>
Note: it's better because if the consumer crashes, it will automatically
re-subscribe when the supervisor restarts it
---
shell
---
<pre><code class="elixir" data-trim>
[0, 2, 4, 6, 8, 10, ...]
[1000, 1002, 1004, 1006, 1008, 1010, ...]
</code></pre>
why 500 events?
---
# Demand
consumers tell producers how much data to send
Note: Demand flows from consumers to producers, as a form of
back-pressure. if consumers slow down, producers will run out of available
demand and will stop sending events until the consumers catch
up. producers/producers can also buffer (10k items for producers, infinity
for producerconsumers). can keep either first or last items.
---
# Dispatchers
pipelines with joints
Note: module which describes how a producer sends events to subscribe
consumers
---
<div>`DemandDisptcher`</div>
<div>sends to consumer with the highest demand <small>(default)</small></div>
<div class="fragment">
<div>`BroadcastDispatcher`</div>
<div>sends events to all consumers when they have demand (fan-out)</div>
<div class="fragment">
<div>`PartitionDispatcher`</div>
<div>sends to a particular consumer based on a hash function</div>
</div>
---
# ConsumerSupervisor
taking advantage of concurrency
---
<pre><code class="elixir" data-trim>
defmodule Consumer do
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
children = [worker(Printer, [], restart: :temporary)]
{:ok, children,
strategy: :one_for_one,
subscribe_to: [A]}
end
end
</code></pre>
Note: This calls `Printer.start_link(event)` for each event received. It's
similar to the `:simple_one_for_one` supervision strategy, or the
`DynamicSupervisor` behavior.
---
# Concentrate
Note: Let's look at an example architecture which uses GenStage. Concentrate
is an application the MBTA uses to combine realtime information from across
our systems.
---
<!-- .slide: data-background-image="/img/concentrate.svg" data-background-size="contain" -->
Note: sources (producers) get data by making HTTP requests and
parsing. MergeFilter receives events from all the sources, then broadcasts to
Reporters (statistics) and Encoders (output data). All the encoders pass
their encoded data to a ConsumerSupervisor which spawns a Sink to upload the
data.
---
# Tips & Tricks
Note: I'll finish up with some tips I've picked up from using GenStage for
projects here @ MBTA.
---
## Stages for Concurrency
modules/functions for logical separation
Note: The best use of stages is for places where you want concurrency. Don't
use stages just because the data is logically flowing between different parts
of your system: that's better represented by having a single stage use
different modules internally. If we look back at the Concentrate architecture...
---
<!-- .slide: data-background-image="/img/concentrate.svg" data-background-size="contain" -->
Note: ...we can see MergeFilter is doing a lot
of work: merging the different sources together, filtering that data, and
grouping it, before passing it along. Those could be implemented as different
stages, but since there's only the single path through them, it's only added
complexity.
---
## Demand management
try changing min/max_demand
Note: each subscription has a miniumum and maximum amount of outstanding
demand. Tuning these can avoid data building up between stages, and keep data flowing smoothly.
---
## Supervision
you probably want the `:rest_for_one`
Note: if one a consumer crashes, it'll generally crash any producers, and
possibly crashing the supervisor as well. `:rest_for_one` restarts the whole
tree at once.
---
## Testing
* `GenStage.from_enumerable/2`
* `GenStage.stream/2 |> Enum.take/2`
Note: GenStage comes with two helpful functions for testing your
stages. `from_enumerable/2` makes a producer stage from an
enumerable. `stream/2` makes an Enumerable by subscribing to a producer
stage. And at the end of the day, you can always call `handle_demand/2` and
`handle_events/3` manually!
---
# Further references
* `gen_stage` (hex)
* `flow` (hex)
* [José's keynote](https://youtu.be/srtMWzyqdp8?t=244)
Note: Flow provides an Enumerable-like API on top of GenStage
---
# We're hiring!
[https://jobs.lever.co/mbta](https://jobs.lever.co/mbta)
Note: hiring for engineers, product and project managers, designers
---
# Thanks!
* @paulswartz
* paul@paulswartz.net
* [paulswartz.net](https://paulswartz.net)
# Usage: mix run example/producer_consumer.exs
#
# Hit Ctrl+C twice to stop it.
#
# This is a base example where a producer A emits items, which are amplified
# by a producer consumer B and printed by consumer C. Based on the GenStage
# example from
# https://github.com/elixir-lang/gen_stage/blob/master/examples/producer_consumer.exs
defmodule A do
use GenStage
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
defmodule B do
use GenStage
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
events = for event <- events, do: event * number
{:noreply, events, number}
end
end
defmodule C do
use GenStage
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
# Wait for a second.
:timer.sleep(1000)
# Inspect the events.
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
{:ok, a} = GenStage.start_link(A, 0) # starting from zero
{:ok, b} = GenStage.start_link(B, 2) # expand by 2
{:ok, c} = GenStage.start_link(C, :ok) # state does not matter
GenStage.sync_subscribe(b, to: a)
GenStage.sync_subscribe(c, to: b)
Process.sleep(:infinity)
strict digraph concentrate {
graph [bgcolor="#002b36"]
node [color="#93a1a1", fontcolor="#93a1a1"]
edge [color="#93a1a1", fontcolor="#93a1a1"]
source1 [label="Source"]
source2 [label="..."]
merge_filter [label="MergeFilter"]
reporter1 [label="Reporter"]
reporter2 [label="..."]
encoder1 [label="Encoder"]
encoder2 [label="..."]
sink_supervisor [label="Sink.ConsumerSupervisor"]
sink1 [label=Sink]
sink2 [label="..."]
source1 -> merge_filter
source2 -> merge_filter
merge_filter -> reporter1 [label="broadcast"]
merge_filter -> reporter2 [label="broadcast"]
merge_filter -> encoder1 [label="broadcast"]
merge_filter -> encoder2 [label="broadcast"]
encoder1 -> sink_supervisor
encoder2 -> sink_supervisor
sink_supervisor -> sink1
sink_supervisor -> sink2
}
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<!-- Generated by graphviz version 2.40.1 (20161225.0304)
-->
<!-- Title: concentrate Pages: 1 -->
<svg width="365pt" height="349pt"
viewBox="0.00 0.00 365.26 349.00" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 345)">
<title>concentrate</title>
<polygon fill="#002b36" stroke="transparent" points="-4,4 -4,-345 361.2553,-345 361.2553,4 -4,4"/>
<!-- source1 -->
<g id="node1" class="node">
<title>source1</title>
<ellipse fill="none" stroke="#93a1a1" cx="127.046" cy="-323" rx="34.8285" ry="18"/>
<text text-anchor="middle" x="127.046" y="-318.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">Source</text>
</g>
<!-- merge_filter -->
<g id="node3" class="node">
<title>merge_filter</title>
<ellipse fill="none" stroke="#93a1a1" cx="167.046" cy="-250" rx="52.3827" ry="18"/>
<text text-anchor="middle" x="167.046" y="-245.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">MergeFilter</text>
</g>
<!-- source1&#45;&gt;merge_filter -->
<g id="edge1" class="edge">
<title>source1&#45;&gt;merge_filter</title>
<path fill="none" stroke="#93a1a1" d="M136.7289,-305.3287C141.4597,-296.695 147.2511,-286.1256 152.4999,-276.5466"/>
<polygon fill="#93a1a1" stroke="#93a1a1" points="155.5777,-278.2132 157.3136,-267.7615 149.4388,-274.8494 155.5777,-278.2132"/>
</g>
<!-- source2 -->
<g id="node2" class="node">
<title>source2</title>
<ellipse fill="none" stroke="#93a1a1" cx="207.046" cy="-323" rx="27" ry="18"/>
<text text-anchor="middle" x="207.046" y="-318.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">...</text>
</g>
<!-- source2&#45;&gt;merge_filter -->
<g id="edge2" class="edge">
<title>source2&#45;&gt;merge_filter</title>
<path fill="none" stroke="#93a1a1" d="M197.7687,-306.0688C193.0115,-297.3871 187.1053,-286.6082 181.7462,-276.8278"/>
<polygon fill="#93a1a1" stroke="#93a1a1" points="184.7059,-274.9457 176.8311,-267.8578 178.5671,-278.3095 184.7059,-274.9457"/>
</g>
<!-- reporter1 -->
<g id="node4" class="node">
<title>reporter1</title>
<ellipse fill="none" stroke="#93a1a1" cx="41.046" cy="-164" rx="41.092" ry="18"/>
<text text-anchor="middle" x="41.046" y="-159.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">Reporter</text>
</g>
<!-- merge_filter&#45;&gt;reporter1 -->
<g id="edge3" class="edge">
<title>merge_filter&#45;&gt;reporter1</title>
<path fill="none" stroke="#93a1a1" d="M122.8946,-240.083C105.4773,-234.5535 86.151,-226.2346 71.4044,-214 63.9087,-207.7812 57.7684,-199.2044 53.0201,-190.9092"/>
<polygon fill="#93a1a1" stroke="#93a1a1" points="55.9919,-189.0394 48.2524,-181.804 49.7906,-192.2866 55.9919,-189.0394"/>
<text text-anchor="middle" x="98.8668" y="-202.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">broadcast</text>
</g>
<!-- reporter2 -->
<g id="node5" class="node">
<title>reporter2</title>
<ellipse fill="none" stroke="#93a1a1" cx="127.046" cy="-164" rx="27" ry="18"/>
<text text-anchor="middle" x="127.046" y="-159.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">...</text>
</g>
<!-- merge_filter&#45;&gt;reporter2 -->
<g id="edge4" class="edge">
<title>merge_filter&#45;&gt;reporter2</title>
<path fill="none" stroke="#93a1a1" d="M146.4013,-233.1437C140.9657,-227.6242 135.7044,-221.0745 132.4044,-214 129.3296,-207.4085 127.6887,-199.8117 126.8739,-192.5621"/>
<polygon fill="#93a1a1" stroke="#93a1a1" points="130.3508,-192.0946 126.2006,-182.3464 123.3659,-192.555 130.3508,-192.0946"/>
<text text-anchor="middle" x="159.8668" y="-202.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">broadcast</text>
</g>
<!-- encoder1 -->
<g id="node6" class="node">
<title>encoder1</title>
<ellipse fill="none" stroke="#93a1a1" cx="212.046" cy="-164" rx="39.6385" ry="18"/>
<text text-anchor="middle" x="212.046" y="-159.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">Encoder</text>
</g>
<!-- merge_filter&#45;&gt;encoder1 -->
<g id="edge5" class="edge">
<title>merge_filter&#45;&gt;encoder1</title>
<path fill="none" stroke="#93a1a1" d="M176.5386,-232.0661C179.5827,-226.3012 182.9632,-219.8844 186.046,-214 190.0046,-206.4437 194.2826,-198.2348 198.2033,-190.6942"/>
<polygon fill="#93a1a1" stroke="#93a1a1" points="201.4236,-192.0874 202.9274,-181.5998 195.2117,-188.8606 201.4236,-192.0874"/>
<text text-anchor="middle" x="219.8668" y="-202.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">broadcast</text>
</g>
<!-- encoder2 -->
<g id="node7" class="node">
<title>encoder2</title>
<ellipse fill="none" stroke="#93a1a1" cx="297.046" cy="-164" rx="27" ry="18"/>
<text text-anchor="middle" x="297.046" y="-159.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">...</text>
</g>
<!-- merge_filter&#45;&gt;encoder2 -->
<g id="edge6" class="edge">
<title>merge_filter&#45;&gt;encoder2</title>
<path fill="none" stroke="#93a1a1" d="M205.2068,-237.5781C220.2413,-231.7066 237.1892,-223.8083 251.046,-214 261.0707,-206.9042 270.5506,-197.3088 278.2704,-188.4212"/>
<polygon fill="#93a1a1" stroke="#93a1a1" points="281.1168,-190.4727 284.8304,-180.55 275.7394,-185.9911 281.1168,-190.4727"/>
<text text-anchor="middle" x="292.8668" y="-202.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">broadcast</text>
</g>
<!-- sink_supervisor -->
<g id="node8" class="node">
<title>sink_supervisor</title>
<ellipse fill="none" stroke="#93a1a1" cx="254.046" cy="-91" rx="103.4191" ry="18"/>
<text text-anchor="middle" x="254.046" y="-86.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">Sink.ConsumerSupervisor</text>
</g>
<!-- encoder1&#45;&gt;sink_supervisor -->
<g id="edge7" class="edge">
<title>encoder1&#45;&gt;sink_supervisor</title>
<path fill="none" stroke="#93a1a1" d="M222.213,-146.3287C227.1436,-137.7589 233.1713,-127.2821 238.65,-117.7597"/>
<polygon fill="#93a1a1" stroke="#93a1a1" points="241.7276,-119.4289 243.6808,-109.0157 235.6601,-115.938 241.7276,-119.4289"/>
</g>
<!-- encoder2&#45;&gt;sink_supervisor -->
<g id="edge8" class="edge">
<title>encoder2&#45;&gt;sink_supervisor</title>
<path fill="none" stroke="#93a1a1" d="M287.0729,-147.0688C281.9442,-138.3621 275.5731,-127.5461 269.7989,-117.7433"/>
<polygon fill="#93a1a1" stroke="#93a1a1" points="272.8078,-115.9552 264.7166,-109.1153 266.7763,-119.508 272.8078,-115.9552"/>
</g>
<!-- sink1 -->
<g id="node9" class="node">
<title>sink1</title>
<ellipse fill="none" stroke="#93a1a1" cx="218.046" cy="-18" rx="27" ry="18"/>
<text text-anchor="middle" x="218.046" y="-13.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">Sink</text>
</g>
<!-- sink_supervisor&#45;&gt;sink1 -->
<g id="edge9" class="edge">
<title>sink_supervisor&#45;&gt;sink1</title>
<path fill="none" stroke="#93a1a1" d="M245.1471,-72.9551C240.8995,-64.3419 235.7344,-53.8681 231.0571,-44.3837"/>
<polygon fill="#93a1a1" stroke="#93a1a1" points="234.1566,-42.7554 226.5946,-35.3347 227.8785,-45.8514 234.1566,-42.7554"/>
</g>
<!-- sink2 -->
<g id="node10" class="node">
<title>sink2</title>
<ellipse fill="none" stroke="#93a1a1" cx="290.046" cy="-18" rx="27" ry="18"/>
<text text-anchor="middle" x="290.046" y="-13.8" font-family="Times,serif" font-size="14.00" fill="#93a1a1">...</text>
</g>
<!-- sink_supervisor&#45;&gt;sink2 -->
<g id="edge10" class="edge">
<title>sink_supervisor&#45;&gt;sink2</title>
<path fill="none" stroke="#93a1a1" d="M262.9449,-72.9551C267.1925,-64.3419 272.3576,-53.8681 277.0349,-44.3837"/>
<polygon fill="#93a1a1" stroke="#93a1a1" points="280.2135,-45.8514 281.4974,-35.3347 273.9354,-42.7554 280.2135,-45.8514"/>
</g>
</g>
</svg>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment