在之前的博客中,对服务器的上传下载就写过一些备忘,当时项目是对文件进行操作,涉及到一些网络文件的基础操作,由于没得网络存储,也就是依靠云的磁盘来存储数据文件。

这里记录一下分块上传,也就是断点续传的实现。

大概可以分为三个步骤:

  1. 尝试秒传接口;
  2. 初始化分块上传信息;
  3. 客户端开始进行分块上传;
  4. 客户端通知服务器完成分块,服务器进行分块合并操作。

分块上传

1、尝试秒传

秒传是利用文件的哈希值,来对文件进行唯一值匹配,若能够匹配成功,则说明文件已被上传,只需要插入下mysql里面的数据即可,这也就是秒传的意思。

这里在实现秒传时,在开源项目BaiduPCS-Go中,是需要:

1
2
3
4
5
6
7
8
map[string]string{
"path": targetPath, // 上传文件的全路径名
"content-length": strconv.FormatInt(length, 10), // 待秒传的文件长度
"content-md5": contentMD5, // 待秒传的文件的MD5
"slice-md5": sliceMD5, // 待秒传的文件前256kb的MD5
"content-crc32": crc32, // 待秒传文件CRC32
"ondup": "overwrite", // overwrite: 表示覆盖同名文件; newcopy: 表示生成文件副本并进行重命名,命名规则为“文件名_日期.后缀”
}

不过在github.com/iikira/BaiduPCS-Go/blob/master/pcsutil/checksum/checksum.go文件中:

作者表示,这个前256kb的MD5CRC32不是必须。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 314行

// 文件大于256kb, 应该要检测秒传, 反之则不应检测秒传
// 经测试, 秒传文件并非一定要大于256KB
if task.uploadInfo.Length >= requiredSliceSize {
// do nothing
}

// 经过测试, 秒传文件并非需要前256kb切片的md5值, 只需格式符合即可
task.uploadInfo.SliceMD5Sum()

// 经测试, 文件的 crc32 值并非秒传文件所必需
// task.uploadInfo.Crc32Sum()

2、初始化分块上传信息

当客户端需要上传文件时,首先会向服务端一个接口传递文件信息,这里我们可以获取:

1
2
* @apiParam filehash  string	文件计算的哈希计算值
* @apiParam filesize int 文件大小

在这步操作中,我们需要填充这个结构体。

1
2
3
4
5
6
7
type MultipleUploadInfo struct {
FileHash string
FileSize int
UploadID string
ChunkSize int
ChunkCount int
}

例如:

1
2
3
4
5
6
7
8
9
upInfo := &MultipartUploadInfo{
FileHash: filehash,
FileSize: filesize,
// UploadID,我在代码里面是使用的函数NewObjectId(),也就是MongoDB的ID生成方式,这个我在之前的博客中也做了备忘。
UploadID: username + fmt.Sprintf("%x", time.Now().UnixNano()),
ChunkSize: 5 << 10 << 10, // 5MB
// 5MB 为一个分块,可以根据服务器性能适当调整
ChunkCount: int(math.Ceil(float64(filesize) / (5 << 10 << 10))),
}

然后需要将这个数据保存下来,因为在后面的使用中需要使用到。

可以存储在内存中,使用map进行管理(我之前是使用这种方式,所以UploadID我得保持唯一),同样的也可以使用RedisMemcached,我这里使用到了Map,项目里头有变化。

此结构体需要返回给客户端。

3、分块上传

客户端按照分块上传信息,分块读取文件流上传给服务端,按照ChunkSize进行读取。

上传时,附带上此次上传的Chunk次数。

服务器将这些数据保存在一个临时目录下,按先后Chunk顺序进行命名,方便合并时使用。

服务器同时也需要记录该UploadID上传的Chunk次数

4、合并

客户端上传完成后,请求该接口(当然,也可以服务器每次接受分块都去检查分块是否已经上传完了,或者是客户端在最后一次上传时,告知服务器这是最后一次上传,这里为了拆开逻辑,故分开接口)。

1
2
3
4
5
6
7
// 1、验证ChunkCount是否匹配;

// 2、验证分块文件是否对的上ChunkCount;

// 3、新建一个文件,将各个分块文件,以追加的方式写入文件,不需要一次性读取到内存,使用io.copy();

// 4、上传都完成后,将文件信息保存到MySQL中。

断点续传/断点下载

这两者可以说都是上面那个过程,操作都是类似的,没有什么不一样,同样是控制分块信息,控制分块上传与下载,即使中间终端,也可以重新确认信息后接上上一次中断的过程继续开始操作。

这里我偷懒没有将代码拿出来进行展示,这里仅展示逻辑了。

记录代码

github.com/iikira/BaiduPCS-Go/blob/master/pcsutil/checksum/checksum.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// Package checksum 校验本地文件包
package checksum

import (
"crypto/md5"
"fmt"
"github.com/iikira/BaiduPCS-Go/pcsutil/converter"
"hash"
"hash/crc32"
"io"
"os"
)

const (
defaultBufSize = 256 * converter.KB
)

