大文件分片上传(一)

去年负责开发类云盘功能,前后优化了几次,一直想把大文件分片上传写一下,没找到机会。忙里偷闲,总结一下其中的技术。先上效果GIF:

图片

上传UI采用的是elementUI中的upload组件,屏蔽自动上传,监听input[file]原生change事件,支持多文件上传,过滤类型动态变化(要在类型渲染结束后再触发文件选择,不然展示的依然是之前的类型)

<el-upload    v-show="false"    ref="upload"    action="/"    :multiple="true"    :with-credentials="true"    :accept="acceptFilters[currenType]"    :auto-upload="false"    @change.native="fileChange">    <el-button ref="upload_resource">选择文件</el-button></el-upload>

每次监听到文件change事件后,将选择的文件插入等待队列,等待进行后续处理。
文件分片分片核心是利用 Blob.prototype.slice 方法,和数组的 slice 方法相似,调用的 slice 方法可以返回原文件的某个切片

图片

hash计算文件分片后如果根据文件名直接上传,然后再合并,遇到同名文件时,断点续传,秒传功能都会不正确,因此需要计算文件的唯一hash值。此处选择使用数量较多的spark-md5.js来进行hash计算。但是当文件超大时,计算hash仍然会消耗大量时间并且阻塞UI交互,造成页面假死。因此可以采用web worker技术在多线程环境计算hash,不影响页面UI的使用。

图片

由于web worker有同源限制,并且worker线程无法读取本地文件,所以将worker线程的js文件放在public目录下(我使用的是vue工程)。worker环境不能像普通文件一样直接import + 路径引入第三方脚本,但提供了importScript函数来导入。

// public/fileHash.jsself.importScripts('spark-md5.min.js')
self.onmessage = e => { // 接收分片数据 const {chunks} = e.data
    const spark = new self.SparkMD5.ArrayBuffer() let count = 0
const loadNext = index => { const reader = new FileReader() reader.readAsArrayBuffer(chunks[index].file) reader.onload = e => { count++ spark.append(e.target.result) if (count == chunks.length) { self.postMessage({ hash: spark.end() }) } else { loadNext(count) } } } loadNext(count)}

在 worker 线程中,通过onmessage接受文件切片数组chunks,利用 FileReader 读取每个切片的 ArrayBuffer 并不断传入 spark-md5 中,全部完成后通过postmessage将最终的 hash 发送给主线程

// 主线程与worker通信function calculateHashWorker(chunks, workerFile) {    return new Promise((resolve, reject) => {        const worker = new window.Worker('./worker/fileChunksHash.js')        workerFile['worker'] = worker        worker.postMessage({ chunks })        worker.onmessage = e => {            const { hash } = e.data            if (hash) {                worker.terminate()                workerFile.isScan = false                if (!ctx.isUpload) {                    return reject()                }                if (ctx.uploadQueue.length) {                    ctx.uploadQueue.push({                        mode: 'large',                        hash,                        chunks,                        option: workerFile                    })                } else {                    ctx.uploadQueue.push({                        mode: 'large',                        hash,                        chunks,                        option: workerFile                    })                    ctx.startUpload()                }                resolve(hash)            }        }    })}

主线程使用postmessage向子线程发送切片数组,并通过onmessage监听子线程的信息(此处可以做计算hash的进度条)

我实现的支持多文件上传,并且并行计算hash,可能会瞬间开多个worker线程,感觉会造成崩溃,这个查了一下和计算机的多核有关,之后找机会研究一下。
并行分片上传

hash计算完成后,就可以对分片进行上传。初始做的时候是一次性将所有分片一起并行上传,但是请求接口瞬间达200个的时候,浏览器直接崩溃。后来进行了优化,每次并行只允许上传四个分片,只有一个分片上传完毕才允许下一个分片上传。

