Simple, Async, Map/Reduce queue for Ruby
At DNSimple we operate a distributed anycast DNS network: we have datacenters spread around the globe, and each of them has multiple servers. In order to guarantee correct DNS responses, we need to ensure that each server has the same set of DNS records.
Distributed data synchronization is hard, that's why we need to constantly monitor it.
In the past few months I worked on an internal tool which gives us more visibility inside our DNS cluster. For each server, the tool asks for the local copy of the DNS records of a given zone, and it aggregates all the results from all the servers. The initial scope of this new tool was internal monitoring, but it ended up to be useful for other tools too, including our Let's Encrypt integration and API v2 (as a new upcoming endpoint).
All of these use cases share three important requisites: aggregated results, fault tolerant requests, and fast response times.
Map/Reduce
The first iteration I wrote was focused around the concept of querying all the servers and aggregating their results. This is a manual case for the Map/Reduce pattern.
Map/reduce is a two step process with a queue that exposes two methods to implement:
#map
: to schedule processing logic for each node. In my case it's the query logic.#reduce
: to define the aggregation logic of the results. It answers the question: are all the servers in sync?
There are two objects to support the described interface: MyQueue::Worker
and MyQueue::Result
.
The first wraps the logic passed to #map
and it defers the execution until MyQueue::Worker#call
is invoked.
The second one is a result object to communicate to the outside world the result of the operation.
This is a simplified, but working version of the first iteration:
# frozen_string_literal: true
require "ostruct"
class MyQueue
class Worker
def initialize(&blk)
@blk = blk
end
def call
@blk.call
end
end
class Result
def initialize(merged)
@merged = merged
end
def in_sync?
@merged.all?(&:in_sync)
end
end
def initialize
@workers = []
end
def map(&blk)
@workers << Worker.new(&blk)
end
def reduce(accumulator)
merged = @workers.each_with_object(accumulator) do |worker, acc|
yield worker.call, acc
end
Result.new(merged)
end
end
servers = %w[server1.test server2.test server3.test]
queue = MyQueue.new
# For each server, enqueue a worker.
servers.each do |server|
queue.map do
# Simulate random wait due to network communication
sleep rand(0..3)
puts "#{Time.now.utc} - querying: #{server}"
# Return a result object with the name of the server and the result of the query
OpenStruct.new(name: server, in_sync: true)
end
end
result = queue.reduce([]) do |server, memo|
memo << server
end
puts "in sync: #{result.in_sync?}"
➜ ruby queue.rb
2018-05-30 14:52:27 UTC - querying: server1.test
2018-05-30 14:52:29 UTC - querying: server2.test
2018-05-30 14:52:31 UTC - querying: server3.test
in sync: true
Please notice the timestamps: the execution is sequential for now.
Fault tolerant requests
It may happen that a single query against one of our servers may fail due to network problems, timeouts, or because the server is down. This kind of failure must not compromise the ability of returning a result to the consumer.
Unfortunately, many HTTP Ruby clients raise exceptions for non-successful responses or for timeouts. I suppress these exceptions and just return a failing result.
During this second iteration, I introduced a Processor
object. It covers an important role: it encapsulates the query logic and turns exceptions into errors. It also makes it possible to easily simulate random network failures.
To reflect the concept of error, MyQueue::Result
now exposes #errors
, a collection of errors that happened during the query phase.
# frozen_string_literal: true
require "ostruct"
class MyQueue
class Worker
def initialize(&blk)
@blk = blk
end
def call
@blk.call
end
end
class Result
def initialize(merged)
@merged = merged
end
def in_sync?
@merged.all?(&:in_sync)
end
# NEW METHOD
def errors
@merged.map(&:error).compact
end
end
def initialize
@workers = []
end
def map(&blk)
@workers << Worker.new(&blk)
end
def reduce(accumulator)
merged = @workers.each_with_object(accumulator) do |worker, acc|
yield worker.call, acc
end
Result.new(merged)
end
end
# NEW OBJECT
class Processor
def call(server)
result = OpenStruct.new(name: server, in_sync: true)
sleep rand(0..3)
puts "#{Time.now.utc} - querying: #{server}"
raise "boom #{server}" if rand(10) < 1
result
rescue => exception
result.in_sync = false
result.error = exception.message
result
end
end
servers = %w[server1.test server2.test server3.test]
queue = MyQueue.new
processor = Processor.new
servers.each do |server|
queue.map do
# MOVED LOGIC INTO Processor#call
processor.call(server)
end
end
result = queue.reduce([]) do |server, memo|
memo << server
end
if result.in_sync?
puts "in sync: true"
else
puts "in sync: false - errors: #{result.errors.join(', ')}"
end
➜ ruby queue.rb
2018-05-30 16:17:45 UTC - querying: server1.test
2018-05-30 16:17:47 UTC - querying: server2.test
2018-05-30 16:17:49 UTC - querying: server3.test
in sync: false - errors: boom server1.test
The execution is still sequential, with a simulation of random network failures.
Parallelism
As a last task I tackled the overall speed of execution.
The consumer cannot wait forever for each single query to complete sequentially. Given these operations are I/O bound, I can parallelize them and wait for the response of each server.
For this iteration I renamed Worker
into SyncWorker
. This object is used to execute all the workers in sync and sequentially. It's useful for testing and debugging purposes, because multi-threading programs are hard to inspect: while observing the behavior of a single thread, other background threads may alter its state, and it's nearly impossible to understand what caused the change and how.
I also introduced AsyncWorker
for async, parallel execution. It's implemented with Concurrent::Promise
from the great concurrent-ruby
gem. It's worth noticing that Concurrent::Promise.execute(&blk)
is scheduling the job, not executing it yet. The real execution happens when I invoke @blk.value
, which yields &blk
internally.
# frozen_string_literal: true
require "ostruct"
require "concurrent"
class MyQueue
# FORMERLY KNOWN AS Worker
class SyncWorker
def initialize(&blk)
@blk = blk
end
def call
@blk.call
end
end
# NEW OBJECT
class AsyncWorker
def initialize(&blk)
@blk = Concurrent::Promise.execute(&blk)
end
def call
@blk.value
end
end
class Result
def initialize(merged)
@merged = merged
end
def in_sync?
@merged.all?(&:in_sync)
end
def errors
@merged.map(&:error).compact
end
end
# NEW METHOD
def self.build(async: true)
worker = async ? AsyncWorker : SyncWorker
new(worker: worker)
end
def initialize(worker:)
@worker = worker
@workers = []
end
def map(&blk)
@workers << @worker.new(&blk)
end
def reduce(accumulator)
merged = @workers.each_with_object(accumulator) do |worker, acc|
yield worker.call, acc
end
Result.new(merged)
end
end
class Processor
def call(server)
result = OpenStruct.new(name: server, in_sync: true)
sleep rand(0..3)
puts "#{Time.now.utc} - querying: #{server}"
raise "boom #{server}" if rand(1_000) < 1
result
rescue => exception
result.in_sync = false
result.error = exception.message
result
end
end
# MOAR SERVERS 🙀
servers = (1..60).map { |i| "server#{i}.test" }
queue = MyQueue.build
processor = Processor.new
servers.each do |server|
queue.map do
processor.call(server)
end
end
result = queue.reduce([]) do |server, memo|
memo << server
end
if result.in_sync?
puts "in sync: true"
else
puts "in sync: false - errors: #{result.errors.join(', ')}"
end
Unlike SyncWorker
, AsyncWorker
is used in production. To make the choice, we can use the default async worker Queue.build
or the synchronous one Queue.build(async: false)
.
Why Concurrent::Promise
instead of Thread
?
An alternative implementation without any gem dependency is to use Ruby Thread
:
class AsyncWorker
def initialize(&blk)
@blk = Thread.new(&blk)
end
def call
@blk.join
@blk.value
end
end
The main problem with this approach is scalability.
It isn't a good idea to run a lot of threads in parallel, as it may exhaust server resources. To use Thread
, I need to manually schedule a maximum number of threads that may run in parallel at any given time. This logic would be hard to maintain and is also error prone.
Concurrent::Future
is executed within a thread-pool that concurrent-ruby
provides under the hood. It's a win-win situation as we have simple API that manages the complexity of thread scheduling for us.
Conclusion
I shared a simplified version of a queue that we have used in production for the past few months. It offers simplicity of design, testability, scalability, and parallelism, without worrying about the complexity of multi-threading.
To query all our 40 servers in parallel, it takes only half second to complete the whole operation. I'd have never guessed that Ruby would have shined for this task.
But, hey, it did! 💎
Luca Guidi
Former astronaut, soccer player, superhero. All at the age of 10. For some reason now I write code.
We think domain management should be easy.
That's why we continue building DNSimple.
4.3 out of 5 stars.
Based on Trustpilot.com and G2.com reviews.