Go 并发基石之 channel (三) —— 几种典型的应用模式
Go 并发系列是根据我对晁岳攀老师的《Go 并发编程实战课》的吸收和理解整理而成,如有偏差,欢迎指正~
在上一篇 Go 并发基石之 channel (二) —— 源码初探 中粗略的了一遍 channel 的源码,大致了解了背后的实现思路。
这一篇,主要介绍 channel 的几种经典的应用模式。
channel 的五种应用场景
消息交流
channel 的底层是一个循环队列,当队列的长度大于 0 的 时候,它会被当做线程安全队列和 buffer。利用这个特性,一个 goroutine 可以安全的往 channel 中存放数据,另一个 goroutine 可以安全的从 channel 中读取数据,这样就实现了 goroutine 之间的消息交流。
这个比较简单,就不展开了。
数据传递
数据传递类似游戏“击鼓传花”。鼓响时,花(或者其它物件)从一个人手里传到下一个人,数据就类似这里的花。
现在有下面这样一个任务:
有 4 个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出它自己的编号,要求你编写程序,让输出的编号总是按照 1、2、3、4、1、2、3、4……这个顺序打印出来。
1 |
|
这段代码中,token 代指“击鼓传花”中的“花”,chans 代指围坐一圈的人。每个 chan(人)都是从上一个 chan(人)手中拿到 token,放在自己手上,从而实现顺序打印 1,2,3,4。
信号通知
channel 类型有这样一个特性:如果 channel 为空,那么 recevier 接收数据的时候就会阻塞,直到有新的数据进来或者 channel 被关闭。
利用这个特性,就可以实现 wait/notify 设计模式。另外还有一个经常碰到的场景,实现程序的 graceful shutdown。
1 |
|
当然,如果清理操作很耗时,需要增加超时限制,doClenup() 可以进行如下改写:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20func main() {
closed := make(chan struct{})
...... // 中间过程都一样
// 执行退出之前的清理操作
go doCleanup(closed)
select {
case <-closed:
case <-time.After(time.Second):
fmt.Println("清理超时,不等了!")
}
fmt.Println("优雅退出!")
}
func doCleanup(closed chan struct{}) {
time.Sleep(time.Minute)
close(closed)
}
锁
在这个系列最开始就介绍了 Go 中 Mutex 设计原理详解(一)。利用 channel 我们也能实现锁的功能。
sync.Mutex 通过修改持有锁标记位的状态达到占有锁的目的,因此 channel 可以通过转移这个标记位的所有权实现占有锁。
具体代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61// 使用chan实现互斥锁
type Mutex struct {
ch chan struct{}
}
// 使用锁需要初始化
func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
// 请求锁,直到获取到
func (m *Mutex) Lock() {
<-m.ch
}
// 解锁
func (m *Mutex) Unlock() {
select {
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// 尝试获取锁
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
// 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
timer.Stop()
return true
case <-timer.C:
}
return false
}
// 锁是否已被持有
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}
func main() {
m := NewMutex()
ok := m.TryLock()
fmt.Printf("locked v %v\n", ok)
ok = m.TryLock()
fmt.Printf("locked %v\n", ok)
}
这里实现锁主要利用了向满 channel 发送数组或从空 channel 接收数据会阻塞的特性。另外,利用 select 很容易实现 TryLock 和 Timeout 的功能。
任务编排
在前面的 Go 并发任务编排利器之 WaitGroup 中介绍了 sync.WaitGroup。通过 sync.WaitGroup,我们能很容易的实现 等待一组 goroutine 完成任务 这种任务编排需求。同样,我们也可以用 channel 实现。
但是如果任务编排再复杂一些呢?如果面试官出了下面这个题目:
有一批任务需要处理,但是机器资源有限,只能承受100的并发度,该如何实现?
一种解决方案就是使用 channel,代码如下:
1 |
|
利用 sender 给满员的 channel 发送数据会阻塞的特性,就实现了并发度始终维持在 100 的需求。
除此之外,下面再介绍几种常见的模式。
or-Done 模式
or-Done 模式对应的场景很好理解,n 个任务,有一个完成就算完成。
看过之前文章 记一次学习 orDone 模式爬坑经历 读者对 or-Done 应该不陌生,因为课程中关于 or-Done 模式的代码是有问题的!
正确的代码应该是下面这个:
1 |
|
在编写 or-Done 的代码时,有两个点需要注意:
- 递归前,需要声明一个 orDone 变量,用来通知子函数退出。
- len(channels) == 2 是一种特殊情况,否则会因为 append orDone 产生无限递归。
扇入模式
扇入(Fan-In)是一个术语,用于描述将多个结果组合到一个 channel 中的过程。扇入模式下,输入源有多个,输出目标只有一个。下面是扇入模式的一种实现:
1 |
|
扇出模式
扇出模式(Fan-Out)只有一个输入源,但是有多个输出目标。下面是一个扇出模式的实现,从源 channel 取出一个数据后,依次发送给多个目标 channel。发送的时候,既可以同步,也可以异步。
1 |
|
stream
stream 是把 channel 当做流式管道的方式。
1 |
|
转成流之后,如果要实现取前 N 个数的功能 TakeN,可以再创建一个输出流,从输入流中读取:1
2
3
4
5
6
7
8
9
10
11
12
13
14func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{}) // 创建输出流
go func() {
defer close(takeStream)
for i := 0; i < num; i++ { // 只读取前num个元素
select {
case <-done:
return
case takeStream <- <-valueStream: //从输入流中读取元素
}
}
}()
return takeStream
}
map-reduce
map-reduce 是一种面向大规模数据处理的并行计算模型和方法,但是这里要介绍的是一种单机版的 map-reduce 模式。
map-reduce 分为两个步骤,第一步是 map,将队列中的数据用 mapFn 函数处理;第二步是 reduce,将处理后的数据用 reduceFn 函数汇总。
map 逻辑实现如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16func mapChan(in <-chan interface{}, mapFn func(interface{}) interface{}) <-chan interface{} {
out := make(chan interface{}) //创建一个输出chan
if in == nil { // 异常检查
close(out)
return out
}
go func() { // 启动一个goroutine,实现map的主要逻辑
defer close(out)
for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作
out <- mapFn(v)
}
}()
return out
}
reduce 逻辑实现如下:1
2
3
4
5
6
7
8
9
10
11
12func reduce(in <-chan interface{}, reduceFn func(r, v interface{}) interface{}) interface{} {
if in == nil { // 异常检查
return nil
}
out := <-in // 先读取第一个元素
for v := range in { // 实现reduce的主要逻辑
out = reduceFn(out, v)
}
return out
}
想象这样一个需求:将一组数据中每个数据乘以10,最后计算总和。为此,我们需要实现 mapFn (乘 10) 和 reduceFn (求和)。
1 |
|
总结
这一篇过了一遍基于 channel 的几种典型应用模式,这些模式都不复杂,但是要做到灵活运用却不容易,平时写代码过程中如果遇到相应场景,还要仔细留心体会,多加练习。
到这里,channel 相关的知识点就告一段落了。
喜极而泣~
下期开始新的章节,再见~
都看到这里了,不如加个关注呗~~
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!