在之前的博客中,对服务器的上传下载就写过一些备忘,当时项目是对文件进行操作,涉及到一些网络文件的基础操作,由于没得网络存储,也就是依靠云的磁盘来存储数据文件。
这里记录一下分块上传,也就是断点续传的实现。
大概可以分为三个步骤:
- 尝试秒传接口;
- 初始化分块上传信息;
- 客户端开始进行分块上传;
- 客户端通知服务器完成分块,服务器进行分块合并操作。
分块上传
1、尝试秒传
秒传是利用文件的哈希值,来对文件进行唯一值匹配,若能够匹配成功,则说明文件已被上传,只需要插入下mysql里面的数据即可,这也就是秒传的意思。
这里在实现秒传时,在开源项目BaiduPCS-Go
中,是需要:
1 2 3 4 5 6 7 8
| map[string]string{ "path": targetPath, "content-length": strconv.FormatInt(length, 10), "content-md5": contentMD5, "slice-md5": sliceMD5, "content-crc32": crc32, "ondup": "overwrite", }
|
不过在github.com/iikira/BaiduPCS-Go/blob/master/pcsutil/checksum/checksum.go
文件中:
作者表示,这个前256kb的MD5
和CRC32
不是必须。
1 2 3 4 5 6 7 8 9 10 11 12 13
|
if task.uploadInfo.Length >= requiredSliceSize { }
task.uploadInfo.SliceMD5Sum()
|
2、初始化分块上传信息
当客户端需要上传文件时,首先会向服务端一个接口传递文件信息,这里我们可以获取:
1 2
| * @apiParam filehash string 文件计算的哈希计算值 * @apiParam filesize int 文件大小
|
在这步操作中,我们需要填充这个结构体。
1 2 3 4 5 6 7
| type MultipleUploadInfo struct { FileHash string FileSize int UploadID string ChunkSize int ChunkCount int }
|
例如:
1 2 3 4 5 6 7 8 9
| upInfo := &MultipartUploadInfo{ FileHash: filehash, FileSize: filesize, UploadID: username + fmt.Sprintf("%x", time.Now().UnixNano()), ChunkSize: 5 << 10 << 10, ChunkCount: int(math.Ceil(float64(filesize) / (5 << 10 << 10))), }
|
然后需要将这个数据保存下来,因为在后面的使用中需要使用到。
可以存储在内存中,使用map
进行管理(我之前是使用这种方式,所以UploadID
我得保持唯一),同样的也可以使用Redis
、Memcached
,我这里使用到了Map
,项目里头有变化。
此结构体需要返回给客户端。
3、分块上传
客户端按照分块上传信息,分块读取文件流上传给服务端,按照ChunkSize
进行读取。
上传时,附带上此次上传的Chunk
次数。
服务器将这些数据保存在一个临时目录下,按先后Chunk
顺序进行命名,方便合并时使用。
服务器同时也需要记录该UploadID
上传的Chunk
次数
4、合并
客户端上传完成后,请求该接口(当然,也可以服务器每次接受分块都去检查分块是否已经上传完了,或者是客户端在最后一次上传时,告知服务器这是最后一次上传,这里为了拆开逻辑,故分开接口)。
断点续传/断点下载
这两者可以说都是上面那个过程,操作都是类似的,没有什么不一样,同样是控制分块信息,控制分块上传与下载,即使中间终端,也可以重新确认信息后接上上一次中断的过程继续开始操作。
这里我偷懒没有将代码拿出来进行展示,这里仅展示逻辑了。
记录代码
github.com/iikira/BaiduPCS-Go/blob/master/pcsutil/checksum/checksum.go

