使用Python实现基于asyncio的生产者/消费者模式。

原理

asyncio实现了在协程上运行的队列。它本身是collections.deque的封装。由于asyncio的事件循环本身就运行在单线程上,而asyncio.Queue需要绑定一个事件循环才能运行,因此在多线程下不需要额外的加锁。但在单个线程下,协程间需要同步还是离不开锁。基于协程的锁同步原语在asyncio.locks。接口使用上threading模块的同步原语一致。

实现

monitor是监控器,每五秒打印队列大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import random
import time
import asyncio
import string

async def producer(queue, _id):
item = string.ascii_letters
loop = 0
while True:
e = random.choice(item)
print('product:{} by producer<{}>'.format(e, _id))
await queue.put(e)
await asyncio.sleep(random.random())
loop += 1
if loop == 20:
break

async def consumer(queue, _id):
loop = 0
while True:
e = await queue.get()
print('sonsume:{} by consumer<{}>'.format(e, _id))
queue.task_done()
await asyncio.sleep(random.random())
loop += 1
if loop == 20:
break

async def monitor(queue):
loop = 0
while True:
await asyncio.sleep(5)
print(queue._queue)
loop += 1
if loop == 5:
break

def main():
queue = asyncio.Queue()
producer1 = producer(queue, 1)
producer2 = producer(queue, 2)

consumer1 = consumer(queue, 1)
consumer2 = consumer(queue, 2)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*[producer1, producer2, consumer1, consumer2, monitor(queue)]))

if __name__ == '__main__':
main()

应用

可以实现基于消费者/生产者模型,实现一个简单的异步爬虫框架。

转载请包括本文地址:https://allenwind.github.io/blog/2647
更多文章请参考:https://allenwind.github.io/blog/archives/