0%

Golang并发组件channel

前言

golang相对其他编程语言,在并发编程中数据通信有特殊处理,CSP模式,使用数据通信保证数据安全,而不是共享内存模式。使用到的内置组件就是channel

channel简介

源码分析

场景分析

结论概览

场景 关闭 异常情况是否可以被捕获
nil channel 阻塞 阻塞 panic 关闭channel的panic可以被捕获
无缓冲channel 无写入情况下,阻塞 无读取情况下,阻塞 不可重复close 关闭channel的panic可以被捕获
有缓冲channel 1. 关闭情况下可读,读到默认值,且flag为false
2. 缓冲区有数据正常读取,无数据阻塞
1. 关闭情况下写,会panic
2. 缓存区有空间直接写入,无空间则阻塞
不可重复close 1. 重复关闭可以被捕获
2. 关闭情况下写的panic也可以被捕获

nil channel

代码案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func nilChannelReadWrite() {
defer func() {
if err := recover(); err != nil {
fmt.Println("recover nilChannelReadWrite err", err)
}
}()
var ch chan int
// 给nil channel 赋值 阻塞
go func(ch chan int) {
fmt.Println("开始写 nil channel")
ch <- 1
fmt.Println("写 nil channel 完成")
}(ch)

go func(ch chan int) {
fmt.Println("开始读 nil channel")
<-ch
fmt.Println("读 nil channel 完成")
}(ch)
close(ch)
fmt.Println("关闭 nil channel 完成")
}

运行结果

1
2
3
4
5
pprof server started on :6060
recover nilChannelReadWrite err close of nil channel
开始读 nil channel
开始写 nil channel
2026-02-24 18:48:02.315937 +0800 CST m=+3.001204668 ------------------after nilChannelReadWrite: 4

结论:

  • 没有使用make创建的channel是nil
  • 读写nil channel不会报错,会永久阻塞协程
  • close nil channel会panic,但是可以通过recover捕获

正常channel

