概述

在使用网关的过程中,可能需要对已有功能进行一些扩展,这个时候就需要对网关进行一些改造工作,在gateway中,除了一些核心模块外,相关的功能模块都采用插件的形式进行实现,例如:黑白名单,熔断器,JWT,限流等,所以,在功能上的扩展上是可以按照插件的形式进行添加的,这也是gateway作者推荐的操作。

接口interface

插件的实现依赖于gateway给出的两个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Context filter context
type Context interface {
StartAt() time.Time
EndAt() time.Time

OriginRequest() *fasthttp.RequestCtx
ForwardRequest() *fasthttp.Request
Response() *fasthttp.Response

API() *metapb.API
DispatchNode() *metapb.DispatchNode
Server() *metapb.Server
Analysis() *util.Analysis

SetAttr(key string, value interface{})
GetAttr(key string) interface{}
}

// BaseFilter base filter support default implemention
type BaseFilter struct{}

// Init init filter
func (f BaseFilter) Init(cfg string) error {
return nil
}

// Pre execute before proxy
func (f BaseFilter) Pre(c Context) (statusCode int, err error) {
return fasthttp.StatusOK, nil
}

// Post execute after proxy
func (f BaseFilter) Post(c Context) (statusCode int, err error) {
return fasthttp.StatusOK, nil
}

// PostErr execute proxy has errors
func (f BaseFilter) PostErr(c Context) {

}

这些相关的定义都在github.com/fagongzi/gateway/pkg/filter包中,每一个Filter都需要导入。其中的Context的上下文接口,提供了Filter和Gateway交互的能力;BaseFilter定义了默认行为。

在实际代码中,Context可以让插件在全局获取到route请求中的数据,例如API绑定到网关时,用户填写的相关配置——API(),HTTP请求的request——ForwardRequest(),后台接口返回的结果——Response()

BaseFilter作为插件的响应动作,并不强制实现所有函数,这里只有两个比较重要的函数需要被关注到,PrePostPre作为网关在转发请求到后台时的前置性操作,例如黑名单功能插件就需要在Pre函数中判断URI是否匹配预先设置的正则表达式。例如接下来需要实现的一个webhook功能,可能就需要在post中进行一些操作,根据接口返回的一些信息来决定是否访问一些接口。

Request处理流程

request -> filter预处理 -> 转发请求 -> filter后置处理 -> 响应客户端

整个逻辑处理符合以下规则:

  • filter预处理返回错误,流程立即终止,并且使用filter返回的状态码响应客户端
  • filter后置处理返回错误,使用filter返回的状态码响应客户端
  • 转发请求,后端返回的状态码>=500,调用filter的错误处理接口

这里直接截取gateway官方的帮助文档中的内容,代码所体现的内容正如流程所说的。

main ——>

proxy.go

Start()——> ServeFastHTTP ——> p.doProxy

1
2
3
4
5
6
7
// pre filters  轮询调用所有注册过的插件的pre函数
filterName, code, err := p.doPreFilters(c)
......

// post filters 轮询调用所有注册过的插件的post函数
filterName, code, err = p.doPostFilters(c)
......

编写插件

预先设定一个简单的需求,例如,在一些特定的接口访问成功后,访问一个特定的地址,传输一些信息给这个地址。

插件操作简单,不需要一些附加的逻辑,所有编写这个插件,只需要完成BaseFilter的相关函数即可,这里先完成一个大概的框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// WebHookFilter is an event handler that triggers web hooks
type WebHookFilter struct {
filter.BaseFilter
}

// Name name
func (w *WebHookFilter) Name() string {
return FilterWebHook
}

func newWebHookFilter() filter.Filter {
return &WebHookFilter{}
}

// Post execute after proxy
func (w *WebHookFilter) Post(c filter.Context) (statusCode int, err error) {

if len(c.API().GetTags()) == 0 {
return fasthttp.StatusBadRequest, err
}

var isNeedWebHook bool
for _, tags := range c.API().GetTags() {
if tags.Name == "webhook" && tags.Value == "true" {
isNeedWebHook = true
}
}

// web url传递一个统一的地址?
if isNeedWebHook && c.Response().StatusCode() == http.StatusOK {
log.Info(Send(c.Response().Body())) // 处理逻辑
// go Send(c.Response().Body())
}

log.Info("webhook post exec!")

return fasthttp.StatusOK, nil
}

可以看到在post函数中,出现c.API().GetTags(),这里的tag是API设置时的一个可选填项,用于个性化标记一个API,用在这里是因为不想对已有的gateway参数进行增添,直接使用一个gateway的可选参数来定制自己的参数,算是一种偷懒少改代码的方式。

Post这里是一般压力的话,使用协程进行异步操作;若是访问比较频繁,就使用10个channel限制一下速度,慢慢发送数据;当然更顺畅的做法肯定是引入消息队列(MQ),不过当整个系统没这么复杂时,多上一个组件,可能也会造成一定量的维护负担(视业务来定)。

