use a Duplex stream for EOF

This commit is contained in:
Danny Coates 2018-06-25 14:01:08 -07:00
parent beccd80902
commit 126ea8c7e6
No known key found for this signature in database
GPG Key ID: 4C442633C62E00CB
3 changed files with 31 additions and 30 deletions

View File

@ -52,7 +52,7 @@ export default function(state, emitter) {
checkFiles(); checkFiles();
}); });
//emitter.on('navigate', checkFiles); emitter.on('navigate', checkFiles);
emitter.on('render', () => { emitter.on('render', () => {
lastRender = Date.now(); lastRender = Date.now();

View File

@ -50,26 +50,35 @@ module.exports = async function(ws, req) {
fileStream = wsStream(ws, { binary: true }) fileStream = wsStream(ws, { binary: true })
.pipe(limiter) .pipe(limiter)
.pipe(parser); .pipe(parser);
storage.set(newId, fileStream, meta); await storage.set(newId, fileStream, meta);
await parser.promise; if (ws.readyState === 1) {
// if the socket is closed by a cancelled upload the stream
// ends without an error so we need to check the state
// before sending a reply.
ws.send( // TODO: we should handle cancelled uploads differently
JSON.stringify({ // in order to avoid having to check socket state and clean
url, // up storage, possibly with an exception that we can catch.
owner: meta.owner, ws.send(
id: newId, JSON.stringify({
authentication: `send-v1 ${meta.nonce}` url,
}) owner: meta.owner,
); id: newId,
authentication: `send-v1 ${meta.nonce}`
})
);
}
} catch (e) { } catch (e) {
log.error('upload', e); log.error('upload', e);
ws.send( if (ws.readyState === 1) {
JSON.stringify({ ws.send(
error: e === 'limit' ? 413 : 500 JSON.stringify({
}) error: e === 'limit' ? 413 : 500
); })
ws.close(); );
ws.close();
}
} }
}); });
}; };

View File

@ -1,23 +1,15 @@
const { Transform } = require('stream'); const { Duplex } = require('stream');
class StreamParser extends Transform { class StreamParser extends Duplex {
constructor() { _write(chunk, encoding, callback) {
super();
let res;
this.promise = new Promise(resolve => {
res = resolve;
});
this.res = res;
}
_transform(chunk, encoding, callback) {
if (chunk.byteLength === 1 && chunk[0] === 0) { if (chunk.byteLength === 1 && chunk[0] === 0) {
this.res(); this.push(null);
} else { } else {
this.push(chunk); this.push(chunk);
} }
callback(); callback();
} }
_read() {}
} }
module.exports = StreamParser; module.exports = StreamParser;