Go语言基础: goroutine和通道

2023-06-26,,

并发编程表现为程序由若干个自主的活动单元组成。

goroutine

在Go语言里,每一个并发执行的活动称为goroutine。当一个程序启动时,只有一个goroutine来调用main函数,称之为主goroutine。新的goroutine通过go语句来创建。goroutine 的执行函数的返回,就意味着 goroutine 退出。goroutine 执行的函数或方法即便有返回值,Go 也会忽略这些返回值。

示例:时钟服务器

首先是用顺序时钟服务器,没有使用并发

package main

import (
"io"
"log"
"net"
"time"
) func main() {
// 监听TCP 8000端口
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
// 接收连接
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
// 一次处理一个连接
handleConn(conn)
}
} func handleConn(c net.Conn) {
defer c.Close()
for {
// 写入时间
_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
if err != nil {
return
}
time.Sleep(1 * time.Second)
}
}

上述代码中Listen函数创建一个net.Listener对象,它在一个网络端口上监听进来的连接。监听器的Accept方法被阻塞,直到有连接请求进来,然后返回net.Conn对象来代表一个连接

handleConn函数处理一个完整的客户连接。在循环里,它将time.Now()获取到的当前时间发送给客户端。net.Conn满足io.Writer接口,所以可以直接向它进行写入,当写入失败时循环结束。这时候handleConn使用延迟的Close关闭这边的连接,继续等待下一个请求

$ nc localhost 8000
10:56:12
10:56:13
10:56:14
10:56:15
10:56:16
10:56:17
10:56:18
10:56:19
10:56:20
10:56:21
10:56:22
10:56:23
10:56:24
10:56:25
10:56:26
10:56:27
10:56:28
10:56:29
10:56:30
10:56:31
10:56:32
10:56:33
10:56:34
^C

未终止时,打开另一个终端进行连接,会被阻塞。终止后,另一个终端的连接请求会被响应。

实现Go版netcat

package main

import (
"io"
"log"
"net"
"os"
) func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// 输出数据
mustCopy(os.Stdout, conn)
} func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}

运行结果

$ go run netcat1.go
11:15:36
11:15:37
11:15:38
11:15:39
11:15:40
11:15:41

修改服务端程序

    for {
conn, err := Listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}

在for循环中,handleConn函数前加上go,此时将并发处理连接

打开两个终端,执行请求

运行结果

$ ./netcat1
11:40:09
11:40:10
11:40:11
11:40:12
11:40:13
11:40:14
11:40:15
11:40:16
11:40:17
11:40:18
11:40:19
11:40:20
$ ./netcat1
11:40:14
11:40:15
11:40:16
11:40:17
11:40:18
11:40:19
11:40:20
11:40:21
goroutine调度器

一个 Go 程序中可以创建成千上万个并发的 Goroutine。而将这些 Goroutine 按照一定算法放到“CPU”上执行的程序,就被称为 Goroutine 调度器(Goroutine Scheduler)。一个 Go 程序对于操作系统来说只是一个用户层程序,操作系统眼中只有线程,它并不知道有一种叫 Goroutine 的事物存在。所以,Goroutine 的调度全要靠 Go 自己完成。那么,实现 Go 程序内 Goroutine 之间“公平”竞争“CPU”资源的任务,就落到了 Go 运行时(runtime)头上了。在一个 Go 程序中,除了用户层代码,剩下的就是 Go 运行时。

它的调度模型与算法几经演化,从最初的 G-M 模型、到 G-P-M 模型,从不支持抢占,到支持协作式抢占,再到支持基于信号的异步抢占,Goroutine 调度器经历了不断地优化与打磨。

每个 Goroutine 对应于运行时中的一个抽象结构:G(Goroutine) ,而被视作“物理 CPU”的操作系统线程,则被抽象为另外一个结构:M(machine)。

