go context 的使用

context 包 – 核心方法

context 包的核心 API 有四个: • context.WithValue:设置键值对,并且返 回一个新的 context 实例

  • context.WithCancel
  • context.WithDeadline
  • context.WithTimeout:

三者都返回一个 可取消的 context 实例,和取消函数

注意: context 实例不可变,每次都是新创建的

context 包提供核心方法4个:

方法 解释
Deadline 过期时间,如果ok 为false,说明没有十二指过期时间
Done 返回一个 channel,一般用于监听context实例的信号,比如过期,或者正常关闭
Err
Deadline 过期时间,如果ok 为false,说明没有设置过期时间
1
2
3
4
5
6
7
type Context interface {
    // 如果 ok 为 false 表示 没有设置过期时间
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key any) any
}

context 包- 安全传递数据

context包我们用来做2件事情:

  • 安全传递数据
  • 控制链路

安全传递数据,是指在请求执行上下文中线程安全地传递数据,依赖于 withValue 方法 因为Go 本身没有 thread-local 机制,所以大部分类似功能都是借助于context 实现

例子:

  • 链路追踪的 trace id
  • 压力测试标记位
  • 分库分表中间件中传递 sharding hint
  • orm 中间件传递 SQL hint
  • web框架传递上下文

父context 无法访问子context 添加的内容,如果逼不得已要在 子context 中获取到某些值,只能在 父context 放入一个 map,后面拿到map 进行修改,但是这种做法明显已经有问题了,说明代码非常糟糕,真要这样做建议重构部分代码,这样做不仅将传送的参数透明化了,还让代码难以维护。

valueCtx 的实现

valueCtx 用于存储 key-value 数据, 特点:

  • 典型的装饰器模式: 在已有 Context的基础上附加一个存储key-value 的功能
  • 只能存储 一个key,val , 为什么不用map?
  • map 要求是 comparable的key , 而有时候我们用的不是 comparable的key
  • context 包设计理念就是 context是不可变的
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// WithValue返回一个父类的副本,其中与键相关的值是
// val.
//
// 仅在请求范围内的数据中使用上下文值,这些数据在进程和//API中传输,而不是在函数中传递可选参数。
// APIs,而不是用于向函数传递可选参数。
//
// 提供的键必须是可比较的,并且不应该是类型为
//字符串或任何其他内置类型,以避免使用上下文的
// 使用上下文的包之间发生碰撞。WithValue的用户应该定义自己的
//键的类型。为了避免在分配给一个
//接口{}时,上下文键通常具有具体的类型
// struct{}。另外,导出的上下文键变量的静态
// 类型应该是一个指针或接口。
func WithValue(parent Context, key, val any) Context {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if key == nil {
		panic("nil key")
	}
    //https://sorcererxw.com/articles/go-comparable-type 
	if !reflectlite.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}



// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
	Context
	key, val any
}


func (c *valueCtx) Value(key any) any {
	if c.key == key {
		return c.val
	}
	return value(c.Context, key)
}

func value(c Context, key any) any {
	for { 
		switch ctx := c.(type) {
		case *valueCtx:
			if key == ctx.key {
				return ctx.val
			}
			c = ctx.Context
		case *cancelCtx:
			if key == &cancelCtxKey {
				return c
			}
			c = ctx.Context
		case *timerCtx:
			if key == &cancelCtxKey {
				return &ctx.cancelCtx
			}
			c = ctx.Context
		case *emptyCtx:
			return nil
		default:
			return c.Value(key)
		}
	}
}

context 其他用法

  1. time.AfterFunc

另外一种超时控制是采用 time.AfterFunc:一般这种用 法我们会认为是定时任务,而不是超时控制。 这种超时控制有两个弊端: • 如果不主动取消,那么 AfterFunc 是必然会执行的 • 如果主动取消,那么在业务正常结束到主动取消之 间,有一个短时间的时间差

1
2
3
4
timer := time.AfterFunc(time.Second ,func() { fmt.Println("--")})

fmt.Println(" do something ")
timer.Stop()

DB.conn 超时控制

数据库连接池就有这样用, 连接是延迟关闭的,比如用完数据库连接,不会自动关闭,而会返回池子里面等,下一次执行的时候,如果 连接已经关闭就释放连接,没有就继续用