正常读写,交替打印案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func channelReadWrite() {
ch1 := make(chan int)
ch2 := make(chan int)

var wg sync.WaitGroup
wg.Add(2)
// 交替打印1 2
go func(wg *sync.WaitGroup) {
for i := 0; i < 2; i++ {
<-ch1
fmt.Println("print 1")
ch2 <- 2
fmt.Println("send ch2 success")
}
fmt.Println("协程1执行完成")
wg.Done()
}(&wg)

go func(wg *sync.WaitGroup) {
for i := 0; i < 2; i++ {
<-ch2
fmt.Println("print 2")
if i < 1 {
ch1 <- 1
fmt.Println("send ch1 success")
}
}
fmt.Println("协程2执行完成")
wg.Done()
}(&wg)
ch1 <- 1
wg.Wait()
fmt.Println("print 交替打印完成")
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
print 1
send ch2 success
print 2
send ch1 success
print 1
send ch2 success
协程1执行完成
print 2
协程2执行完成
print 交替打印完成
2026-02-24 19:32:17.166339 +0800 CST m=+3.003253710 ------------------after channelReadWrite: 2

channel关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func channelClose() {

go func() {
ch := make(chan int)
defer func() {
if err := recover(); err != nil {
fmt.Printf("recover channelClose error: %v\n", err)
}
}()
go func() {
<-ch
fmt.Println("receive ch success")
}()

ch <- 1
fmt.Println("send ch success")
close(ch)
fmt.Println("close ch success")
close(ch)
fmt.Println("close two ch success")
}()

time.Sleep(2 * time.Second)
// 有缓冲channel
go func() {
ch := make(chan int, 2)
defer func() {
if err := recover(); err != nil {
fmt.Printf("recover channelClose buf ch error: %v\n", err)
}
}()
go func() {
<-ch
fmt.Println("receive ch success")
}()

ch <- 1
fmt.Println("send ch success")
close(ch)
fmt.Println("close two ch success")
ch <- 1
fmt.Println("send to close buf ch success")
}()

time.Sleep(time.Second)
}

运行结果

1
2
3
4
5
6
7
8
9
10
receive ch success
send ch success
close ch success
pprof server started on :6060
recover channelClose error: close of closed channel
send buf ch success
close buf ch success
recover channelClose buf ch error: send on closed channel
receive buf ch success
2026-02-24 19:29:32.054209 +0800 CST m=+3.002766418 ------------------after channelClose: 2

结论:

  • channel重复关闭,会panic,可以被捕获
  • 向已经被关闭的channel写入数据,会panic,可以被捕获

超时控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func channelTimeout() {

go func() {
ch := make(chan int)
go func() {
time.Sleep(4 * time.Second)
fmt.Printf("ts=%v\tmsg=send ch start\n", time.Now().String())
ch <- 1
fmt.Printf("ts=%v\tmsg=send ch success\n", time.Now().String())
}()
fmt.Printf("ts=%v\tmsg=receive ch start\n", time.Now().String())
select {
case <-ch:
fmt.Printf("ts=%v\tmsg=receive ch success\n", time.Now().String())
case <-time.After(2 * time.Second):
fmt.Printf("ts=%v\tmsg=receive ch timeout\n", time.Now().String())
return
default:
fmt.Printf("ts=%v\tmsg=receive ch default\n", time.Now().String())
}
}()

time.Sleep(5 * time.Second)
go func() {
ch := make(chan int, 1)
go func() {
time.Sleep(4 * time.Second)
fmt.Printf("ts=%v\tmsg=send ch2 start\n", time.Now().String())
ch <- 1
fmt.Printf("ts=%v\tmsg=send ch2 success\n", time.Now().String())
}()
fmt.Printf("ts=%v\tmsg=receive ch2 start\n", time.Now().String())
select {
case <-ch:
fmt.Printf("ts=%v\tmsg=receive ch2 success\n", time.Now().String())
case <-time.After(2 * time.Second):
fmt.Printf("ts=%v\tmsg=receive ch2 timeout\n", time.Now().String())
return
default:
fmt.Printf("ts=%v\tmsg=receive ch2 default\n", time.Now().String())
}
}()

}

结论:

  • 不论有缓冲、无缓冲,select读取channel数据时候,如果没有数据则阻塞。其他分支不可用时候,有default分支则走default分支。可以通过time.after控制超时读取

场景之限流器

令牌桶模式。以固定速率向桶中投入令牌,如果桶满了则阻塞;每次请求从桶中获取令牌,获取不到则被限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
type ChannelLimiter struct {
tokens chan struct{}
}

func (l *ChannelLimiter) Take() bool {
select {
case <-l.tokens:
return true
default:
return false
}
}

func NewChannelLimiter(qps int) *ChannelLimiter {
limiter := &ChannelLimiter{
tokens: make(chan struct{}, qps),
}
// 启动一个 goroutine 定时向令牌桶中添加令牌
go func(limiter *ChannelLimiter) {

ticker := time.NewTicker(1e9 / time.Duration(qps) * time.Nanosecond)
for {
select {
case <-ticker.C:
limiter.tokens <- struct{}{}
//fmt.Println(time.Now().String(), "add token success")
fmt.Printf("ts=%v\tmsg=add token success\n", time.Now().String())
}
}
}(limiter)
return limiter
}

func channelLimiter() {
// 限流器,每秒允许2个请求,超过的请求会被丢弃
limiter := NewChannelLimiter(2)
time.Sleep(1 * time.Second)

wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
if ok := limiter.Take(); ok {
fmt.Printf("ts=%v\tid=%v\tcnt=%v\tmsg=get token success\n", time.Now().String(), j, id)
} else {
fmt.Printf("ts=%v\tid=%v\tcnt=%v\tmsg=get token faild\n", time.Now().String(), j, id)
}
time.Sleep(time.Second)
}
}(i)
}
wg.Wait()
fmt.Println("limiter test completed")
}