From 126ea8c7e60a0ad5fac72015985605d2ef50e5fb Mon Sep 17 00:00:00 2001 From: Danny Coates Date: Mon, 25 Jun 2018 14:01:08 -0700 Subject: [PATCH] use a Duplex stream for EOF --- app/fileManager.js | 2 +- server/routes/ws.js | 41 +++++++++++++++++++++++++---------------- server/streamparser.js | 18 +++++------------- 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/app/fileManager.js b/app/fileManager.js index e29a05e2..786f99f8 100644 --- a/app/fileManager.js +++ b/app/fileManager.js @@ -52,7 +52,7 @@ export default function(state, emitter) { checkFiles(); }); - //emitter.on('navigate', checkFiles); + emitter.on('navigate', checkFiles); emitter.on('render', () => { lastRender = Date.now(); diff --git a/server/routes/ws.js b/server/routes/ws.js index e9ea3858..df9f2ba3 100644 --- a/server/routes/ws.js +++ b/server/routes/ws.js @@ -50,26 +50,35 @@ module.exports = async function(ws, req) { fileStream = wsStream(ws, { binary: true }) .pipe(limiter) .pipe(parser); - storage.set(newId, fileStream, meta); + await storage.set(newId, fileStream, meta); - await parser.promise; + if (ws.readyState === 1) { + // if the socket is closed by a cancelled upload the stream + // ends without an error so we need to check the state + // before sending a reply. - ws.send( - JSON.stringify({ - url, - owner: meta.owner, - id: newId, - authentication: `send-v1 ${meta.nonce}` - }) - ); + // TODO: we should handle cancelled uploads differently + // in order to avoid having to check socket state and clean + // up storage, possibly with an exception that we can catch. + ws.send( + JSON.stringify({ + url, + owner: meta.owner, + id: newId, + authentication: `send-v1 ${meta.nonce}` + }) + ); + } } catch (e) { log.error('upload', e); - ws.send( - JSON.stringify({ - error: e === 'limit' ? 413 : 500 - }) - ); - ws.close(); + if (ws.readyState === 1) { + ws.send( + JSON.stringify({ + error: e === 'limit' ? 413 : 500 + }) + ); + ws.close(); + } } }); }; diff --git a/server/streamparser.js b/server/streamparser.js index d7f7b110..e59476d0 100644 --- a/server/streamparser.js +++ b/server/streamparser.js @@ -1,23 +1,15 @@ -const { Transform } = require('stream'); +const { Duplex } = require('stream'); -class StreamParser extends Transform { - constructor() { - super(); - let res; - this.promise = new Promise(resolve => { - res = resolve; - }); - this.res = res; - } - - _transform(chunk, encoding, callback) { +class StreamParser extends Duplex { + _write(chunk, encoding, callback) { if (chunk.byteLength === 1 && chunk[0] === 0) { - this.res(); + this.push(null); } else { this.push(chunk); } callback(); } + _read() {} } module.exports = StreamParser;