线程间通信常用的数据结构是队列。消息的发送者往队列中put数据,消息的接受者从队列中get数据,为了让队列知道消息的处理情况,有时候还添加task_done接口,以通知队列消息的接收者已经处理了先前接收的消息。如果队列有限,消息接收者get空队列和消息的发送者put满的队列都会阻塞。

在此基础上,还可以添加getput的非阻塞版本。

在某些情况下,我们需要消息的传递安装某种优先级排序而发送,而不是简单的FIFO。这样,具有更高优先级别的消息会更快被处理。就像TCP报文段中的紧急包一样。为了达到这一点,需要在该队列中添加优先排序方法。实现优先队列常使用的是堆排序方案。

优先队列的实现

下面来实现非线程安全的优先队列。使用了Python内置的堆排序库heapq减少代码量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import heapq

class PriorityQueue:

def __init__(self):
self._queue = []

def qsize(self):
return len(self._queue)

def put(self, item):
heapq.heappush(self._queue, item)

def get(self):
return heapq.heappop(self._queue)

原理稍后再剖析,现在先测试下效果。每个消息的结构是一个元组,元组的第一个元素表示消息的优先级别,元组的其他元素就是要发送的消息。注意,heapq规定数字越小,优先界别越大。

1
2
3
4
5
6
7
8
queue = PriorityQueue()
queue.put((2, "B"))
queue.put((1, "A"))
queue.put((3, "C"))

print(queue.get())
print(queue.get())
print(queue.get())

输出结果如下。

1
2
3
(1, 'A')
(2, 'B')
(3, 'C')

如果需要借口更直观,改为可以直接指定优先级别,且数字越大有些界别越大,上面的实现可以改为如下。同时,考虑到优先级别应该是数值类型且在一定范围内,添加一个装饰器做类型和返回检测。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import heapq
import functools

def typeassert(func):
@functools.wraps(func)
def wrapper(self, priority, item):
if not isinstance(priority, (int, float)):
raise TypeError("priority must int or fload")
if not 0 <= priority <= 9:
raise ValueError("priority must in range [0, 9]")
return func(self, priority, item)
return wrapper

class PriorityQueue:

def __init__(self):
self._queue = []

def qsize(self):
return len(self._queue)

@typeassert
def put(self, priority, item):
item = (-priority, item)
heapq.heappush(self._queue, item)

def get(self):
_, item = heapq.heappop(self._queue)
return item

原理剖析

堆结构

优先队列的实现方法有几种,例如有序链表和有序数组、特殊的二叉树,但这里使用性能较好的堆方法。本节会剖析用于优先队列的堆。通常我们说的堆是指二叉堆。堆的基础数据结构是完全二叉树,满足堆有序性质。在存储结构上,可以使用数组或链式结构。

堆有序:二叉树的每个结点都大于等于它的两个子结点。
二叉堆:一组能够用堆有序的完全二叉树排序的元素,并在数组中按照层级存储(一般情况不使用数组的第一个元素)。

二叉堆中,数组的元素的关系如下:

位置k的结点的父结点的位置为[k/2]取下限整数值,它的两个子结点的位置分别为2k和2k+1。对于最大堆,N([k/2])>N(k);N(k)>N(2k);N(k)>N(2K+1)

二叉堆的存储结构要么是数组,要么是二叉树链式结构。这两种实现,数组元素和二叉树的各结点是一一对应的。在二叉树中,给根结点编号为1,然后依次从上到下,从左到右为二叉树的结点编号,依次为2、3、4…。这就是完全二叉树和数组的一一对应关系。

堆操作

堆操作可以生动地理解为一个弱肉强吃的黑社会,如果一个结点(大佬),它存在孩子结点(直属跟班)比它厉害(数字大小),那么它们就交换位置。

上浮

如果有一结点大于它的父结点,那么堆有序状态被打破了,需要把该结点和父结点交换,但这样也不能保证交换就满足堆有序。交换后的结点可能依然大于父结点,于是需要进一步交换。每一次的交换,如果堆有序还没有满足,就再次交换,就像该结点在不断上浮一样,直到堆的根结点结束。上浮的过程表达如下。

