协程是用户空间线程又叫绿色线程,使用协程的好处是可以实现自己的协程调度器,另外是协程非常轻量。但由于协程对操作系统是透明的,所以协程无法实现真正的并行运行。因此协程通常用于I/O密集性的处理。那么如何实现一个协程调度器呢?这就是本文的话题。

简单的调度器

注意到,Python的生成器中yield关键字可以让函数在指定位置挂起,然后等待next函数从挂起点恢复函数(或者调用send方法)。例如:

1
2
3
4
5
def printer(nums):
print('printer start')
for item in nums:
yield item
print('printer end')

当我们有多个生成器需要“同时”执行时,通过yield把当前任务(生成器)挂起,进而从任务队列中取出其他的任务继续执行。这样就可以实现在用户空间调度协程。

来一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class CoroutineScheduler:

def __init__(self):
self._task_queue = deque()

def add_task(self, task):
self._task_queue.append(task)

def run(self):
while self._task_queue:
task = self._task_queue.popleft()
try:
result = next(task)
print(result)
self._task_queue.append(task)
except StopIteration:
pass

假定有多台打印机需要执行打印任务,但又不想其中一台一直占用资源直到打印完毕,那么就可以使用上面的调度器。

1
2
3
4
5
6
7
8
def print_worker():
printer1 = printer([1, 3, 5, 7, 9])
printer2 = printer([2, 4, 6, 8, 10])

scheduler = CoroutineScheduler()
scheduler.add_task(printer1)
scheduler.add_task(printer2)
scheduler.run()

输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
printer start
1
printer start
2
3
4
5
6
7
8
9
10
printer end
printer end

实现自己的调度逻辑

上述调度器的调度逻辑是先到先服务。我们可以在这个基础上添加优先级别、LIFO等。以优先级别为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class CoroutinePriorityScheduler:

def __init__(self):
self._task_queue = PriorityQueue()

def add_task(self, task, priority):
self._task_queue.append((priority, task))

def run(self):
while self._task_queue:
priority, task = self._task_queue.popleft()
try:
result = next(task)
print(result)
if priority > 0:
priority = priority - 1
self._task_queue.append((priority, task))
except StopIteration:
pass

关键是把任务取出后,重新加入到任务队列中的逻辑过程以及任务队列的作业排列特性。事实上这个逻辑过程也可以融合到任务队列中。也就是说,如果我们要实现特定的调度算法,把焦点放到任务队列的实现上即可。

根据上面的思路,异步网络调度器可以在非阻塞I/O的基础上实现。Python3.4后,标准库提供了异步I/O库asyncio了。

转载请包括本文地址:https://allenwind.github.io/blog/4596
更多文章请参考:https://allenwind.github.io/blog/archives/