Pythonic的Actor并发模式实现

Actor模式,参与者模式,是一种解决并发和分布式计算的方法。这种方法适用于共享内存的计算机和分布式场景。可以把它类比成企业内部的组织架构。

  1. Actor类,角色的内部状态和处理消息的方式的抽象。
  2. Actor实例,程序运行时存在的实体,它们能够接收消息。每个Actor自己决定是否把消息分发到其他Actor实例。Actor实例也可以创建自己的Actor实例。
  3. 消息,通信数据的单位,Actor间使用消息进行通信。
  4. 邮箱,缓存消息的内存区域,每个Actor实例都有一块专属的区域。
  5. Actor引用,是一种对象,使用该对象可以向指定的Actor实例发送消息。Actor引用就像企业员工的邮件地址
  6. 分配器,是一种组件,用于决定何时允许Actor实例处理消息,并未Actor分配计算资源。

下面我们来实现简单的Actor模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Actor:
_qsize = 100

def send(self, task, *args, **kwargs):
self._mailbox.put((task, args, kwargs))

def recv(self, block=True, timeout=None):
try:
task, args, kwargs = self._mailbox.get(block=True, timeout=timeout)
except queue.Empty as err:
task = ActorExit

if task is ActorExit:
raise ActorExit()
return task, args, kwargs

def start(self):
raise NotImplementedError("must start by MixIn pattern!")

def close(self):
self.send(ActorExit)

def _bootstrap(self):
try:
self.run()
except ActorExit:
pass
finally:
self._terminated.set()

def join(self):
self._terminated.wait()

def run(self):
pass

def _wrapper_task(self, task, *args, **kwargs):
result = task(*args, **kwargs)
self._results.put(result)
return

通过重写run方法可以为每个Actor添加运行时功能。

转载请包括本文地址:https://allenwind.github.io/blog/3145
更多文章请参考:https://allenwind.github.io/blog/archives/