Coroutines and Semi-Coroutines

February 19, 2008 – 7:57 pm

Ruby 1.9 中引入了 Fiber 用于支持 Coroutine 。事实上 Fiber 并不是一个 Coroutine ,而是 Semi-Coroutine ,或者叫做 Asymmetric Coroutine 。因为它只能将执行权返回给调用者,而完整的 Coroutine 可以自由转移运行权。 Ruby 1.9 也提供了 Fiber::Core 用于支持“正宗”的 Coroutine 。关于 Semi-Coroutine 和 Coroutine 的等价性似乎有些争论。下面是一个 Fiber 的例子:

fib = Fiber.new do
  x, y = 0, 1
  loop do
    Fiber.yield y
    x,y = y,x+y
  end
end
20.times { puts fib.resume }

这个 Semi-Coroutine 被用于生成 Fibonacci 数。Semi-Coroutine 看起来就像一个“轻量级的线程(Thread)”,而线程的概念已经为许多人所熟悉了,所以 Semi-Coroutine 看起来也就更“亲切”一些。从例子中可以看到, Fiber.yield
被用于将执行权返回到调用者,而 resume 可以用于启动或恢复 Fiber 的运行。相比之下, Coroutine 则更灵活一些,通过 Fiber#transfer 方法可以将执行权限转移到任意一个 Coroutine ,而不局限于“返回”。但是这种灵活的执行线路的转移通常会让人晕头转向。因此一些语言(例如 Lua )只提供 Semi-Coroutine 的支持,而 Python 则只提供更局限的(被称作 generator function)支持,在 Python 里只能在 Semi-Coroutine 当前的堆栈层次里 yield ,不能递归或者调用其他函数在 yield ,例如,在 Python 里,用下面的方法来做一个二叉树的 iterator 是不行的:

class Tree:
    def __init__(self, value, left=None, right=None):
        self.value = value
        self.left = left
        self.right = right
 
    def inorder(self):
        if self.left is not None:
            self.left.inorder
 
        yield self.value
 
        if self.right is not None:
            self.right.inorder

相比之下,Ruby 1.9 同时提供了 Coroutine ( Fiber::Core )和 Semi-Coroutine ( Fiber )的支持。有些人认为同时提供两种支持会让人感到混乱,如果二者实际上是等价的话,像 Lua 那样只支持更容易理解的 Semi-Coroutine 是更好的选择。那么二者究竟等价不等价呢?用 Coroutine 来实现一个Semi-Coroutine 是很容易的事情,相比之下用 Semi-Coroutine 来实现 Coroutine 会难一些。下面让我们来尝试一下:

首先考虑只有两个 Coroutine 互相转移的情况,这是最简单的情况,在执行 transfer 的时候考虑如果目标已经 resume 过了,为了避免二次 resume (这会触发异常),直接调用 Fiber.yield “返回”即可,因为只有两个 Coroutine ,返回一次一定是到另一个 Coroutine 那里;如果目标没有 resume 过,那么直接调用 resume 转移执行权即可:

require 'fiber'
 
class Coroutine
  @@prev = nil
  @@current = nil
 
  def initialize(&brk)
    @fib = Fiber.new(&brk)
  end
 
  def transfer(*args)
    if @@prev == self
      @@current = self
      return Fiber.yield(*args)
    else
      @@prev = @@current
      @@current = self
      return @fib.resume(*args)
    end
  end
  alias :start :transfer
end

下面是一个例子:

producer = consumer = nil
 
producer = Coroutine.new {
  [1, 2, 3].each { |x|
    consumer.transfer(x)
  }
  consumer.transfer(:stop)
}
 
consumer = Coroutine.new { |x|
  loop do
    break if x == :stop
    puts "Consumer get: #{x}"
    x = producer.transfer
  end
}
 
producer.start

输出的结果是:

Consumer get: 1
Consumer get: 2
Consumer get: 3

但是当有多个 Coroutine 协同工作的时候就不能这么简单了。例如下面的状态:

f ----------> g ----------> h
   g.resume      h.resume

现在是 h 在执行,如果 h 执行了 f.transfer ,则不能直接调用 f.resume ,这会导致 Double Resume 的异常;也不能直接调用 Fiber.yield ,这会回到 g
中而不是 f 中;唯一的办法就是连续两次 Fiber.yield 。因此,我们需要记录所有被 resume 过的 Coroutine :

@@prev << @@current
@@current = self
ret = @fib.resume(*args)

通过每次 resume 之前把当前 Coroutine 添加到 @@prev 数组中,在实际 transfer 的时候就可以查看目标 Coroutine 是否被 resume 过了,如果有,则设置 @@target 变量为要 transfer 的目标 Coroutine ,并一层一层地“弹出”(通过 Fiber.yield ):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def transfer(*args)
  ret = nil
  if @@prev.include?(self)
    @@target = self
    @@current = @@prev.pop
    return Fiber.yield(*args)
  else
    @@prev << @@current
    @@current = self
    ret = @fib.resume(*args)
  end
 
  if @@target == @@current
    return ret
  else
    @@current = @@prev.pop
    return Fiber.yield(*ret)
  end
end

注意每次“弹出”后继续执行的位置应该是在第 10 行,所以要在第 13 行检察是否继续“弹”,直到当前 Coroutine 即是目标为止。下面是一个例子:

f = g = h = nil
 
f = Coroutine.new { |x, y|
  puts "F1: #{x}"
  x, y = g.transfer(y, x+y)
  puts "F2: #{x}"
  x, y = h.transfer(y, x+y)
  puts "F3: #{x}"
}
 
g = Coroutine.new { |x, y|
  puts "G1: #{x}"
  x, y = h.transfer(y, x+y)
  puts "G2: #{x}"
  x, y = f.transfer(y, x+y)
  puts "G3: #{x}"
}
 
h = Coroutine.new { |x, y|
  puts "H1: #{x}"
  x, y = f.transfer(y, x+y)
  puts "H2: #{x}"
  x, y = g.transfer(y, x+y)
  puts "H3: #{x}"
}
 
f.start(0, 1)

执行结果为:

F1: 0
G1: 1
H1: 1
F2: 2
H2: 3
G2: 5
F3: 8

当然这样的实现还是有问题,使用了类变量来保存 Coroutine 的历史记录,如果同时有多“组” Coroutines 在运行,就会有问题,因此需要把信息保存在每个线程所独有的变量中,而且限制 Coroutine 不能在不同的线程之间进行跳转。

Updates: 目前 Ruby 1.9 发布的版本中已经没有 Fiber::Core 了, Fiber::CoreFiber 合并为一个东西了,现在 Fiber 既可以做不对称的 yield 也可以做对称的=transfer= 了。

Post a Comment