diff --git a/app/api.js b/app/api.js index 4101f9a8..7ec1341e 100644 --- a/app/api.js +++ b/app/api.js @@ -115,6 +115,7 @@ function listenForResponse(ws, canceller) { }); } } catch (e) { + ws.close(); canceller.cancelled = true; canceller.error = e; reject(e); @@ -134,7 +135,6 @@ async function upload( const host = window.location.hostname; const port = window.location.port; const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; - const error = { cancelled: false }; const ws = await asyncInitWebSocket(`${protocol}//${host}:${port}/api/ws`); try { @@ -144,7 +144,7 @@ async function upload( authorization: `send-v1 ${verifierB64}` }; - const responsePromise = listenForResponse(ws, error); + const responsePromise = listenForResponse(ws, canceller); ws.send(JSON.stringify(fileMeta)); @@ -154,17 +154,17 @@ async function upload( while (!state.done) { const buf = state.value; if (canceller.cancelled) { - throw new Error(0); - } - if (error.cancelled) { - throw new Error(error.error); + throw canceller.error; } + ws.send(buf); onprogress([Math.min(streamInfo.fileSize, size), streamInfo.fileSize]); size += streamInfo.recordSize; state = await reader.read(); } + const footer = new Uint8Array([0]); + ws.send(footer); const response = await responsePromise; //promise only fufills if response is good ws.close(); @@ -180,6 +180,7 @@ export function uploadWs(encrypted, info, metadata, verifierB64, onprogress) { return { cancel: function() { + canceller.error = new Error(0); canceller.cancelled = true; }, result: upload( diff --git a/server/routes/ws.js b/server/routes/ws.js index ea81573c..f2f6250d 100644 --- a/server/routes/ws.js +++ b/server/routes/ws.js @@ -3,6 +3,7 @@ const storage = require('../storage'); const config = require('../config'); const mozlog = require('../log'); const Limiter = require('../limiter'); +const Parser = require('../streamparser'); const wsStream = require('websocket-stream/stream'); const log = mozlog('send.upload'); @@ -45,7 +46,10 @@ module.exports = async function(ws, req) { const url = `${protocol}://${req.get('host')}/download/${newId}/`; const limiter = new Limiter(config.max_file_size); - fileStream = wsStream(ws, { binary: true }).pipe(limiter); + const parser = new Parser(); + fileStream = wsStream(ws, { binary: true }) + .pipe(limiter) + .pipe(parser); storage.set(newId, fileStream, meta); ws.send( @@ -60,7 +64,7 @@ module.exports = async function(ws, req) { log.error('upload', e); ws.send( JSON.stringify({ - error: 500 + error: e === 'limit' ? 413 : 500 }) ); ws.close(); diff --git a/server/streamparser.js b/server/streamparser.js new file mode 100644 index 00000000..d7f7b110 --- /dev/null +++ b/server/streamparser.js @@ -0,0 +1,23 @@ +const { Transform } = require('stream'); + +class StreamParser extends Transform { + constructor() { + super(); + let res; + this.promise = new Promise(resolve => { + res = resolve; + }); + this.res = res; + } + + _transform(chunk, encoding, callback) { + if (chunk.byteLength === 1 && chunk[0] === 0) { + this.res(); + } else { + this.push(chunk); + } + callback(); + } +} + +module.exports = StreamParser;