第五章 并发编程 v1.0
第一部分 并发编程
一、协程
Go语言中的协程(Goroutine)是轻量级执行线程,由Go运行时管理,相比于操作系统线程,协程创建成本低、切换开销小,单个Go程序可同时运行数万个协程。
1.创建
// 方式1:匿名函数创建协程
go func() {
// 协程执行的逻辑
fmt.Println("hello goroutine")
}()
// 方式2:调用命名函数创建协程
func sayHello() {
fmt.Println("hello named func goroutine")
}
go sayHello() // 通过在调用函数前加go关键字实现协程的创建2.协程间通信
Go提倡通过通信共享内存而不是通过共享内存实现通信
通过共享内存实现通信(不推荐,易出现竞态条件)
(推荐) 通过通信共享内存(基于通道实现,天然线程安全)
3.执行等待
为了防止协程的执行因为主线程执行完毕而提前结束,需要进行协程的执行等待
使用 sync包 的 WaitGroup 类:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup // 创建WaitGroup对象,计数器初始为0
wg.Add(2) // 添加计数器,使其初始数值为2
// 第一个协程
go func() {
defer wg.Done() // 协程结束时执行wg.Done(),计数器减1
fmt.Println("goroutine 1 finished")
}()
// 第二个协程
go func() {
defer wg.Done() // 协程结束时执行wg.Done(),计数器减1
fmt.Println("goroutine 2 finished")
}()
wg.Wait() // 等待结束(阻塞直至计数器归零)
fmt.Println("all goroutines finished")
}WaitGroup中的主要方法
| 方法 | 作用 |
|---|---|
| Add(delta) | 计数器+delta(delta需为正数) |
| Done() | 计数器-1(等价于Add(-1)) |
| Wait() | 主协程阻塞直至计数器为零 |
二、通道
通道(chan)是Go语言中用于协程间通信的核心数据结构,可理解为一个带类型的、线程安全的管道,支持协程间安全地发送和接收数据。通道类似于一个固定长度的可共享容器,可以通过for循环+range遍历,也支持select多路复用。
1.有缓冲通道与无缓冲通道
| 类型 | 特点 | 适用场景 |
|---|---|---|
| 无缓冲通道 | 1. 创建时不指定缓冲区大小; 2. 发送操作会阻塞,直到有协程接收数据; 3. 接收操作会阻塞,直到有协程发送数据 | 协程间需要同步通信(如任务指令传递、结果即时同步),确保数据发送方和接收方步调一致 |
| 有缓冲通道 | 1. 创建时指定缓冲区大小; 2. 缓冲区未满时发送不阻塞,缓冲区未空时接收不阻塞; 3. 缓冲区满时发送阻塞,空时接收阻塞 | 1. 生产消费速率不匹配的场景(如日志收集、数据批量处理); 2. 降低协程间耦合,缓冲临时数据 |
2.创建
package main
import "fmt"
func main() {
// 创建无缓冲通道(int类型)
unbufferedChan := make(chan int)
fmt.Printf("无缓冲通道类型:%T,长度:%d,容量:%d\n", unbufferedChan, len(unbufferedChan), cap(unbufferedChan))
// 创建有缓冲通道(string类型,缓冲区大小为5)
bufferedChan := make(chan string, 5)
fmt.Printf("有缓冲通道类型:%T,长度:%d,容量:%d\n", bufferedChan, len(bufferedChan), cap(bufferedChan))
}3.基本使用
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
ch := make(chan int, 2) // 创建有缓冲通道,容量2
wg.Add(1)
// 发送协程
go func() {
defer wg.Done()
ch <- 10 // 向通道发送数据,缓冲区未满时不阻塞
ch <- 20 // 向通道发送数据
fmt.Println("发送完所有数据")
}()
wg.Add(1)
// 接收协程
go func() {
defer wg.Done()
val1 := <-ch // 从通道接收数据
fmt.Println("接收数据1:", val1)
val2 := <-ch // 从通道接收数据
fmt.Println("接收数据2:", val2)
}()
wg.Wait()
}4.关闭
通道关闭后不能再向通道发送数据(会触发panic),但可以继续接收数据,直到通道为空;已关闭的空通道接收操作会返回对应类型的零值。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
ch := make(chan int, 3)
// 发送数据
ch <- 1
ch <- 2
ch <- 3
close(ch) // 关闭通道
wg.Add(1)
// 接收数据(遍历已关闭的通道)
go func() {
defer wg.Done()
// range遍历通道,通道关闭后会自动退出循环
for val := range ch {
fmt.Println("接收数据:", val)
}
fmt.Println("通道已遍历完毕")
}()
// 测试:关闭后发送数据(会panic,注释掉可运行)
// ch <- 4
wg.Wait()
}5.等待多个通道
使用 select语句 可以同时等待多个通道操作,任意一个通道就绪(可发送/接收)时,执行对应的case逻辑;select还支持default分支,用于处理所有通道都未就绪的情况。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
// 协程1:2秒后发送数据到ch1
go func() {
time.Sleep(2 * time.Second)
ch1 <- 1
}()
// 协程2:1秒后发送数据到ch2
go func() {
time.Sleep(1 * time.Second)
ch2 <- 2
}()
// 等待两个通道的结果
for i := 0; i < 2; i++ {
select {
case val := <-ch1:
fmt.Println("Received from ch1:", val)
case val := <-ch2:
fmt.Println("Received from ch2:", val)
case <-time.After(3 * time.Second): // 超时保护
fmt.Println("timeout")
return
}
}
fmt.Println("all channels processed")
}三、同步原语
同步原语是Go语言中用于协程间同步、互斥、等待的核心工具,主要位于
sync包中,用于解决共享资源竞争、协程间协作等问题。
1.互斥锁(Mutex)
互斥锁(sync.Mutex)用于保证同一时间只有一个协程能访问临界区资源,是解决数据竞争最基础的工具。核心方法:
Lock()(加锁,阻塞直到获取锁)、Unlock()(解锁)。
package main
import (
"fmt"
"sync"
)
var (
count int
mu sync.Mutex
)
func increment(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock() // 加锁,进入临界区
count++ // 临界区操作(共享资源)
fmt.Println("count:", count)
mu.Unlock() // 解锁,退出临界区
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Println("final count:", count)
}2.读写锁(RWMutex)
读写锁(sync.RWMutex)是互斥锁的扩展,区分“读操作”和“写操作”:
- 读锁:多个协程可同时获取(读共享);
- 写锁:同一时间只能有一个协程获取,且获取时会阻塞所有读锁和写锁。 适用于“读多写少”的场景,能提升并发性能。
package main
import (
"fmt"
"sync"
"time"
)
var (
data = "初始数据"
rwMu sync.RWMutex
wgR sync.WaitGroup
wgW sync.WaitGroup
)
// 读操作(加读锁)
func readData(id int) {
defer wgR.Done()
rwMu.RLock() // 获取读锁
defer rwMu.RUnlock() // 释放读锁
fmt.Printf("读协程%d:%s\n", id, data)
time.Sleep(100 * time.Millisecond) // 模拟读耗时
}
// 写操作(加写锁)
func writeData(id int, newData string) {
defer wgW.Done()
rwMu.Lock() // 获取写锁
defer rwMu.Unlock() // 释放写锁
fmt.Printf("写协程%d:修改数据为%s\n", id, newData)
data = newData
time.Sleep(200 * time.Millisecond) // 模拟写耗时
}
func main() {
// 启动5个读协程
for i := 0; i < 5; i++ {
wgR.Add(1)
go readData(i)
}
// 启动2个写协程
wgW.Add(1)
go writeData(0, "修改后数据1")
wgW.Add(1)
go writeData(1, "修改后数据2")
// 再启动5个读协程
for i := 5; i < 10; i++ {
wgR.Add(1)
go readData(i)
}
wgR.Wait()
wgW.Wait()
fmt.Println("最终数据:", data)
}3.WaitGroup
详见【一、协程 - 3.执行等待】,核心作用是等待一组协程执行完成,通过计数器实现阻塞等待。
4.Once
sync.Once 用于保证某个函数在程序运行期间仅执行一次,即使被多个协程调用,常用于单例初始化、资源一次性加载等场景。核心方法:
Do(f func())(执行函数f,确保仅执行一次)。
package main
import (
"fmt"
"sync"
)
var (
once sync.Once
initData string
)
func initResource() {
fmt.Println("初始化资源...")
initData = "初始化完成"
}
func useResource(id int) {
once.Do(initResource) // 确保initResource仅执行一次
fmt.Printf("协程%d使用资源:%s\n", id, initData)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
useResource(id)
}(i)
}
wg.Wait()
}5.Cond——条件变量(等待/通知机制)
sync.Cond 基于互斥锁实现,用于协程间的“等待-通知”协作,核心场景:一个/多个协程等待某个条件满足,当条件满足时,由其他协程通知等待的协程继续执行。 核心方法:
Wait():释放锁并阻塞,直到被Signal/Broadcast唤醒,唤醒后重新获取锁;Signal():唤醒一个等待的协程;Broadcast():唤醒所有等待的协程。
package main
import (
"fmt"
"sync"
"time"
)
var (
cond *sync.Cond
ready bool // 等待的条件
wgCond sync.WaitGroup
)
// 等待者协程
func waiter(id int) {
defer wgCond.Done()
cond.L.Lock() // 获取锁
for !ready { // 循环检查条件(防止虚假唤醒)
cond.Wait() // 释放锁并等待通知
}
fmt.Printf("等待者%d:条件满足,开始执行\n", id)
cond.L.Unlock() // 释放锁
}
// 通知者协程
func notifier() {
defer wgCond.Done()
time.Sleep(2 * time.Second) // 模拟准备工作
cond.L.Lock()
ready = true // 设置条件为满足
cond.L.Unlock()
fmt.Println("通知者:发送广播通知")
cond.Broadcast() // 通知所有等待者
}
func main() {
// 初始化Cond,关联一个互斥锁
cond = sync.NewCond(&sync.Mutex{})
// 启动3个等待者
for i := 0; i < 3; i++ {
wgCond.Add(1)
go waiter(i)
}
// 启动1个通知者
wgCond.Add(1)
go notifier()
wgCond.Wait()
fmt.Println("所有协程执行完成")
}四、原子操作
原子操作是指不可被中断的操作,在并发场景下,原子操作无需加锁即可保证对共享变量的修改是线程安全的,Go语言中原子操作通过
sync/atomic包实现,性能优于互斥锁。
1.sync 包 vs sync/atomic 包
| 特性 | sync包(互斥锁/读写锁) | sync/atomic包(原子操作) |
|---|---|---|
| 适用场景 | 复杂临界区(多行代码) | 简单变量操作(单个变量) |
| 性能 | 相对较低(有锁开销) | 极高(无锁,硬件级支持) |
| 使用复杂度 | 低(API简单) | 中(需匹配变量类型) |
| 支持操作 | 任意操作 | 仅支持加减、比较交换等基础操作 |
2.atomic 核心操作示例
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var (
counter int64 // 原子操作建议使用int64/uint64类型
wgAtom sync.WaitGroup
)
func atomicIncrement() {
defer wgAtom.Done()
// 原子自增(等价于counter++,但线程安全)
atomic.AddInt64(&counter, 1)
}
func main() {
// 启动1000个协程执行原子自增
for i := 0; i < 1000; i++ {
wgAtom.Add(1)
go atomicIncrement()
}
wgAtom.Wait()
// 原子读取变量值(确保读取的是最新值)
fmt.Println("最终计数器值:", atomic.LoadInt64(&counter))
}常用原子操作:
AddInt64(自增/自减)、LoadInt64(读取值)、StoreInt64(设置值)、CompareAndSwapInt64(CAS操作)。
第二部分 Goroutine
一、简介
Goroutine(简称Goroutine)是Go语言特有的轻量级执行体,由Go运行时(runtime)管理,而非操作系统内核线程,是Go实现并发的核心。
核心特性
- 轻量级:每个Goroutine初始栈大小仅2KB,可动态扩容(最大数MB),单个Go程序可轻松创建数万个Goroutine;
- 低切换开销:Goroutine的切换由Go运行时完成,无需陷入内核态,切换成本远低于操作系统线程;
- 调度高效:Go运行时通过M-P-G模型实现Goroutine的调度,充分利用多核CPU资源;
- 与线程的关系:多个Goroutine可映射到同一个操作系统线程执行,由Go运行时调度器负责分配。
二、Goroutine 调度原理(M-P-G 模型)
Go的Goroutine调度器采用M-P-G模型,核心是将Goroutine(G)映射到操作系统线程(M),并通过处理器(P)实现高效调度,充分利用多核CPU。
1.核心组件定义
| 组件 | 全称 | 作用 |
|---|---|---|
| G | Goroutine | 协程,包含执行栈、程序计数器、状态等,是调度的基本单位 |
| M | Machine | 操作系统线程(OS Thread),是执行Goroutine的载体 |
| P | Processor | 处理器,是M和G之间的桥梁,包含Goroutine本地队列(LQ),P的数量对应Go程序的并发度(默认等于CPU核心数) |
| GQ | Global Queue | 全局Goroutine队列,存放等待执行的Goroutine |
| S | Scheduler | 调度器,负责将G分配给M-P组合执行 |
2.调度流程(简化版)
- 初始化:Go程序启动时,创建与CPU核心数相等的P,每个P维护一个本地G队列(LQ);
- Goroutine创建:使用
go关键字创建G时,优先放入当前M绑定的P的本地队列;若本地队列满,则放入全局队列(GQ); - 执行调度:
- M绑定P后,从P的本地队列取出G执行;
- 当P的本地队列为空时,M会从全局队列(GQ)偷取G,或从其他P的本地队列偷取(工作窃取机制,steal work);
- 阻塞处理:
- 若G执行阻塞操作(如I/O、sleep、锁等待),M会释放绑定的P,P可与其他空闲M绑定继续执行其他G;
- 当阻塞操作完成后,G会重新进入P的本地队列或全局队列,等待再次调度;
- 抢占式调度:Go 1.14+支持抢占式调度,若一个G执行时间过长(超过10ms),调度器会主动抢占,将其暂停并放入队列,让其他G有机会执行。
3.核心特点
- 工作窃取:空闲的P会从繁忙的P的本地队列偷取Goroutine执行,提升CPU利用率;
- 抢占式调度:避免单个Goroutine长时间占用CPU,保证调度公平性;
- M-P分离:M与P解耦,当M阻塞时,P可快速绑定其他M,减少资源浪费。