前言
golang相对其他编程语言,在并发编程中数据通信有特殊处理,CSP模式,使用数据通信保证数据安全,而不是共享内存模式。使用到的内置组件就是channel
channel简介
源码分析
场景分析
结论概览
| 场景 |
读 |
写 |
关闭 |
异常情况是否可以被捕获 |
| nil channel |
阻塞 |
阻塞 |
panic |
关闭channel的panic可以被捕获 |
| 无缓冲channel |
无写入情况下,阻塞 |
无读取情况下,阻塞 |
不可重复close |
关闭channel的panic可以被捕获 |
| 有缓冲channel |
1. 关闭情况下可读,读到默认值,且flag为false 2. 缓冲区有数据正常读取,无数据阻塞 |
1. 关闭情况下写,会panic 2. 缓存区有空间直接写入,无空间则阻塞 |
不可重复close |
1. 重复关闭可以被捕获 2. 关闭情况下写的panic也可以被捕获 |
nil channel
代码案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func nilChannelReadWrite() { defer func() { if err := recover(); err != nil { fmt.Println("recover nilChannelReadWrite err", err) } }() var ch chan int go func(ch chan int) { fmt.Println("开始写 nil channel") ch <- 1 fmt.Println("写 nil channel 完成") }(ch)
go func(ch chan int) { fmt.Println("开始读 nil channel") <-ch fmt.Println("读 nil channel 完成") }(ch) close(ch) fmt.Println("关闭 nil channel 完成") }
|
运行结果
1 2 3 4 5
| pprof server started on :6060 recover nilChannelReadWrite err close of nil channel 开始读 nil channel 开始写 nil channel 2026-02-24 18:48:02.315937 +0800 CST m=+3.001204668 ------------------after nilChannelReadWrite: 4
|
结论:
- 没有使用make创建的channel是nil
- 读写nil channel不会报错,会永久阻塞协程
- close nil channel会panic,但是可以通过recover捕获
正常channel
正常读写,交替打印案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| func channelReadWrite() { ch1 := make(chan int) ch2 := make(chan int)
var wg sync.WaitGroup wg.Add(2) go func(wg *sync.WaitGroup) { for i := 0; i < 2; i++ { <-ch1 fmt.Println("print 1") ch2 <- 2 fmt.Println("send ch2 success") } fmt.Println("协程1执行完成") wg.Done() }(&wg)
go func(wg *sync.WaitGroup) { for i := 0; i < 2; i++ { <-ch2 fmt.Println("print 2") if i < 1 { ch1 <- 1 fmt.Println("send ch1 success") } } fmt.Println("协程2执行完成") wg.Done() }(&wg) ch1 <- 1 wg.Wait() fmt.Println("print 交替打印完成") }
|
运行结果
1 2 3 4 5 6 7 8 9 10 11
| print 1 send ch2 success print 2 send ch1 success print 1 send ch2 success 协程1执行完成 print 2 协程2执行完成 print 交替打印完成 2026-02-24 19:32:17.166339 +0800 CST m=+3.003253710 ------------------after channelReadWrite: 2
|
channel关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| func channelClose() {
go func() { ch := make(chan int) defer func() { if err := recover(); err != nil { fmt.Printf("recover channelClose error: %v\n", err) } }() go func() { <-ch fmt.Println("receive ch success") }()
ch <- 1 fmt.Println("send ch success") close(ch) fmt.Println("close ch success") close(ch) fmt.Println("close two ch success") }()
time.Sleep(2 * time.Second)
go func() { ch := make(chan int, 2) defer func() { if err := recover(); err != nil { fmt.Printf("recover channelClose buf ch error: %v\n", err) } }() go func() { <-ch fmt.Println("receive ch success") }()
ch <- 1 fmt.Println("send ch success") close(ch) fmt.Println("close two ch success") ch <- 1 fmt.Println("send to close buf ch success") }()
time.Sleep(time.Second) }
|
运行结果
1 2 3 4 5 6 7 8 9 10
| receive ch success send ch success close ch success pprof server started on :6060 recover channelClose error: close of closed channel send buf ch success close buf ch success recover channelClose buf ch error: send on closed channel receive buf ch success 2026-02-24 19:29:32.054209 +0800 CST m=+3.002766418 ------------------after channelClose: 2
|
结论:
- channel重复关闭,会panic,可以被捕获
- 向已经被关闭的channel写入数据,会panic,可以被捕获
超时控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| func channelTimeout() {
go func() { ch := make(chan int) go func() { time.Sleep(4 * time.Second) fmt.Printf("ts=%v\tmsg=send ch start\n", time.Now().String()) ch <- 1 fmt.Printf("ts=%v\tmsg=send ch success\n", time.Now().String()) }() fmt.Printf("ts=%v\tmsg=receive ch start\n", time.Now().String()) select { case <-ch: fmt.Printf("ts=%v\tmsg=receive ch success\n", time.Now().String()) case <-time.After(2 * time.Second): fmt.Printf("ts=%v\tmsg=receive ch timeout\n", time.Now().String()) return default: fmt.Printf("ts=%v\tmsg=receive ch default\n", time.Now().String()) } }()
time.Sleep(5 * time.Second) go func() { ch := make(chan int, 1) go func() { time.Sleep(4 * time.Second) fmt.Printf("ts=%v\tmsg=send ch2 start\n", time.Now().String()) ch <- 1 fmt.Printf("ts=%v\tmsg=send ch2 success\n", time.Now().String()) }() fmt.Printf("ts=%v\tmsg=receive ch2 start\n", time.Now().String()) select { case <-ch: fmt.Printf("ts=%v\tmsg=receive ch2 success\n", time.Now().String()) case <-time.After(2 * time.Second): fmt.Printf("ts=%v\tmsg=receive ch2 timeout\n", time.Now().String()) return default: fmt.Printf("ts=%v\tmsg=receive ch2 default\n", time.Now().String()) } }()
}
|
结论:
- 不论有缓冲、无缓冲,select读取channel数据时候,如果没有数据则阻塞。其他分支不可用时候,有default分支则走default分支。可以通过time.after控制超时读取
场景之限流器
令牌桶模式。以固定速率向桶中投入令牌,如果桶满了则阻塞;每次请求从桶中获取令牌,获取不到则被限流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| type ChannelLimiter struct { tokens chan struct{} }
func (l *ChannelLimiter) Take() bool { select { case <-l.tokens: return true default: return false } }
func NewChannelLimiter(qps int) *ChannelLimiter { limiter := &ChannelLimiter{ tokens: make(chan struct{}, qps), } go func(limiter *ChannelLimiter) {
ticker := time.NewTicker(1e9 / time.Duration(qps) * time.Nanosecond) for { select { case <-ticker.C: limiter.tokens <- struct{}{} fmt.Printf("ts=%v\tmsg=add token success\n", time.Now().String()) } } }(limiter) return limiter }
func channelLimiter() { limiter := NewChannelLimiter(2) time.Sleep(1 * time.Second)
wg := sync.WaitGroup{} for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 5; j++ { if ok := limiter.Take(); ok { fmt.Printf("ts=%v\tid=%v\tcnt=%v\tmsg=get token success\n", time.Now().String(), j, id) } else { fmt.Printf("ts=%v\tid=%v\tcnt=%v\tmsg=get token faild\n", time.Now().String(), j, id) } time.Sleep(time.Second) } }(i) } wg.Wait() fmt.Println("limiter test completed") }
|