超时控制有至少2个分支

  • 正常分支
  • 超时分支
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
		select {
		case <-ctx.Done():
			// 删除连接请求,并确保删除后没有发送任何值
			// 移除后对其进行处理。 [客户端上层超时了,要返回]
			db.mu.Lock()
			delete(db.connRequests, reqKey)
			db.mu.Unlock()

			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
			select {
			default:
			case ret, ok := <-req: // 从 channel中获取 连接
				if ok && ret.conn != nil {
 // 如果有的话执行 db.putConn(ret.conn, ret.err, false) ,目的是释放掉这个连接
					db.putConn(ret.conn, ret.err, false)
				}
			}
			return nil, ctx.Err()
 // 尝试从 reqchannel 中取出连接
		case ret, ok := <-req: //...

释放连接原理

连接被是过后是需要被释放的

释放连接的逻辑封装在DB实例中

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
  // 释放连接的操作加锁
	db.mu.Lock()
  // debug的信息
	if !dc.inUse {
		if debugGetPut {
			fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
		}
		panic("sql: connection returned that was never out")
	}
	if debugGetPut {
		db.lastPut[dc] = stack()
	}
  // 标记driverConn处理不可用的状态
	dc.inUse = false

	for _, fn := range dc.onPut {
		fn()
	}
	dc.onPut = nil

  // 本方法的入参中有参数err
  // 当会话获取出这个连接后,发现这个连接过期了、或者被标记上来lastErr时,再调用这个putConn方法时,同时会将这个错误传递进来,然后在这里判断,当出现坏掉的连接时就不直接把这个连接放回空闲连接池了。
	if err == driver.ErrBadConn {
		// Don't reuse bad connections.
		// Since the conn is considered bad and is being discarded, treat it
		// as closed. Don't decrement the open count here, finalClose will
		// take care of that.
    // 这个方法的作用如下:
    // 他会去判断当前DB维护的map的容量,也就是前面提到的那种情况:当DB允许打开连接,但是现在的连接数已经达到当前DB允许打开的最大连接数上限了,那么针对接下来想要获取连接的请求的处理逻辑就是,构建一个req channel,放入connRequests这个map中,表示他们正在等待连接的建立。
    // 换句话说,这时系统时繁忙的,业务处于高峰,那么问题来了,现在竟然出现了一个坏掉的连接,那为了把对业务线的影响降到最低,是不是得主动新建一个新的连接放到空闲连接池中呢?
    // 	db.maybeOpenNewConnections() 函数主要干的就是这个事。
    // 	方法详情如下
    /*
    	func (db *DB) maybeOpenNewConnections() {
					numRequests := len(db.connRequests)
					if db.maxOpen > 0 {
						numCanOpen := db.maxOpen - db.numOpen
					if numRequests > numCanOpen {
						numRequests = numCanOpen
					}
			}
			for numRequests > 0 {
						db.numOpen++ // optimistically
						numRequests--
						if db.closed {
							return
						}
				  // 它只是往这个	openerCh channel中写入一个空的结构体,会有专门的协程负责创建连接
					db.openerCh <- struct{}{}
			}
		}
    */
		db.maybeOpenNewConnections()
    //  解锁,关闭连接,返回
		db.mu.Unlock()
		dc.Close()
		return
	}
	if putConnHook != nil {
		putConnHook(db, dc)
	}
  // 如果DB已经关闭了,标记 resetSession为 false
	if db.closed {
		// Connections do not need to be reset if they will be closed.
		// Prevents writing to resetterCh after the DB has closed.
    // 当DB都已经关了,意味着DB里面的连接池都没有了,那当然不需要关闭连接池中的连接了~
		resetSession = false
	}
  // 如果DB没有关闭的话,进入if代码块
	if resetSession {
    // 将dricerConn中的Conn验证转换为driver.SessionResetter
		if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
      // 在此处锁定driverConn,以便在连接重置之前不会释放。
      // 必须在将连接放入池之前获取锁,以防止在重置之前将其取出
			dc.Lock()
		}
	}
  // 真正将连接放回空闲连接池中
  // 满足connRequest或将driverConn放入空闲池并返回true或false
  /*
  	func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
  			// 检测如果DB都关闭块,直接返回flase
				if db.closed {
					return false
				}
				// 如果DB当前打开的连接数大于DB能打开的最大的连接数,返回false
				if db.maxOpen > 0 && db.numOpen > db.maxOpen {
					return false
				}
				//如果等待获取连接的map中有存货
		 if c := len(db.connRequests); c > 0 {
		 		
				var req chan connRequest
				var reqKey uint64
				// 取出map中的第一个key
				for reqKey, req = range db.connRequests {
					break
				}
				// 将这个key,value再map中删除
				delete(db.connRequests, reqKey) // Remove from pending requests.
				// 重新标记这个连接是可用的状态
				if err == nil {
					dc.inUse = true
				}
				// 将这个连接放入到 req channel中,给等待连接到会话使用
				req <- connRequest{
					conn: dc,
					err:  err,
				}
				return true
				
		// 来到这个if,说明此时没有任何请求在等待获取连接,并且没有发生错误,DB也没有关闭
		} else if err == nil && !db.closed {
				// 比较当前空闲连接池的大小(默认是2) 和 freeConn空闲连接数的数量
				// 意思是,如果空闲的连接超出了这个规定的阈值,空闲连接是需要被收回的。
				if db.maxIdleConnsLocked() > len(db.freeConn) {
				  // 收回
					db.freeConn = append(db.freeConn, dc)
					db.startCleanerLocked()
					return true
				}
				// 如果空闲连接还没到阈值,保留这个连接当作空闲连接
				db.maxIdleClosed++
		}		
				// 收回空闲连接返回false
				return false
}
  */
  
  // 如果将连接成功放入了空闲连接池,或者将连接成功给了等待连接到会话使用,此处返回true
  // 收回空闲连接返回false
  // 代码详情就是在上面的这段注释中
	added := db.putConnDBLocked(dc, nil)
	db.mu.Unlock()
	
  // 如果
	if !added {
    // 如果DB没有关闭,进入if
		if resetSession {
			dc.Unlock()
		}
		dc.Close()
		return
	}
  // 重新校验,如果连接关闭了,进入if
	if !resetSession {
		return
	}
  
  // 如果负责重置 conn状态的线程阻塞住了,那么标记这个driverConn为lastErr
	select {
	default:
		// If the resetterCh is blocking then mark the connection
		// as bad and continue on.
		dc.lastErr = driver.ErrBadConn
		dc.Unlock()
	case db.resetterCh <- dc:
	}
}

