锁是一种同步机制,用于在多任务环境中限制资源的访问,以满足互斥需求。
go源码sync包中经常用于同步操作的方式:
* 原子操作
* 互斥锁
* 读写锁
* waitgroup
我们着重来分析下互斥锁和读写锁.
互斥锁:
下面是互斥锁的数据结构:
~~~go
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32 // 互斥锁上锁状态枚举值如下所示
sema uint32 // 信号量,向处于Gwaitting的G发送信号
}
const (
mutexLocked = 1 << iota // 值为1,表示在state中由低向高第1位,意义:锁是否可用,0可用,1不可用,锁定中
mutexWoken // 值为2,表示在state中由低向高第2位,意义:mutex是否被唤醒
mutexStarving // 当前的互斥锁进入饥饿状态;
mutexWaiterShift = iota //值为2,表示state中统计阻塞在此mutex上goroutine的数目需要位移的偏移量
starvationThresholdNs = 1e6
~~~
state和sema两个加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁。
互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 等待互斥锁的释放.
[![](https://github.com/KeKe-Li/data-structures-questions/raw/master/src/images/138.jpg)](https://github.com/KeKe-Li/data-structures-questions/blob/master/src/images/138.jpg)
在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:
* mutexLocked 表示互斥锁的锁定状态;
* mutexWoken 表示从正常模式被从唤醒;
* mutexStarving 当前的互斥锁进入饥饿状态;
* waitersCount 当前互斥锁上等待的 Goroutine 个数;
sync.Mutex 有两种模式,正常模式和饥饿模式。
在正常模式下,锁的等待者会按照先进先出的顺序获取锁。
但是刚被唤起的`Goroutine`与新创建的`Goroutine`竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被饿死。
饥饿模式是在 Go 语言 1.9 版本引入的优化的,引入的目的是保证互斥锁的公平性(Fairness)。
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。
如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。
相比于饥饿模式,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。
互斥锁的加锁是靠 sync.Mutex.Lock 方法完成的, 当锁的状态是 0 时,将`mutexLocked`位置成 1:
~~~go
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
~~~
如果互斥锁的状态不是 0 时就会调用`sync.Mutex.lockSlow`尝试通过自旋(Spinnig)等方式等待锁的释放,
这个方法是一个非常大 for 循环,它获取锁的过程:
1. 判断当前 Goroutine 能否进入自旋;
2. 通过自旋等待互斥锁的释放;
3. 计算互斥锁的最新状态;
4. 更新互斥锁的状态并获取锁;
那么互斥锁是如何判断当前 Goroutine 能否进入自旋等互斥锁的释放,是通过它的lockSlow方法, 由于自旋是一种多线程同步机制,所以呢当前的进程在进入自旋的过程中会一直保持对 CPU 的占用,持续检查某个条件是否为真。 通常在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用得当会对性能带来很大的增益,但是往往使用的不得当就会拖慢整个程序.
所以 Goroutine 进入自旋的条件非常苛刻:
* 互斥锁只有在普通模式才能进入自旋;
* `runtime.sync_runtime_canSpin`需要返回 true: a. 需要运行在多 CPU 的机器上; b. 当前的Goroutine 为了获取该锁进入自旋的次数小于四次; c. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
一旦当前 Goroutine 能够进入自旋就会调用`runtime.sync_runtime_doSpin`和`runtime.procyield`并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间.
处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。
通过几个不同的条件分别会更新 state 字段中存储的不同信息,`mutexLocked`、`mutexStarving`、`mutexWoken`和`mutexWaiterShift`:
~~~go
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
new &^= mutexWoken
}
~~~
计算了新的互斥锁状态之后,就会使用 CAS 函数 sync/atomic.CompareAndSwapInt32 更新该状态:
~~~go
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // 通过 CAS 函数获取了锁
}
...
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
~~~
如果我们没有通过 CAS 获得锁,会调用`runtime.sync_runtime_SemacquireMutex`使用信号量保证资源不会被两个 Goroutine 获取。
`runtime.sync_runtime_SemacquireMutex`会在方法中不断调用尝试获取锁并休眠当前 Goroutine 等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,`sync.Mutex.Lock`方法的剩余代码也会继续执行。
在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环.
在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出.
互斥锁的解锁过程`sync.Mutex.Unlock`与加锁过程相比就很简单,该过程会先使用`sync/atomic.AddInt32`函数快速解锁,这时会发生下面的两种情况:
* 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
* 如果该函数返回的新状态不等于 0,这段代码会调用`sync.Mutex.unlockSlow`方法开始慢速解锁:
~~~go
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
~~~
`sync.Mutex.unlockSlow`方法首先会校验锁状态的合法性, 如果当前互斥锁已经被解锁过了就会直接抛出异常`sync: unlock of unlocked mutex`中止当前程序。
在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁.
~~~go
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
~~~
在正常模式下,这段代码会分别处理以下两种情况处理:
* 如果互斥锁不存在等待者或者互斥锁的`mutexLocked`、`mutexStarving`、`mutexWoken`状态不都为 0,那么当前方法就可以直接返回,不需要唤醒其他等待者;
* 如果互斥锁存在等待者,会通过`sync.runtime_Semrelease`唤醒等待者并移交锁的所有权;
在饥饿模式下,上述代码会直接调用`sync.runtime_Semrelease`方法将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态;
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
* 如果互斥锁处于初始化状态,就会直接通过置位 mutexLocked 加锁;
* 如果互斥锁处于 mutexLocked 并且在普通模式下工作,就会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
* 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
* 互斥锁在正常情况下会通过`runtime.sync_runtime_SemacquireMutex`函数将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒当前 Goroutine;
* 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,当前 Goroutine 会将互斥锁切换回正常模式;
互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解:
* 当互斥锁已经被解锁时,那么调用`sync.Mutex.Unlock`会直接抛出异常;
* 当互斥锁处于饥饿模式时,会直接将锁的所有权交给队列中的下一个等待者,等待者会负责设置`mutexLocked`标志位;
* 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,就会直接返回;在其他情况下会通过`sync.runtime_Semrelease`唤醒对应的 Goroutine.
读写锁:
读写互斥锁`sync.RWMutex`是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。
sync.RWMutex 中总共包含5 个字段:
~~~go
type RWMutex struct {
w Mutex // 复用互斥锁提供的能力
writerSem uint32 // 写等待读
readerSem uint32 // 读等待写
readerCount int32 // 存储了当前正在执行的读操作的数量
readerWait int32 // 当写操作被阻塞时等待的读操作个数
}
~~~
我们从写锁开始分析:
当我们想要获取写锁时,需要调用`sync.RWMutex.Lock`方法:
~~~go
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
~~~
* 这里调用结构体持有的`sync.Mutex`的`sync.Mutex.Lock`方法阻塞后续的写操作;
因为互斥锁已经被获取,其他 Goroutine 在获取写锁时就会进入自旋或者休眠;
* 调用`sync/atomic.AddInt32`方法阻塞后续的读操作:
如果仍然有其他 Goroutine 持有互斥锁的读锁`(r != 0)`,该 Goroutine 会调用`runtime.sync_runtime_SemacquireMutex`进入休眠状态等待所有读锁所有者执行结束后释放`writerSem`信号量将当前协程唤醒。
写锁的释放会调用`sync.RWMutex.Unlock`方法:
~~~go
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
~~~
解锁与加锁的过程正好相反,写锁的释放分为以下几个步骤:
1. 调用`sync/atomic.AddInt32`函数将`readerCount`变回正数,释放读锁;
2. 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine:
3. 调用`sync.Mutex.Unlock`方法释放写锁;
获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作饿死。
接着是读锁:
读锁的加锁方法`sync.RWMutex.RLock`就比较简单了,该方法会通过`sync/atomic.AddInt32`将`readerCount`加一:
~~~go
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
~~~
如果`RLock`该方法返回负数,其他 Goroutine 获得了写锁,当前 Goroutine 就会调用`runtime.sync_runtime_SemacquireMutex`陷入休眠等待锁的释放; 如果`RLock`该方法的结果为非负数,没有 Goroutine 获得写锁,当前方法就会成功返回.
当 Goroutine 想要释放读锁时,会调用如下所示的`RUnlock`方法:
~~~go
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
~~~
该方法会先减少正在读资源的`readerCount`整数,根据`sync/atomic.AddInt32`的返回值不同会分别进行处理:
* 如果返回值大于等于零,表示读锁直接解锁成功.
* 如果返回值小于零 ,表示有一个正在执行的写操作,在这时会调用`rUnlockSlow`方法.
~~~go
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
~~~
`rUnlockSlow`该方法会减少获取锁的写操作等待的读操作数`readerWait`并在所有读操作都被释放之后触发写操作的信号量,`writerSem`,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine。
其实读写互斥锁(sync.RWMutex),虽然提供的功能非常复杂,不过因为它是在互斥锁( sync.Mutex)的基础上,所以整体的实现上会简单很多。
因此呢:
* 调用`sync.RWMutex.Lock`尝试获取写锁时;
每次`sync.RWMutex.RUnlock`都会将`readerCount`其减一,当它归零时该 Goroutine 就会获得写锁, 将`readerCount`减少`rwmutexMaxReaders`个数以阻塞后续的读操作.
* 调用`sync.RWMutex.Unlock`释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;
读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。
- Golang基础
- Go中new与make的区别
- Golang中除了加Mutex锁以外还有哪些方式安全读写共享变量
- 无缓冲Chan的发送和接收是否同步
- Golang并发机制以及它所使用的CSP并发模型.
- Golang中常用的并发模型
- Go中对nil的Slice和空Slice的处理是一致的吗
- 协程和线程和进程的区别
- Golang的内存模型中为什么小对象多了会造成GC压力
- Go中数据竞争问题怎么解决
- 什么是channel,为什么它可以做到线程安全
- Golang垃圾回收算法
- GC的触发条件
- Go的GPM如何调度
- 并发编程概念是什么
- Go语言的栈空间管理是怎么样的
- Goroutine和Channel的作用分别是什么
- 怎么查看Goroutine的数量
- Go中的锁有哪些
- 怎么限制Goroutine的数量
- Channel是同步的还是异步的
- Goroutine和线程的区别
- Go的Struct能不能比较
- Go的defer原理是什么
- Go的select可以用于什么
- Context包的用途是什么
- Go主协程如何等其余协程完再操作
- Go的Slice如何扩容
- Go中的map如何实现顺序读取
- Go中CAS是怎么回事
- Go中的逃逸分析是什么
- Go值接收者和指针接收者的区别
- Go的对象在内存中是怎样分配的
- 栈的内存是怎么分配的
- 堆内存管理怎么分配的
- 在Go函数中为什么会发生内存泄露
- G0的作用
- Go中的锁如何实现
- Go中的channel的实现
- 栈的内存是怎么分配的2
- 堆内存管理怎么分配的2
- Go中的map的实现
- Go中的http包的实现原理
- Goroutine发生了泄漏如何检测
- Go函数返回局部变量的指针是否安全
- Go中两个Nil可能不相等吗
- Goroutine和KernelThread之间是什么关系
- 为何GPM调度要有P
- 如何在goroutine执行一半就退出协程
- Mysql基础
- Mysql索引用的是什么算法
- Mysql事务的基本要素
- Mysql的存储引擎
- Mysql事务隔离级别
- Mysql高可用方案有哪些
- Mysql中utf8和utf8mb4区别
- Mysql中乐观锁和悲观锁区别
- Mysql索引主要是哪些
- Mysql联合索引最左匹配原则
- 聚簇索引和非聚簇索引区别
- 如何查询一个字段是否命中了索引
- Mysql中查询数据什么情况下不会命中索引
- Mysql中的MVCC是什么
- Mvcc和Redolog和Undolog以及Binlog有什么不同
- Mysql读写分离以及主从同步
- InnoDB的关键特性
- Mysql如何保证一致性和持久性
- 为什么选择B+树作为索引结构
- InnoDB的行锁模式
- 哈希(hash)比树(tree)更快,索引结构为什么要设计成树型
- 为什么索引的key长度不能太长
- Mysql的数据如何恢复到任意时间点
- Mysql为什么加了索引可以加快查询
- Explain命令有什么用
- Redis基础
- Redis的数据结构及使用场景
- Redis持久化的几种方式
- Redis的LRU具体实现
- 单线程的Redis为什么快
- Redis的数据过期策略
- 如何解决Redis缓存雪崩问题
- 如何解决Redis缓存穿透问题
- Redis并发竞争key如何解决
- Redis的主从模式和哨兵模式和集群模式区别
- Redis有序集合zset底层怎么实现的
- 跳表的查询过程是怎么样的,查询和插入的时间复杂度
- 网络协议基础
- TCP和UDP有什么区别
- TCP中三次握手和四次挥手
- TCP的LISTEN状态是什么
- 常见的HTTP状态码有哪些
- 301和302有什么区别
- 504和500有什么区别
- HTTPS和HTTP有什么区别
- Quic有什么优点相比Http2
- Grpc的优缺点
- Get和Post区别
- Unicode和ASCII以及Utf8的区别
- Cookie与Session异同
- Client如何实现长连接
- Http1和Http2和Grpc之间的区别是什么
- Tcp中的拆包和粘包是怎么回事
- TFO的原理是什么
- TIME_WAIT的作用
- 网络的性能指标有哪些