context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。
context 用来解决 goroutine 之间退出通知
、元数据传递
的功能。
控制并发有两种经典的方式,一种是WaitGroup,另外一种就是Context
Value函数并没有任何保证,编译器不会检查传进来的参数是否是合理。
Context 接口
Context接口定义
代码语言:javascript复制type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key any) any
}
Context 核心方法
Context 接口中有四个核心方法:Deadline()、Done()、Err()、Value()。
- Deadl()
Deadline() (deadline time.Time, ok bool) 方法返回 Context 的截止时间,表示在这个时间点之后,Context 会被自动取消。如果 Context 没有设置截止时间,该方法返回一个零值 time.Time 和一个布尔值 false。
代码语言:javascript复制deadline, ok := ctx.Deadline()
if ok {
// Context 有截止时间
} else {
// Context 没有截止时间
}
- Done()
Done() 方法返回一个只读通道,当 Context 被取消时,该通道会被关闭。可以通过监听这个通道来检测 Context 是否被取消。如果 Context 永不取消,则返回 nil。
代码语言:javascript复制select {
case <-ctx.Done():
// Context 已取消
default:
// Context 尚未取消
}
- Err()
Err() 方法返回一个 error 值,表示 Context 被取消时产生的错误。如果 Context 尚未取消,该方法返回 nil。
代码语言:javascript复制if err := ctx.Err(); err != nil {
// Context 已取消,处理错误
}
- Value()
Value(key any) any 方法返回与 Context 关联的键值对,一般用于在 Goroutine 之间传递请求范围内的信息。如果没有关联的值,则返回 nil。
代码语言:javascript复制value := ctx.Value(key)
if value != nil {
// 存在关联的值
}
添加值
context.WithValue()
代码语言:javascript复制ctx := context.WithValue(parentCtx, "username", "Rolle")
取消Context
context.WithCancel()
context.WithCancel(parent Context) (ctx Context, cancel CancelFunc) 函数接收一个父 Context,返回一个新的子 Context 和一个取消函数,当取消函数被调用时,子 Context 会被取消,同时会向子 Context 关联的 Done() 通道发送取消信号,届时其衍生的子孙 Context 都会被取消。这个函数适用于手动取消操作的场景。
代码语言:javascript复制ctx, cancelFunc := context.WithCancel(parentCtx)
defer cancelFunc()
取消原因
context.WithCancelCause() 与 context.Cause()
context.WithCancelCause(parent Context) (ctx Context, cancel CancelCauseFunc) 函数是 Go 1.20 版本才新增的,其功能类似于 context.WithCancel(),但是它可以设置额外的取消原因,也就是 error 信息,返回的 cancel 函数被调用时,需传入一个 error 参数。
代码语言:javascript复制ctx, cancelFunc := context.WithCancelCause(parentCtx)
defer cancelFunc(errors.New("原因"))
context.Cause(c Context) error 函数用于返回取消 Context 的原因,即错误值 error。如果是通过 context.WithCancelCause() 函数返回的取消函数 cancelFunc(myErr) 进行的取消操作,我们可以获取到 myErr 的值。否则,我们将得到与 c.Err() 相同的返回值。如果 Context 尚未被取消,将返回 nil。
代码语言:javascript复制err := context.Cause(ctx)
context.WithDeadline()
context.WithDeadline(parent Context, d time.Time) (Context, CancelFunc) 函数接收一个父 Context 和一个截止时间作为参数,返回一个新的子 Context。当截止时间到达时,子 Context 其衍生的子孙 Context 会被自动取消。这个函数适用于需要在特定时间点取消操作的场景。
代码语言:javascript复制deadline := time.Now().Add(time.Second * 2)
ctx, cancelFunc := context.WithTimeout(parentCtx, deadline)
defer cancelFunc()
context.WithTimeout()
context.WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) 函数和 context.WithDeadline() 函数的功能是一样的,其底层会调用 WithDeadline() 函数,只不过其第二个参数接收的是一个超时时间,而不是截止时间。这个函数适用于需要在一段时间后取消操作的场景。
代码语言:javascript复制ctx, cancelFunc := context.WithTimeout(parentCtx, time.Second * 2)
defer cancelFunc()
Context 的使用场景
传递共享数据
编写中间件函数,用于向 HTTP 处理链中添加处理请求 ID 的功能。
代码语言:javascript复制type key int
const (
requestIDKey key = iota
)
func WithRequestId(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// 从请求中提取请求ID和用户信息
requestID := req.Header.Get("X-Request-ID")
// 创建子 context,并添加一个请求 Id 的信息
ctx := context.WithValue(req.Context(), requestIDKey, requestID)
// 创建一个新的请求,设置新 ctx
req = req.WithContext(ctx)
// 将带有请求 ID 的上下文传递给下一个处理器
next.ServeHTTP(rw, req)
})
}
传递取消信号、结束任务
启动一个协程,接受到取消信号就停止工作
代码语言:javascript复制package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, cancelFunc := context.WithCancel(context.Background())
go Working(ctx)
time.Sleep(3 * time.Second)
cancelFunc()
// 等待一段时间,以确保工作协程接收到取消信号并退出
time.Sleep(1 * time.Second)
}
func Working(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("done...")
return
default:
fmt.Println("ing...")
}
}
}
在上面的示例中,创建了一个 Working 函数,它会不断执行工作任务。使用 context.WithCancel 创建了一个上下文 ctx 和一个取消函数 cancelFunc。然后,启动了一个工作协程,并将上下文传递给它。 在主函数中,需要等待一段时间(3 秒)模拟业务逻辑的执行。然后,调用取消函数 cancelFunc,通知工作协程停止工作。工作协程在每次循环中都会检查上下文的状态,一旦接收到取消信号,就会退出循环。 最后,等待一段时间(1 秒),以确保工作协程接收到取消信号并退出。
超时控制
模拟耗时操作,超时控制
代码语言:javascript复制package main
import (
"context"
"fmt"
"time"
)
func main() {
// 使用 WithTimeout 创建一个带有超时的上下文对象
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// 在另一个 goroutine 中执行耗时操作
go func() {
// 模拟一个耗时的操作,例如数据库查询
time.Sleep(5 * time.Second)
cancel()
}()
select {
case <-ctx.Done():
fmt.Println("操作已超时")
case <-time.After(10 * time.Second):
fmt.Println("操作完成")
}
}
执行结果
代码语言:javascript复制操作已超时
在上面的例子中,首先使用 context.WithTimeout() 创建了一个带有 3 秒超时的上下文对象 ctx, cancel := context.WithTimeout(ctx, 3*time.Second)。 接下来,在一个新的 goroutine 中执行一个模拟的耗时操作,例如等待 5 秒钟。当耗时操作完成后,调用 cancel() 方法来取消超时上下文。 最后,在主 goroutine 中使用 select 语句等待超时上下文的完成信号。如果在 3 秒内耗时操作完成,那么会输出 “操作完成”。如果超过了 3 秒仍未完成,超时上下文的 Done() 通道会被关闭,输出 “操作已超时”。
同时启动多个 goroutine 进行任务处理时,可以使用 Context 来控制这些 goroutine 的执行。在每个 goroutine 中,都可以检测 Context 对象是否被取消,如果是,则退出 goroutine 的执行,否则继续执行。
代码语言:javascript复制package main
import (
"context"
"fmt"
"sync"
)
func worker(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
default:
fmt.Println("work")
case <-ctx.Done():
return
}
}
}
func main() {
parent := context.Background()
ctx, cancel := context.WithCancel(parent)
var wg sync.WaitGroup
for i := 0; i < 3; i {
wg.Add(1)
go worker(ctx, &wg)
}
cancel()
wg.Wait()
}
什么是WaitGroup
它是一种控制并发的方式,它的这种方式是控制多个goroutine同时完成。
代码语言:javascript复制func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
time.Sleep(2*time.Second)
fmt.Println("first")
wg.Done()
}()
go func() {
time.Sleep(2*time.Second)
fmt.Println("second")
wg.Done()
}()
wg.Wait()
fmt.Println("all done")
}
一定要例子中的2个goroutine同时做完,才算是完成
可能会有这么一种场景:需要我们主动的通知某一个goroutine结束。比如开启一个后台goroutine一直做事情,比如监控,定时任务等现在不需要了,就需要通知这个goroutine结束
代码语言:javascript复制func main() {
stop := make(chan bool)
go func() {
for {
select {
case <-stop:
fmt.Println("break")
return
default:
fmt.Println("watch ing")
time.Sleep(1 * time.Second)
}
}
}()
time.Sleep(5 * time.Second)
fmt.Println("stop")
stop <- true
fmt.Println(5 * time.Second)
}
定义一个stop的chan,通知他结束后台goroutine。实现也非常简单,在后台goroutine中,使用select判断stop是否可以接收到值,如果可以接收到,就表示可以退出停止了;如果没有接收到,就会执行default里的监控逻辑,继续监控,只到收到stop的通知。 有了以上的逻辑,就可以在其他goroutine种,给stop chan发送值了,例子中是在main goroutine中发送的,控制让这个监控的goroutine结束。
如果有一层层的无穷尽的goroutine,不太好控制
代码语言:javascript复制func main() {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("stop,break...")
return
default:
fmt.Println("goroutine watching...")
time.Sleep(2 * time.Second)
}
}
}(ctx)
time.Sleep(10 * time.Second)
fmt.Println("all done")
cancel()
// 为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
重写,就是把原来的chan stop 换成Context,使用Context跟踪goroutine,以便进行控制,比如结束等。 context.Background() 返回一个空的Context,这个空的Context一般用于整个Context树的根节点。然后使用context.WithCancel(parent)函数,创建一个可取消的子Context,然后当作参数传给goroutine使用,这样就可以使用这个子Context跟踪这个goroutine。 在goroutine中,使用select调用<-ctx.Done()判断是否要结束,如果接受到值的话,就可以返回结束goroutine了;如果接收不到,就会继续进行监控。 那么是如何发送结束指令的呢?这就是示例中的cancel函数啦,它是我们调用context.WithCancel(parent)函数生成子Context的时候返回的,第二个返回值就是这个取消函数,它是CancelFunc类型的。我们调用它就可以发出取消指令,然后我们的监控goroutine就会收到信号,就会返回结束。
Context控制多个goroutine
代码语言:javascript复制func main() {
ctx, cancel := context.WithCancel(context.Background())
go watch(ctx,"【监控1】")
go watch(ctx,"【监控2】")
go watch(ctx,"【监控3】")
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
// 为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println(name,"监控退出,停止了...")
return
default:
fmt.Println(name,"goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}
启动了3个监控goroutine进行不断的监控,每一个都使用了Context进行跟踪,当使用cancel函数通知取消时,这3个goroutine都会被结束。这就是Context的控制能力,它就像一个控制器一样,按下开关后,所有基于这个Context或者衍生的子Context都会收到通知,这时就可以进行清理操作了,最终释放goroutine,这就优雅的解决了goroutine启动后不可控的问题。
如果Context取消的时候,我们就可以得到一个关闭的chan,关闭的chan是可以读取的,所以只要可以读取的时候,就意味着收到Context取消的信号了,以下是这个方法的经典用法。
代码语言:javascript复制func Stream(ctx context.Context, out chan<- Value) error {
for {
v, err := DoSomething(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case out <- v:
}
}
}
WithValue传递元数据
代码语言:javascript复制var key string="name"
func main() {
ctx, cancel := context.WithCancel(context.Background())
// 附加值
valueCtx:=context.WithValue(ctx,key,"【监控1】")
go watch(valueCtx)
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
// 为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context) {
for {
select {
case <-ctx.Done():
// 取出值
fmt.Println(ctx.Value(key),"监控退出,停止了...")
return
default:
// 取出值
fmt.Println(ctx.Value(key),"goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}
通过传递参数的方式,把name的值传递给监控函数。在这个例子里,我们实现一样的效果,但是通过的是Context的Value的方式。可以使用context.WithValue方法附加一对K-V的键值对,这里Key必须是等价性的,也就是具有可比性;Value值要是线程安全的。 这样我们就生成了一个新的Context,这个新的Context带有这个键值对,在使用的时候,可以通过Value方法读取ctx.Value(key)。 记住,使用WithValue传值,一般是必须的值,不要什么值都传递。
代码语言:javascript复制package main
import (
"context"
"fmt"
)
func main() {
ctx := context.Background()
process(ctx)
ctx = context.WithValue(ctx, "traceId", "rolle")
process(ctx)
}
func process(ctx context.Context) {
traceId, ok := ctx.Value("traceId").(string)
if ok {
fmt.Printf("process over. trace_id=%sn", traceId)
} else {
fmt.Printf("process over. no trace_idn")
}
}
运行结果
代码语言:javascript复制process over. no trace_id
process over. trace_id=rolle
Context 使用原则
- 不要把Context放在结构体中,要以参数的方式传递
- 以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。
- 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
- Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递
- Context是线程安全的,可以放心的在多个goroutine中传递
超时控制
- 通过context的WithTimeout设置一个有效时间为800毫秒的context。
- 该context会在耗尽800毫秒后或者方法执行完成后结束,结束的时候会向通道ctx.Done发送信号。
- 有人可能要问,你这里已经设置了context的有效时间,为什么还要加上这个time.After呢?
这是因为该方法内的context是自己申明的,可以手动设置对应的超时时间,但是在大多数场景,这里的ctx是从上游一直传递过来的,对于上游传递过来的context还剩多少时间,我们是不知道的,所以这时候通过time.After设置一个自己预期的超时时间就很有必要了。 注意,这里要记得调用cancel(),不然即使提前执行完了,还要傻傻等到800毫秒后context才会被释放。 总结
上面的超时控制是搭配使用了ctx.Done和time.After。 Done通道负责监听context啥时候完事,如果在time.After设置的超时时间到了,你还没完事,那我就不等了,执行超时后的逻辑代码。
代码语言:javascript复制func AsyncCall() {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*800))
defer cancel()
go func(ctx context.Context) {
// 发送HTTP请求
}()
select {
case <-ctx.Done():
fmt.Println("call successfully!!!")
return
case <-time.After(time.Duration(time.Millisecond * 900)):
fmt.Println("timeout!!!")
return
}
}
使用通道
代码语言:javascript复制func AsyncCall() {
ctx := context.Background()
done := make(chan struct{}, 1)
go func(ctx context.Context) {
// 发送HTTP请求
done <- struct{}{}
}()
select {
case <-done:
fmt.Println("call successfully!!!")
return
case <-time.After(time.Duration(800 * time.Millisecond)):
fmt.Println("timeout!!!")
return
}
}
- 这里主要利用通道可以在协程之间通信的特点,当调用成功后,向done通道发送信号。
- 监听Done信号,如果在time.After超时时间之前接收到,则正常返回,否则走向time.After的超时逻辑,执行超时逻辑代码。
- 这里使用的是通道和time.After组合,也可以使用通道和time.NewTimer组合。
子父context
代码语言:javascript复制package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx := context.Background()
before := time.Now()
preCtx, _ := context.WithTimeout(ctx, 100*time.Millisecond)
go func() {
childCtx, _ := context.WithTimeout(preCtx, 300*time.Millisecond)
select {
case <-childCtx.Done():
after := time.Now()
fmt.Println("child during:", after.Sub(before).Milliseconds())
}
}()
select {
case <-preCtx.Done():
after := time.Now()
fmt.Println("pre during:", after.Sub(before).Milliseconds())
}
}
举一个例子来说明一下 Context 中的级联退出。下面的代码中 childCtx 是 preCtx 的子 Context,其设置的超时时间为 300ms。但是 preCtx 的超时时间为 100 ms,因此父 Context 退出后,子 Context 会立即退出,实际的等待时间只有 100ms。
当把 preCtx 的超时时间修改为 500ms 时:
代码语言:javascript复制preCtx ,_:= context.WithTimeout(ctx,500*time.Millisecond)
从新的输出中可以看出,子协程的退出不会影响父协程的退出。
从上面这个例子可以看出,父 Context 的退出会导致所有子 Context 的退出,而子 Context 的退出并不会影响父 Context。
参考 link