From d543d5ae6af0cbad75e942e1c7e3a9f892d46e5a Mon Sep 17 00:00:00 2001 From: Ray Bellis Date: Wed, 30 Jan 2019 10:43:01 +0000 Subject: [PATCH] PadMessageHandler.js: convert handleUserChanges() to Promises - the call site still expects a nodeback function, so also introduced the `nodeify` module to allow that function to work as expected. --- src/node/handler/PadMessageHandler.js | 253 +++++++++++--------------- src/package.json | 2 +- 2 files changed, 109 insertions(+), 146 deletions(-) diff --git a/src/node/handler/PadMessageHandler.js b/src/node/handler/PadMessageHandler.js index 3004ae49..1e9496cf 100644 --- a/src/node/handler/PadMessageHandler.js +++ b/src/node/handler/PadMessageHandler.js @@ -39,6 +39,7 @@ var channels = require("channels"); var stats = require('../stats'); var remoteAddress = require("../utils/RemoteAddress").remoteAddress; const thenify = require("thenify").withCallback; +const nodeify = require("nodeify"); /** * A associative array that saves informations about a session @@ -61,7 +62,11 @@ stats.gauge('totalUsers', function() { /** * A changeset queue per pad that is processed by handleUserChanges() */ -var padChannels = new channels.channels(thenify(handleUserChanges)); +var padChannels = new channels.channels(handleUserChangesCB); + +function handleUserChangesCB(data, callback) { + return nodeify(handleUserChanges(data), callback); +} /** * Saves the Socket class we need to send and receive data from the client @@ -591,7 +596,7 @@ function handleUserInfoUpdate(client, message) * @param client the client that send this message * @param message the message from the client */ -function handleUserChanges(data, cb) +async function handleUserChanges(data) { var client = data.client , message = data.message @@ -602,17 +607,17 @@ function handleUserChanges(data, cb) // Make sure all required fields are present if (message.data.baseRev == null) { messageLogger.warn("Dropped message, USER_CHANGES Message has no baseRev!"); - return cb(); + return; } if (message.data.apool == null) { messageLogger.warn("Dropped message, USER_CHANGES Message has no apool!"); - return cb(); + return; } if (message.data.changeset == null) { messageLogger.warn("Dropped message, USER_CHANGES Message has no changeset!"); - return cb(); + return; } // TODO: this might happen with other messages too => find one place to copy the session @@ -620,7 +625,7 @@ function handleUserChanges(data, cb) // if the session was valid when the message arrived in the first place if (!sessioninfos[client.id]) { messageLogger.warn("Dropped message, disconnect happened in the mean time"); - return cb(); + return; } // get all Vars we need @@ -632,172 +637,130 @@ function handleUserChanges(data, cb) // finish processing the changeset, so keep a reference to the session. var thisSession = sessioninfos[client.id]; - var r, apool, pad; - // Measure time to process edit var stopWatch = stats.timer('edits').start(); - async.series([ - // get the pad - function(callback) { - padManager.getPad(thisSession.padId, function(err, value) { - if (ERR(err, callback)) return; + // get the pad + let pad = await padManager.getPad(thisSession.padId); - pad = value; - callback(); + // create the changeset + try { + try { + // Verify that the changeset has valid syntax and is in canonical form + Changeset.checkRep(changeset); + + // Verify that the attribute indexes used in the changeset are all + // defined in the accompanying attribute pool. + Changeset.eachAttribNumber(changeset, function(n) { + if (!wireApool.getAttrib(n)) { + throw new Error("Attribute pool is missing attribute " + n + " for changeset " + changeset); + } }); - }, - // create the changeset - function(callback) { - // ex. _checkChangesetAndPool + // Validate all added 'author' attribs to be the same value as the current user + var iterator = Changeset.opIterator(Changeset.unpack(changeset).ops) + , op; - try { - // Verify that the changeset has valid syntax and is in canonical form - Changeset.checkRep(changeset); + while (iterator.hasNext()) { + op = iterator.next() - // Verify that the attribute indexes used in the changeset are all - // defined in the accompanying attribute pool. - Changeset.eachAttribNumber(changeset, function(n) { - if (!wireApool.getAttrib(n)) { - throw new Error("Attribute pool is missing attribute " + n + " for changeset " + changeset); + // + can add text with attribs + // = can change or add attribs + // - can have attribs, but they are discarded and don't show up in the attribs - but do show up in the pool + + op.attribs.split('*').forEach(function(attr) { + if (!attr) return; + + attr = wireApool.getAttrib(attr); + if (!attr) return; + + // the empty author is used in the clearAuthorship functionality so this should be the only exception + if ('author' == attr[0] && (attr[1] != thisSession.author && attr[1] != '')) { + throw new Error("Trying to submit changes as another author in changeset " + changeset); } }); - - // Validate all added 'author' attribs to be the same value as the current user - var iterator = Changeset.opIterator(Changeset.unpack(changeset).ops) - , op - while(iterator.hasNext()) { - op = iterator.next() - - // + can add text with attribs - // = can change or add attribs - // - can have attribs, but they are discarded and don't show up in the attribs - but do show up in the pool - - op.attribs.split('*').forEach(function(attr) { - if (!attr) return; - - attr = wireApool.getAttrib(attr) - if (!attr) return; - - // the empty author is used in the clearAuthorship functionality so this should be the only exception - if ('author' == attr[0] && (attr[1] != thisSession.author && attr[1] != '')) { - throw new Error("Trying to submit changes as another author in changeset " + changeset); - } - }); - } - - // ex. adoptChangesetAttribs - - // Afaik, it copies the new attributes from the changeset, to the global Attribute Pool - changeset = Changeset.moveOpsToNewPool(changeset, wireApool, pad.pool); - } catch(e) { - // There is an error in this changeset, so just refuse it - client.json.send({disconnect:"badChangeset"}); - stats.meter('failedChangesets').mark(); - return callback(new Error("Can't apply USER_CHANGES, because "+e.message)); } - // ex. applyUserChanges - apool = pad.pool; - r = baseRev; + // ex. adoptChangesetAttribs - // The client's changeset might not be based on the latest revision, - // since other clients are sending changes at the same time. - // Update the changeset so that it can be applied to the latest revision. - // https://github.com/caolan/async#whilst - async.whilst( - function() { return r < pad.getHeadRevisionNumber(); }, - function(callback) - { - r++; + // Afaik, it copies the new attributes from the changeset, to the global Attribute Pool + changeset = Changeset.moveOpsToNewPool(changeset, wireApool, pad.pool); - pad.getRevisionChangeset(r, function(err, c) { - if (ERR(err, callback)) return; + } catch(e) { + // There is an error in this changeset, so just refuse it + client.json.send({ disconnect: "badChangeset" }); + stats.meter('failedChangesets').mark(); + throw new Error("Can't apply USER_CHANGES, because " + e.message); + } - // At this point, both "c" (from the pad) and "changeset" (from the - // client) are relative to revision r - 1. The follow function - // rebases "changeset" so that it is relative to revision r - // and can be applied after "c". - try { - // a changeset can be based on an old revision with the same changes in it - // prevent eplite from accepting it TODO: better send the client a NEW_CHANGES - // of that revision - if (baseRev + 1 == r && c == changeset) { - client.json.send({disconnect:"badChangeset"}); - stats.meter('failedChangesets').mark(); + // ex. applyUserChanges + let apool = pad.pool; + let r = baseRev; - return callback(new Error("Won't apply USER_CHANGES, because it contains an already accepted changeset")); - } + // The client's changeset might not be based on the latest revision, + // since other clients are sending changes at the same time. + // Update the changeset so that it can be applied to the latest revision. + while (r < pad.getHeadRevisionNumber()) { + r++; - changeset = Changeset.follow(c, changeset, false, apool); - } catch(e) { - client.json.send({disconnect:"badChangeset"}); - stats.meter('failedChangesets').mark(); + let c = await pad.getRevisionChangeset(r); - return callback(new Error("Can't apply USER_CHANGES, because "+e.message)); - } - - if ((r - baseRev) % 200 == 0) { - // don't let the stack get too deep - async.nextTick(callback); - } else { - callback(null); - } - }); - }, - - // use the callback of the series function - callback - ); - }, - - // do correction changesets, and send it to all users - function(callback) { - var prevText = pad.text(); - - if (Changeset.oldLen(changeset) != prevText.length) { - client.json.send({disconnect:"badChangeset"}); - stats.meter('failedChangesets').mark(); - - return callback(new Error("Can't apply USER_CHANGES "+changeset+" with oldLen " + Changeset.oldLen(changeset) + " to document of length " + prevText.length)); - } + // At this point, both "c" (from the pad) and "changeset" (from the + // client) are relative to revision r - 1. The follow function + // rebases "changeset" so that it is relative to revision r + // and can be applied after "c". try { - pad.appendRevision(changeset, thisSession.author); + // a changeset can be based on an old revision with the same changes in it + // prevent eplite from accepting it TODO: better send the client a NEW_CHANGES + // of that revision + if (baseRev + 1 == r && c == changeset) { + client.json.send({disconnect:"badChangeset"}); + stats.meter('failedChangesets').mark(); + throw new Error("Won't apply USER_CHANGES, because it contains an already accepted changeset"); + } + + changeset = Changeset.follow(c, changeset, false, apool); } catch(e) { client.json.send({disconnect:"badChangeset"}); stats.meter('failedChangesets').mark(); - - return callback(e) + throw new Error("Can't apply USER_CHANGES, because " + e.message); } - - var correctionChangeset = _correctMarkersInPad(pad.atext, pad.pool); - if (correctionChangeset) { - pad.appendRevision(correctionChangeset); - } - - // Make sure the pad always ends with an empty line. - if (pad.text().lastIndexOf("\n") != pad.text().length-1) { - var nlChangeset = Changeset.makeSplice(pad.text(), pad.text().length-1, - 0, "\n"); - pad.appendRevision(nlChangeset); - } - - exports.updatePadClients(pad, function(er) { - ERR(er) - }); - callback(); } - ], - function(err) { - stopWatch.end() - cb(); - if(err) { - console.warn(err.stack || err); + let prevText = pad.text(); + + if (Changeset.oldLen(changeset) != prevText.length) { + client.json.send({disconnect:"badChangeset"}); + stats.meter('failedChangesets').mark(); + throw new Error("Can't apply USER_CHANGES "+changeset+" with oldLen " + Changeset.oldLen(changeset) + " to document of length " + prevText.length); } - }); + + try { + pad.appendRevision(changeset, thisSession.author); + } catch(e) { + client.json.send({ disconnect: "badChangeset" }); + stats.meter('failedChangesets').mark(); + throw e; + } + + let correctionChangeset = _correctMarkersInPad(pad.atext, pad.pool); + if (correctionChangeset) { + pad.appendRevision(correctionChangeset); + } + + // Make sure the pad always ends with an empty line. + if (pad.text().lastIndexOf("\n") != pad.text().length-1) { + var nlChangeset = Changeset.makeSplice(pad.text(), pad.text().length - 1, 0, "\n"); + pad.appendRevision(nlChangeset); + } + + await exports.updatePadClients(pad); + } catch (err) { + console.warn(err.stack || err); + } + + stopWatch.end(); } exports.updatePadClients = thenify(function(pad, callback) diff --git a/src/package.json b/src/package.json index fa63b071..11259e76 100644 --- a/src/package.json +++ b/src/package.json @@ -48,6 +48,7 @@ "languages4translatewiki": "0.1.3", "log4js": "0.6.35", "measured-core": "1.11.2", + "nodeify": "^1.0.1", "npm": "6.4.1", "object.values": "^1.0.4", "request": "2.88.0", @@ -87,4 +88,3 @@ "version": "1.7.5", "license": "Apache-2.0" } -