调度器的工作就是将 G 调度到 M 上去运行。为了更好地控制程序中活跃的 M 的数量,调度器引入了 GOMAXPROCS 变量来表示 Go 调度器可见的“处理器”的最大数量。

G-M 模型的一个重要不足:限制了 Go 并发程序的伸缩性,尤其是对那些有高吞吐或并行计算需求的服务程序。

这个问题主要体现在这几个方面:

    单一全局互斥锁(Sched.Lock) 和集中状态存储的存在,导致所有 Goroutine 相关操作,比如创建、重新调度等,都要上锁;

    Goroutine 传递问题:M 经常在 M 之间传递“可运行”的 Goroutine,这导致调度延迟增大,也增加了额外的性能损耗;每个 M 都做内存缓存,导致内存占用过高,数据局部性较差;

    由于系统调用(syscall)而形成的频繁的工作线程阻塞和解除阻塞,导致额外的性能损耗。

G-M 模型中增加了一个 P,让 Go 调度器具有很好的伸缩性。P 是一个“逻辑 Proccessor”,每个 G(Goroutine)要想真正运行起来,首先需要被分配一个 P,也就是进入到 P 的本地运行队列(local runq)中。对于 G 来说,P 就是运行它的“CPU”,可以说:在 G 的眼里只有 P。但从 Go 调度器的视角来看,真正的“CPU”是 M,只有将 P 和 M 绑定,才能让 P 的 runq 中的 G 真正运行起来。

G-P-M 模型的实现算是Go调度器的一大进步,但调度器仍然有一个令人头疼的问题,那就是不支持抢占式调度,这导致一旦某个 G 中出现死循环的代码逻辑,那么 G 将永久占用分配给它的 P 和 M,而位于同一个 P 中的其他 G 将得不到调度,出现“饿死”的情况。

在 Go 1.2 中实现了基于协作的“抢占式”调度。这个抢占式调度的原理就是,Go 编译器在每个函数或方法的入口处加上了一段额外的代码 (runtime.morestack_noctxt),让运行时有机会在这段代码中检查是否需要执行抢占调度。这种解决方案只能说局部解决了“饿死”问题,只在有函数调用的地方才能插入“抢占”代码(埋点),对于没有函数调用而是纯算法循环计算的 G,Go 调度器依然无法抢占。比如,死循环等并没有给编译器插入抢占代码的机会,这就会导致 GC 在等待所有 Goroutine 停止时的等待时间过长,从而导致 GC 延迟,内存占用瞬间冲高;甚至在一些特殊情况下,导致在 STW(stop the world)时死锁。

Go 在 1.14 版本中增加了对非协作的抢占式调度的支持,这种抢占式调度是基于系统信号的,也就是通过向线程发送信号的方式来抢占正在运行的 Goroutine。

Go 运行时已经实现了 netpoller,这使得即便 G 发起网络 I/O 操作,也不会导致 M 被阻塞(仅阻塞 G),也就不会导致大量线程(M)被创建出来。

CSP理论(通信顺序模型)

一个并发系统由若干并行运行的顺序进程组成,每个进程不能对其他进程的变量赋值。进程之间只能通过 一对通信原语实现协作:Q->x表示从进程Q输入一个值到变量x中;P<-e表示把表达式e的值发送给进程P。当P进程执行Q->x, 同时Q进程执行P<-e时,发生通信,e的值从Q进程传送给P进程的变量x。

一个符合 CSP 模型的并发程序应该是一组通过输入输出原语连接起来的 P 的集合。

通道

CSP 模型的实现包含两个主要组成部分:一个是 Goroutine,它是 Go 应用并发设计的基本构建与执行单元;另一个就是 channel,它在并发模型中扮演着重要的角色。channel 既可以用来实现 Goroutine 间的通信,还可以实现 Goroutine 间的同步。

goroutine是Go程序并发的执行体,通道就是它们之间的连接。通道是可以让一个goroutine发送特定值到另一个goroutine的通信机制。每一个通道都是一个具体类型的导管,叫作通道的元素类型。

