协程是用户空间线程又叫绿色线程,使用协程的好处是可以实现自己的协程调度器,另外是协程非常轻量。但由于协程对操作系统是透明的,所以协程无法实现真正的并行运行。因此协程通常用于I/O密集性的处理。那么如何实现一个协程调度器呢?这就是本文的话题。
简单的调度器
注意到,Python的生成器中yield
关键字可以让函数在指定位置挂起,然后等待next函数从挂起点恢复函数(或者调用send方法)。例如:
1 2 3 4 5
| def printer(nums): print('printer start') for item in nums: yield item print('printer end')
|
当我们有多个生成器需要“同时”执行时,通过yield
把当前任务(生成器)挂起,进而从任务队列中取出其他的任务继续执行。这样就可以实现在用户空间调度协程。
来一个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class CoroutineScheduler:
def __init__(self): self._task_queue = deque()
def add_task(self, task): self._task_queue.append(task)
def run(self): while self._task_queue: task = self._task_queue.popleft() try: result = next(task) print(result) self._task_queue.append(task) except StopIteration: pass
|
假定有多台打印机需要执行打印任务,但又不想其中一台一直占用资源直到打印完毕,那么就可以使用上面的调度器。
1 2 3 4 5 6 7 8
| def print_worker(): printer1 = printer([1, 3, 5, 7, 9]) printer2 = printer([2, 4, 6, 8, 10])
scheduler = CoroutineScheduler() scheduler.add_task(printer1) scheduler.add_task(printer2) scheduler.run()
|
输出结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| printer start 1 printer start 2 3 4 5 6 7 8 9 10 printer end printer end
|
实现自己的调度逻辑
上述调度器的调度逻辑是先到先服务。我们可以在这个基础上添加优先级别、LIFO等。以优先级别为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class CoroutinePriorityScheduler:
def __init__(self): self._task_queue = PriorityQueue()
def add_task(self, task, priority): self._task_queue.append((priority, task))
def run(self): while self._task_queue: priority, task = self._task_queue.popleft() try: result = next(task) print(result) if priority > 0: priority = priority - 1 self._task_queue.append((priority, task)) except StopIteration: pass
|
关键是把任务取出后,重新加入到任务队列中的逻辑过程以及任务队列的作业排列特性。事实上这个逻辑过程也可以融合到任务队列中。也就是说,如果我们要实现特定的调度算法,把焦点放到任务队列的实现上即可。
根据上面的思路,异步网络调度器可以在非阻塞I/O的基础上实现。Python3.4后,标准库提供了异步I/O库asyncio了。
转载请包括本文地址:https://allenwind.github.io/blog/4596
更多文章请参考:https://allenwind.github.io/blog/archives/