流水线模型在编程中是非常常见的,但扇入扇出模型可能就较少听闻了。

以汽车组装为例,汽车生产线上有个阶段是给小汽车装4个轮子,可以把这个阶段任务交给4个人同时去做,这4个人把轮子都装完后,再把汽车移动到生产线下一个阶段。(这是流水线)

这个过程中,就有任务的分发,和任务结果的收集。其中任务分发是FAN-OUT,任务收集是FAN-IN。

  • **FAN-OUT模式:多个goroutine从同一个通道读取数据,直到该通道关闭。**OUT是一种张开的模式,所以又被称为扇出,可以用来分发任务。
  • **FAN-IN模式:1个goroutine从多个通道读取数据,直到这些通道关闭。**IN是一种收敛的模式,所以又被称为扇入,用来收集处理的结果。

fan-in和fan-out.png

可以看到FAN模式,其作为一种并发模型,是无法保证执行顺序的,同样也无法保证返回的顺序。所以各个块部分运行的独立性是先决条件。

FAN模型的优势在其他文章就有提到:

  • FAN模式可以提高CPU利用率。
  • FAN模式可以提升程序运行效率,降低程序运行时间。(用的好的情况下)

这里面很好理解,FANOUT类似于一个读放大,普通的流水线模型,可能只有一个worker协程处理,FAN模型相当于增加消费端,而且读取的channel还类似于一个队列,增加其缓冲容量,也能接纳更多的请求。

代码演示和详细的解释,我觉得其他文章已经讲的很好了,我这里直接贴链接了:

Go并发模型:轻松入门流水线模型

Golang并发模型:轻松入门流水线FAN模式

扇入扇出


怎么实现一个fanout呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package fanout

import (
"context"
"errors"
"runtime"
"sync"
"log"
)

var (
// ErrFull chan full.
ErrFull = errors.New("fanout: chan full")
)

type options struct {
worker int
buffer int
}

// Option fanout option
type Option func(*options)

// Worker specifies the worker of fanout
func Worker(n int) Option {
if n <= 0 {
panic("fanout: worker should > 0")
}
return func(o *options) {
o.worker = n
}
}

// Buffer specifies the buffer of fanout
func Buffer(n int) Option {
if n <= 0 {
panic("fanout: buffer should > 0")
}
return func(o *options) {
o.buffer = n
}
}

type item struct {
f func(c context.Context)
ctx context.Context
}

// Fanout async consume data from chan.
type Fanout struct {
name string
ch chan item
options *options
waiter sync.WaitGroup

ctx context.Context
cancel func()
}

// 新建一个fanout 对象 名称为cache 名称主要用来上报监控和打日志使用 最好不要重复
// (可选参数) worker数量为1 表示后台只有1个线程在工作
// (可选参数) buffer 为1024 表示缓存chan长度为1024 如果chan满了 再调用Do方法就会报错 设定长度主要为了防止OOM
// New new a fanout struct.
func New(name string, opts ...Option) *Fanout {
if name == "" {
name = "fanout"
}
o := &options{
worker: 1,
buffer: 1024,
}
for _, op := range opts {
op(o)
}
c := &Fanout{
ch: make(chan item, o.buffer),
name: name,
options: o,
}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.waiter.Add(o.worker)
for i := 0; i < o.worker; i++ {
go c.proc()
}
return c
}

// 读channel,异步执行channel传递的函数
func (c *Fanout) proc() {
defer c.waiter.Done()
for {
select {
case t := <-c.ch:
wrapFunc(t.f)(t.ctx)
case <-c.ctx.Done():
return
}
}
}

// 包装函数,加入recover,防止panic导致程序挂掉
func wrapFunc(f func(c context.Context)) (res func(context.Context)) {
res = func(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 64*1024)
buf = buf[:runtime.Stack(buf, false)]
log.Error("panic in fanout proc, err: %s, stack: %s", r, buf)
}
}()
f(ctx)
}
return
}

// 需要异步执行的方法
// Do save a callback func.
func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) (err error) {
if f == nil || c.ctx.Err() != nil {
return c.ctx.Err()
}

select {
case c.ch <- item{f: f, ctx: ctx}:
default:
err = ErrFull
}
return
}

// 程序结束的时候关闭fanout 会等待后台线程完成后返回
// Close close fanout
func (c *Fanout) Close() error {
if err := c.ctx.Err(); err != nil {
return err
}
c.cancel()
c.waiter.Wait()
return nil
}

FAN-IN,怎么从多个通道读取数据呢?

常见的,可以使用WaitGroup来保证取完所有channel数据。