Pythonic的Actor并发模式实现
Actor模式,参与者模式,是一种解决并发和分布式计算的方法。这种方法适用于共享内存的计算机和分布式场景。可以把它类比成企业内部的组织架构。
- Actor类,角色的内部状态和处理消息的方式的抽象。
- Actor实例,程序运行时存在的实体,它们能够接收消息。每个Actor自己决定是否把消息分发到其他Actor实例。Actor实例也可以创建自己的Actor实例。
- 消息,通信数据的单位,Actor间使用消息进行通信。
- 邮箱,缓存消息的内存区域,每个Actor实例都有一块专属的区域。
- Actor引用,是一种对象,使用该对象可以向指定的Actor实例发送消息。Actor引用就像企业员工的邮件地址
- 分配器,是一种组件,用于决定何时允许Actor实例处理消息,并未Actor分配计算资源。
下面我们来实现简单的Actor模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| class Actor: _qsize = 100
def send(self, task, *args, **kwargs): self._mailbox.put((task, args, kwargs))
def recv(self, block=True, timeout=None): try: task, args, kwargs = self._mailbox.get(block=True, timeout=timeout) except queue.Empty as err: task = ActorExit
if task is ActorExit: raise ActorExit() return task, args, kwargs
def start(self): raise NotImplementedError("must start by MixIn pattern!")
def close(self): self.send(ActorExit)
def _bootstrap(self): try: self.run() except ActorExit: pass finally: self._terminated.set()
def join(self): self._terminated.wait()
def run(self): pass
def _wrapper_task(self, task, *args, **kwargs): result = task(*args, **kwargs) self._results.put(result) return
|
通过重写run方法可以为每个Actor添加运行时功能。
转载请包括本文地址:https://allenwind.github.io/blog/3145
更多文章请参考:https://allenwind.github.io/blog/archives/