最近在项目中要考虑一个接口幂等问题,幂等既是,多次请求请求接口,接口操作的东西都不会被其后续的重复请求所改变,维持第一次请求成功时的样子。

例如查询操作,不管查询多少次,在数据不变的情况下,查询到的数据都是一样的;删除一条数据也是,删一次和多次,都已经将数据删除了。

总结

幂等的很多结论以及方法都已经是个老话题了,看这个博客总结的,方法和总结基本都全说了:


幂等与你是不是分布式高并发还有JavaEE都没有关系。关键是你的操作是不是幂等的

一个幂等的操作典型如:把编号为5的记录的A字段设置为0这种操作不管执行多少次都是幂等的。

一个非幂等的操作典型如:把编号为5的记录的A字段增加1这种操作显然就不是幂等的。

要做到幂等性,从接口设计上来说不设计任何非幂等的操作即可。

譬如说需求是:当用户点击赞同时,将答案的赞同数量+1。改为:当用户点击赞同时,确保答案赞同表中存在一条记录,用户、答案。赞同数量由答案赞同表统计出来。


分析

达成幂等,这里选用幂等号来完成,这里会碰到一些异常情况。

首先我们要清楚,接口请求大体分为三个阶段:

  1. 调用方发起请求,且被对方接收到;
  2. 执行接口处理逻辑;
  3. 执行返回;

1、3阶段对整体访问来说无伤大雅,总体上,幂等号都已经被设置上了,但在阶段二,就有可能出现幂等号保存了,结果业务挂了,导致数据不一致,这样会让调用方的逻辑造成迷惑。请求并没有成功,但重试请求却无法执行了。

总结一下:

  • 业务代码异常处理,是否阻止该接口继续被调用,由开发自行设置;
  • 业务系统宕机处理,会造成数据不一致问题,前面提到了,使用手动补偿,或者MySQL事务的方式人工补偿或避免该问题的产生;
  • 幂等组件处理,例如redis挂了,使用人工补偿的方式的话,redis作为最要组件宕机,正常服务应直接降级,等待修复;

实现

代码:https://github.com/younglifestyle/idempotence

手动补偿失败

保存幂等号,可以直接存Redis,但是会有数据不一致的风险。这里在设计模式之美中提到一句话,做工程不是做理论。对于这种极少发生的异常,在工程中,我们能够做到,在出错时能及时发现问题、能够根据记录的信息人工修复就可以了。

作为一种做工程上的取舍平衡,这个是值得思考的。这里可以记录下SQL的执行日志,在日志中再附加上幂等号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package idempotence_v1

import (
"github.com/go-redis/redis"
"github.com/google/uuid"
"time"
)

type Idempotence interface {
// GenerateId 生成或拿取唯一识别ID
GenerateId() string

IdempotenceStorage
}

type IdempotenceStorage interface {
// SaveIfAbsent 保存唯一ID
SaveIfAbsent(idempotenceId string) bool

// Delete 删除幂等ID
Delete(idempotenceId string) (result bool)
}

type RedisIdempotenceImpl struct {
conn *redis.Client
}

// NewRedisIdempotence 复用Redis链接,不在内部创建
func NewRedisIdempotence(conn *redis.Client) IdempotenceStorage {
return &RedisIdempotenceImpl{conn: conn}
}

func (idem *RedisIdempotenceImpl) GenerateId() string {
return uuid.New().String()
}

// SaveIfAbsent 根据返回值判断幂等ID是否有存在
func (idem *RedisIdempotenceImpl) SaveIfAbsent(idempotenceId string) bool {
// setnx : 不加ExpireTime是原子的,加ExpireTime原子操作要用LUA脚本才是原子操作
// 相当于执行了setnx后,再设置过期时间.
// 故Redis在设置key后崩溃,ExpireTime是加不上的,
// 不过后面的DEL操作肯定也报错了,加上打印,以及错误上报(Prometheus),让人工进行干预
return idem.conn.SetNX(idempotenceId, 1, time.Second*3000).Val()
}