使用内置的make函数来创建一个通道

ch=make(chan int)

像map一样,通道是一个使用make创建的数据结构的引用。当复制或者作为参数传递到一个函数时,复制的是引用,这样调用者和被调用者都引用同一份数据结构。和其他引用类型一样,通道的零值是nil。

同种类型的通道可以用==进行比较。当二者都是同一通道的数据的引用时,比较值为true。通道也可以和nil进行比较。

通道有两个主要操作:发送(send)和接收(receive),两者统称为通信。send语句从一个goroutine传输一个值到另一个在执行接收表达式的goroutine。两个操作都使用<-操作符书写。发送语句中,通道和值分别在<-左右两边。

ch <- x   // 发送语句
x = <-ch // 赋值语句中的接收表达式
<-ch // 接收语句,丢弃结果

通道也支持关闭,它设置一个标志位来指示值当前已经发送完毕,这个通道后面不会有值了。

close(ch)

使用简单的make调用创建的通道叫作无缓冲通道,但make还可以接收第二个可选参数,一个表示通道容量的整数。

ch = make(chan int)
ch = make(chan int,0)
ch = make(chan int,3)

写入通道

package main

import (
"fmt"
"time"
) func writeToChannel(c chan int, x int) {
fmt.Println(x)
c <- x
close(c)
fmt.Println(x)
} func main() {
c := make(chan int)
go writeToChannel(c, 10)
time.Sleep(1 * time.Second)
}

运行结果表明,程序只打印出来一个10

因为c<-x阻塞了writeToChannel函数其余代码的执行,因为没有读取写入c通道,因此程序不会等待writeToChannel且即刻终止。

这就是无缓冲通道,无缓冲通道上的发送操作将会阻塞,直到另一个goroutine在对应的通道上执行接收操作,这时值传送完成,两个goroutine都可以继续执行。相反,如果接收操作先执行,接收方将goroutine将堵塞,直到另一个goroutine在同一个通道上发送一个值。

使用无缓冲通道进行的通信导致发送和接收goroutine同步化。因此,无缓冲通道也称之为同步通道。当一个值在无缓冲通道上传递时,接收值后发送goroutine才被再次唤醒。

对无缓冲 channel 类型的发送与接收操作,一定要放在两个不同的 Goroutine 中进行,否则会导致 deadlock。

从通道中读取数据

package main

import (
"fmt"
"time"
) func writeToChannel(c chan int, x int) {
fmt.Println("1", x)
c <- x
close(c)
fmt.Println("2", x)
} func main() {
c := make(chan int)
go writeToChannel(c, 10)
time.Sleep(1 * time.Second)
fmt.Println("Read:", <-c)
time.Sleep(1 * time.Second)
// 判断给定通道是否是开启状态
_, ok := <-c
if ok {
fmt.Println("Channel is open")
} else {
fmt.Println("Channel is closed")
}
}

通过执行<-c,可以从名为c的通道中读取一个值。

从关闭的通道中读取的数据是对应类型的零值。

管道

是连接协程和通道的虚拟方法,以便通过通道使得一个协程的输出变为另一个协程的输入,进而传输数据。

管道的优点之一是,因为不存在协程和通道且必须等待一切都完成后才可以开始其执行,所以程序中存在一个恒定的数据流。此外,无须将所有事务保存为变量,较少的变量将节省更多的内存空间。最后简化了程序设计并改善了程序的可维护性。

package main

import "fmt"

func main() {
naturals := make(chan int)
squares := make(chan int)
// counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// printer 在主goroutine
for x := range squares {
fmt.Println(x)
}
}

在上面的管道中,当counter goroutine在100个元素后结束循环时,它关闭naturals通道,导致squarer结束循环并关闭squares通道。最终,主goroutine结束,然后程序退出。

单向通道类型

