Free Trial
Learning

Simple, Async, Map/Reduce queue for Ruby

Luca Guidi's profile picture Luca Guidi on

At DNSimple we operate a distributed anycast DNS network: we have datacenters spread around the globe, and each of them has multiple servers. In order to guarantee correct DNS responses, we need to ensure that each server has the same set of DNS records.

Distributed data synchronization is hard, that's why we need to constantly monitor it.

In the past few months I worked on an internal tool which gives us more visibility inside our DNS cluster. For each server, the tool asks for the local copy of the DNS records of a given zone, and it aggregates all the results from all the servers. The initial scope of this new tool was internal monitoring, but it ended up to be useful for other tools too, including our Let's Encrypt integration and API v2 (as a new upcoming endpoint).

All of these use cases share three important requisites: aggregated results, fault tolerant requests, and fast response times.

Map/Reduce

The first iteration I wrote was focused around the concept of querying all the servers and aggregating their results. This is a manual case for the Map/Reduce pattern.

Map/reduce is a two step process with a queue that exposes two methods to implement:

  • #map: to schedule processing logic for each node. In my case it's the query logic.
  • #reduce: to define the aggregation logic of the results. It answers the question: are all the servers in sync?

There are two objects to support the described interface: MyQueue::Worker and MyQueue::Result. The first wraps the logic passed to #map and it defers the execution until MyQueue::Worker#call is invoked. The second one is a result object to communicate to the outside world the result of the operation.

This is a simplified, but working version of the first iteration:

# frozen_string_literal: true

require "ostruct"

class MyQueue
  class Worker
    def initialize(&blk)
      @blk = blk
    end

    def call
      @blk.call
    end
  end

  class Result
    def initialize(merged)
      @merged = merged
    end

    def in_sync?
      @merged.all?(&:in_sync)
    end
  end

  def initialize
    @workers = []
  end

  def map(&blk)
    @workers << Worker.new(&blk)
  end

  def reduce(accumulator)
    merged = @workers.each_with_object(accumulator) do |worker, acc|
      yield worker.call, acc
    end

    Result.new(merged)
  end
end

servers = %w[server1.test server2.test server3.test]
queue = MyQueue.new

# For each server, enqueue a worker.
servers.each do |server|
  queue.map do
    # Simulate random wait due to network communication
    sleep rand(0..3)
    puts "#{Time.now.utc} - querying: #{server}"

    # Return a result object with the name of the server and the result of the query
    OpenStruct.new(name: server, in_sync: true)
  end
end

result = queue.reduce([]) do |server, memo|
  memo << server
end

puts "in sync: #{result.in_sync?}"
➜ ruby queue.rb
2018-05-30 14:52:27 UTC - querying: server1.test
2018-05-30 14:52:29 UTC - querying: server2.test
2018-05-30 14:52:31 UTC - querying: server3.test
in sync: true

Please notice the timestamps: the execution is sequential for now.

Fault tolerant requests

It may happen that a single query against one of our servers may fail due to network problems, timeouts, or because the server is down. This kind of failure must not compromise the ability of returning a result to the consumer.

Unfortunately, many HTTP Ruby clients raise exceptions for non-successful responses or for timeouts. I suppress these exceptions and just return a failing result.

During this second iteration, I introduced a Processor object. It covers an important role: it encapsulates the query logic and turns exceptions into errors. It also makes it possible to easily simulate random network failures.

To reflect the concept of error, MyQueue::Result now exposes #errors, a collection of errors that happened during the query phase.

# frozen_string_literal: true

require "ostruct"

class MyQueue
  class Worker
    def initialize(&blk)
      @blk = blk
    end

    def call
      @blk.call
    end
  end

  class Result
    def initialize(merged)
      @merged = merged
    end

    def in_sync?
      @merged.all?(&:in_sync)
    end

    # NEW METHOD
    def errors
      @merged.map(&:error).compact
    end
  end

  def initialize
    @workers = []
  end

  def map(&blk)
    @workers << Worker.new(&blk)
  end

  def reduce(accumulator)
    merged = @workers.each_with_object(accumulator) do |worker, acc|
      yield worker.call, acc
    end

    Result.new(merged)
  end
end

# NEW OBJECT
class Processor
  def call(server)
    result = OpenStruct.new(name: server, in_sync: true)
    sleep rand(0..3)
    puts "#{Time.now.utc} - querying: #{server}"
    raise "boom #{server}" if rand(10) < 1
    result
  rescue => exception
    result.in_sync = false
    result.error   = exception.message
    result
  end
end

servers = %w[server1.test server2.test server3.test]
queue = MyQueue.new
processor = Processor.new

servers.each do |server|
  queue.map do
    # MOVED LOGIC INTO Processor#call
    processor.call(server)
  end
