add streaming

This commit is contained in:
Emily Hou 2018-06-21 13:57:53 -07:00
parent 1bd7e4d486
commit e4a0028f5d
4 changed files with 50 additions and 41 deletions

View File

@ -107,7 +107,8 @@ async function upload(
metadata,
verifierB64,
keychain,
onprogress
onprogress,
canceller
) {
const metadataHeader = arrayToB64(new Uint8Array(metadata));
const fileMeta = {
@ -115,23 +116,25 @@ async function upload(
authorization: `send-v1 ${verifierB64}`
};
//send file header
ws.send(JSON.stringify(fileMeta));
function listenForRes() {
function listenForResponse() {
return new Promise((resolve, reject) => {
ws.addEventListener('message', function(msg) {
const response = JSON.parse(msg.data);
resolve({
url: response.url,
id: response.id,
ownerToken: response.owner
});
if (response.error) {
reject(response.error);
} else {
resolve({
url: response.url,
id: response.id,
ownerToken: response.owner
});
}
});
});
}
const resPromise = listenForRes();
const resPromise = listenForResponse();
ws.send(JSON.stringify(fileMeta));
const reader = stream.getReader();
let state = await reader.read();
@ -139,8 +142,9 @@ async function upload(
while (!state.done) {
const buf = state.value;
ws.send(buf);
if (ws.readyState !== 1) {
throw new Error(0); //should this be here
if (canceller.cancelled) {
throw new Error(0);
}
onprogress([Math.min(streamInfo.fileSize, size), streamInfo.fileSize]);
@ -148,10 +152,10 @@ async function upload(
state = await reader.read();
}
const res = await resPromise;
const response = await resPromise;
ws.close();
return res;
return response;
}
export async function uploadWs(
@ -166,12 +170,12 @@ export async function uploadWs(
const port = window.location.port;
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const ws = await asyncInitWebSocket(`${protocol}//${host}:${port}/api/ws`);
//console.log(`made connection to websocket: ws://${host}:${port}/api/ws`)
const canceller = { cancelled: false };
return {
cancel: function() {
ws.close(4000, 'upload cancelled');
canceller.cancelled = true;
},
result: upload(
ws,
@ -180,7 +184,8 @@ export async function uploadWs(
metadata,
verifierB64,
keychain,
onprogress
onprogress,
canceller
)
};
}

View File

@ -6,7 +6,7 @@ const TAG_LENGTH = 16;
const KEY_LENGTH = 16;
const MODE_ENCRYPT = 'encrypt';
const MODE_DECRYPT = 'decrypt';
const RS = 1024 * 1024;
const RS = 1048576;
const encoder = new TextEncoder();
@ -16,7 +16,7 @@ function generateSalt(len) {
return randSalt.buffer;
}
export class ECETransformer {
class ECETransformer {
constructor(mode, ikm, rs, salt) {
this.mode = mode;
this.prevChunk;
@ -139,7 +139,6 @@ export class ECETransformer {
return Buffer.concat([Buffer.from(this.salt), nums]);
}
//salt is arraybuffer, rs is int, length is int
readHeader(buffer) {
if (buffer.length < 21) {
throw new Error('chunk too small for reading header');
@ -259,7 +258,7 @@ class BlobSlicer {
}
}
export class BlobSliceStream extends ReadableStream {
class BlobSliceStream extends ReadableStream {
constructor(blob, size, mode) {
super(new BlobSlicer(blob, size, mode));
}
@ -272,7 +271,6 @@ mode: string, either 'encrypt' or 'decrypt'
rs: int containing record size, optional
salt: ArrayBuffer containing salt of KEY_LENGTH length, optional
*/
export default class ECE {
constructor(input, key, mode, rs, salt) {
if (rs === undefined) {

View File

@ -12,7 +12,7 @@ if (config.sentry_dsn) {
const app = express();
expressWs(app, null, { perMessageDeflate: false });
app.ws('/api/ws', require('./routes/ws')); //want to move this into routes/index.js but it's not working...
app.ws('/api/ws', require('./routes/ws'));
routes(app);
app.use(

View File

@ -10,17 +10,17 @@ const log = mozlog('send.upload');
module.exports = async function(ws, req) {
let fileStream;
try {
ws.on('close', e => {
if (e !== 1000) {
if (fileStream !== undefined) {
fileStream.destroy();
}
ws.on('close', e => {
if (e !== 1000) {
if (fileStream !== undefined) {
fileStream.destroy();
}
});
}
});
let first = true;
ws.on('message', function(message) {
let first = true;
ws.on('message', function(message) {
try {
if (first) {
const newId = crypto.randomBytes(5).toString('hex');
const owner = crypto.randomBytes(10).toString('hex');
@ -29,11 +29,13 @@ module.exports = async function(ws, req) {
const metadata = fileInfo.fileMetadata;
const auth = fileInfo.authorization;
/*
if (!metadata || !auth) {
return res.sendStatus(400);
ws.send(
JSON.stringify({
error: 400
})
);
}
*/
const meta = {
owner,
@ -60,9 +62,13 @@ module.exports = async function(ws, req) {
first = false;
}
});
} catch (e) {
log.error('upload', e);
//res.sendStatus(500);
}
} catch (e) {
log.error('upload', e);
ws.send(
JSON.stringify({
error: 500
})
);
}
});
};