// Delete 失败的情况,应将失败SQL和幂等ID打印出来,以预期人工干预来进行补偿
func (idem *RedisIdempotenceImpl) Delete(idempotenceId string) bool {
err := idem.conn.Del(idempotenceId).Err()
if err != nil {
return false
}
return true
}

MySQL事务

将幂等号插入实际业务表中,或者新建一张幂等表,将幂等号设置为唯一键,通过MySQL事务关联,这里的代码会与业务代码耦合在一起,无法将之分开。代码量也会增加很多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// $ docker run -p 6379:6379 --name my-redis -d redis:latest
// $ docker run -p 3306:3306 --name my-mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest

package main

import (
"github.com/go-redis/redis"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/singleflight"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/schema"
"idempotence_v2/model"
"sync"
"time"
)

var cacheSingleFlight = &singleflight.Group{}

func getDbConn() (db *gorm.DB, err error) {
dsn := "root:123456@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local"

db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{
NamingStrategy: schema.NamingStrategy{SingularTable: true}})

_ = db.AutoMigrate(&model.StudentMoney{})
return
}

// 返回返回值给调用者使用,保存redis失败无很大问题,再次请求再传一次即可
func saveIdToRedis(rdb *redis.Client, idempotenceId string) bool {

return rdb.SetNX(idempotenceId, 1, time.Second*20).Val()
}

//
func getIdFromRedis(rdb *redis.Client, idempotenceId string) string {

return rdb.Get(idempotenceId).Val()
}

// 学生参与劳动,即可获得奖励,创建转账记录
func giveTipToStudent(db *gorm.DB, rdb *redis.Client, studentMoney *model.StudentMoney) error {

// 并发量大时,这里是会同时找不到ID,且从DB中也捞不出DB的
if getIdFromRedis(rdb, studentMoney.IdempotenceId) == "" {
// 缓存查不到就查数据库
rr, _, _ := cacheSingleFlight.Do(studentMoney.IdempotenceId, func() (r interface{}, e error) {
log.Debug("cache miss : ", studentMoney.IdempotenceId)

var stuMoneyExistInfo model.StudentMoney
err := db.Select("idempotence_id").First(&stuMoneyExistInfo,
"idempotence_id = ?", studentMoney.IdempotenceId).Error
if err != nil && err != gorm.ErrRecordNotFound {
log.Errorf("select idempotence_id From student_money WHERE idempotence_id = %s, error = %s",
studentMoney.IdempotenceId, err.Error())
}

return stuMoneyExistInfo, nil
})

stuMoneyExistInfo := rr.(model.StudentMoney)
// id exist in db
if stuMoneyExistInfo.IdempotenceId != "" {
log.Debug("get id from db : ", stuMoneyExistInfo.IdempotenceId)

_ = saveIdToRedis(rdb, studentMoney.IdempotenceId)
return nil
}
} else {
// 缓存中存在幂等值,不执行
log.Debug("id exist")
return nil
}

if err := db.Transaction(func(tx *gorm.DB) error {
log.Debug("store id : ", studentMoney.IdempotenceId)

if err := tx.Create(studentMoney).Error; err != nil {
// 返回任何错误都会回滚事务
return err
}

// 返回 nil 提交事务
return nil
}); err != nil {

return err
}

_ = saveIdToRedis(rdb, studentMoney.IdempotenceId)

return nil
}

func main() {
log.SetLevel(log.DebugLevel)

rdb := redis.NewClient(&redis.Options{
Addr: ":6379",
})

dbConn, err := getDbConn()
if err != nil {
panic(err)
}

wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)

go func() {
//time.Sleep(time.Millisecond * 100)
err = giveTipToStudent(dbConn, rdb, &model.StudentMoney{
//IdempotenceId: GenerateId(),
IdempotenceId: "b533516b-744c-41b6-b4d6-ba42bfc13fed",
Name: "right",
Age: 18,
Money: 100,
})

wg.Done()
}()
}

wg.Wait()

time.Sleep(time.Second * 10)
}

ID的生成可以是内部产生,也可以是依赖外部服务,保持绝对唯一,例如使用美团Leaf。为了简单易用,且减少依赖,且根据服务请求量来评估,选前者会更易于维护一些。