Looping with Fibers

Mar 10, 2014
ruby, tech
I needed to loop over a fairly large data set and group it by year and month. In this post I walk through using two Ruby Fibers which communicate between each other in order to do in-loop grouping

An overview of how Fibers work in Ruby

Fibers are code blocks that can be paused and resumed. They are unlike threads because they never run concurrently. The programmer is in complete control of when a fiber is run. Because of this we can create two fibers and pass control between them.

Control is passed to a fiber when you call Fiber#resume, the Fiber returns control by calling Fiber.yield

1
2
3
4
5
6
7
8
fiber = Fiber.new do
  Fiber.yield 'one'
  Fiber.yield 'two'
end
puts fiber.resume
#=> one
puts fiber.resume
#=> two

The above example shows the most common use case where Fiber.yield is passed an argument which is returned through Fiber#resume.
What’s interesting is that you can pass an argument into the fiber via Fiber#resume as well. The first call to Fiber#resume starts the fiber and that argument goes to the block that creates the fiber, all subsequent calls to Fiber#resume have their arguments passed to Fiber.yield.

1
2
3
4
5
6
7
8
9
10
11
12
13
fiber = Fiber.new do |arg|
  puts arg                   # prints 'one'
  puts Fiber.yield('two')    # prints 'three'
  puts Fiber.yield('four')   # prints 'five'
end
puts fiber.resume('one')     # prints 'two'
#=> one
#=> two
puts fiber.resume('three')   # prints 'four'
#=> three
#=> four
puts fiber.resume('five')    # prints nil because there's no corresponding yield and the fiber exits
#=> nil

Armed with this information, we can setup two fibers and get them to communicate between each other.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
require 'fiber'

fiber2 = nil
fiber1 = Fiber.new do
  puts fiber2.resume     # start fiber2 and print first result (1)
  puts fiber2.resume 2   # send second number and print second result (3)
  fiber2.resume 4        # send forth number, print nothing and exit
end
fiber2 = Fiber.new do
  puts Fiber.yield 1     # send first number and print returned result (2)
  puts Fiber.yield 3     # send third number, print returned result (4) and exit
end
fiber1.resume            # start fiber1
#=> 1
#=> 2
#=> 3
#=> 4
puts "fiber1 done" unless fiber1.alive?
#=> fiber1 done
puts "fiber2 done" unless fiber2.alive?
#=> fiber2 done

EachGroup module

Knowing we can send information between two fibers with alternating calls of Fiber#resume and Fiber.yield, we have the building blocks to tackle a streaming #each_group method.
Tip: The fiber you first call #resume on should always call #resume on the fiber it is communicating with. The other thread then always calls Fiber.yield. This goes against the natural inclination to pass information with Fiber.yield as in the first example above. Because of how the two fibers are setup below, you’ll see that no information is passed with Fiber.yield, information is only passed using Fiber#resume —confusing, I know.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# -*- coding: utf-8 -*-
require 'fiber'

module EachGroup
  def each_group(*fields, &block)
    grouper = Grouper.new(*fields, &block)
    loop_fiber = Fiber.new do
      each do |result|
        grouper.process_result(result)
      end
    end
    loop_fiber.resume
  end

  class Grouper
    def initialize(*fields, &block)
      @current_group = nil
      @fields = fields
      @block = block
    end
    attr_reader :fields, :block
    attr_accessor :current_group

    def process_result(result)
      group_fiber = get_group_fiber(result)
      group_fiber.resume(result) if group_fiber.alive?
    end

    private
    def get_group_fiber(result)
      group_value = fields.map{|f| result.public_send(f) }
      unless current_group == group_value
        self.current_group = group_value
        create_group_fiber(result, group_value)
      end
      @group_fiber
    end

    def create_group_fiber(result, group_value)
      @group_fiber = Fiber.new do |first_result|
        group = Group.new(group_value)
        block.call(group)
      end
      @group_fiber.resume(nil) # Start the fiber and wait for its first yield
    end
  end

  class Group
    def initialize(value)
      @value = value
    end
    attr_reader :value

    def each(&block)
      while result = Fiber.yield
        block.call(result)
      end
    end
  end
end

Example Usage

#each_group requires input sorted for grouping.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
require 'each_group'
require 'ostruct'

Array.send(:include, EachGroup)

array = [
  OpenStruct.new(year: 2014, month: 1, date: 1),
  OpenStruct.new(year: 2014, month: 1, date: 3),
  OpenStruct.new(year: 2014, month: 2, date: 5),
  OpenStruct.new(year: 2014, month: 2, date: 7),
]
array.each_group(:year, :month) do |group|
  puts group.value.inspect
  group.each do |obj|
    puts "  #{obj.date}"
  end
end
#=> [2014, 1]
#=>   1
#=>   3
#=> [2014, 2]
#=>   5
#=>   7

This code can be used with ActiveRecord as follows:

1
2
3
4
5
6
7
ActiveRecord::Relation.send(:include, EachGroup)

Model.order('year, month').each_group do |group|
  group.each do
    # ...
  end
end

I have uploaded a Gist that shows a previous iteration of the EachGroup module using a nested loop which you may find easier to use to understand how the fibers are used to control the flow of the loop.

  1. The above code with a RSpec spec - https://gist.github.com/andrewtimberlake/9462561
  2. The original code with nested loops - https://gist.github.com/andrewtimberlake/9462561/f0e88cd310614a34693d57c3fc759f5c78e3a264

Thanks for taking the time to read through this. Explaining complicated concepts like Fibers is a challenge, please leave a comment and let me know if this was helpful or if you still have any questions.

comments powered by Disqus