若使用代码来设置API(一般使用前端UI较为方便):

1
2
3
4
5
6
7
8
9
sb := c.NewAPIBuilder()
// 必选项
sb.Name("用户API")
// 设置URL规则,匹配所有开头为/api/user的请求
sb.MatchURLPattern("/hello/(.+)")
// 匹配GET请求
sb.MatchMethod("GET")
// 添加tag
sb.AddTag("webhook", "true")

到这里代码已完成一个大概的框架,可以看到编写逻辑非常简单,但若要插件插入gateway运行,还需要将插件信息注册gateway中,接口中的Name()需要和这里的name信息能够匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
--- "gateway\pkg\proxy\factory.go"

// Filter WebHook jwt filter
FilterWebHook = "WEBHOOK"

func (p *Proxy) newFilter(filterSpec *FilterSpec) (filter.Filter, error) {
......

switch input {
......
case FilterWebHook:
return newWebHookFilter(), nil
default:
return nil, ErrUnknownFilter
}
}

--- "gateway\cmd\proxy\proxy.go"

func init() {
......
defaultFilters.Set(proxy.FilterWebHook)
}

插件程序编写到这里,其实已经是可以正常运行了,但还缺了一个必要的环节,那就是可配置的参数信息,用于多机环境下进行工作,参数借助etcd进行分布式设置。

获取etcd配置

initDispatcher() ——> GetStoreFrom ——> fn, ok := supportSchema[schema] ——> getEtcdStoreFrom ——> NewEtcdStore ——> store.init(),到这里就配置好各个插件取etcd配置的函数了,需要动手添加函数

1
2
3
4
5
6
7
8
func (e *EtcdStore) init() {
e.watchMethodMapping[EventSrcBind] = e.doWatchWithBind
e.watchMethodMapping[EventSrcServer] = e.doWatchWithServer
e.watchMethodMapping[EventSrcCluster] = e.doWatchWithCluster
e.watchMethodMapping[EventSrcAPI] = e.doWatchWithAPI
e.watchMethodMapping[EventSrcRouting] = e.doWatchWithRouting
e.watchMethodMapping[EventSrcProxy] = e.doWatchWithProxy
}

函数是根据配置路径的前缀来匹配插件的配置的,而这些前缀同样也是需要提前写好的,需要动手添加函数NewEtcdStore 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
store := &EtcdStore{
prefix: prefix,
clustersDir: fmt.Sprintf("%s/clusters", prefix),
serversDir: fmt.Sprintf("%s/servers", prefix),
bindsDir: fmt.Sprintf("%s/binds", prefix),
apisDir: fmt.Sprintf("%s/apis", prefix),
proxiesDir: fmt.Sprintf("%s/proxies", prefix),
routingsDir: fmt.Sprintf("%s/routings", prefix),
idPath: fmt.Sprintf("%s/id", prefix),
watchMethodMapping: make(map[EvtSrc]func(EvtType, *mvccpb.KeyValue) *Evt),
base: 100,
end: 100,
}

接下来就是监听etcd的配置信息:

p.dispatcher.load() ——> go r.watch() ——> r.store.Watch(r.watchEventC, r.watchStopC) ——> e.doWatch() ——> e.evtCh <- e.watchMethodMapping[evtSrc](evtType, ev.Kv)

根据etcd出发的事件,判断是哪个插件的配置进行更改或是删除,然后调用在 store.init()中配置的取etcd配置的函数(还未处理),再通过channel发送出去,需要动手添加一个case处理自定义的插件

在这个流程中的go r.watch(),有着另一条分支路线,go r.readyToReceiveWatchEvent()这里也是需要我们添加函数处理的地方,用于根据etcd事件对参数进行实际处理(增删改)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (r *dispatcher) readyToReceiveWatchEvent() {
for {
evt := <-r.watchEventC

if evt.Src == store.EventSrcCluster {
r.doClusterEvent(evt)
} else if evt.Src == store.EventSrcServer {
r.doServerEvent(evt)
} else if evt.Src == store.EventSrcBind {
r.doBindEvent(evt)
} else if evt.Src == store.EventSrcAPI {
r.doAPIEvent(evt)
} else if evt.Src == store.EventSrcRouting {
r.doRoutingEvent(evt)
} else if evt.Src == store.EventSrcProxy {
r.doProxyEvent(evt)
} else {
log.Warnf("unknown event <%+v>", evt)
}
}
}

至此,配置参数信息被保存到了结构体dispatcher中,当然若是新增的配置,这里依然是需要在结构体dispatcher中新增一个结构体成员来保存用于自定义插件的配置的。

那如何使用这些配置呢?

在context中,插件所使用的函数由proxyContext实现,"gateway\pkg\proxy\filter.go",这里保存了dispatcher的相关信息,可以由开发者自由拿取数据。

至此,整个开发阶段全部完成,插件部分逻辑可随着业务更改而进行些许的更改,但整体插件的编写是不会出现太多变动的。