mysql 连接数限制

数据库连接池的大小到底设置为多少,得根据业务流量以及数据库所在机器的性能综合考虑。

mysql连接数到配置在 my.cnf中,具体的参数是max_connections。

当业务流量异常猛烈时,很可能会出现这个问题:to many connections

对于操纵系统内核来说,当他接受到一个tcp请求就会在本地创建一个由文件系统管理的socket文件。在linux中我们将它叫做文件句柄。

linux为防止单一进程将系统资源全部耗费掉,会限制进程最大能打开的连接数为1024,这意味着,哪怕通过改配置文件,将mysql能打开的连接池设置为9999,事实上它能打开的文件数最多不会超过1024。

这个问题也好解决:

命令:设置单个进程能打开的最大连接数为65535

1
2
3
4
5
6
7
ulimit -HSn 65535
ulimit -a 

# core file size: 进程崩溃是转储文件大小限制
# man loaded memort 最大锁定内存大小
# open file 能打开的文件句柄数
# 这些变量定义在 /etc/security/limits.conf配置文件中。

参考文章

参考文章

http 库中 context的使用

http.Request 使用 context作为字段

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type Request struct {
	// ctx是客户端或服务器的上下文。它只应该
	// 通过使用WithContext复制整个请求来修改。
	// 它没有被导出,以防止人们使用错误的Context
	// 和突变同一请求的调用者所持有的上下文。
	ctx context.Context
}
func (r *Request) WithContext(ctx context.Context) *Request {
	if ctx == nil {
		panic("nil context")
	}
	r2 := new(Request)
	*r2 = *r
	r2.ctx = ctx
    //遗留行为;TODO:尝试删除。问题23544
	r2.URL = cloneURL(r.URL) // legacy behavior; TODO: try to remove. Issue 23544
	return r2
}
1
2
3
4
5

func main()  {
    req := http.Request{}
    newCtx := req.WithContext(context.TODO())
}

error groups 使用

