go标准库context原理及项目实践

由”并发控制”开始

golang语言层面提供了协程支持为我们开发高并发程序带来巨大便利,但是就因为太容易开辟协程引入了一个大部分时候我们都容易忽视的问题:协程泄露。

阅读下以下代码:

func main() {
	r := gin.Default()
	r.GET("/run", func(c *gin.Context) {
		for {
			fmt.Println("running")
			time.Sleep(time.Second)
		}
		c.JSON(200, gin.H{
			"message": "done",
		})
	})
	r.Run() 
}

这是使用 gin 框架实现的一个控制器逻辑,逻辑很简单。一个 http 请求会执行一个永远不会退出的一个死循环,用来模拟协程执行超时的情况。

思考:当 http 请求因为网关超时或者客户端主动中断请求的时候,这个控制器逻辑会退出吗?

答案是不会,在 go 语言的实现里,协程之间是平级的,并不存在父子关系,没有跟进程一样父进程退出会导致子进程退出类似的实现。也就是当启动一个协程的时候如果没有实现控制方法那么除非他任务执行完成主动退出,不然这个协程就会一直在运行中。这也就是上面请求虽然已经终止,但是控制逻辑一直继续执行的原因。

并发控制

为了解决以上问题,曾经出现过不少解决方案:

方案1:通过传入一个控制变量,需要协程退出时,修改该变量,协程内通过检测这个变量状态主动退出。但是要需要注意 go 语言中变量赋值不是并发安全的,所以可能会出现协程间读取变量值不一致情况。

方案2:定义一个 channel,给所有”子协程”(为了表达简略,本文内的”父协程”,“子协程”不是指协程的父子关系,被调用的协程称为”子协程”,发起调用的协程称为”父协程”)传入一个channel,当需要停止子协程时通过 channel 发送一条消息,协程内检测到 chan 内有消息时主动退出。

以上的解决方案粗看都能解决问题,但是细想一下这种情况: A协程需要调用B,C协程,B,C协程各自需要调用一些”子协程”。此时如果 A 发现 B 已经执行失败,C 的执行已经无意义,所以要求C以及C的所有”子协程”退出处理。

这个时候如果采用上面的方案,就不得不得定义很多状态来维持各个协程的状态。 这还是只是2层的情况,如果逻辑复杂出现更多依赖层的时候更是不可控制。

以上只是说了主动控制协程退出的处理,在 web 开发中还有个非常场景的需求:超时控制。

超时控制

在 web 开发中,经常会遇到一个请求依赖多个其他网络请求,并且需要对请求的各个阶段的超时时间进行控制。

比如一个 Http 请求依赖一个 mysql query请求,当某个慢 sql响应缓慢时,如果没有超时机制,将导致http 请求响应缓慢请求积压,在流量较高的场景下非常容易导致服务宕机。

这个时候如果有超时控制,就可以很好的止损,当 sql响应超时时,直接中断该请求避免影响整个服务缩小故障范围。

有需求的地方就有市场,那么有没有一个东西可以满足以上所有需求?铺垫了这么久终于轮到我们的主角登场了: context(上下文)。

一口气解决了以上所有问题,此外还提供了协程间上下文变量共享的功能。

context简介

context 是 go 官方在1.7版本引入标准库中的一个包,context 包主要用来简化并发(协程)控制。

context 包的核心数据结构为 Context 接口,接口定义如下:

type Context interface {
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key interface{}) interface{}
}

这四个方法的定义如下:

Deadline 会返回一个超时时间,routine获得了超时时间后,可以对某些io操作设定超时时间。 Done 会返回一个channel,当该context被取消的时候,该channel会被关闭,同时对应的使用该context的routine也应该结束并返回。 Err 返回 Context 结束原因,只有在 Done 返回的 channel 关闭的时候返回。 Value 可以让routine共享一些数据,当然获得数据是协程安全的。

根据 go 接口定义,实现方法即实现接口的定义,我们只需要给任意一个结构体实现这4个方法即可作为 context.Context 使用。例如 gin 框架中的 gin.Context 就是一个 context.Context 实现,代码如下:

func (c *Context) Deadline() (deadline time.Time, ok bool) {
	return
}
func (c *Context) Done() <-chan struct{} {
	return nil
}
func (c *Context) Err() error {
	return nil
}

func (c *Context) Value(key interface{}) interface{} {
	if key == 0 {
		return c.Request
	}
	if keyAsString, ok := key.(string); ok {
		val, _ := c.Get(keyAsString)
		return val
	}
	return nil
}

仔细观察可以发现4个方法中的前三个都是直接返回的 nil,这三个方法都和并发控制相关。

gin 框架本身因为没有实现以上方法,所以其实没有实现并发控制,即使我们使用的 gin 控制器参数的 gin.Context.Done 也并不能实现请求中断协程。