| package checksum
import ( "crypto/md5" "fmt" "github.com/iikira/BaiduPCS-Go/pcsutil/converter" "hash" "hash/crc32" "io" "os" )
const ( defaultBufSize = 256 * converter.KB )
type ( LocalFileMeta struct { Path string `json:"path"` Length int64 `json:"length"` SliceMD5 []byte `json:"slicemd5"` MD5 []byte `json:"md5"` CRC32 uint32 `json:"crc32"` ModTime int64 `json:"modtime"` }
LocalFile struct { LocalFileMeta
bufSize int buf []byte File *os.File }
SumConfig struct { IsMD5Sum bool IsSliceMD5Sum bool IsCRC32Sum bool } )
func NewLocalFileInfo(localPath string, bufSize int) *LocalFile { return &LocalFile{ LocalFileMeta: LocalFileMeta{ Path: localPath, }, bufSize: bufSize, } }
func (lf *LocalFile) OpenPath() error { if lf.File != nil { lf.File.Close() }
var err error lf.File, err = os.Open(lf.Path) if err != nil { return err }
info, err := lf.File.Stat() if err != nil { return err }
lf.Length = info.Size() lf.ModTime = info.ModTime().Unix() return nil }
func (lf *LocalFile) Close() error { if lf.File == nil { return fmt.Errorf("file is nil") }
return lf.File.Close() }
func (lf *LocalFile) initBuf() { if lf.buf == nil { if lf.bufSize != 0 { lf.buf = make([]byte, lf.bufSize) return }
lf.buf = make([]byte, defaultBufSize) } }
func (lf *LocalFile) repeatRead(ws ...io.Writer) { if lf.File == nil { return }
lf.initBuf()
var ( begin int64 n int err error )
handle := func() { begin += int64(n) for k := range ws { ws[k].Write(lf.buf[:n]) } }
for { n, err = lf.File.ReadAt(lf.buf, begin) if err != nil { if err == io.EOF { handle() } else { fmt.Printf("%s\n", err) } break }
handle() } }
func (lf *LocalFile) Sum(cfg SumConfig) { var ( md5w hash.Hash crc32w hash.Hash32 )
ws := make([]io.Writer, 0, 2) if cfg.IsMD5Sum { md5w = md5.New() ws = append(ws, md5w) } if cfg.IsCRC32Sum { crc32w = crc32.NewIEEE() ws = append(ws, crc32w) } if cfg.IsSliceMD5Sum { lf.SliceMD5Sum() }
lf.repeatRead(ws...)
if cfg.IsMD5Sum { lf.MD5 = md5w.Sum(nil) } if cfg.IsCRC32Sum { lf.CRC32 = crc32w.Sum32() } }
func (lf *LocalFile) Md5Sum() { lf.Sum(SumConfig{ IsMD5Sum: true, }) }
func (lf *LocalFile) SliceMD5Sum() { if lf.File == nil { return }
lf.initBuf()
m := md5.New() n, err := lf.File.ReadAt(lf.buf, 0) if err != nil { if err == io.EOF { goto md5sum } else { fmt.Printf("SliceMD5Sum: %s\n", err) return } }
md5sum: m.Write(lf.buf[:n]) lf.SliceMD5 = m.Sum(nil) }
func (lf *LocalFile) Crc32Sum() { lf.Sum(SumConfig{ IsCRC32Sum: true, }) }
|
github.com/iikira/BaiduPCS-Go/blob/master/pcsutil/checksum/file.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package checksum
import ( "bytes" "fmt" "os" "path/filepath" )
func (lfm *LocalFileMeta) EqualLengthMD5(m *LocalFileMeta) bool { if lfm.Length != m.Length { return false } if bytes.Compare(lfm.MD5, m.MD5) != 0 { return false } return true }
func (lfm *LocalFileMeta) CompleteAbsPath() { if filepath.IsAbs(lfm.Path) { return }
absPath, err := filepath.Abs(lfm.Path) if err != nil { return }
lfm.Path = absPath }
func GetFileSum(localPath string, cfg *SumConfig) (lf *LocalFile, err error) { file, err := os.Open(localPath) if err != nil { return nil, err }
defer file.Close()
fileStat, err := file.Stat() if err != nil { return nil, err } if fileStat.IsDir() { return nil, fmt.Errorf("sum %s: is a directory", localPath) }
lf = &LocalFile{ File: file, LocalFileMeta: LocalFileMeta{ Path: localPath, Length: fileStat.Size(), }, }
lf.Sum(*cfg)
return lf, nil }
|
本文标题:服务器上传下载问题之分块上传(断点续传)
文章作者:小师
发布时间:2019-04-09
最后更新:2022-05-04
原始链接:chunlife.top/2019/04/09/服务器上传下载问题之分块上传(断点续传)/
版权声明:本站所有文章均采用知识共享署名4.0国际许可协议进行许可