去年负责开发类云盘功能,前后优化了几次,一直想把大文件分片上传写一下,没找到机会。忙里偷闲,总结一下其中的技术。先上效果GIF:

上传UI采用的是elementUI中的upload组件,屏蔽自动上传,监听input[file]原生change事件,支持多文件上传,过滤类型动态变化(要在类型渲染结束后再触发文件选择,不然展示的依然是之前的类型)
<el-upload
v-show="false"
ref="upload"
action="/"
:multiple="true"
:with-credentials="true"
:accept="acceptFilters[currenType]"
:auto-upload="false"
@change.native="fileChange"
>
<el-button ref="upload_resource">选择文件</el-button>
</el-upload>
每次监听到文件change事件后,将选择的文件插入等待队列,等待进行后续处理。
文件分片分片核心是利用 Blob.prototype.slice
方法,和数组的 slice 方法相似,调用的 slice 方法可以返回原文件的某个切片

hash计算文件分片后如果根据文件名直接上传,然后再合并,遇到同名文件时,断点续传,秒传功能都会不正确,因此需要计算文件的唯一hash值。此处选择使用数量较多的spark-md5.js来进行hash计算。但是当文件超大时,计算hash仍然会消耗大量时间并且阻塞UI交互,造成页面假死。因此可以采用web worker技术在多线程环境计算hash,不影响页面UI的使用。

由于web worker有同源限制,并且worker线程无法读取本地文件,所以将worker线程的js文件放在public目录下(我使用的是vue工程)。worker环境不能像普通文件一样直接import + 路径引入第三方脚本,但提供了importScript函数来导入。
// public/fileHash.js
self.importScripts('spark-md5.min.js')
self.onmessage = e => {
// 接收分片数据
const {chunks} = e.data
const spark = new self.SparkMD5.ArrayBuffer()
let count = 0
const loadNext = index => {
const reader = new FileReader()
reader.readAsArrayBuffer(chunks[index].file)
reader.onload = e => {
count++
spark.append(e.target.result)
if (count == chunks.length) {
self.postMessage({
hash: spark.end()
})
} else {
loadNext(count)
}
}
}
loadNext(count)
}
在 worker 线程中,通过onmessage接受文件切片数组chunks,利用 FileReader 读取每个切片的 ArrayBuffer 并不断传入 spark-md5 中,全部完成后通过postmessage将最终的 hash 发送给主线程
// 主线程与worker通信
function calculateHashWorker(chunks, workerFile) {
return new Promise((resolve, reject) => {
const worker = new window.Worker('./worker/fileChunksHash.js')
workerFile['worker'] = worker
worker.postMessage({ chunks })
worker.onmessage = e => {
const { hash } = e.data
if (hash) {
worker.terminate()
workerFile.isScan = false
if (!ctx.isUpload) {
return reject()
}
if (ctx.uploadQueue.length) {
ctx.uploadQueue.push({
mode: 'large',
hash,
chunks,
option: workerFile
})
} else {
ctx.uploadQueue.push({
mode: 'large',
hash,
chunks,
option: workerFile
})
ctx.startUpload()
}
resolve(hash)
}
}
})
}
主线程使用postmessage向子线程发送切片数组,并通过onmessage监听子线程的信息(此处可以做计算hash的进度条)
我实现的支持多文件上传,并且并行计算hash,可能会瞬间开多个worker线程,感觉会造成崩溃,这个查了一下和计算机的多核有关,之后找机会研究一下。
并行分片上传
hash计算完成后,就可以对分片进行上传。初始做的时候是一次性将所有分片一起并行上传,但是请求接口瞬间达200个的时候,浏览器直接崩溃。后来进行了优化,每次并行只允许上传四个分片,只有一个分片上传完毕才允许下一个分片上传。
async function sendRequest(requests, size) {
return new Promise((resolve, reject) => {
let len = requests.length
if (len == 0) {
resolve()
}
let limit = len > MAX_REQUEST ? MAX_REQUEST : len
let count = 0
let isStop = false
async function start() {
if (isStop || !ctx.isUpload) return
let task = requests.shift()
if (task) {
const { params, index } = task
try {
await Axios({
baseURL: config.API_PATH,
url: `服务端接口`,
headers: {
'content-type': 'multipart/form-data'
},
data: params,
method: "post",
timeout: 30000,
withCredentials: true,
onUploadProgress: progress => {
let temp = ((progress.loaded / progress.total) * 100).toFixed(2)
// 防止上传中途失败,进度条回退
if (chunksCache[index].progress * 1 < temp * 1) {
chunksCache[index].progress = temp
computeProgress(size)
}
}
})
if (count == len - 1) {
resolve()
} else {
count++
start()
}
} catch (error) {
if (task.error < 3) {
task.error++
requests.unshift(task)
start()
} else {
isStop = true
reject()
}
}
}
}
while (limit > 0) {
start()
limit--
}
})
}
每个分片上传过程中,有可能因为各种意外上传失败,所以提供了容错机制,每个分片允许三次失败,超过三次时上传结束。
实现文件进度条功能需要用到onprogress函数,可以获取每个分片的上传进度,从而计算出原文件的整体上传进度。我选择的是element进度条来实现,也可以仿迅雷的小方块上传进度。合并文件
两种方案:
1、将总分片数提交给服务端,当分片数一致时,服务端自动合并文件
2、前端上传完所有文件时,主动通知服务端合并文件(我使用的是这种方式)断点续传
两种方案:
1、客户端localstorage本地存储,上传文件前获取已上传的分片进行过滤(换浏览器/清空缓存后,无法获取已上传的分片)
2、服务端存储已上传切片的hash索引,上传文件前客户端发送请求获取已上传的分片进行过滤
let res = await Axios({
baseURL: config.API_PATH,
url: `获取已上传分片接口`,
headers: {},
data: { hash, pathUrl: opt.pathUrl },
method: "post",
timeout: 30000,
withCredentials: true,
}).catch(() => {
currFile.status = 'exception'
ctx.uploadQueue.shift()
})
if (res.data.code == 500) {
currFile.status = 'exception'
ctx.uploadQueue.shift()
}
if (currFile.status == 'exception' || !ctx.isUpload) return
let { uploadedList } = res.data.data
let tempChunks = chunks.map((chunk, index) => {
let name = hash + '-' + index
return {
name: name,
hash,
index,
pathUrl: opt.pathUrl,
file: chunk.file
}
})
// 断点续传处理
if (uploadedList.length > 0) {
tempChunks = tempChunks.filter((chunk, index) => {
if (uploadedList.indexOf(chunk.name) !== -1) {
chunksCache[index].progress = 100
}
return uploadedList.indexOf(chunk.name) == -1
})
computeProgress(file.size)
}
秒传
将文件hash计算完成后,通知服务端,根据hash判断是否已上传,如果已上传,则直接上传成功(服务端复制文件)
后续扩展和思考
1、使用requestIdleCallback函数,利用浏览器的空闲时段计算hash
2、按照布隆过滤器的原理,抽样计算文件hash,牺牲碰撞率,减少计算时间
3、慢启动策略,根据网络状况,动态调整分片大小
4、根据不同计算机内核,进行最优worker数量调度
后续这些优化实现后会再开文章进行介绍。

参考资料
1、字节跳动面试官:请你实现一个大文件上传和断点续传(https://juejin.cn/post/6844904046436843527)
2、thinkphp 文件上传 切片_字节跳面试官,我也实现了大文件上传和断点续传(https://blog.csdn.net/weixin_39978257/article/details/111360836)
阅读原文