Go协程、通道、同步
并发和并行
在讲解并发的时候我们总会提及到另外一个概念叫并行。现在我们来理解并发和并行之间的区别
- 并发:把任务在不同的时间点交给处理器进行处理,在统一时间点任务并不会同时运行
- 并行:把每一个任务分配给每一个处理器独立完成。在统一时间点任务一定是同时进行
两个概念的区别在于:任务是否同时执行
$GO$在$GOMAXPROCS$数量和任务相同时,可以左到并行执行,但是一般情况下都是并发执行
- 我们可以用
runtime.GOMAXPROCS(runtime.NumCPU())
来查看$CPU$的数量
协程
协程是轻量级的执行线程。
假设我们有一个函数$f(s)$,我们一般会 同步的调用它
使用$go \ f(s)$在一个协程中调用这个函数的话,这个新的$Go$协程将会并发的执行这个函数
我们需要注意的是,在使用 $go$ 关键字创建 $go \ routing$的时候,被调用函数的返回值会被忽略
我们也可以为匿名函数启动一个协程
现在两个协程在独立的协程中异步的执行,然后等待两个协程完成,我们后面会使用$WaitGroup$方法
package main
import (
"fmt"
"time"
)
func f(s string) {
for i := 0; i < 3; i++ {
fmt.Println(s, ":", i)
}
}
func main() {
f("direct")
go f("go routing")
go func(msg string) {
fmt.Println(msg)
}("going")
time.Sleep(time.Second)
fmt.Println("done")
}
direct : 1
direct : 2
go routing : 0
go routing : 1
go routing : 2
going
done
//
direct : 1
direct : 2
going
go routing : 0
go routing : 1
go routing : 2
done
观察上述结果我们会发现$direct : 0$丢失了,初步推测是与协程有关,于是我又在第一个协程前面加了一个$Sleep$,那么现在就可以正常输出了
$main$函数也是一个$goroutine$,那么我们可以用如下的代码来说明
package main
import (
"fmt"
"time"
)
func running() {
now := 0
for {
now++
fmt.Println("tick:", now)
time.Sleep(time.Second)
}
}
func main() {
go running()
var ipt string
fmt.Scanln(&ipt)
fmt.Println(ipt)
}
在上面的代码中,$running$是一个无限循环,但是我们在使用协程启动它的时候,我们发现对于下面的输入字符串也可以输入
通道
通道是连接多个协程的管道。我们可以从一个协程将值发到通道,然后再另外一个协程中接收
通道发送和接收信息
我们可以使用var 通道变量 chan 通道类型
来声明一个通道的元素类型。
- 通道类型:通道内的数据类型
- 通道变量:保存通道的变量
$chan$类型的空值是$nil$,声明后需要配合$make$才能使用
这里我们看一下通道的语法,使用 通道变量 <- 值
语法发送一个新的值到通道中。我们再一个新的协程中发送$ping$到上面创建的$message$管道中
然后再接收的时候我们可以使用值<-通道变量
语法从通道中接收一个值。这里我们会收到在上面发送的$ping$消息并且将其打印出来
我们运行程序时,通过通道,成功的将消息$ping$从一个协程传送到了另外一个协程中
package main
import "fmt"
func main() {
messages := make(chan string)
go func() { messages <- "ping" }()
msg := <-messages
fmt.Printf("%T\n", msg)
fmt.Println(msg)
}
string
ping
示例:并发打印
使用无缓冲通道往里面塞入数据的时候,装入方将被阻塞,直到另外通道在另外一个goroutine中被取出。同样,如果通道中没有放入任何数据,接收方试图从通道中获取数据时,同样也是阻塞。
package main
import "fmt"
func printer(c chan int) {
for {
data := <-c
if data == 0 {
break
}
fmt.Println(data)
}
//通知main已经结束循环
c <- 0
}
func main() {
c := make(chan int)
go printer(c)
for i := 1; i <= 10; i++ {
c <- i
}
//通知并发的printer结束循环
c <- 0
//等待printer结束
now := <-c
if now == 0 {
fmt.Println("ending!")
}
}
通道缓冲
默认情况下通道是无缓冲的,这意味着对应的接收通道<-chan
准备号接收时,才允许发送chan<-
有缓冲通道允许再没有对应接收者的情况,缓存一定数量的值,只有当空间被存储满的时候才会发生堵塞。
那么我们创建带有缓冲通道的方法也是通道实例 := make(chan 通道类型,缓冲大小)
package main
import "fmt"
func main() {
messages := make(chan string)
messages <- "im first"
messages <- "im second"
fmt.Println(messages)
//fmt.Println(messages)
}
上面的代码中我们给$messages$放了两个值,然后我们运行起来发现,欸它死锁了,因为通道需要接收两个数值但是目前有一个在通道中有一个还没有进去。
那么我们可以把通道加大,这样子两个数值就可以输出来了
package main
import "fmt"
func main() {
messages := make(chan string, 2)
messages <- "im first"
messages <- "im second"
fmt.Println(<-messages)
fmt.Println(<-messages)
}
im first
im second
阻塞条件
带缓冲通道仍然会在下面列举的情况下依然会发生阻塞:
- 带缓冲通道被填满的时候,尝试再次发送数据会发生堵塞
- 带缓冲通道为空的时候,尝试接收数据时会再次发生堵塞
为什么$Go$语言要限制长度而不提供无限长度的通道
内存将不断膨胀直到应用崩溃
循环接收
通道的数据接收可以借用 $for \ range$ 语句进行多个元素的接收操作
package main
import (
"fmt"
"time"
)
func running() {
now := 0
for {
now++
fmt.Println("tick:", now)
time.Sleep(time.Second)
}
}
func main() {
ch := make(chan int)
go func() {
for i := 3; i >= 0; i-- {
ch <- i
time.Sleep(time.Second)
}
}()
for data := range ch {
fmt.Println(data)
if data == 0 {
break
}
}
}
3
2
1
0
这里有一个点就是在管道输出的时候为什么我输出到最后一个的时候我就需要$break$掉呢?
因为如果继续发送,由于接收的$goroutine$已经退出,没有$goroutine$发送到管道,因此运行时会触发宕机报错
实例:模拟远程过程调用(RPC)
服务器开发中会使用$RPC$来简化进程间通信的过程。
package main
import (
"errors"
"fmt"
"time"
)
func RPCClient(ch chan string, req string) (string, error) {
//向服务器发送一个请求
ch <- req
//等待服务器返回
select {
case ack := <-ch:
return ack, nil
case <-time.After(time.Second):
return "", errors.New("Time out")
}
}
func RPCServer(ch chan string) {
for {
//接收请求
data := <-ch
//打印接收到的数据
fmt.Println("Server receiver:", data)
//反馈给用户自己已经收到
ch <- "haoye"
}
}
func main() {
//创建一个无缓冲字符串通道
ch := make(chan string)
//并发执行服务器逻辑
go RPCServer(ch)
//客户端请求数据和接收数据
recv, err := RPCClient(ch, "hi")
if err != nil {
//发生打印错误
fmt.Println(err)
} else {
//正常接收到数据
fmt.Println("Client Received", recv)
}
}
Server receiver: hi
Client Received haoye
上述代码永远不会发生超时因为处理的很快,因此我们可以在服务器中加一句$sleep$来模拟超时
关闭通道之后继续使用通道
我们可以使用close()
来关闭一个通道
关闭的通道仍然可以被访问,但是访问被关闭的通道将会出现一些问题
给被关闭的通道发送数据将会触发panic
package main
import "fmt"
func main() {
ch := make(chan int)
close(ch)
fmt.Printf("%d %d", cap(ch), len(ch))
ch <- 1
}
0 0panic: send on closed channel
从已关闭的通道接收数据的时候将不会发生堵塞
package main
import "fmt"
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
close(ch)
for i := 0; i < cap(ch)+1; i++ {
v, ok := <-ch
fmt.Println(v, ok)
}
}
我们故意设定遍历的值时这个管道的容量+1,然后我们去看输出的数值
1 true
2 true
0 false
我们可以发现通道空了的话默认返回$0$,但是获取的时候不会堵塞,取出数据的时候会发生失败的状况。
同步——保证并发环境下数据访问的正确性
$Go$程序可以使用通道进行多个$goroutine$间的数据交换,这仅仅时数据同步中的一种方法
竞态检测
当多线程并发运行的程序竞争访问和修改同一块资源时,会发生竞争问题
package main
import (
"fmt"
"sync/atomic"
)
var seq int64
func GetID() int64 {
//尝试原子的增加序列号
atomic.AddInt64(&seq, 1)
return seq
}
func main() {
for i := 0; i < 10; i++ {
go GetID()
}
fmt.Println(GetID())
}
我们在运行的时候加上race
参数,使用go run - race demo16.go
然后会发现调用go GetId的时候会存在竞态问题
那么我们可以这么修改
return atomic.AddInt64(&seq, 1)
互斥锁——保证同时只有一个goroutine可以访问共享资源
加锁,使得当前设置这个count变量以及获取这个变量的时候是安全的
package main
import (
"fmt"
"sync"
)
var (
count int
countGuard sync.Mutex
)
func GetCount() int {
countGuard.Lock()
defer countGuard.Unlock()
return count
}
func SetCount(c int) {
countGuard.Lock()
count = c
countGuard.Unlock()
}
func main() {
SetCount(1)
fmt.Println(GetCount())
}
读写互斥锁–在读比写多的环境下比互斥锁更加高效
WaitGroup
除了可以使用通道和互斥锁进行两个并发程序的同步以外,还可以使用等待组进行多个任务的同步
方法名 | 功能 |
---|---|
(wg *WaitGroup) add(delta int) | 等待组的计数器+1 |
(wg *WaitGroup) Done() | 等待组的计数器-1 |
(wg *WaitGroup) Wait() | 当等待组计数器不等于0时阻塞直到变0 |
package main
import (
"fmt"
"net/http"
"sync"
)
func main() {
//声明一个等待组
var wg sync.WaitGroup
//准备网站地址
var urls = []string{
"https://blog.yokito.cn",
"https://www.qiniu.com",
"https://gobyexample-cn.github.io/",
}
//遍历这些地址
for _, url := range urls {
wg.Add(1)
//开启并发
go func(url string) {
defer wg.Done()
_, err := http.Get(url)
fmt.Println(url, err)
}(url)
}
//等待所有的任务完成
wg.Wait()
fmt.Println("over")
}
https://www.qiniu.com <nil>
https://blog.yokito.cn <nil>
https://gobyexample-cn.github.io/ <nil>
还有一个例子我们也可以来看一看
package main
import (
"fmt"
"sync"
"time"
)
func main() {
//声明一个等待组
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
//开启并发
go func(num int) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", num)
time.Sleep(time.Second)
fmt.Printf("Worker %d ended\n", num)
}(i)
}
//等待所有的任务完成
wg.Wait()
fmt.Println("over")
}
Worker 1 starting
Worker 3 starting
Worker 4 starting
Worker 2 starting
Worker 5 starting
Worker 5 ended
Worker 3 ended
Worker 4 ended
Worker 1 ended
Worker 2 ended