end

result = queue.reduce([]) do |server, memo|
  memo << server
end

if result.in_sync?
  puts "in sync: true"
else
  puts "in sync: false - errors: #{result.errors.join(', ')}"
end
➜ ruby queue.rb
2018-05-30 16:17:45 UTC - querying: server1.test
2018-05-30 16:17:47 UTC - querying: server2.test
2018-05-30 16:17:49 UTC - querying: server3.test
in sync: false - errors: boom server1.test

The execution is still sequential, with a simulation of random network failures.

Parallelism

As a last task I tackled the overall speed of execution.

The consumer cannot wait forever for each single query to complete sequentially. Given these operations are I/O bound, I can parallelize them and wait for the response of each server.

For this iteration I renamed Worker into SyncWorker. This object is used to execute all the workers in sync and sequentially. It's useful for testing and debugging purposes, because multi-threading programs are hard to inspect: while observing the behavior of a single thread, other background threads may alter its state, and it's nearly impossible to understand what caused the change and how.

I also introduced AsyncWorker for async, parallel execution. It's implemented with Concurrent::Promise from the great concurrent-ruby gem. It's worth noticing that Concurrent::Promise.execute(&blk) is scheduling the job, not executing it yet. The real execution happens when I invoke @blk.value, which yields &blk internally.

# frozen_string_literal: true

require "ostruct"
require "concurrent"

class MyQueue
  # FORMERLY KNOWN AS Worker
  class SyncWorker
    def initialize(&blk)
      @blk = blk
    end

    def call
      @blk.call
    end
  end

  # NEW OBJECT
  class AsyncWorker
    def initialize(&blk)
      @blk = Concurrent::Promise.execute(&blk)
    end

    def call
      @blk.value
    end
  end

  class Result
    def initialize(merged)
      @merged = merged
    end

    def in_sync?
      @merged.all?(&:in_sync)
    end

    def errors
      @merged.map(&:error).compact
    end
  end

  # NEW METHOD
  def self.build(async: true)
    worker = async ? AsyncWorker : SyncWorker
    new(worker: worker)
  end

  def initialize(worker:)
    @worker  = worker
    @workers = []
  end

  def map(&blk)
    @workers << @worker.new(&blk)
  end

  def reduce(accumulator)
    merged = @workers.each_with_object(accumulator) do |worker, acc|
      yield worker.call, acc
    end

    Result.new(merged)
  end
end

class Processor
  def call(server)
    result = OpenStruct.new(name: server, in_sync: true)
    sleep rand(0..3)
    puts "#{Time.now.utc} - querying: #{server}"
    raise "boom #{server}" if rand(1_000) < 1
    result
  rescue => exception
    result.in_sync = false
    result.error   = exception.message
    result
  end
end

# MOAR SERVERS 🙀
servers = (1..60).map { |i| "server#{i}.test" }
queue = MyQueue.build
processor = Processor.new

servers.each do |server|
  queue.map do
    processor.call(server)
  end
end

result = queue.reduce([]) do |server, memo|
  memo << server
end

if result.in_sync?
  puts "in sync: true"
else
  puts "in sync: false - errors: #{result.errors.join(', ')}"
end

Unlike SyncWorker, AsyncWorker is used in production. To make the choice, we can use the default async worker Queue.build or the synchronous one Queue.build(async: false).

Why Concurrent::Promise instead of Thread?

An alternative implementation without any gem dependency is to use Ruby Thread:

class AsyncWorker
  def initialize(&blk)
    @blk = Thread.new(&blk)
  end

  def call
    @blk.join
    @blk.value
  end
end

The main problem with this approach is scalability.

It isn't a good idea to run a lot of threads in parallel, as it may exhaust server resources. To use Thread, I need to manually schedule a maximum number of threads that may run in parallel at any given time. This logic would be hard to maintain and is also error prone.

Concurrent::Future is executed within a thread-pool that concurrent-ruby provides under the hood. It's a win-win situation as we have simple API that manages the complexity of thread scheduling for us.

Conclusion

I shared a simplified version of a queue that we have used in production for the past few months. It offers simplicity of design, testability, scalability, and parallelism, without worrying about the complexity of multi-threading.

To query all our 40 servers in parallel, it takes only half second to complete the whole operation. I'd have never guessed that Ruby would have shined for this task.

But, hey, it did! 💎

Share on Twitter and Facebook

Luca Guidi's profile picture

Luca Guidi

Former astronaut, soccer player, superhero. All at the age of 10. For some reason now I write code.

We think domain management should be easy.
That's why we continue building DNSimple.

Try us free for 30 days
4.5 stars

4.3 out of 5 stars.

Based on Trustpilot.com and G2.com reviews.