type (
// LocalFileMeta 本地文件元信息
LocalFileMeta struct {
Path string `json:"path"` // 本地路径
Length int64 `json:"length"` // 文件大小
SliceMD5 []byte `json:"slicemd5"` // 文件前 requiredSliceLen (256KB) 切片的 md5 值
MD5 []byte `json:"md5"` // 文件的 md5
CRC32 uint32 `json:"crc32"` // 文件的 crc32
ModTime int64 `json:"modtime"` // 修改日期
}

// LocalFileInfo LocalFile
LocalFile struct {
LocalFileMeta

bufSize int
buf []byte
File *os.File // 文件
}

// SumConfig 计算文件摘要值配置
SumConfig struct {
IsMD5Sum bool
IsSliceMD5Sum bool
IsCRC32Sum bool
}
)

func NewLocalFileInfo(localPath string, bufSize int) *LocalFile {
return &LocalFile{
LocalFileMeta: LocalFileMeta{
Path: localPath,
},
bufSize: bufSize,
}
}

// OpenPath 检查文件状态并获取文件的大小 (Length)
func (lf *LocalFile) OpenPath() error {
if lf.File != nil {
lf.File.Close()
}

var err error
lf.File, err = os.Open(lf.Path)
if err != nil {
return err
}

info, err := lf.File.Stat()
if err != nil {
return err
}

lf.Length = info.Size()
lf.ModTime = info.ModTime().Unix()
return nil
}

// Close 关闭文件
func (lf *LocalFile) Close() error {
if lf.File == nil {
return fmt.Errorf("file is nil")
}

return lf.File.Close()
}

func (lf *LocalFile) initBuf() {
if lf.buf == nil {
if lf.bufSize != 0 {
lf.buf = make([]byte, lf.bufSize)
return
}

lf.buf = make([]byte, defaultBufSize)
}
}

func (lf *LocalFile) repeatRead(ws ...io.Writer) {
if lf.File == nil {
return
}

lf.initBuf()

var (
begin int64
n int
err error
)

handle := func() {
begin += int64(n)
for k := range ws {
ws[k].Write(lf.buf[:n])
}
}

// 读文件
for {
n, err = lf.File.ReadAt(lf.buf, begin)
if err != nil {
if err == io.EOF {
handle()
} else {
fmt.Printf("%s\n", err)
}
break
}

handle()
}
}

// Sum 计算文件摘要值
func (lf *LocalFile) Sum(cfg SumConfig) {
var (
md5w hash.Hash
crc32w hash.Hash32
)

ws := make([]io.Writer, 0, 2)
if cfg.IsMD5Sum {
md5w = md5.New()
ws = append(ws, md5w)
}
if cfg.IsCRC32Sum {
crc32w = crc32.NewIEEE()
ws = append(ws, crc32w)
}
if cfg.IsSliceMD5Sum {
lf.SliceMD5Sum()
}

lf.repeatRead(ws...)

if cfg.IsMD5Sum {
lf.MD5 = md5w.Sum(nil)
}
if cfg.IsCRC32Sum {
lf.CRC32 = crc32w.Sum32()
}
}

// Md5Sum 获取文件的 md5 值
func (lf *LocalFile) Md5Sum() {
lf.Sum(SumConfig{
IsMD5Sum: true,
})
}

// SliceMD5Sum 获取文件前 requiredSliceLen (256KB) 切片的 md5 值
func (lf *LocalFile) SliceMD5Sum() {
if lf.File == nil {
return
}

// 获取前 256KB 文件切片的 md5
lf.initBuf()

m := md5.New()
n, err := lf.File.ReadAt(lf.buf, 0)
if err != nil {
if err == io.EOF {
goto md5sum
} else {
fmt.Printf("SliceMD5Sum: %s\n", err)
return
}
}

md5sum:
m.Write(lf.buf[:n])
lf.SliceMD5 = m.Sum(nil)
}

// Crc32Sum 获取文件的 crc32 值
func (lf *LocalFile) Crc32Sum() {
lf.Sum(SumConfig{
IsCRC32Sum: true,
})
}

github.com/iikira/BaiduPCS-Go/blob/master/pcsutil/checksum/file.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package checksum

import (
"bytes"
"fmt"
"os"
"path/filepath"
)

// EqualLengthMD5 检测md5和大小是否相同
func (lfm *LocalFileMeta) EqualLengthMD5(m *LocalFileMeta) bool {
if lfm.Length != m.Length {
return false
}
if bytes.Compare(lfm.MD5, m.MD5) != 0 {
return false
}
return true
}

// CompleteAbsPath 补齐绝对路径
func (lfm *LocalFileMeta) CompleteAbsPath() {
if filepath.IsAbs(lfm.Path) {
return
}

absPath, err := filepath.Abs(lfm.Path)
if err != nil {
return
}

lfm.Path = absPath
}

// GetFileSum 获取文件的大小, md5, 前256KB切片的 md5, crc32
func GetFileSum(localPath string, cfg *SumConfig) (lf *LocalFile, err error) {
file, err := os.Open(localPath)
if err != nil {
return nil, err
}

defer file.Close()

fileStat, err := file.Stat()
if err != nil {
return nil, err
}
if fileStat.IsDir() {
return nil, fmt.Errorf("sum %s: is a directory", localPath)
}

lf = &LocalFile{
File: file,
LocalFileMeta: LocalFileMeta{
Path: localPath,
Length: fileStat.Size(),
},
}

lf.Sum(*cfg)

return lf, nil
}