1
2
3
4
def swim(self, k):
while k > 1 and self.less(k//2, k):
self.exch(k//2, k)
k = k // 2

下沉

类似于上浮,如果一个父结点(因外部操作)小于它的孩子结点,则需要下沉来维持堆有序。下沉操作是一个持续的过程直至满足堆有序。

1
2
3
4
5
6
7
8
def sink(self, k):
while 2*k <= self.n:
j = 2 * k
if j < self.n and self.less(j, j+1):
j += 1
if not self.less(k, j):
break
self.exch(k, j)

插入元素

优先队列的插入,在堆中的操作分为两部分:(1)把元素插入到数组尾部(2)从最尾部元素位置索引开始执行上浮操作。具体实现如下

1
2
3
4
def push(self, item):
self._heap.append(item)
self.n += 1
self.swim(self.n)

弹出元素

在数组非空的情况下(不包括0位置元素),A[1]就是要弹出的元素,保存A[1]后,交换A[1]和A[n],删掉A[n]后,从数组位置1开始执行下沉操作。具体实现如下

1
2
3
4
5
6
7
8
9
def pop(self):
if self.empty():
return None
item = self._heap[1]
self.exch(1, self.n)
del self._heap[self.n]
self.n -= 1
self.sink(1)
return item

完整的实现

根据上面的讨论,基于堆的优先队列的完整实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class PriorityQueue:

def __init__(self):
array = [None]
self._heap = array
self.n = len(self._heap) - 1

def __len__(self):
return self.n

def empty(self):
return self.n == 0

def exch(self, i, j):
self._heap[i], self._heap[j] = self._heap[j], self._heap[i]

def less(self, i, j):
return self._heap[i] < self._heap[j]

def swim(self, k):
while k > 1 and self.less(k//2, k):
self.exch(k//2, k)
k = k // 2

def sink(self, k):
while 2*k <= self.n:
j = 2 * k
if j < self.n and self.less(j, j+1):
j += 1
if not self.less(k, j):
break
self.exch(k, j)
k = j

def push(self, item):
self._heap.append(item)
self.n += 1
self.swim(self.n)

def pop(self):
if self.empty():
return None
item = self._heap[1]
self.exch(1, self.n)
del self._heap[self.n]
self.n -= 1
self.sink(1)
return item

线程安全

上述的实现并非线程安全。在多线程下回出现竞争条件。典型的例子就是银行存款问题。这里就不举例了。解决竞争条件的直观方法就是加锁,另外一方法就是在优先队列上采用比较和交换(CAS)算法。这里采用直观的实现,考虑到,同一线程多次获取锁不应该阻塞,因此这里采用可重入的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import heapq
import functools
import threading

def typeassert(func):
@functools.wraps(func)
def wrapper(self, priority, item):
if not isinstance(priority, (int, float)):
raise TypeError("priority must int or fload")
if not 0 <= priority <= 9:
raise ValueError("priority must in range [0, 9]")
return func(self, priority, item)
return wrapper

class PriorityQueue:

def __init__(self):
self._queue = []
self._mutex = threading.RLock()

def qsize(self):
return len(self._queue)

@typeassert
def put(self, priority, item):
item = (-priority, item)
with self._mutex:
heapq.heappush(self._queue, item)

def get(self):
with self._mutex:
_, item = heapq.heappop(self._queue)
return item

如果担心对象共享了状态,最后就传入不可变对象(immutable)或者发送深拷贝对象。

1
2
3
4
5
6
7
8
9
10
11
12
import copy

class PriorityQueue:
...

@typeassert
def put(self, priority, item):
item = copy.deepcopy(item)
item = (-priority, item)
with self._mutex:
heapq.heappush(self._queue, item)

这就是简单的线程安全的优先队列了。但上面的实现并不完整:队列没有容量控制—导致的一个问题就是队列可以无限容纳消息。对于计算机内存有限这个事实,这是一个很严重的问题。因此需要添加容量限制。但,仔细想想,一旦添加了容量限制就出现了如下情况:

(1)线程往满的队列执行put操作被阻塞后,当队列变为非满的时候,如何唤醒该线程?
(2)线程从空的队列执行get操作被阻塞后,当队列变为非空的时候,如何唤醒该线程?
(3)线程如果告知队列,获取的数据是否被正确处理?

在并发编程中,这些都是需要谨慎处理的情况。

转载请包括本文地址:https://allenwind.github.io/blog/4737
更多文章请参考:https://allenwind.github.io/blog/archives/