那么context包如何使用呢?

context包方法

context包包含4个主要方法,以下分别介绍:

WithCancel(parent Context)

传入继承一个父级context,并返回一个新的 context 和 cancel 函数。可以通过主动调用 cancel 函数给所有的子级 Context 发送终止信息。

WithDeadline(parent Context, d time.Time)

传入一个需要继承的父级 Context 和一个执行截止时间,返回一个新的Context和 cancel 函数。当执行到制定时间或者主动调用 cancel时,给所有子级 Context发送终止信号。

context.WithTimeout(parent Context, timeout time.Duration)

传入一个需要继承的父级 Context 和一个执行超时时间,返回一个新的Context和 cancel 函数。当执行超时或者主动调用 cancel时,给所有子级 Context将被撤销,对应的子级 Context 的 Done 将接收到 退出信号。

WithValue(parent Context, key, val interface{})

传入一个需要继承的父级 Context,返回一个包含 key/val 数据的context,用来在协程中实现数据同步。context 数据共享是并发安全的。

context 使用

context创建

在大部分 web 框架内使用 context 时,框架默认都给控制器层传入一个附加了 Value 信息的 Context ,此时可以使用该参数作为最上层的 Context。

如果需要新建一个 Context 作为最上层的 Context,可以使用 context.Background() 方法返回一个空的 Context。

ctx := context.Background()

此外,还有一个 context.Todo() 方法,当不确定创建的context用来做什么时可以使用 Todo()方法。

需要注意的是 Todo 和 Background 只是表达意义不同,代码层面都是实例化的同一个emptyCtx,就是说无论使用哪个,代码逻辑不会有任何区别。

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)
func Background() Context {
	return background
}
func TODO() Context {
	return todo
}

实现超时控制

实现超时控制需要使用上面介绍过的 context.WithTimeout方法。需要在”父协程”与”子协程”分别处理:

“父协程”内:

使用WithTimeout传入父级context 及超时时间产生新的子级context。将新的 context 传入将要控制的 “子协程” 。

“子协程”内:

在耗时处理时需要同时监听 context.Done 管道是否有消息,如果消息到达时终止任务退出协程。

按照这个方法我们将上面的 gin 部分的问题代码用 context加入超时控制如下所示:

func main() {
	r := gin.Default()
	r.GET("/run", func(c *gin.Context) {
	    // 创建一个超时时间为3秒的 context
		ctx, _ := context.WithTimeout(c, time.Second*3)
		// 模拟启动一个任务处理协程 传入 context
		go func(ctx context.Context) {
			for {
				select {
				case <-ctx.Done():
					fmt.Println("exit")
					return
				default:
				}
				// 执行任务中
				fmt.Println("do something")
				time.Sleep(time.Second)
			}
		}(ctx)

		c.JSON(200, gin.H{
			"message": "done",
		})
	})
	r.Run() 
}
细节分析

这个时候细心的同学可能会发现,如果在”do something” 这里执行超时了,那么这个超时控制岂不是没用了吗?

首先单独看这里的代码确实如此,但是别着急下结论让我们进一步分析一下: 在正常的代码逻辑处理中(死锁除外)比较耗时的无非分为2种,大量CPU运算 和 等待IO操作。

大量CPU运算必然用到大量的循环或者逐行的运算逻辑,那么问题就降级为如何退出循环或者终止运算,套用上面的方案即可。

对于IO操作,我们可以参考下标准库的 http.Request 请求库的实现,request请求层层追踪最终会执行到internal/poll/fd_poll_runtime.go,进入该函数之前会获取到context.Deadline的time.Time。 (在linux下,一切皆文件,所以理论上所有的IO最终都会执行到这里转换为对fd的读写操作)

//internal/poll/fd_poll_runtime.go
//为fd文件句柄设置读写deadline
func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
	var d int64
	if !t.IsZero() {
		d = int64(time.Until(t))
		if d == 0 {
			d = -1 // dont confuse deadline right now with no deadline
		}
	}
	if err := fd.incref(); err != nil {
		return err
	}
	defer fd.decref()
	if fd.pd.runtimeCtx == 0 {
		return ErrNoDeadline
	}
  //调用 go 运行时方法卫 pd 设置deadline
	runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
	return nil
}

继续追踪runtime_pollSetDeadline到go runtime 会发现如下代码:

// runtime/netpoll.go
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
    // 并发访问加锁
    lock(&pd.lock)
    if pd.closing {
        unlock(&pd.lock)
        return
    }

    rd0, wd0 := pd.rd, pd.wd
    combo0 := rd0 > 0 && rd0 == wd0

    // 计算过期时间点
    if d > 0 {
        d += nanotime()
        if d <= 0 {
            d = 1<<63 - 1
        }
    }
    // 将过期时间根据mode存到rd和wd上
    if mode == 'r' || mode == 'r'+'w' {
        pd.rd = d
    }
    if mode == 'w' || mode == 'r'+'w' {
        pd.wd = d
    }
    combo := pd.rd > 0 && pd.rd == pd.wd
    // timer回调函数
    rtf := netpollReadDeadline
    if combo {
        rtf = netpollDeadline
    }
    // 读timer
    if pd.rt.f == nil {
        if pd.rd > 0 {
            pd.rt.f = rtf
            pd.rt.when = pd.rd
            pd.rt.arg = pd
            pd.rt.seq = pd.rseq
          	// 设置一个timer监控 pd
            addtimer(&pd.rt)
        }
    } else if pd.rd != rd0 || combo != combo0 {
        // 重置当前正在进行中的计时器
        pd.rseq++
        if pd.rd > 0 {  // 修改计时器
            modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
        } else {    // 删除计时器
            deltimer(&pd.rt)
            pd.rt.f = nil
        }
    }
    // ...
}

这里可以看出go的runtime给文件fd操作增加了一个timer来监控当前读写时间是否已经到达deadline。

到这里我们已经可以基本确定context能实现对协程的并发控制。

实现cancel并发控制

实现并发控制需要使用上面介绍过的 context.WithCancel方法。需要在”父协程”与”子协程”分别处理:

“父协程”内:

使用WithTimeout传入父级context 新的子级context。将新的 context 传入将要控制的 “子协程” 。在需要控制所以”子协程”退出时,调用 cancel 方法即可。

“子协程”内:

在耗时处理时需要同时监听 context.Done 管道是否有消息,如果消息到达时终止任务退出协程。

按照这个方法我们将上面的 gin 部分的问题代码用 context实现并发控制如下所示:

func main() {
	r := gin.Default()
	r.GET("/run", func(c *gin.Context) {
	    // 创建一个带有 cancel 的 context
		ctx, cancel := context.WithCancel(c)
		// 模拟启动一个任务处理协程 传入 context
		go func(ctx context.Context) {
			for {
				select {
				case <-ctx.Done():
					fmt.Println("exit")
					return
				default:
				}
				// 执行任务中
				fmt.Println("do something")
				time.Sleep(time.Second)
			}
		}(ctx)
        // http 处理完成 通知所有任务退出
        cancel()
		c.JSON(200, gin.H{
			"message": "done",
		})
	})
	r.Run() 
}

context原理

在使用 context 包的相关方法时可以发现所有方法都都需要接收一个 parent 参数,我们以 context.WithCancel为例:

type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     chan struct{}         // created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}
// 方法入口
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}
func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

可以看到 首先初始化了一个 cancelCtx,并把传入的Context作为父级Context。cancelCtx 包含了 done 和 children 属性用来发送通知和保存子级Context。

紧接着调用了 propagateCancel 方法,最终返回了这个 cancelCtx 和他的 cancel 方法。

我们先看下propagateCancel实现:

func propagateCancel(parent Context, child canceler) {
	if parent.Done() == nil {
		return // parent is never canceled
	}
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

这里有一段关键代码 p.children[child] = struct{}{} ,将当前 context 插入到父级 context 的 Children内。看到这里就可以确定,context包的方法调用会产生一个树形结构,每一个父节点拥有所有的子节点信息。

然后再看下 cancelCtx 的 cancel 实现,我们在调用返回的 cancel 方法时最终调用的就是这里:

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done)
	}
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		removeChild(c.Context, c)
	}
}

首先给关闭了自己的通知管道 ,然后再遍历所有子级 Context,继续执行子级 Context的cancel 方法。

到这里,我们已经可以勾勒出包含多层context.cancelCtx的树形结构了:

当我们在任意节点执行 cancel 方法时,该节点及其所有子节点都将被撤销掉,监听对应节点的所有协程/循环等将将接收到 ctx.Done 信息后退出。

项目实践:kratos中的context

最佳实践

kratos 的项目分层一般分为接口层,服务层和数据访问层,请求过来时逐层调用,很多同学在开发时到服务层就扔掉了context,这个时候context的超时控制对这个项目来说其实是没有起到作用的。

所以最佳的做法是: - 在各个层内实现的方法第一个参数都指定为(ctx context.Context) - 在Context被传递到方法中,如果存在可能的异常超时等情况,应该对Context的Done信道(channel)进行监控,一旦该信道被关闭(即上层运行环境执行了 Cancel 或者超时),应主动终止对当前请求信息的处理,释放资源并返回。 - 在需要并发或者超时控制的时候调用子服务/方法时应该传入Context。 - 在调用子组件时应该尽量使用带有 context 参数的方法。例如使用标准库的 http包时,应该使用http.NewRequestWithContext而不是 http.NewRequest。 - 因为 context 可以在共享存储变量的特性,一般会将该请求的标识信息写入 context,比如 trace_id ,request_id 等。

