线程间通信常用的数据结构是队列。消息的发送者往队列中put
数据,消息的接受者从队列中get
数据,为了让队列知道消息的处理情况,有时候还添加task_done
接口,以通知队列消息的接收者已经处理了先前接收的消息。如果队列有限,消息接收者get
空队列和消息的发送者put
满的队列都会阻塞。
在此基础上,还可以添加get
、put
的非阻塞版本。
在某些情况下,我们需要消息的传递安装某种优先级排序而发送,而不是简单的FIFO。这样,具有更高优先级别的消息会更快被处理。就像TCP报文段中的紧急包一样。为了达到这一点,需要在该队列中添加优先排序方法。实现优先队列常使用的是堆排序方案。
优先队列的实现
下面来实现非线程安全的优先队列。使用了Python内置的堆排序库heapq
减少代码量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import heapq
class PriorityQueue:
def __init__(self): self._queue = []
def qsize(self): return len(self._queue)
def put(self, item): heapq.heappush(self._queue, item)
def get(self): return heapq.heappop(self._queue)
|
原理稍后再剖析,现在先测试下效果。每个消息的结构是一个元组,元组的第一个元素表示消息的优先级别,元组的其他元素就是要发送的消息。注意,heapq
规定数字越小,优先界别越大。
1 2 3 4 5 6 7 8
| queue = PriorityQueue() queue.put((2, "B")) queue.put((1, "A")) queue.put((3, "C"))
print(queue.get()) print(queue.get()) print(queue.get())
|
输出结果如下。
1 2 3
| (1, 'A') (2, 'B') (3, 'C')
|
如果需要借口更直观,改为可以直接指定优先级别,且数字越大有些界别越大,上面的实现可以改为如下。同时,考虑到优先级别应该是数值类型且在一定范围内,添加一个装饰器做类型和返回检测。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import heapq import functools
def typeassert(func): @functools.wraps(func) def wrapper(self, priority, item): if not isinstance(priority, (int, float)): raise TypeError("priority must int or fload") if not 0 <= priority <= 9: raise ValueError("priority must in range [0, 9]") return func(self, priority, item) return wrapper
class PriorityQueue:
def __init__(self): self._queue = []
def qsize(self): return len(self._queue)
@typeassert def put(self, priority, item): item = (-priority, item) heapq.heappush(self._queue, item)
def get(self): _, item = heapq.heappop(self._queue) return item
|
原理剖析
堆结构
优先队列的实现方法有几种,例如有序链表和有序数组、特殊的二叉树,但这里使用性能较好的堆方法。本节会剖析用于优先队列的堆。通常我们说的堆是指二叉堆。堆的基础数据结构是完全二叉树,满足堆有序性质。在存储结构上,可以使用数组或链式结构。
堆有序:二叉树的每个结点都大于等于它的两个子结点。
二叉堆:一组能够用堆有序的完全二叉树排序的元素,并在数组中按照层级存储(一般情况不使用数组的第一个元素)。
二叉堆中,数组的元素的关系如下:
位置k的结点的父结点的位置为[k/2]取下限整数值,它的两个子结点的位置分别为2k和2k+1。对于最大堆,N([k/2])>N(k);N(k)>N(2k);N(k)>N(2K+1)
二叉堆的存储结构要么是数组,要么是二叉树链式结构。这两种实现,数组元素和二叉树的各结点是一一对应的。在二叉树中,给根结点编号为1,然后依次从上到下,从左到右为二叉树的结点编号,依次为2、3、4…。这就是完全二叉树和数组的一一对应关系。
堆操作
堆操作可以生动地理解为一个弱肉强吃的黑社会,如果一个结点(大佬),它存在孩子结点(直属跟班)比它厉害(数字大小),那么它们就交换位置。
上浮
如果有一结点大于它的父结点,那么堆有序状态被打破了,需要把该结点和父结点交换,但这样也不能保证交换就满足堆有序。交换后的结点可能依然大于父结点,于是需要进一步交换。每一次的交换,如果堆有序还没有满足,就再次交换,就像该结点在不断上浮一样,直到堆的根结点结束。上浮的过程表达如下。
1 2 3 4
| def swim(self, k): while k > 1 and self.less(k//2, k): self.exch(k//2, k) k = k // 2
|
下沉
类似于上浮,如果一个父结点(因外部操作)小于它的孩子结点,则需要下沉来维持堆有序。下沉操作是一个持续的过程直至满足堆有序。
1 2 3 4 5 6 7 8
| def sink(self, k): while 2*k <= self.n: j = 2 * k if j < self.n and self.less(j, j+1): j += 1 if not self.less(k, j): break self.exch(k, j)
|
插入元素
优先队列的插入,在堆中的操作分为两部分:(1)把元素插入到数组尾部(2)从最尾部元素位置索引开始执行上浮操作。具体实现如下
1 2 3 4
| def push(self, item): self._heap.append(item) self.n += 1 self.swim(self.n)
|
弹出元素
在数组非空的情况下(不包括0位置元素),A[1]就是要弹出的元素,保存A[1]后,交换A[1]和A[n],删掉A[n]后,从数组位置1开始执行下沉操作。具体实现如下
1 2 3 4 5 6 7 8 9
| def pop(self): if self.empty(): return None item = self._heap[1] self.exch(1, self.n) del self._heap[self.n] self.n -= 1 self.sink(1) return item
|
完整的实现
根据上面的讨论,基于堆的优先队列的完整实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| class PriorityQueue:
def __init__(self): array = [None] self._heap = array self.n = len(self._heap) - 1
def __len__(self): return self.n
def empty(self): return self.n == 0
def exch(self, i, j): self._heap[i], self._heap[j] = self._heap[j], self._heap[i]
def less(self, i, j): return self._heap[i] < self._heap[j]
def swim(self, k): while k > 1 and self.less(k//2, k): self.exch(k//2, k) k = k // 2
def sink(self, k): while 2*k <= self.n: j = 2 * k if j < self.n and self.less(j, j+1): j += 1 if not self.less(k, j): break self.exch(k, j) k = j
def push(self, item): self._heap.append(item) self.n += 1 self.swim(self.n)
def pop(self): if self.empty(): return None item = self._heap[1] self.exch(1, self.n) del self._heap[self.n] self.n -= 1 self.sink(1) return item
|
线程安全
上述的实现并非线程安全。在多线程下回出现竞争条件。典型的例子就是银行存款问题。这里就不举例了。解决竞争条件的直观方法就是加锁,另外一方法就是在优先队列上采用比较和交换(CAS)算法。这里采用直观的实现,考虑到,同一线程多次获取锁不应该阻塞,因此这里采用可重入的锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import heapq import functools import threading
def typeassert(func): @functools.wraps(func) def wrapper(self, priority, item): if not isinstance(priority, (int, float)): raise TypeError("priority must int or fload") if not 0 <= priority <= 9: raise ValueError("priority must in range [0, 9]") return func(self, priority, item) return wrapper
class PriorityQueue:
def __init__(self): self._queue = [] self._mutex = threading.RLock()
def qsize(self): return len(self._queue)
@typeassert def put(self, priority, item): item = (-priority, item) with self._mutex: heapq.heappush(self._queue, item)
def get(self): with self._mutex: _, item = heapq.heappop(self._queue) return item
|
如果担心对象共享了状态,最后就传入不可变对象(immutable)或者发送深拷贝对象。
1 2 3 4 5 6 7 8 9 10 11 12
| import copy
class PriorityQueue: ... @typeassert def put(self, priority, item): item = copy.deepcopy(item) item = (-priority, item) with self._mutex: heapq.heappush(self._queue, item)
|
这就是简单的线程安全的优先队列了。但上面的实现并不完整:队列没有容量控制—导致的一个问题就是队列可以无限容纳消息。对于计算机内存有限这个事实,这是一个很严重的问题。因此需要添加容量限制。但,仔细想想,一旦添加了容量限制就出现了如下情况:
(1)线程往满的队列执行put操作被阻塞后,当队列变为非满的时候,如何唤醒该线程?
(2)线程从空的队列执行get操作被阻塞后,当队列变为非空的时候,如何唤醒该线程?
(3)线程如果告知队列,获取的数据是否被正确处理?
在并发编程中,这些都是需要谨慎处理的情况。
转载请包括本文地址:https://allenwind.github.io/blog/4737
更多文章请参考:https://allenwind.github.io/blog/archives/