单向通道类型,仅仅导出发送或接收操作。类型chan<- int是一个只能发送的通道,允许发送但不允许接收。反之,类型<-chan int是一个只能接收的通道,允许接收但不允许发送。

package main

import "fmt"

func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
} func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
} func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
} func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}

在counter中,只能向out写入数据,在squarer中,只能从in读出数据

缓冲通道

有一个元素队列,队列的最大长度在创建的时候通过make的容量参数来设置。

ch = make(chan string,3)

这条语句创建了一个可以容纳3个字符串的缓冲通道

缓冲通道上的发送操作在队列的尾部插入一个元素,接收操作从队列的头部移除一个元素。如果通道满了,发送操作会阻塞所在的groutine直到另一个goroutine对它进行接收操作来留出可用的空间。

cap函数可以得知通道的长度cap(ch)

len函数可以获取当前通道缓冲区的元素个数len(ch)

和无缓冲 channel 相反,带缓冲 channel 的运行时层实现带有缓冲区,因此,对带缓冲 channel 的发送操作在缓冲区未满、接收操作在缓冲区非空的情况下是异步的(发送或接收不需要阻塞等待)。

select

当涉及同时对多个 channel 进行操作时,会结合 Go 为 CSP 并发模型提供的另外一个原语 select,一起使用。

当 select 语句中没有 default 分支,而且所有 case 中的 channel 操作都阻塞了的时候,整个 select 语句都将被阻塞,直到某一个 case 上的 channel 变成可发送,或者某个 case 上的 channel 变成可接收,select 语句才可以继续进行下去。

select {
case x := <-ch1: // 从channel ch1接收数据
... ... case y, ok := <-ch2: // 从channel ch2接收数据,并根据ok值判断ch2是否已经关闭
... ... case ch3 <- z: // 将z值发送到channel ch3中:
... ... default: // 当上面case中的channel通信均无法实施时,执行该默认分支
}
无缓冲 channel 的惯用法

无缓冲 channel 兼具通信和同步特性,在并发程序中应用颇为广泛。

第一种用法:用作信号传递

无缓冲 channel 用作信号传递的时候,有两种情况,分别是 1 对 1 通知信号和 1 对 n 通知信号。

1对1通知信号

type signal struct{} // 空结构体被当作信号

func worker() {
println("worker is working...")
time.Sleep(1 * time.Second)
} func spawn(f func()) <-chan signal {
c := make(chan signal)
go func() {
println("worker start to work...")
f()
c <- signal{} // 传递信号
}()
return c
} func main() {
println("start a worker...")
c := spawn(worker)
<-c // 当通道未被阻塞表明f()已经执行完毕
fmt.Println("worker work done!")
}

spawn 函数返回的 channel,被用于承载新 Goroutine 退出的“通知信号”,这个信号专门用作通知 main goroutine。main goroutine 在调用 spawn 函数后一直阻塞在对这个“通知信号”的接收动作上。

有些时候,无缓冲 channel 还被用来实现 1 对 n 的信号通知机制。这样的信号通知机制,常被用于协调多个 Goroutine 一起工作,比如下面的例子:

func worker(i int) {
fmt.Printf("worker %d: is working...\n", i)
time.Sleep(1 * time.Second)
fmt.Printf("worker %d: works done\n", i)
} func spawnGroup(f func(i int), num int, groupSignal <-chan signal) <-chan signal {
c := make(chan signal)
var wg sync.WaitGroup for i := 0; i < num; i++ {
wg.Add(1)
go func(i int) {
<-groupSignal
fmt.Printf("worker %d: start to work...\n", i)
f(i)
wg.Done()
}(i + 1)
} go func() {
wg.Wait()
c <- signal(struct{}{})
}()
return c
} func main() {
fmt.Println("start a group of workers...")
groupSignal := make(chan signal)
c := spawnGroup(worker, 5, groupSignal)
time.Sleep(5 * time.Second)
fmt.Println("the group of workers start to work...")
close(groupSignal)
<-c
fmt.Println("the group of workers work done!")
}

