上次我写了通过protoreflect的方法,自动代理服务,可以摆脱开proto文件的限制,这样网关可以将这些服务都自动代理起来。更进一步的,像现在微服务框架,如Kratos、ego、go-zero都兼顾根据proto文件上,根据google/api/annotations.proto增加的注解,生成HTTP的服务,而这些属性注释,网关都可以通过反射拿到,然后代理其HTTP。

除此之外,HTTP —> Gateway —> GRPC 有没有省事点的方法呢?可以直接将HTTP/1.1协议转成GRPC兼容,然后将返回再翻译为HTTP/1.1。

协议数据修改

这是偶然看到go-kratos/gateway中实现的http—>h2所使用的技术。

这里利用了GRPC协议中为了兼容一些HTTP1.1端,在消息体的格式中,可使用json格式进行序列化(application/grpc+json )。

在使用此 Content-Type 时,gRPC 服务器会将 JSON 格式的数据转换为 Protocol Buffers 格式的数据,然后进行处理。所以使用什么数据格式传输,并不影响其协议依然为HTTP/2。

代码分析

不想看代码,可以直接看总结中看如何实现的。

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# This is a gateway config.
name: helloworld
version: v1
middlewares: 公共的中间件,作用于所有路由
- name: logging
- name: transcoder
endpoints:
- path: /helloworld/*
timeout: 1s
protocol: HTTP
host: localhost
backends:
- target: '127.0.0.1:8000' 直连后端服务
# - target: 'discovery:///bbs'
middlewares: 私有中间件
- name: circuitbreaker
options:
'@type': type.googleapis.com/gateway.middleware.circuitbreaker.v1.CircuitBreaker
successRatio: {"success":0.6, "request":"1", "bucket":"10", "window":"3s"}
backupService: {"endpoint":{"backends":[{"target":"127.0.0.1:8001"}]}}
assertCondtions:
- {"by_status_code":"200"}
- path: /helloworld.Greeter/*
method: POST
timeout: 1s
protocol: GRPC 协议,将决定gateway以什么样的client去请求后端服务
backends:
- target: '127.0.0.1:9000'
retry:
attempts: 3
perTryTimeout: 0.1s
conditions:
- byStatusCode: '502-504'
- byHeader:
name: 'Grpc-Status'
value: '14'

proxy构建

这里来看下来看下kratos-gateway的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
go-kratos/gateway/cmd/gateway/main.go
clientFactory := client.NewFactory(makeDiscovery())
go-kratos/gateway/client/factory.go 返回一个client生成的工厂函数,用于生成请求Grpc或http的client,根据config上定义的endpoint。

// NewFactory new a client factory.
func NewFactory(r registry.Discovery, opts ...Option) Factory {
o := &options{
pickerBuilder: p2c.NewBuilder(),
}
for _, opt := range opts {
opt(o)
}
return func(endpoint *config.Endpoint) (http.RoundTripper, error) {
// 负载均衡选择实例
picker := o.pickerBuilder.Build()
ctx, cancel := context.WithCancel(context.Background())
applier := &nodeApplier{
cancel: cancel,
endpoint: endpoint,
registry: r,
}
// 创建client实例
if err := applier.apply(ctx, picker); err != nil {
return nil, err
}
return newClient(applier, picker), nil
}
}

// applier.apply
func (na *nodeApplier) apply(ctx context.Context, dst selector.Selector) error {
var nodes []selector.Node
for _, backend := range na.endpoint.Backends {
target, err := parseTarget(backend.Target)
if err != nil {
return err
}

switch target.Scheme {
// - target: '127.0.0.1:8000'
case "direct":
node := newNode(backend.Target, na.endpoint.Protocol, weighted, map[string]string{})
nodes = append(nodes, node)
dst.Apply(nodes)
// # - target: 'discovery:///bbs'
case "discovery":
....

default:
return fmt.Errorf("unknown scheme: %s", target.Scheme)
}
}
return nil
}

// newNode,创建http/2或http/1.1的客户端 config中配置了GRPC
func newNode(addr string, protocol config.Protocol, weight *int64, md map[string]string) *node {
node := &node{
protocol: protocol,
address: addr,
weight: weight,
metadata: md,
}
if protocol == config.Protocol_GRPC {
node.client = _globalH2Client
} else {
node.client = _globalClient
}
return node
}

func defaultH2Client() *http.Client {
return &http.Client{
CheckRedirect: defaultCheckRedirect,
Transport: &http2.Transport{
// So http2.Transport doesn't complain the URL scheme isn't 'https'
AllowHTTP: true,
DisableCompression: true,
// Pretend we are dialing a TLS endpoint.
// Note, we ignore the passed tls.Config
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.DialTimeout(network, addr, _dialTimeout)
},
},
}
}

这里使用了库net/http2,是Golang标准库中用于支持HTTP/2协议的包,它提供了HTTP/2客户端和服务器的实现,以及与HTTP/1.x的兼容性支持。

从这里也就能看到实际上在gateway中的客户端请求的实例了,gateway通过它来请求到后端实际的服务。到这里还得看HTTP request是怎么转化到GRPC请求的。

继续看Gateway怎么构建proxy的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
	go-kratos/gateway/cmd/gateway/main.go
p.Update(bc) 根据配置路由信息,进行构建
buildEndpoint

func (p *Proxy) buildEndpoint(e *config.Endpoint, ms []*config.Middleware) (http.Handler, error) {
// 生成实际访问的客户端实例
tripper, err := p.clientFactory(e)
// 实例请求时增加拦截函数(公有的,log、trace、熔断器等)
tripper, err = p.buildMiddleware(e.Middlewares, tripper)
// 实例请求时增加拦截函数(私有,为这一路由独有的)
tripper, err = p.buildMiddleware(ms, tripper)

return http.Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
startTime := time.Now()
setXFFHeader(req)

// 把config信息带到每一个中间件函数中
reqOpts := middleware.NewRequestOptions(e)
ctx := middleware.NewRequestContext(req.Context(), reqOpts)

var resp *http.Response
for i := 0; i < retryStrategy.attempts; i++ {
......

// 实际请求的地方,会从中间件一层一层请求到实际的client请求后端服务
resp, err = tripper.RoundTrip(req.Clone(tryCtx))

......
}

doCopyBody := func() bool {

....

return true
}
doCopyBody()
requestsTotalIncr(labels, resp.StatusCode)
})), nil
}

请求—协议转化

实际请求时, 将进行协议中间件进行转化。

协议转化在go-kratos/gateway/middleware/transcoder/transcoder.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Middleware is a gRPC transcoder.
func Middleware(c *config.Middleware) (middleware.Middleware, error) {
return func(next http.RoundTripper) http.RoundTripper {
return middleware.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
ctx := req.Context()
contentType := req.Header.Get("Content-Type")
// 拿到配置信息
endpoint, _ := middleware.EndpointFromContext(ctx)
// 该路由请求到的后端服务是否为GRPC,是或者其请求格式已经为GRPC,不需要进行下面转化操作
if endpoint.Protocol != config.Protocol_GRPC || strings.HasPrefix(contentType, "application/grpc") {
return next.RoundTrip(req)
}
b, err := io.ReadAll(req.Body)
if err != nil {
return nil, err
}
bb := make([]byte, len(b)+5)
binary.BigEndian.PutUint32(bb[1:], uint32(len(b)))
copy(bb[5:], b)
// content-type:
// - application/grpc+json
// - application/grpc+proto
// 将数据格式的定义进行转化
req.Header.Set("Content-Type", "application/grpc+"+strings.TrimLeft(contentType, "application/"))
req.Header.Del("Content-Length")
req.ContentLength = int64(len(bb))
req.Body = ioutil.NopCloser(bytes.NewReader(bb))
resp, err := next.RoundTrip(req)
if err != nil {
return nil, err
}
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
// Convert HTTP/2 response to HTTP/1.1
// Trailers are sent in a data frame, so don't announce trailers as otherwise downstream proxies might get confused.
// HTTP trailers是HTTP消息的一部分,用于传输HTTP消息的元数据,例如消息的哈希值、压缩信息或长度等。
for trailerName, values := range resp.Trailer {
resp.Header[trailerName] = values
}
resp.Trailer = nil
resp.Header.Set("Content-Type", contentType)

// 不为0即异常。grpc-status转化为http code,错误详情:grpc-status-details-bin,grpc-message
// grpc-status-details-bin是一个二进制元数据,它包含有关gRPC调用失败的更多详细信息,例如错误代码、错误消息和堆栈跟踪等。这些详细信息可以由gRPC客户端用于诊断和调试问题。
// grpc-message是一个字符串元数据,其中包含有关gRPC调用失败的简要信息,例如错误消息。它通常用于向终端用户报告错误。
// 当gRPC调用失败时,gRPC服务器会将grpc-status-details-bin和grpc-message元数据作为响应的一部分发送回gRPC客户端。客户端可以访问这些元数据,以了解有关失败的更多信息,以及如何解决问题。
if grpcStatus := resp.Header.Get("grpc-status"); grpcStatus != "0" {
code, err := strconv.ParseInt(grpcStatus, 10, 64)
if err != nil {
return nil, err
}
st := &spb.Status{
Code: int32(code),
Message: resp.Header.Get("grpc-message"),
}
if grpcDetails := resp.Header.Get("grpc-status-details-bin"); grpcDetails != "" {
details, err := decodeBinHeader(grpcDetails)
if err != nil {
return nil, err
}
if err = proto.Unmarshal(details, st); err != nil {
return nil, err
}
}
data, err := protojson.Marshal(st)
if err != nil {
return nil, err
}
return newResponse(200, resp.Header, data)
}

// grpc返回数据前5个字节代表数据长度。
resp.Body = ioutil.NopCloser(bytes.NewReader(data[5:]))
resp.ContentLength = int64(len(data) - 5)
// Any content length that might be set is no longer accurate because of trailers.
// 不再准确的原因是,HTTP/2的header是不体现在length,所以不能单纯的直接进行替换
resp.Header.Del("Content-Length")
return resp, nil
})
}, nil
}

总结

通过GRPC协议中为了兼容HTTP/1.1协议所给出传输格式application/grpc+json,使用golang.org/x/net/http2,通过修改HTTP请求头,可以做到使用HTTP访问到GRPC服务。以下是更简略的版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func Test_defaultH2Client(t *testing.T) {
cli := &http.Client{
CheckRedirect: defaultCheckRedirect,
Transport: &http2.Transport{
// So http2.Transport doesn't complain the URL scheme isn't 'https'
AllowHTTP: true,
DisableCompression: true,
// Pretend we are dialing a TLS endpoint.
// Note, we ignore the passed tls.Config
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.DialTimeout(network, addr, _dialTimeout)
},
},
}
resp, err := cli.Post("http://127.0.0.1:9000/helloworld.Greeter/SayHello",
"application/grpc+json", strings.NewReader(`{"hello": "2131"}`))
if err != nil {
fmt.Println(err)
return
}
fmt.Println(resp.Status)
}