通过 errorGroup 获取多个任务执行中是否有任意失败

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
	"context"
	"errors"
	"fmt"

	"golang.org/x/sync/errgroup"
)

func main() {
	eg := errgroup.Group{}
	ctx := context.TODO()
	_ = ctx
	for i := 0; i < 10; i++ {
		i := i
		eg.Go(func() error {
			if i == 8 {
				return errors.New("error")
			}
			return ctx.Err()
		})
	}
	err := eg.Wait()
	fmt.Println(err)
}

cancelCtx 的实现原理

cancelCtx 是典型的装饰器模式: 在已有的Context基础上,添加取消功能。

核心功能:

  • Done 方法是通过类似于 double-check 的机制写的。这种原子操作和说结合的用法比较罕见。
  • 利用 children来维护所有的衍生节点,难点在于他是如何维护衍生节点的
1
2
3
4
5
6
7
8
9
// 一个cancelCtx可以被取消。当被取消时,它也会取消任何实现了canceller的儿童
// 实现canceller。
type cancelCtx struct {
    Context
	mu sync.Mutex // 保护以下字段
	done atomic.Value // of chan struct{}, lazily created, closed by first cancel call
	children map[canceller]结构{}// 被第一次取消调用设置为nil
	err error // 被第一次取消调用设置为非零
}

children:核心是儿子把自己加进去父亲的 children 字段里面。 但是因为 Context 里面存在非常多的层级, 所以父亲不一定是 cancelCtx,因此本质上 是找最近属于 cancelCtx 类型的祖先,然后 儿子把自己加进去。 cancel 就是遍历 children,挨个调用 cancel。然后儿子调用孙子的 cancel,子子 孙孙无穷匮也。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if p, ok := parentCancelCtx(parent); ok { // 最近的 cancelCtx祖先,将 child 加进祖先的child里面
    p.mu.Lock()
    if p.err != nil {
        // parent has already been canceled
        child.cancel(false, p.err)
    } else {
        // 得到 最近的 祖先 cancelCtx 
        if p.children == nil {
            p.children = make(map[canceler]struct{})
        }
        //将 child 加入
        p.children[child] = struct{}{}
    }
    p.mu.Unlock()
} else {
    atomic.AddInt32(&goroutines, +1)
    go func() {
        select {
            //找不到祖先的 cancelCtx 就 监听 parent 或者自己的信号
        case <-parent.Done():
            child.cancel(false, parent.Err())
        case <-child.Done(): //子节点调用了 cancelFunc 获取到取消信号了
        }
    }()
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (c *cancelCtx) Done() <-chan struct{} {
	d := c.done.Load()
	if d != nil {
		return d.(chan struct{})
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	d = c.done.Load()
	if d == nil {
		d = make(chan struct{})
		c.done.Store(d)
	}
	return d.(chan struct{})
}

cancelCtx 实现原理:

  • 关闭所有 的 children
  • 关闭 done 这个 channel : 这个符合谁创建谁关闭原则
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// cancel关闭c.done,取消c的每个子节点,并且,如果
// removeFromParent为true,将c从其父的孩子中移除。 true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
	d, _ := c.done.Load().(chan struct{})
	if d == nil {
		c.done.Store(closedchan)
	} else {
		close(d)
	}
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
        // 注意:在持有父方锁的同时获得子方的锁。
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		removeChild(c.Context, c)
	}
}

timerCtx 实现原理

timerCtx 也是装饰器模式:在已有 cancelCtx 的基础上增加了超时的功能。 实现要点:

  • WithTimeout 和 WithDeadline 本质一样
  • WithDeadline 里面,在创建 timerCtx 的时候 利用 time.AfterFunc 来实现超时
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
	propagateCancel(parent, c)
	dur := time.Until(d)
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // 截止日期已过

		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
        // 开启定时任务,超时自己主动 cancel自己
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }

面试要点

  • context.Context 使用创建: 上下文传递和超时控制
  • context.Context 原理:
    • 父亲如何控制儿子: 通过儿子主动加入到父亲的children里面,父亲只需要遍历就可以
    • valueCtx 和 timeCtx原理

context 包– 使用注意事项

  • 一般只用做方法参数,而且是作为第一个参数;
  • 所有公共方法,除非是 util,helper 之类的方法,否则都加上 context 参数;
  • 不要用作结构体字段,除非你的结构体本身也是表达一个上下文的概念