本文总结Go的并发编程。关于Python的并发编程见Python并发编程总结。关于并发编程的讨论见为什么需要并发?

谈到Go的并发编程,首先要澄清并发(concurrency)和并行(parallesim)的区别。

  • 并发: 逻辑上同时处理多个任务的能力
  • 并行:物理上同一时刻处理多个并发任务的能力

当我们说一个程序是并发执行时,这个并发是指程序包含多个逻辑上独立的执行块,至于这些执行块如何被操作系统调度执行,程序的管不了的。例如,在单核电脑上,无论操作系统的何种调度算法,任意一时刻也就只有一个执行块爱执行;如果在多核电脑上,同一时刻可以有多个执行块同时执行。也就是说,并发的程序,多个执行块既可以独立并行执行,也可以串行执行。这取决于操作系统和硬件资源。

而并行的程序是多个执行块的同时执行。从调度的角度理解,并行可以看作是并发的特例。要实现程序的并行执行,除了在编程上做处理还需要硬件支持:多主机或者多核CPU。简单说,并行就是并发的最理想的执行模式。这个最理想的状态包括硬件层面和软件层面。硬件层面就如上述的多主机或多核CPU情况。软件层面看实现并发的编程语言的特性。例如Go很容易让多个Goroutine跑在多个CPU上,但Python尽管在多核环境下由于GIL的原因,很难实现并行。关于Python的GIL问题见Python真的慢吗?

那么,并发(并行)如何实现?这就是操作系统层面的问题。操作系统或程序运行时通过线程(thread)、进程(process)、协程(coroutine)实现并发,或理想情况下的并行。关于进行、线程、协程的讨论这里不展开了。

如果更严格地讨论并行,那么并行也就粒度之分。例如位级并行(就是常说的32位电脑、64位电脑、8位单片机)、指令级并行、数据级并行、任务级并行。对于应用编程来说,通常只需关心数据级并行和任务级别并行。

最后,引用Rob Pike的经典描述总结上述的讨论(注意英文语境下的理解):

并发是同一时间应对(dealing with)多件事情的能力。
并行是同一时间动手(doing)做多件事情的能力。

Go并发特点

  • Goroutine

Go的并发单元称为Goroutine。Goroutine和Python的Coroutine看似很像,但事实上并不一样的概念。Go运行时会创建多个线程来执行多个Goroutine并发任务,且Goroutine可以被运行时调度到其他线程上执行。这样,Goroutine更像是线程+协程的综合体。或者理解为:Goroutine是能在线程态和协程态间随运行时调度而进行状态切换的综合体。被调度时以协程态存在,执行时以线程态存在。Goroutine在执行时并不是每次都会创建新的线程,运行时会复用已有的线程。

Goroutine自定义栈初始化只有2KB大小,同时还包括函数指针和调用参数。比操作系统线程MB栈级别小得多,因此在简单的机器上可以创建上万个并发单元。Goroutine栈的大小可以在需要时扩容,最大达到GB级别。

  • channel

Go鼓励采用CSP通道进行消息传递来代替内存共享,以此实现安全的并发。CSP就是通信顺序进程(Communicating Sequential Processe)模型。该模型由独立的、并发执行的实体组成,实体之间的通信通过消息通道来实现。CSP模型并不关注发送消息的实体,而是关注发送消息时使用的channel。这一点很容易体会,对于消息的接收者来说,它只通过channel获取消息但并不知道该消息来自哪个发生消息的实体。类似的通信模型还有Actor模型,以后在讨论Scala的并发编程时再详细展开。

channel是线程安全的队列,在多个实现进行通信时,无需担心并发所带来的数据不一致问题。

channel还有两种类型,一种是有缓存,另外一中是无缓冲。后面会详细讨论。

基础

Go很容易创建Goroutine,然后让运行时调度执行Goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
package main

func main() {

go println("awesome goroutine!") // 创建Goroutine

go func(s string) {
println(s)
}("awesome goroutine again!")
}

select{} // 阻塞当前程序以等带所有Goroutine执行完成