main goroutine 创建了一组 5 个 worker goroutine,这些 Goroutine 启动后会阻塞在名为 groupSignal 的无缓冲 channel 上。main goroutine 通过close(groupSignal)向所有 worker goroutine 广播“开始工作”的信号,收到 groupSignal 后,所有 worker goroutine 会“同时”开始工作。

第二种用法:用于替代锁机制

一个传统的、基于“共享内存”+“互斥锁”的 Goroutine 安全的计数器的实现:

type counter struct {
sync.Mutex
i int
} var cter counter func Increase() int {
cter.Lock() // 互斥锁
defer cter.Unlock()
cter.i++
return cter.i
} func main() {
var wg sync.WaitGroup for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
v := Increase()
fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
wg.Done()
}(i)
} wg.Wait()
}

使用了一个带有互斥锁保护的全局变量作为计数器,所有要操作计数器的 Goroutine 共享这个全局变量,并在互斥锁的同步下对计数器进行自增操作。

再看更符合 Go 设计惯例的实现,也就是使用无缓冲 channel 替代锁后的实现:

type counter struct {
c chan int
i int
} func NewCounter() *counter {
cter := &counter{
c: make(chan int),
}
go func() {
for {
cter.i++ // 增加值
cter.c <- cter.i // 向通道发送
}
}()
return cter
} func (cter *counter) Increase() int {
return <-cter.c // 可能会被阻塞,通过阻塞来实现同步
} func main() {
cter := NewCounter()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
v := cter.Increase() // 增加计数器的值变成了接收通道的值
fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
wg.Done()
}(i)
}
wg.Wait()
}

将计数器操作全部交给一个独立的 Goroutine 去处理,并通过无缓冲 channel 的同步阻塞特性,实现了计数器的控制。这样其他 Goroutine 通过 Increase 函数试图增加计数器值的动作,实质上就转化为了一次无缓冲 channel 的接收动作。

这种并发设计逻辑更符合 Go 语言所倡导的“不要通过共享内存来通信,而是通过通信来共享内存”的原则。

带缓冲 channel 的惯用法

带缓冲的 channel 与无缓冲的 channel 的最大不同之处,就在于它的异步性。也就是说,对一个带缓冲 channel,在缓冲区未满的情况下,对它进行发送操作的 Goroutine 不会阻塞挂起;在缓冲区有数据的情况下,对它进行接收操作的 Goroutine 也不会阻塞挂起。

第一种用法:用作消息队列

channel 的原生特性与消息队列十分相似,包括 Goroutine 安全、有 FIFO(first-in, first out)保证等。

和无缓冲 channel 更多用于信号 / 事件管道相比,可自行设置容量、异步收发的带缓冲 channel 更适合被用作为消息队列,并且,带缓冲 channel 在数据收发的性能上要明显好于无缓冲 channel。

第二种用法:用作信号计数

Go 并发设计的一个惯用法,就是将带缓冲 channel 用作计数信号量(counting semaphore)。带缓冲 channel 中的当前数据个数代表的是,当前同时处于活动状态(处理业务)的 Goroutine 的数量,而带缓冲 channel 的容量(capacity),就代表了允许同时处于活动状态的 Goroutine 的最大数量。向带缓冲 channel 的一个发送操作表示获取一个信号量,而从 channel 的一个接收操作则表示释放一个信号量。

示例:

var active = make(chan struct{}, 3)
var jobs = make(chan int, 10) func main() {
go func() {
for i := 0; i < 8; i++ {
jobs <- (i + 1)
}
close(jobs)
}() var wg sync.WaitGroup for j := range jobs {
wg.Add(1)
go func(j int) {
// 向通道传入一个空结构,一个gourtine开启
active <- struct{}{}
log.Printf("handle job: %d\n", j)
time.Sleep(2 * time.Second)
<-active
wg.Done()
}(j)
}
wg.Wait()
}

