本文总结Go的并发编程。关于Python的并发编程见Python并发编程总结。关于并发编程的讨论见为什么需要并发?。
谈到Go的并发编程,首先要澄清并发(concurrency)和并行(parallesim)的区别。
- 并发: 逻辑上同时处理多个任务的能力
- 并行:物理上同一时刻处理多个并发任务的能力
当我们说一个程序是并发执行时,这个并发是指程序包含多个逻辑上独立的执行块,至于这些执行块如何被操作系统调度执行,程序的管不了的。例如,在单核电脑上,无论操作系统的何种调度算法,任意一时刻也就只有一个执行块爱执行;如果在多核电脑上,同一时刻可以有多个执行块同时执行。也就是说,并发的程序,多个执行块既可以独立并行执行,也可以串行执行。这取决于操作系统和硬件资源。
而并行的程序是多个执行块的同时执行。从调度的角度理解,并行可以看作是并发的特例。要实现程序的并行执行,除了在编程上做处理还需要硬件支持:多主机或者多核CPU。简单说,并行就是并发的最理想的执行模式。这个最理想的状态包括硬件层面和软件层面。硬件层面就如上述的多主机或多核CPU情况。软件层面看实现并发的编程语言的特性。例如Go很容易让多个Goroutine跑在多个CPU上,但Python尽管在多核环境下由于GIL的原因,很难实现并行。关于Python的GIL问题见Python真的慢吗?。
那么,并发(并行)如何实现?这就是操作系统层面的问题。操作系统或程序运行时通过线程(thread)、进程(process)、协程(coroutine)实现并发,或理想情况下的并行。关于进行、线程、协程的讨论这里不展开了。
如果更严格地讨论并行,那么并行也就粒度之分。例如位级并行(就是常说的32位电脑、64位电脑、8位单片机)、指令级并行、数据级并行、任务级并行。对于应用编程来说,通常只需关心数据级并行和任务级别并行。
最后,引用Rob Pike的经典描述总结上述的讨论(注意英文语境下的理解):
并发是同一时间应对(dealing with)多件事情的能力。
并行是同一时间动手(doing)做多件事情的能力。
Go并发特点
Go的并发单元称为Goroutine。Goroutine和Python的Coroutine看似很像,但事实上并不一样的概念。Go运行时会创建多个线程来执行多个Goroutine并发任务,且Goroutine可以被运行时调度到其他线程上执行。这样,Goroutine更像是线程+协程的综合体。或者理解为:Goroutine是能在线程态和协程态间随运行时调度而进行状态切换的综合体。被调度时以协程态存在,执行时以线程态存在。Goroutine在执行时并不是每次都会创建新的线程,运行时会复用已有的线程。
Goroutine自定义栈初始化只有2KB大小,同时还包括函数指针和调用参数。比操作系统线程MB栈级别小得多,因此在简单的机器上可以创建上万个并发单元。Goroutine栈的大小可以在需要时扩容,最大达到GB级别。
Go鼓励采用CSP通道进行消息传递来代替内存共享,以此实现安全的并发。CSP就是通信顺序进程(Communicating Sequential Processe)模型。该模型由独立的、并发执行的实体组成,实体之间的通信通过消息通道来实现。CSP模型并不关注发送消息的实体,而是关注发送消息时使用的channel。这一点很容易体会,对于消息的接收者来说,它只通过channel获取消息但并不知道该消息来自哪个发生消息的实体。类似的通信模型还有Actor模型,以后在讨论Scala的并发编程时再详细展开。
channel是线程安全的队列,在多个实现进行通信时,无需担心并发所带来的数据不一致问题。
channel还有两种类型,一种是有缓存,另外一中是无缓冲。后面会详细讨论。
基础
Go很容易创建Goroutine,然后让运行时调度执行Goroutine。
1 2 3 4 5 6 7 8 9 10 11 12
| package main
func main() { go println("awesome goroutine!")
go func(s string) { println(s) }("awesome goroutine again!") }
select{}
|
调用go
关键字后并不会阻塞程序,而是接着往下执行。注意go
关键字并不是执行并发操作,而是创建一个goroutine并发单元。该并发单元会放到系统队列中,等待调度器指派适合的系统先去执行该单元。
下面通过一个例子对比Go和Python的并发编程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import threading import time
def task(interval): while True: print("Thread<{}".format(threading.current_thread().ident), time.ctime()) time.sleep(interval)
def main(): task1 = threading.Thread(target=task, args=(1,)) task2 = threading.Thread(target=task, args=(2,))
task1.start() task2.start()
task1.join() task2.join() // 主线程等待task1、task2退出
if __name__ == '__main__': main()
|
使用Go改写。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import ( "fmt" "time" )
func task(interval int) { for { fmt.Println("goroutine1", time.Now()) time.Sleep(interval * time.Second) } }
func main() { go task(1) go task(2)
selec{} }
|
在等待Goroutine执行完毕,main函数不能退出,可以使用一定的技巧阻塞main函数所在的Goroutine。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package main
import ( "fmt" "time" )
func main() { quit := make(chan struct{})
go func() { time.Sleep(time.Second) fmt.Println("goroutine quit.") close(quit) }()
fmt.Println("main wait...") <-quit fmt.Println("main quit...") }
|
采用struct{}
空结构创建channl是应为其初始化后占用空间为零,指向runtime.ZeroBase
。这是一种惯用方法。除了关闭通道外,还有往通道上传入数据也可以解除阻塞。这里传入空结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package main
import ( "fmt" "time" )
func main() { quit := make(chan struct{})
go func() { time.Sleep(time.Second) fmt.Println("goroutine quit.") close(quit) }()
fmt.Println("main wait...") <-struct{}{} fmt.Println("main quit...") }
|
关于channel更多技巧在后面会详述。
在Goroutine中依然可以使用defer语句,通过defer注册的函数会在Goroutine退出前执行。
Goroutine会像defer一样,因延迟执行,会在创建时立即计算并复制参数。
某些情况下需要Goroutine退出,可以使用runtime.Goexit
函数·。该函数立即终止当前的任务,运行时确保所有注册的延时调用都会被执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package main
import ( "fmt" "runtime" )
func main() { exit := make(chan struct{})
go func() { defer close(exit) defer fmt.Println("will execute in defer")
runtime.Goexit() fmt.Println("never execute") }()
<-exit }
|
注意,如果是在main函数中调用Goexit
,它会等待其他任务结束,然后进程直接崩溃。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package main
import ( "fmt" "runtime" "time" )
func main() { defer fmt.Println("main defer")
go func() { fmt.Println("goroutine start...") time.Sleep(time.Second) fmt.Println("main func wait...") }()
runtime.Goexit() fmt.Println("main exit...")
|
如果调用标准库函数os.Exit
可直接终止进程,不会执行defer注册的函数。
让出当前线程类似Java中的Thread.yield
。Go中执行runtime.Gosched
后,当前任务会放回队列中,等待下次调度时恢复执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package main
import ( "fmt" "runtime" )
func main() { exit := make(chan struct{}, 2)
go func() { defer func() { exit<-struct{}{} }
for i := 0; i < 5; i++ { fmt.Println("goroutine-1:", i) runtime.Gosched() } }()
go func() { defer func() { exit<-struct{}{} }
for i := 0; i < 5; i++ { fmt.Println("goroutine-2", i) runtime.Gosched() } }()
<-exit <-exit
|
Goroutine在退出前会执行defer登记的函数。
runtime.GOMAXPROCS
可以修改运行时创建的线程数,默认是CPU核心数。通过一个例子体验并行计算。我当前笔记本是双核四线程。
语法上,Go进行并发编程简介多了。是不是对Go的并发编程有点小兴奋?系好安全带,马上开车!
同步
Go中,Goroutine间同步的操作有大部分都包含在标准包sync
中。本部分详述Goroutine间的同步方法,包括Go本身的并发特性上的同步和sync
的使用。
sync
包中包括类型有:
条件变量:Cond
类似于Python的threading.Condition
互斥锁:Mutex
类似于Python的threading.Lock
读写锁:RWMutex
类似于Python的threading.Lock
,但不分读写操作
Python Threading
模块中没有的同步操作:Pool
, Once
, WaitGroup
上述方法接下来一一讲解。Go并没有信号量Semaphore,但不难实现,本节结尾会给出实现方法。
sync.Mutex
是互斥锁,类似threading.Lock
。该类型初始化处于unlock状态。下面以实现线程安全的并发字典为例。当然,这种方法并不是最好的实现方式,仅作例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package main
import ( "sync" "fmt" )
type ConcurrentMap struct { data map[string]string mutex *sync.Mutex }
func (m *ConcurrentMap) Get(key string) (string, bool) { m.mutex.Lock() defer m.mutex.Unlock() if value, ok := m.data[key]; ok { return value, ok } else { return "", false } }
func (m *ConcurrentMap) Set(key, value string) { m.mutex.Lock() defer m.mutex.Unlock() m.data[key] = value }
func (m *ConcurrentMap) Delete(key string) { m.mutex.Lock() defer m.mutex.Unlock() delete(m.data, key) }
func main() {
cmap := &ConcurrentMap{} wait := make(struct{})
for i := 0; i < 10; i ++ { go func(_id int) { key := "goroutine-" + _id value := "value-" + _id cmap[key] = value }(i) }
for i := 0; i < 10; i ++ { go func(_id int) { key := "goroutine-" + _id value, ok := cmap[key] if ok { fmt.Println(key, "get", value) } }(i) }
<-wait }
|
Mutex
锁对于ConcurrentMap的粒度太大。可以细化锁的粒度,把读写分离,分别用不同粒度的锁进行加锁。在数据库并发控制,读写锁也有别名:共享锁、排他锁。
名称 |
锁的影响范围 |
别名 |
共享锁 |
其他连接可以读取数据但不能修改数据 |
读锁 |
排他锁 |
其他连接无法读取数据,也不能修改数据 |
写锁 |
sync.RWMutex
中Lock
和Unlock
分别获取和释放写锁。RLock
和RUnlock
分别获取和释放读锁。
代码参看上面,这里只修改加锁部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| type ConcurrentMap struct { data map[string]string mutex *sync.RWMutex }
func (m *ConcurrentMap) Get(key string) (string, bool) { m.mutex.RLock() defer m.mutex.RUnlock() if value, ok := m.data[key]; ok { return value, ok } else { return "", false } }
func (m *ConcurrentMap) Set(key, value string) { m.mutex.Lock() defer m.mutex.Unlock() m.data[key] = value }
func (m *ConcurrentMap) Delete(key string) { m.mutex.Lock() defer m.mutex.Unlock() delete(m.data, key) }
|
对使用Mutex
和RWMutex
做性能测试对比。测试代码如下
WaitGroup
Once
Pool
Cond
Semaphore
Go中并没有信号量Semaphore,但通过内置类型可以轻易实现Semaphore。这里以channel为例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package main
import ( "fmt" "time" )
type Semaphore chan struct{}
func (s Semaphore) acquire() { s <- struct{}{} }
func (s Semaphore) release() { <-s }
func NewSemaphore(value uint) Semaphore { return make(Semaphore, value) }
func main() { s := NewSemaphore(2) go func() { s.acquire() defer s.release() time.Sleep(time.Second) fmt.Println("goroutine 1") }()
go func() { s.acquire() defer s.release() time.Sleep(2 * time.Second) fmt.Println("goroutine 2") }()
fmt.Println("wait here") s.acquire() fmt.Println("acquire semaphore success") time.Sleep(time.Second) }
|
输出结构如下
1 2 3 4
| wait here goroutine 1 acquire semaphore success goroutine 2
|
转载请包括本文地址:https://allenwind.github.io/blog/2001
更多文章请参考:https://allenwind.github.io/blog/archives/