调用go关键字后并不会阻塞程序,而是接着往下执行。注意go关键字并不是执行并发操作,而是创建一个goroutine并发单元。该并发单元会放到系统队列中,等待调度器指派适合的系统先去执行该单元。

  • 简单的例子

下面通过一个例子对比Go和Python的并发编程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time

def task(interval):
while True:
print("Thread<{}".format(threading.current_thread().ident), time.ctime())
time.sleep(interval)

def main():
task1 = threading.Thread(target=task, args=(1,))
task2 = threading.Thread(target=task, args=(2,))

task1.start()
task2.start()

task1.join()
task2.join() // 主线程等待task1、task2退出

if __name__ == '__main__':
main()

使用Go改写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import (
"fmt"
"time"
)

func task(interval int) {
for {
fmt.Println("goroutine1", time.Now())
time.Sleep(interval * time.Second)
}
}

func main() {
go task(1)
go task(2)

selec{}
}

  • 阻塞main函数

在等待Goroutine执行完毕,main函数不能退出,可以使用一定的技巧阻塞main函数所在的Goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"time"
)

func main() {
quit := make(chan struct{})

go func() {
time.Sleep(time.Second)
fmt.Println("goroutine quit.")
close(quit)
}()

fmt.Println("main wait...")
<-quit
fmt.Println("main quit...")
}

采用struct{}空结构创建channl是应为其初始化后占用空间为零,指向runtime.ZeroBase。这是一种惯用方法。除了关闭通道外,还有往通道上传入数据也可以解除阻塞。这里传入空结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"time"
)

func main() {
quit := make(chan struct{})

go func() {
time.Sleep(time.Second)
fmt.Println("goroutine quit.")
close(quit)
}()

fmt.Println("main wait...")
<-struct{}{}
fmt.Println("main quit...")
}

关于channel更多技巧在后面会详述。

  • Goroutine和defer

在Goroutine中依然可以使用defer语句,通过defer注册的函数会在Goroutine退出前执行。
Goroutine会像defer一样,因延迟执行,会在创建时立即计算并复制参数。

1
2


  • 退出Goroutine

某些情况下需要Goroutine退出,可以使用runtime.Goexit函数·。该函数立即终止当前的任务,运行时确保所有注册的延时调用都会被执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"runtime"
)

func main() {
exit := make(chan struct{})

go func() {
defer close(exit)
defer fmt.Println("will execute in defer")

runtime.Goexit()
fmt.Println("never execute") // 不会执行
}()

<-exit
}

注意,如果是在main函数中调用Goexit,它会等待其他任务结束,然后进程直接崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"runtime"
"time"
)

func main() {
defer fmt.Println("main defer")

go func() {
fmt.Println("goroutine start...")
time.Sleep(time.Second)
fmt.Println("main func wait...")
}()

runtime.Goexit()
fmt.Println("main exit...")

如果调用标准库函数os.Exit可直接终止进程,不会执行defer注册的函数。

  • 让出执行线程

让出当前线程类似Java中的Thread.yield。Go中执行runtime.Gosched后,当前任务会放回队列中,等待下次调度时恢复执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"runtime"
)

func main() {
exit := make(chan struct{}, 2)

go func() {
defer func() { exit<-struct{}{} }

for i := 0; i < 5; i++ {
fmt.Println("goroutine-1:", i)
runtime.Gosched()
}
}()

go func() {
defer func() { exit<-struct{}{} }

for i := 0; i < 5; i++ {
fmt.Println("goroutine-2", i)
runtime.Gosched()
}
}()

<-exit
<-exit

Goroutine在退出前会执行defer登记的函数。

  • GOMAXPROCS

runtime.GOMAXPROCS可以修改运行时创建的线程数,默认是CPU核心数。通过一个例子体验并行计算。我当前笔记本是双核四线程。

1
2


语法上,Go进行并发编程简介多了。是不是对Go的并发编程有点小兴奋?系好安全带,马上开车!

同步

Go中,Goroutine间同步的操作有大部分都包含在标准包sync中。本部分详述Goroutine间的同步方法,包括Go本身的并发特性上的同步和sync的使用。

sync包中包括类型有:

条件变量:Cond 类似于Python的threading.Condition
互斥锁:Mutex 类似于Python的threading.Lock
读写锁:RWMutex 类似于Python的threading.Lock,但不分读写操作

Python Threading模块中没有的同步操作:Pool, Once, WaitGroup

上述方法接下来一一讲解。Go并没有信号量Semaphore,但不难实现,本节结尾会给出实现方法。

  • Mutex

sync.Mutex是互斥锁,类似threading.Lock。该类型初始化处于unlock状态。下面以实现线程安全的并发字典为例。当然,这种方法并不是最好的实现方式,仅作例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
"sync"
"fmt"
)

