GMP 模型
Robert Griesemer、Rob Pike、Ken Thompson三位Go语言创始人,对新语言商在讨论时,就决定了 要让Go语言成为面向未来的语言。当时多核CPU已经开始普及,但是众多“古老”编程语言却不能很好的 适应新的硬件进步,Go语言诞生之初就为多核CPU并行而设计。
Go语言协程中,非常重要的就是协程调度器scheduler和网络轮询器netpoller。
GPM分别是什么
Go协程调度中,有三个重要角色:
- M:Machine Thread,对系统线程抽象、封装。所有代码最终都要在系统线程上运行,协程最终 也是代码,也不例外
- G:Goroutine,Go协程。存储了协程的执行栈信息、状态和任务函数等。初始栈大小约为2~4k, 理论上开启百万个Goroutine不是问题
- P:Go1.1版本引入,Processor,虚拟处理器
- 可以通过环境变量
GOMAXPROCS
或runtime.GOMAXPROCS()
设置,默认为CPU核心数 - P的数量决定着最大可并行的G的数量
- P有自己的队列(长度256),里面放着待执行的G
- M和P需要绑定在一起,这样P队列中的G才能真正在线程上执行
- 可以通过环境变量
调度过程
1、使用go func 创建一个Goroutine g1
2、当前P为p1,将g1加入当前P的本地队列LRQ(Local Run Queue)。如果LRQ满了,就加入到GRQ (Global Run Queue)
3、p1和m1绑定,m1先尝试从p1的LRQ中请求G。如果没有,就从GRQ中请求G。如果还没有,就随机 从别的P的LRQ中偷(work stealing)一部分G到本地LRQ中
4、假设m1最终拿到了g1
5、执行,让g1的代码在m1线程上运行
5.1、g1正常执行完了(函数调用完成了),g1和m1解绑,执行第3步的获取下一个可执行的g
5.2、g1中代码主动让出控制权,g1和m1解绑,将g1加入到GRQ中,执行第3步的获取下一个可执行的g
5.3、g1中进行channel、互斥锁等操作进入阻塞态,g1和m1解绑,执行第3步的获取下一个可执行的 g。如果阻塞态的g1被其他协程g唤醒后,就尝试加入到唤醒者的LRQ中,如果LRQ满了,就连同g和LRQ 中一半转移到GRQ中。
5.4、系统调用
① 同步系统调用时,执行如下
如果遇到了同步阻塞系统调用,g1阻塞,m1也被阻塞了,m1和p1解绑。
从休眠线程队列中获取一个空闲线程,和p1绑定,并从p1队列中获取下一个可执行的g来执行;如果休 眠队列中无空闲线程,就创建一个线程提供给p1。
如果m1阻塞结束,需要和一个空闲的p绑定,优先和原来的p1绑定。如果没有空闲的P,g1会放到GRQ 中,m1加入到休眠线程队列中。
② 异步网络IO调用时,如下
网络IO代码会被Go在底层变成非阻塞IO,这样就可以使用IO多路复用了。
m1执行g1,执行过程中发生了非阻塞IO调用(读/写)时,g1和m1解绑,g1会被网络轮询器Netpoller 接手。m1再从p1的LRQ中获取下一个Goroutine g2执行。注意,m1和p1不解绑。
g1等待的IO就绪后,g1从网络轮询器移回P的LRQ(本地运行队列)或全局GRQ中,重新进入可执行状 态。
就大致相当于网络轮询器Netpoller内部就是使用了IO多路复用和非阻塞IO,类似我们课件代码中的 select的循环。GO对不同操作系统MAC(kqueue)、Linux(epoll)、Windows(iocp)提供了支持。
并发注意事项
控制并发数
在Go语言中启动一个Goroutine不仅和调用函数一样简单,而且Goroutine之间调度代价也很低,这些因素极大地促进了并发编程的流行和发展。
很多用户在适应了Go语言强大的并发特性之后,都倾向于编写最大并发的程序,因为这样似乎可以提供最大的性能
遇事不决go一下:
go func()
一个Goroutine会以一个很小的栈启动(可能是2KB或4KB), 但是你的系统和调度器的能力总是有上限的, 在面对大规模的并发请求时(千万或者亿)我们是要考虑goroutine的销毁成本的。
一个是使用goroutine pool控制gotourine数量, 另一个就是做好系统的限流与上限控制
当然还有 更重要的一点, 管理好goroutine的退出, 不让goroutine泄露是很容量的。
并发的安全退出
有时候我们需要通知goroutine停止它正在干的事情,特别是当它工作在错误的方向上的时候。
Go语言并没有提供在一个直接终止Goroutine的方法,由于这样会导致goroutine之间的共享变量处在未定义的状态上。但是如果我们想要退出两个或者任意多个Goroutine怎么办呢?
我们可以 通过channel 发送退出信号
比如:
func worker(cannel chan struct{}) {
for {
select {
default:
fmt.Println("hello")
time.Sleep(100 * time.Millisecond)
case <-cannel:
// 退出
}
}
}
func CancelWithChannel() {
cannel := make(chan struct{})
go worker(cannel)
time.Sleep(time.Second)
cannel <- struct{}{}
}
当Goroutine收到退出指令, 退出时一般会进行一定的清理工作,但是退出的清理工作并不能保证被完成,
因为main线程并没有等待各个工作Goroutine退出工作完成的机制,我们可以结合sync.WaitGroup来改进
func workerv2(wg *sync.WaitGroup, cancel chan bool) {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
time.Sleep(100 * time.Millisecond)
case <-cancel:
return
}
}
}
func CancelWithDown() {
cancel := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go workerv2(&wg, cancel)
}
time.Sleep(time.Second)
// 发送退出信号
close(cancel)
// 等待goroutine 安全退出
wg.Wait()
}
使用context管理goroutine的退出
在Go1.7发布时,标准库增加了一个context包,用来简化对于处理单个请求的多个Goroutine之间与请求域的数据、超时和退出等操作,官方有博文对此做了专门介绍。我们可以用context包来重新实现前面的线程安全退出或超时的控制
func workerV3(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
time.Sleep(100 * time.Millisecond)
case <-ctx.Done():
return ctx.Err()
}
}
}
func CancelWithCtx() {
// 控制超时
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go workerV3(ctx, &wg)
}
time.Sleep(time.Second)
// 取出任务
cancel()
// 等待安全退出
wg.Wait()
}
并发中的Pannic
我们可以通过recover捕获异常避免程序崩溃,比如:
func DealPanic() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
arr := []int{0}
_ = arr[2]
}
// 输出: runtime error: index out of range [2] with length 1
但是如果我们的错误是发生在Goroutine中喃?
func DealPanicInG() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
wg.Add(1)
go work()
wg.Wait()
}
func work() {
arr := []int{0}
_ = arr[2]
wg.Done()
}
结果没用, 程序还是panic了, 因为go的recover是针对 但goroutine的, 因此我们在写启动gorouite的适合,
不能 无脑 go func, 这样可能由于goroutine的异常 而让 整个程序崩溃掉
go 的http库,是为每个请求启动一个Goroutine进行处理,我们看看他如何处理的:
// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.server.logf("http: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
if !c.hijacked() {
c.close()
c.setState(c.rwc, StateClosed, runHooks)
}
}()
他在goroutine的启动函数里面 专门做了recover, 用于捕获当前goroutine的异常, 那我们修改下我们的代码
func DealPanicInGV2() {
// 处理主Goroutine的异常
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
wg.Add(1)
go workV2()
wg.Wait()
}
func workV2() {
// 处理协程的异常
defer func() {
wg.Done()
if err := recover(); err != nil {
fmt.Println(err)
}
}()
arr := []int{0}
_ = arr[2]
}
克制的使用Goroutine, 回调也不错哦
我们再讲CSP的时候, 有个并行爬取网页的示例, 使用的是channel来获取传递数据的, 我们现在使用回调来改写
我们通过回调函数返回参数
type SiteRespCallBack func(SiteResp)
// 构造请求
func doSiteRequest(cb SiteRespCallBack, url string) {
res := SiteResp{}
startAt := time.Now()
defer func() {
res.Cost = time.Since(startAt).Milliseconds()
cb(res)
wg.Done()
}()
resp, err := client.Get(url)
if resp != nil {
res.Status = resp.StatusCode
}
if err != nil {
res.Err = err
return
}
// 站不处理结果
_, err = ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
res.Err = err
return
}
// res.Resp = string(byt)
}
主函数编写回调处理逻辑
func CallBackMode() {
endpoints := []string{
"https://www.baidu.com",
"https://segmentfault.com/",
"https://blog.csdn.net/",
"https://www.jd.com/",
}
// 一个endpoints返回一个结果, 缓冲可以确定
respChan := make(chan SiteResp, len(endpoints))
defer close(respChan)
ret := make([]SiteResp, 0, len(endpoints))
cb := func(resp SiteResp) {
ret = append(ret, resp)
}
// 并行爬取
for _, endpoints := range endpoints {
wg.Add(1)
go doSiteRequest(cb, endpoints)
}
// 等待结束
wg.Wait()
for _, v := range ret {
fmt.Println(v)
}
}
我们看看之前函数式编程的map
func TestMap(t *testing.T) {
list := []string{"abc", "def", "fqp"}
out := MapStrToUpper(list, func(item string) string {
return strings.ToUpper(item)
})
fmt.Println(out)
}
func MapStrToUpper(data []string, fn func(string) string) []string {
newData := make([]string, 0, len(data))
for _, v := range data {
newData = append(newData, fn(v))
}
return newData
}
如果fn 是一个耗时的操作, 可以使用goroutine把它异步化, 函数式编程 + goroutine 也是很不错的一个方向(函数式并发)
CSP不是全部
比如大名鼎鼎的actor模型:
更多的可以看看七周七并发模型