使用Python实现基于asyncio的生产者/消费者模式。
原理
asyncio实现了在协程上运行的队列。它本身是collections.deque
的封装。由于asyncio的事件循环本身就运行在单线程上,而asyncio.Queue
需要绑定一个事件循环才能运行,因此在多线程下不需要额外的加锁。但在单个线程下,协程间需要同步还是离不开锁。基于协程的锁同步原语在asyncio.locks
。接口使用上threading
模块的同步原语一致。
实现
monitor
是监控器,每五秒打印队列大小。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import random import time import asyncio import string
async def producer(queue, _id): item = string.ascii_letters loop = 0 while True: e = random.choice(item) print('product:{} by producer<{}>'.format(e, _id)) await queue.put(e) await asyncio.sleep(random.random()) loop += 1 if loop == 20: break
async def consumer(queue, _id): loop = 0 while True: e = await queue.get() print('sonsume:{} by consumer<{}>'.format(e, _id)) queue.task_done() await asyncio.sleep(random.random()) loop += 1 if loop == 20: break
async def monitor(queue): loop = 0 while True: await asyncio.sleep(5) print(queue._queue) loop += 1 if loop == 5: break
def main(): queue = asyncio.Queue() producer1 = producer(queue, 1) producer2 = producer(queue, 2)
consumer1 = consumer(queue, 1) consumer2 = consumer(queue, 2)
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*[producer1, producer2, consumer1, consumer2, monitor(queue)]))
if __name__ == '__main__': main()
|
应用
可以实现基于消费者/生产者模型,实现一个简单的异步爬虫框架。
转载请包括本文地址:https://allenwind.github.io/blog/2647
更多文章请参考:https://allenwind.github.io/blog/archives/