内部一直需要使用消息队列,所以这边考虑写一个中间件简化这些代码交互,将交互统统归化统一为单纯的发送/接受处理函数。最近也算是完成了中间件CS端的编写了。

消息中间件的作用是为了简化应用程序对消息队列的交互,让应用程序更多的关心代码逻辑,不用关心Kafka的操作。兼容Redis协议,可以更为方便去掉写客户端的麻烦事(可以使用Redis客户端,后面会提)。

Redis协议解析

兼容Redis协议redisC/S使用的是TCP连接,协议编码为二进制,类似于这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"

通过检查服务器发回数据的第一个字节, 可以确定这个回复是什么类型:

状态回复(status reply)的第一个字节是 "+"

错误回复(error reply)的第一个字节是 "-"

整数回复(integer reply)的第一个字节是 ":"

批量回复(bulk reply)的第一个字节是 "$"

多条批量回复(multi bulk reply)的第一个字节是 "*"

也就是在处理逻辑上,套一层协议解析。像Go Redis客户端肯定是都实现了协议解析的,但有更方便的做法,使用**redcon**。这个库帮我们做掉了协议套壳的那一层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"log"
"strings"
"sync"

"github.com/tidwall/redcon"
)

var addr = ":6380"

func main() {
var mu sync.RWMutex
var items = make(map[string][]byte)
var ps redcon.PubSub
go log.Printf("started server at %s", addr)
err := redcon.ListenAndServe(addr,
func(conn redcon.Conn, cmd redcon.Command) {
// 参数处理逻辑
switch strings.ToLower(string(cmd.Args[0])) {
default:
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
},
// 设置标志位,置1 拒绝连接接入
func(conn redcon.Conn) bool {
// Use this function to accept or deny the connection.
// log.Printf("accept: %s", conn.RemoteAddr())
return true
},

// close时,清除Kafka相关资源
func(conn redcon.Conn, err error) {
// This is called when the connection has been closed
// log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err)
},
)
if err != nil {
log.Fatal(err)
}
}

Kafka交互

Kafka交互使用的库是:sarama

以前sarama这块实现缺的地方是对offset的更多底层操作,没将mark权限暴露给开发者来做。需要借助另一个库sarama-cluster 来做。但现在它已经把这块给做掉了,所以单用这个库是没有什么问题的。

实际操作参考官方库案例:https://github.com/Shopify/sarama/blob/main/examples/consumergroup/main.go

客户端

客户端的编写会是比较麻烦的点,应该中间件使用的是redis协议,那客户端需要也兼容它,那我们怎么兼容呢?

由于服务端使用了Redis兼容,那客户端,是可以直接使用Redis的库的,比如go-redis/redis。选用这个库是因为其把连接池和重试策略都做了,这样我们可以复用这个连接库,往里面添加自己的逻辑代码即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
redis.Options{
//连接信息
Network: "tcp", //网络类型,tcp or unix,默认tcp
Addr: "127.0.0.1:6379", //主机名+冒号+端口,默认localhost:6379
Password: "", //密码, 发送auth指令,带着ps参数,可作为中间件验证

//连接池容量及闲置连接数量
PoolSize: 15, // 连接池最大socket连接数,默认为4倍CPU数, 4 * runtime.NumCPU
MinIdleConns: 10, //在启动阶段创建指定数量的Idle连接,并长期维持idle状态的连接数不少于指定数量;。

//超时
DialTimeout: 5 * time.Second, //连接建立超时时间,默认5秒。
ReadTimeout: 3 * time.Second, //读超时,默认3秒, -1表示取消读超时
WriteTimeout: 3 * time.Second, //写超时,默认等于读超时
PoolTimeout: 4 * time.Second, //当所有连接都处在繁忙状态时,客户端等待可用连接的最大等待时长,默认为读超时+1秒。

//闲置连接检查包括IdleTimeout,MaxConnAge
IdleCheckFrequency: 60 * time.Second, //闲置连接检查的周期,默认为1分钟,-1表示不做周期性检查,只在客户端获取连接时对闲置连接进行处理。
IdleTimeout: 5 * time.Minute, //闲置超时,默认5分钟,-1表示取消闲置超时检查
MaxConnAge: 0 * time.Second, //连接存活时长,从创建开始计时,超过指定时长则关闭连接,默认为0,即不关闭存活时长较长的连接

//命令执行失败时的重试策略
MaxRetries: 0, // 命令执行失败时,最多重试多少次,默认为3
MinRetryBackoff: 8 * time.Millisecond, //每次计算重试间隔时间的下限,默认8毫秒,-1表示取消间隔
MaxRetryBackoff: 512 * time.Millisecond, //每次计算重试间隔时间的上限,默认512毫秒,-1表示取消间隔
}

总结

最好,整体写完中间件全部服务还是花了一些功夫,主要是一些细节需要抠,特别是Kafka sarama的使用,大家还是优先使用sarama-cluster,我看了一些仓库使用sarama的方式,里面其实也有一些细节处有问题,但可能其他库的使用环境不会触发那些个问题把。

另外就是使用redis库时,中间件不回发数据,客户端其会重试发送数据,尝试获取返回结果,所以返回数据时需要妥善处理。