Using GenStage with low volume data source
In our new Elixir project, we recently needed to orchestrate various sequential processing steps, fetching jobs from an API endpoint. In our case, we were expecting fairly low-volume, so there was not always work present. The first step was to regularly check the API for new jobs. This post goes over our thinking about solutions, eventually settling on Elixir's GenStage.
To illustrate this let's pick a more natural example than DNS and Domains, because that's a very complex domain (pun intended) and DNSimple is here, so you don't have to think about that complexity. Let's say there is a server that generates numbers; sometimes it has a new number sometimes it doesn't. Then we want to calculate the square root of that number because square roots are fun!
The following gist is the server that provides the API we are talking to:
We do not expect many messages; something in the realm of 100 messages (numbers) per hour. When facing this task in Elixir, there are some options:
- Building a custom GenServer solution based on message passing via
cast/2
to trigger the work (push). - Using
:gen_event
or the GenEvent Supervisor based replacement as an underlying event bus to emit and listen to events (events). - Use GenStage as a tool for orchestration (pull).
The first lesson I learned is that all solutions benefit from decoupling the domain knowledge from the orchestration, that way we could try different approaches and the functions dealing with the fetching and processing are independent of what is plumping them together.
Here we have the DNSimpleNumbers
exposing the fetch/0
function that is responsible for fetching a number if present or nil
if not and the process/1
function that works with our data. These are the building blocks that we need for any orchestration strategy.
Looking at the various solutions, we agreed that an event bus for only this feature would be overkill, and the project does not need the overhead of a generic event bus yet. After a spike using GenServer.cast/2
, we discovered that we would have to implement certain things ourselves, like buffering, and also testing would be harder because cast/2
makes things async.
GenStage in a low volume environment?
Reading the through the GenStage documentation it became clear to me that it's designed for Big Data; processing thousands of events per second. The examples always assumed the data source is inexhaustible and there is always more than enough data to satisfy the demand
of the consumers. It was hard to understand if it's even possible to use GenStage with very little data or if it's a good choice doing so.
Before diving into the answer to that question let's look at the various parts of the GenStage flow (the stages). Let's start with the consumer that should process the data:
Because the processor is simple, it doesn't contain any state. In the init/1
function we are subscribing to the Fetcher
, since in this simplified example it is a 1-to-1 relation. The handle_events/3
function maps overall numbers passed down from the Fetcher
and uses DNSimpleNumbers
to process the data.
The answer to the question of how and if GenStage works with very little data is in the Fetcher
, as it's the responsibility to manage the demand of the consumers.
As shown handle_demand/2
does nothing because the Fetcher
knows when data is available to be passed down to the consumer. The Fetcher
polls the API every 2 seconds to look for new data and continues fetching it until there are no more numbers on the number server. The numbers we fetched are passed into the inner guts of GenStage
via {:noreply, numbers, state}
. Here is where the magic happens: GenStage takes care of passing the number of events that were requested by the consumer down and buffered the rest until there is more demand.
The producer needs to be started before the consumer, so the consumer has something to subscribe to.
Now, let's look at how this looks when we run it. Starting the number-server first and letting it run for a few seconds to ensure we built up some numbers in the number-server.rb
.
Then we start the script that is using starting the Fetcher and the Processor and hooks them together.
In the initial request we see the numbers that were queued in the number server, and afterward, we see event by event dripping through.
What this blog post is not covering, is that in our real use case we had multiple producers so we needed to make sure we can subscribe to n
producers in, we also skipped that for a real production case, it may be wise to reduce the initial demand of the consumer. However, all of that wouldn't make this blog post simple, and that's what we are here for. These topics may be something for another blog post.
Sources: Blogpost, Twitter Conversation, Gist, GenStage documentation
Ole Michaelis
Conference junkie, user groupie and boardgame geek also knows how to juggle. Oh, and software.
We think domain management should be easy.
That's why we continue building DNSimple.
4.3 out of 5 stars.
Based on Trustpilot.com and G2.com reviews.