常见用法

在项目内经常会添加很多异步/定时任务,大部分情况下这些任务都是不受管理的(封装较好的组件一般会有自己的context,通过调用 Close方法执行cancel),启动跟随服务启动,关闭跟随服务关闭,关闭的时机不可控。在有流量的时候重启服务可能会产生部分错误导致数据丢失。

比如启动了一个消费队列,消费队列内写入 mysql 数据库,在服务器关闭时,有可能 mysql 连接先关闭,但是消费队列还是有流量进入导致这部分流量写入失败。这个时候就可以用 context 来控制消费逻辑,确保在服务关闭时消费队列 与mysql同步关闭。

具体实现:在服务入口出定义一个全局的context,用于整个服务层面(区别与请求周期的 context)的组件控制。

ctx,cancel = context.WithCancel(context.Background())

在需要进行管理的方法内接收 全局的 context并实现退出逻辑,常见的包括循环和定时处理,这两种情况的退出逻辑实现如下:

控制循环退出:

for {
	select {
	case <-ctx.Done:
	    //end
		return
	default:
	}
	//job
}

控制定时任务退出:

	ticker := time.NewTicker(time.Second)
	for{
		select{
		case <-ticker.C:
			//job
		case <- ctx.Done:
		    ticker.Stop()
			// end
			return
		}
	}

问题1:deadline exceeded

刚开始使用 kartos 的同学应该都遇到一个错误”context deadline exceeded”,可能出现在某个请求的任何地方,根据上面的了解,我们可以判定肯定时kraots 的 context 超时了,我们看下 kratos Context 的实现:

func (engine *Engine) handleContext(c *Context) {
	var cancel func()
    .....
    .....
	ctx := metadata.NewContext(context.Background(), md)
	if tm > 0 {
		c.Context, cancel = context.WithTimeout(ctx, tm)
	} else {
		c.Context, cancel = context.WithCancel(ctx)
	}
	defer cancel()
	engine.prepareHandler(c)
	c.Next()
}

可以看出 kratos 的 context与 gin 的 context 相比,所有的 context 都继承自context.WithTimeout 或者context.WithCancel。

所以回到上面的问题,因为 kratos 的默认超时是 1s 。所以当某个http 请求完整链路超过1s时,所有该请求内的子请求都会被终止掉,如果是用的 mysql/redis/http/grpc 等子组件时就会抛出”context deadline exceeded”错误。

问题2:context invalid memory address

前几天一个后端的同学遇到一个诡异的问题,在 kratos 请求内新启动一个协程传入入了 context,访问接口完全正常,但是压测的时候一定会出现” invalid memory address”错误。模拟代码如下:

func ctrl(c *blademaster.Context) {
	go func(c context.Context) {
		doSomeThings(c)
	}(c)

	c.JSON(true, nil)
}
func doSomeThings(ctx context.Context) {
	time.Sleep(3 * time.Second)
	data := ctx.Value("aaa")
	fmt.Println(data)
}

单从这里看代码确实是没问题的,但是既然出现了肯定是什么地方给遗漏了,我们跟踪看下blademaster.Context的来源。

// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  // 取出 Context
	c := engine.pool.Get().(*Context)
	c.Request = req
	c.Writer = w
	c.reset()

	engine.handleContext(c)
  // 归还 Context
	engine.pool.Put(c)
}

注意这里用到了sync.Pool,就是说 kartos 内的 Context 都是从 sync.Pool 内取出的,在请求结束时进行归还。再回过头看上面的逻辑就可以找到问题原因了。

在上面的代码中,启动了一个协程来处理比较耗时的异步任务,任务中有用到 context.Value()来获取存储变量的操作。但是此时,框架已经将 Context 归还给了 sync.Pool,context 的状态是未知的,有可能已经被 sync.Pool 给释放被 GC 掉了,所以在压测时会出现指针异常。

知道问题根源就很容易解决了,只需要新生成一个 context,把 blademaster.Context内存储的数据复制到新的 context 内就可以了。

func ctrl(c *blademaster.Context) {
  // 这里只复制了trace数据
	t,_ := trace.FromContext(c)
	ctx := trace.NewContext(context.Background(),t)
	go func(c context.Context) {
		doSomeThings(c)
	}(ctx)

	c.JSON(true, nil)
}

但是需要注意这样做会让这个协程脱离框架的超时控制,如果还需要超时控制的话,使用 WithTimeout 继承一层即可。

另外直接从 *blademaster.Context 继承产生一个context 也是不可以的,因为使用 context.Value 会逐层往上查询,最终还是会调用作为顶级的 blademaster.Context.Value() ,一样会出现指针错误。


文档导航