type ConcurrentMap struct {
data map[string]string
mutex *sync.Mutex // 为了让该例子直观,这里不采用匿名字段。
}

func (m *ConcurrentMap) Get(key string) (string, bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
if value, ok := m.data[key]; ok {
return value, ok
} else {
return "", false
}
}

func (m *ConcurrentMap) Set(key, value string) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.data[key] = value
}

func (m *ConcurrentMap) Delete(key string) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.data, key)
}


func main() {

cmap := &ConcurrentMap{}
wait := make(struct{})

for i := 0; i < 10; i ++ {
go func(_id int) {
key := "goroutine-" + _id
value := "value-" + _id
cmap[key] = value
}(i)
}

for i := 0; i < 10; i ++ {
go func(_id int) {
key := "goroutine-" + _id
value, ok := cmap[key]
if ok {
fmt.Println(key, "get", value)
}
}(i)
}

<-wait
}
  • RWMutex

Mutex锁对于ConcurrentMap的粒度太大。可以细化锁的粒度,把读写分离,分别用不同粒度的锁进行加锁。在数据库并发控制,读写锁也有别名:共享锁、排他锁。

名称 锁的影响范围 别名
共享锁 其他连接可以读取数据但不能修改数据 读锁
排他锁 其他连接无法读取数据,也不能修改数据 写锁

sync.RWMutexLockUnlock分别获取和释放写锁。RLockRUnlock分别获取和释放读锁。

代码参看上面,这里只修改加锁部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

type ConcurrentMap struct {
data map[string]string
mutex *sync.RWMutex // 为了让该例子直观,这里不采用匿名字段。
}


func (m *ConcurrentMap) Get(key string) (string, bool) {
m.mutex.RLock()
defer m.mutex.RUnlock()
if value, ok := m.data[key]; ok {
return value, ok
} else {
return "", false
}
}

func (m *ConcurrentMap) Set(key, value string) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.data[key] = value
}

func (m *ConcurrentMap) Delete(key string) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.data, key)
}

对使用MutexRWMutex做性能测试对比。测试代码如下

1
2


  • WaitGroup

  • Once

  • Pool

  • Cond

  • Semaphore

Go中并没有信号量Semaphore,但通过内置类型可以轻易实现Semaphore。这里以channel为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"fmt"
"time"
)

type Semaphore chan struct{}

func (s Semaphore) acquire() {
s <- struct{}{}
}

func (s Semaphore) release() {
<-s
}

func NewSemaphore(value uint) Semaphore {
// 原则上,value值不能为负数
return make(Semaphore, value)
}

func main() {
s := NewSemaphore(2)
go func() {
s.acquire()
defer s.release()
time.Sleep(time.Second)
fmt.Println("goroutine 1")
}()

go func() {
s.acquire()
defer s.release()
time.Sleep(2 * time.Second)
fmt.Println("goroutine 2")
}()

fmt.Println("wait here")
s.acquire()
fmt.Println("acquire semaphore success")
time.Sleep(time.Second)
}

输出结构如下

1
2
3
4
wait here
goroutine 1
acquire semaphore success
goroutine 2

转载请包括本文地址:https://allenwind.github.io/blog/2001
更多文章请参考:https://allenwind.github.io/blog/archives/