这个示例创建了一组 Goroutine 来处理 job,同一时间允许最多 3 个 Goroutine 处于活动状态。为了达成这一目标,这个示例使用了一个容量(capacity)为 3 的带缓冲 channel: active 作为计数信号量,这意味着允许同时处于活动状态的最大 Goroutine 数量为 3。

G-P-M 模型

G: 代表 Goroutine,存储了 Goroutine 的执行栈信息、Goroutine 状态以及 Goroutine 的任务函数等,而且 G 对象是可以重用的;

P: 代表逻辑 processor,P 的数量决定了系统内最大可并行的 G 的数量,P 的最大作用还是其拥有的各种 G 对象队列、链表、一些缓存和状态;

M: M 代表着真正的执行计算资源。在绑定有效的 P 后,进入一个调度循环,而调度循环的机制大致是从 P 的本地运行队列以及全局队列中获取 G,切换到 G 的执行栈上并执行 G 的函数,调用 goexit 做清理工作并回到 M,如此反复。M 并不保留 G 状态,这是 G 可以跨 M 调度的基础

聊天服务器

让几个用户之间互相广播文本消息。这个程序里有4个groutine。主goroutine和广播(broadcaster)groutine。每一个连接里面有一个连接处理goroutine和一个客户写入(clientWriter)goroutine。广播器(broadcaster)对三种不同消息进行响应。

主goroutine的工作是监听端口,接收连接客户端的网络连接。对每一个连接,它创建一个新的handleConn goroutine

func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
go boardcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
// 处理客户端连接
go handleConn(conn)
}
}

广播器,使用局部变量clients来记录当前连接的客户集合。每个客户唯一被记录的信息是其对外发送消息通道的ID

type client chan<- string

var (
entering = make(chan client)
leaving = make(chan client)
messages = make(chan string)
) func boardcaster() {
clients := make(map[client]bool)
for {
select {
case msg := <-messages:
for cli := range clients {
cli <- msg
}
// 有客户端连接 标记为TRUE
case cli := <-entering:
clients[cli] = true
// 客户端断开连接 移出集合
case cli := <-leaving:
delete(clients, cli)
close(cli)
}
}
}

广播者监听两个全局的通道entering和leaving,通过它们通知客户的到来和离开,如果它从其他一个接收到事件,它将更新clients集合。如果客户离开,那么它关闭对应客户对外发送消息的通道。广播者也监听来自messages通道的事件,所有的客户都将消息发送到这个通道。当广播者接收到其中一个事件时,把消息广播给所有客户。

每个客户有自己的goroutine。handleConn函数创建一个对外发送消息的新通道,然后通过entering通道通知广播者新客户到来。接着,它读取客户发来的每一行文本,通过全局接收消息通道将每一行发送给广播者,发送时在每条消息前面加上发送者ID作为前缀。一旦从客户端读取完毕消息,handleConn通过leaving通道通知客户离开,然后关闭连接。

func handleConn(conn net.Conn) {
ch := make(chan string) // 创建通道
go clientWriter(conn, ch)
who := conn.RemoteAddr().String() // 获取客户端IP地址
ch <- "You are " + who
messages <- who + " has arrived"
entering <- ch
// 读取客户端发送信息
input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
leaving <- ch
messages <- who + " has left"
conn.Close()

handleConn函数为每一个客户创建了写入(clientWriter)goroutine,它接收消息(ch通道),广播到客户发送消息通道中,然后将它们写到客户的网络连接中。客户写入者的循环在广播者收到leaving通知并且关闭客户的发送消息通道后中止。

func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
fmt.Fprintln(conn, msg)
}
}

Go语言基础: goroutine和通道的相关教程结束。

《Go语言基础: goroutine和通道.doc》

下载本文的Word格式文档,以方便收藏与打印。