1. 并发与并行
并发:同一时间段内执行多个任务(一人用两根鱼竿钓鱼)
并行:同一时刻执行多个任务(和朋友们一起用鱼竿钓鱼)
Go语言中的并发通过goroutine
实现,goroutine
类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine
来进行并发工作。goroutine
是由Go语言的运行时(runtime)调度完成的,而线程是由操作系统调度完成的。
如果想要在多个
goroutine
之间进行通信,Go语言提供了channel
进行简单通信。
2. goroutine
在go语言中,不需要考虑调度和上下文切换的机制。go语言会智能的将goroutine
中的任务合理的分配给每个CPU
,在go语言中不需要自己去写进程、线程、协程,只需要将这个逻辑包装成一个函数,开启一个goroutine
去执行这个函数就可以了。
2.1 使用goroutine
go语言中使用goroutine
非常简单,只需要在函数的前面加上go关键词,就可以为一个函数创建一个goroutine
。
每个goroutine
必定对应一个函数,可以创建多个goroutine
去执行不同的函数。
2.2 启动单个goroutine
package main
import (
"fmt"
)
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
hello()
fmt.Println("main goroutine done!")
}
如上所示,由于整个环境是串行的,所以执行结果是先hello()
函数,后main
函数,即:
Hello Goroutine!
main goroutine done!
接下来,我们在调用的hello函数上加上关键词go,即可启动一个goroutine去执行这个函数。
package main
import (
"fmt"
)
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello()
fmt.Println("main goroutine done!")
}
//执行结果
main goroutine done!
可见,返回结果中只有main goroutine done!
,在main函数运行结束后,会调用 exit(0)
故goroutine还未运行就结束了,
此时,如果我们想要将hello()
函数中的内容打印出来,最简单的方式是使用time.Sleep
。
package main
import (
"fmt"
"time"
)
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello()
fmt.Println("main goroutine done!")
time.Sleep(time.Second)
}
//执行结果
main goroutine done!
Hello Goroutine!
首先,会打印main goroutine done!
后打印Hello Goroutine!
,因为我们在创建新的goroutine
时会花费一些时间,而此时main函数所在的goroutine
是继续执行的。
2.3 sync.WaitGroup的用法
package main
import (
"fmt"
"time"
)
func main() {
for i := 0; i < 5; i++ {
go fmt.Println(i)
}
time.Sleep(time.Second)
}
//运行结果
1
4
0
2
3
主线程为了等待goroutine
都运行完毕,需要等待一段时间。对于较为简单的任务来说,之前提及的time.Sleep()
就是最简单的一种方法,但是对于略复杂的场景,我们并不能知道具体的等待时间,此时我们就要考虑使用其他办法。
如果使用管道,则可以轻易的、非常轻松的、完美的完成我们的目的,但是使用管道来处理这样的任务显得非常的小题大做,因为它被设计出来并不是作为简单的同步处理的,而且假设我们有成千上万的for循环,需要申请同样数量大小的管道,对内存也是不小的开销。
package main
import (
"fmt"
)
func main() {
c := make(chan bool, 10)
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println(i)
c <- true
}(i)
}
for i := 0; i < 10; i++ {
<- c
}
}
//运行结果
0
5
8
4
1
9
6
3
7
2
对于这种情况,go语言中的
sync.WaitGroup
可以更加简单的实现这个功能。
WaitGroup对象的内部有一个计数器,最初从0开始,它有3个方法:Add()/Done()/Wait()
用来控制计数器的数量:
Add(n)
把计数器的数量设置为nDone()
每次把计数器的数量-1Wait()
会阻塞代码的运行,直到计数器的值为0
我们使用WaitGroup
来执行上述代码:
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(10) //定时器数量为10
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println("Now output ", i)
wg.Done() //定时器-1
}(i)
}
wg.Wait() //阻塞,直到定时器为0
}
//运行结果
Now output 0
Now output 9
Now output 6
Now output 7
Now output 8
Now output 2
Now output 1
Now output 3
Now output 5
Now output 4
相对于管道来说,WaitGroup
轻巧许多,说完这个知识点,我们来看如何启动多个goroutine
。
2.4 启动多个goroutine
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() //goroutine结束计数器-1
fmt.Println("goroutine number:", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) //启动一个goroutine计数器+1
go hello(i)
}
wg.Wait() //阻塞,等待goroutine为0
}
//运行结果
goroutine number: 0
goroutine number: 6
goroutine number: 9
goroutine number: 7
goroutine number: 4
goroutine number: 2
goroutine number: 1
goroutine number: 3
goroutine number: 5
goroutine number: 8
很显然,打印的顺序并不一致,因为这个10个goroutine
是并发执行的,而goroutine
的调度是随机的。
2.5 goroutine与线程
操作系统线程一般都有固定的栈内存(通常为2MB),一个goroutine
的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine
的栈不是固定的,他可以按需增大和缩小,goroutine
的栈大小限制可以达到1GB,虽然极少会用到这么大。所以在Go语言中一次创建十万左右的goroutine
也是可以的。
2.6 goroutine的调度策略
GPM
是Go语言运行时(runtime)层面的实现,是go语言自己实现的一套调度系统。区别于操作系统调度OS线程。
G
很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。P
管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。M(machine)
是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;
P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。
P的个数是通过runtime.GOMAXPROCS
设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。
单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine
则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。
Go语言中的操作系统线程和goroutine的关系:
- 一个操作系统线程对应用户态多个goroutine。
- go程序可以同时使用多个操作系统线程。
- goroutine和OS线程是多对多的关系,即m:n。
3. Channel
单纯的执行并发函数是没有意义的,函数间进行数据交换才能体现并发执行函数的意义。
虽然可以使用共享内存进行数据交换,但是共享内存存在不同的goroutine容易发生竞态问题,为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这样势必会造成性能问题。
Go语言的并发模型是CSP(Communicating Sequential Processes)
,提倡通过通信共享内存而不是通过共享内存而实现通信。
如果说goroutine是go程序并发的执行体,channel就是它们之间的连接。channel是用来让一个goroutine发送特定值到另一个goroutine的通信机制。
3.1 channel类型
channel是一种引用类型,可以通过如下方法声明:
var 变量 chan 元素类型
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个出传递布尔类型的通道
var ch3 chan []int // 声明一个传递int切片的通道
3.2 创建channel
首先,需要声明通道,此时通道的值为
,声明通道之后需要使用make函数初始化后才能使用。
//声明通道
var ch chan int
fmt.println(ch) // <nil>
//初始化通道
make(chan 元素类型,缓冲区大小)
ch4 :=make(chan int,100)
ch5 :=make(chan bool,1000)
ch6 :=make(chan []int,1000)
3.3 channel操作
通道有发送、接收和关闭三种操作。首先初始化一个通道:ch :=make(chan int)
-
发送:
ch <- 10
(将值发送到通道中) -
接收:
x := <- ch //从ch中接收值并赋值给变量x <- ch //从ch中接收值,忽略结果
-
关闭:通过调用内置的close方法来关闭通道
close(ch)
只有在通知接收方goroutine所有的数据都发送完毕后,才需要关闭通道。
通道是可以被GC(垃圾回收机制)回收的,它和关闭文件是不一样的,关闭文件是必须操作,但关闭通道不是必须的。
关闭后的通道有以下特点:
- 对一个关闭的通道再发送值会导致pannic
- 对一个关闭的通道进行接收会一直获取值知道通道为空
- 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值
- 关闭一个已经关闭的通道会导致panic
3.4 无缓冲的通道
无缓冲的通道又称为阻塞的通道,只能存一个数据,并且只有当改数据被取出的时候才能存下一个数据。代码如下:
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}
//运行结果
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
出现错误的原因是:使用ch := make(chan int)
创建的是无缓冲的通道,需要有接收值的情况下才能发送值。上述代码会在ch<-10处产生死锁。
我们需要在ch <- 10
接收值之前,启用一个goroutine去接收值,代码如下:
package main
import "fmt"
func received(c chan int) {
result := <-c
fmt.Println("发送成功:", result)
}
func main() {
ch := make(chan int)
go received(ch)
ch <- 10
fmt.Println("发送成功")
}
//运行结果
发送成功: 10
发送成功
使用无缓冲通道进行通信将导致发送和接收的
goroutine
同步化,因此,无缓冲通道也被称为同步通道
。
3.5 有缓冲的通道
为了解决上面的问题,可以使用带有缓冲区的通道,即在初始化通道时指定通道的容量。
package main
import "fmt"
func main() {
ch := make(chan int, 4)
ch <- 10
ch <- 11
ch <- 12
ch <- 13
for i := 0; i < 4; i++ {
result := <-ch
fmt.Println("发送成功", result)
}
}
//运行结果
发送成功 10
发送成功 11
发送成功 12
发送成功 13
从通道中循环取值
当向通道中发送完整数据时,当发送结束,我们可以通过close
来关闭通道。当通道关闭后,如果再往通道发送值会引起panic
,从通道中取值的操作会把通道中的所有值都取完,后续取到的值都是对应类型的零值。
package main
import "fmt"
func main() {
ch1 := make(chan int) //初始化channel
ch2 := make(chan int)
go func() { // 开启goroutine将0~100的数发送到ch1中
for i := 0; i < 10; i++ {
ch1 <- i
}
close(ch1)
}()
go func() { // 开启goroutine从ch1中接收值,并将该值发送到ch2中
for {
i, ok := <-ch1 // 通道关闭后再取值ok=false
if !ok {
break
}
ch2 <- i * 2
}
close(ch2)
}()
// 在主goroutine中从ch2中接收值打印
for i := range ch2 { // 通道关闭后会退出for range循环
fmt.Println(i)
}
}
//输出结果
0
2
4
6
8
10
12
14
16
18
如上例所示,可以通过close
方法关闭channel
,或者使用for range的方式,使用for range
的方式遍历通道,当通道关闭的时候就会退出for range
。
3.6 单向通道
有的时候我们会将通道作为参数在多个任务之间传递,在不同的任务中使用通道都会对其进行限制,比如限制通道在函数中仅用作发送或接收。
假如一个 channel 真的只能读取数据,那么它肯定只会是空的,因为你没机会往里面写数据。同理,如果一个 channel 只允许写入数据,即使写进去了,也没有丝毫意义,因为没有办法读取到里面的数据。所谓的单向 channel 概念,其实只是对 channel 的一种使用限制。
Go中提供了单向通道来处理这种情况,单向通道首先需要声明:
- 只能写入数据的通道类型为:
chan <-
定义方式为:var 通道实例 chan <- 元素类型
- 只能读取数据的通道类型为:
<- chan
定义方式为:var 通道实例 <- chan 元素类型
单向通道的使用案例
func main() {
//方法一:
ch := make(chan int)
var chSendOnly chan<- int = ch //声明一个只能写入数据的通道类型,赋值为ch
var chReadOnly <-chan int = ch //声明一个只能读取数据的通道类型,赋值为ch
//方法二:
ch := make(<-chan int) //声明一个只能写入数据的通道类型,赋值为ch
ch2 := make(chan<- int) //声明一个只能读取数据的通道类型,赋值为ch
}
3.7 worker pool (goroutine池)
在工作中我们通常会使用可以指定启动的goroutine数量–worker pool
模式,控制goroutine
的数量,防止goroutine
泄漏和暴涨。
一个简易的work pool
示例代码如下:
package main
import (
"fmt"
"time"
)
//jobs为只读单向通道,(只能对其写入int类型值),可以对其执行发送操作但是不能执行接收操作;
//results为只写单向通道,(只能从其读取int类型值),可以对其执行接收操作但是不能执行发送操作。
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("worker:%d start job:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end job:%d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
//开启3个goroutine
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
//5个任务
for j := 1; j <= 3; j++ {
jobs <- j
}
close(jobs)
for t := 1; t <= 3; t++ {
<-results
}
}
//运行结果
worker:1 start job:2
worker:3 start job:1
worker:2 start job:3
worker:1 end job:2
worker:2 end job:3
worker:3 end job:1
3.8 select 多路复用
在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。
Go内置了select
关键字,可以同时响应多个通道的操作,select
的使用类似于switch
语句,它是通过case
分支和默认分支来确定不同通道的通信(接收或发送)过程,select
会一直等待,直到某个case
的通信操作完成时才会执行case
分支对应的语句,具体如下:
select{
case <-ch1:
...
case data := <-ch2:
...
case ch3 <- data:
...
default:
默认操作
}
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
//运行结果
0
2
4
6
8
使用select
语句能提高代码的可读性。
- 可处理一个或多个channel的发送/接收操作。
- 如果多个
case
同时满足,select
会随机选择一个。 - 对于没有
case
的select{}
会一直等待,可用于阻塞main函数。
Q.E.D.