async function sendRequest(requests, size) {    return new Promise((resolve, reject) => {        let len = requests.length        if (len == 0) {            resolve()        }        let limit = len > MAX_REQUEST ? MAX_REQUEST : len        let count = 0        let isStop = false        async function start() {            if (isStop || !ctx.isUpload) return            let task = requests.shift()            if (task) {                const { params, index } = task                try {                    await Axios({                        baseURL: config.API_PATH,                        url: `服务端接口`,                        headers: {                            'content-type': 'multipart/form-data'                        },                        data: params,                        method: "post",                        timeout: 30000,                        withCredentials: true,                        onUploadProgress: progress => {                            let temp = ((progress.loaded / progress.total) * 100).toFixed(2)                            // 防止上传中途失败,进度条回退                            if (chunksCache[index].progress * 1 < temp * 1) {                                chunksCache[index].progress = temp                                computeProgress(size)                            }                        }                    })                    if (count == len - 1) {                        resolve()                    } else {                        count++                        start()                    }                } catch (error) {                    if (task.error < 3) {                        task.error++                        requests.unshift(task)                        start()                    } else {                        isStop = true                        reject()                    }                }            }        }
while (limit > 0) { start() limit-- } })}

每个分片上传过程中,有可能因为各种意外上传失败,所以提供了容错机制,每个分片允许三次失败,超过三次时上传结束。

实现文件进度条功能需要用到onprogress函数,可以获取每个分片的上传进度,从而计算出原文件的整体上传进度。我选择的是element进度条来实现,也可以仿迅雷的小方块上传进度。合并文件

两种方案:

1、将总分片数提交给服务端,当分片数一致时,服务端自动合并文件

2、前端上传完所有文件时,主动通知服务端合并文件(我使用的是这种方式)断点续传

两种方案:

1、客户端localstorage本地存储,上传文件前获取已上传的分片进行过滤(换浏览器/清空缓存后,无法获取已上传的分片)

2、服务端存储已上传切片的hash索引,上传文件前客户端发送请求获取已上传的分片进行过滤

let res = await Axios({        baseURL: config.API_PATH,        url: `获取已上传分片接口`,        headers: {},        data: { hash, pathUrl: opt.pathUrl },        method: "post",        timeout: 30000,        withCredentials: true,    }).catch(() => {        currFile.status = 'exception'        ctx.uploadQueue.shift()    })    if (res.data.code == 500) {        currFile.status = 'exception'        ctx.uploadQueue.shift()    }    if (currFile.status == 'exception' || !ctx.isUpload) return    let { uploadedList } = res.data.data    let tempChunks = chunks.map((chunk, index) => {        let name = hash + '-' + index        return {            name: name,            hash,            index,            pathUrl: opt.pathUrl,            file: chunk.file        }    })    // 断点续传处理    if (uploadedList.length > 0) {        tempChunks = tempChunks.filter((chunk, index) => {            if (uploadedList.indexOf(chunk.name) !== -1) {                chunksCache[index].progress = 100            }            return uploadedList.indexOf(chunk.name) == -1        })        computeProgress(file.size)    }

秒传

将文件hash计算完成后,通知服务端,根据hash判断是否已上传,如果已上传,则直接上传成功(服务端复制文件)

后续扩展和思考

1、使用requestIdleCallback函数,利用浏览器的空闲时段计算hash

2、按照布隆过滤器的原理,抽样计算文件hash,牺牲碰撞率,减少计算时间

3、慢启动策略,根据网络状况,动态调整分片大小

4、根据不同计算机内核,进行最优worker数量调度

后续这些优化实现后会再开文章进行介绍。

图片

参考资料

1、字节跳动面试官:请你实现一个大文件上传和断点续传(https://juejin.cn/post/6844904046436843527)

2、thinkphp 文件上传 切片_字节跳面试官,我也实现了大文件上传和断点续传(https://blog.csdn.net/weixin_39978257/article/details/111360836)

阅读原文

简介:人生无时不在焚心,无非是文火与武火的区别罢了。给生活留下点痕迹,给自己留下点回忆。欢迎关注微信公众号:焚心小记
(0)
打赏 喜欢就点个赞支持下吧 喜欢就点个赞支持下吧
网站客服
网站客服
内容投稿 侵权处理
分享本页
返回顶部