collab_client: Redo server message queueing

Move server message queue processing out of `handleUserChanges()` for
the following reasons:
  * Fix a race condition: Before this change the client would stop
    processing incoming messages and stop sending changes to the
    server if a `NEW_CHANGES` message arrived while the user was
    composing a character and waiting for an `ACCEPT_COMMIT` message.
  * Improve readability: The `handleUserChanges()` function is for
    handling changes from the local user, not for handling changes
    from other users.
  * Simplify the code.
This commit is contained in:
Richard Hansen 2021-03-29 02:50:32 -04:00
parent e99fe88537
commit 63a1f078f4
3 changed files with 75 additions and 115 deletions

View file

@ -141,7 +141,7 @@ const Ace2Editor = function () {
this.getDebugProperty = (prop) => info.ace_getDebugProperty(prop); this.getDebugProperty = (prop) => info.ace_getDebugProperty(prop);
this.getInInternationalComposition = this.getInInternationalComposition =
() => loaded ? info.ace_getInInternationalComposition() : false; () => loaded ? info.ace_getInInternationalComposition() : null;
// prepareUserChangeset: // prepareUserChangeset:
// Returns null if no new changes or ACE not ready. Otherwise, bundles up all user changes // Returns null if no new changes or ACE not ready. Otherwise, bundles up all user changes

View file

@ -3504,16 +3504,7 @@ function Ace2Inner(editorInfo, cssManagers) {
const teardown = () => _teardownActions.forEach((a) => a()); const teardown = () => _teardownActions.forEach((a) => a());
let inInternationalComposition = false; let inInternationalComposition = null;
const handleCompositionEvent = (evt) => {
// international input events, fired in FF3, at least; allow e.g. Japanese input
if (evt.type === 'compositionstart') {
inInternationalComposition = true;
} else if (evt.type === 'compositionend') {
inInternationalComposition = false;
}
};
editorInfo.ace_getInInternationalComposition = () => inInternationalComposition; editorInfo.ace_getInInternationalComposition = () => inInternationalComposition;
const bindTheEventHandlers = () => { const bindTheEventHandlers = () => {
@ -3602,8 +3593,15 @@ function Ace2Inner(editorInfo, cssManagers) {
}); });
}); });
$(document.documentElement).on('compositionstart', handleCompositionEvent); $(document.documentElement).on('compositionstart', () => {
$(document.documentElement).on('compositionend', handleCompositionEvent); if (inInternationalComposition) return;
inInternationalComposition = new Promise((resolve) => {
$(document.documentElement).one('compositionend', () => {
inInternationalComposition = null;
resolve();
});
});
});
}; };
const topLevel = (n) => { const topLevel = (n) => {

View file

@ -50,8 +50,6 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
const userSet = {}; // userId -> userInfo const userSet = {}; // userId -> userInfo
userSet[userId] = initialUserInfo; userSet[userId] = initialUserInfo;
const msgQueue = [];
let isPendingRevision = false; let isPendingRevision = false;
const callbacks = { const callbacks = {
@ -75,7 +73,11 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
} }
const handleUserChanges = () => { const handleUserChanges = () => {
if (editor.getInInternationalComposition()) return; if (editor.getInInternationalComposition()) {
// handleUserChanges() will be called again once composition ends so there's no need to set up
// a future call before returning.
return;
}
const now = Date.now(); const now = Date.now();
if ((!getSocket()) || channelState === 'CONNECTING') { if ((!getSocket()) || channelState === 'CONNECTING') {
if (channelState === 'CONNECTING' && (now - initialStartConnectTime) > 20000) { if (channelState === 'CONNECTING' && (now - initialStartConnectTime) > 20000) {
@ -88,10 +90,10 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
} }
if (committing) { if (committing) {
if (msgQueue.length === 0 && (now - lastCommitTime) > 20000) { if (now - lastCommitTime > 20000) {
// a commit is taking too long // a commit is taking too long
setChannelState('DISCONNECTED', 'slowcommit'); setChannelState('DISCONNECTED', 'slowcommit');
} else if (msgQueue.length === 0 && (now - lastCommitTime) > 5000) { } else if (now - lastCommitTime > 5000) {
callbacks.onConnectionTrouble('SLOW'); callbacks.onConnectionTrouble('SLOW');
} else { } else {
// run again in a few seconds, to detect a disconnect // run again in a few seconds, to detect a disconnect
@ -106,27 +108,6 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
return; return;
} }
// apply msgQueue changeset.
if (msgQueue.length !== 0) {
let msg;
while ((msg = msgQueue.shift())) {
const newRev = msg.newRev;
rev = newRev;
if (msg.type === 'ACCEPT_COMMIT') {
acceptCommit();
} else if (msg.type === 'NEW_CHANGES') {
const changeset = msg.changeset;
const author = (msg.author || '');
const apool = msg.apool;
editor.applyChangesToBase(changeset, author, apool);
}
}
if (isPendingRevision) {
setIsPendingRevision(false);
}
}
let sentMessage = false; let sentMessage = false;
// Check if there are any pending revisions to be received from server. // Check if there are any pending revisions to be received from server.
// Allow only if there are no pending revisions to be received from server // Allow only if there are no pending revisions to be received from server
@ -182,6 +163,21 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
}); });
}; };
const serverMessageTaskQueue = new class {
constructor() {
this._promiseChain = Promise.resolve();
}
async enqueue(fn) {
const taskPromise = this._promiseChain.then(fn);
// Use .catch() to prevent rejections from halting the queue.
this._promiseChain = taskPromise.catch(() => {});
// Do NOT do `return await this._promiseChain;` because the caller would not see an error if
// fn() throws/rejects (due to the .catch() added above).
return await taskPromise;
}
}();
const handleMessageFromServer = (evt) => { const handleMessageFromServer = (evt) => {
if (!getSocket()) return; if (!getSocket()) return;
if (!evt.data) return; if (!evt.data) return;
@ -190,44 +186,27 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
const msg = wrapper.data; const msg = wrapper.data;
if (msg.type === 'NEW_CHANGES') { if (msg.type === 'NEW_CHANGES') {
const newRev = msg.newRev; serverMessageTaskQueue.enqueue(async () => {
const changeset = msg.changeset; // Avoid updating the DOM while the user is composing a character. Notes about this `await`:
const author = (msg.author || ''); // * `await null;` is equivalent to `await Promise.resolve(null);`, so if the user is not
const apool = msg.apool; // currently composing a character then execution will continue without error.
// * We assume that it is not possible for a new 'compositionstart' event to fire after
// When inInternationalComposition, msg pushed msgQueue. // the `await` but before the next line of code after the `await` (or, if it is
if (msgQueue.length > 0 || editor.getInInternationalComposition()) { // possible, that the chances are so small or the consequences so minor that it's not
const oldRev = msgQueue.length > 0 ? msgQueue[msgQueue.length - 1].newRev : rev; // worth addressing).
if (newRev !== (oldRev + 1)) { await editor.getInInternationalComposition();
window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${oldRev + 1}`); const {newRev, changeset, author = '', apool} = msg;
// setChannelState("DISCONNECTED", "badmessage_newchanges");
return;
}
msgQueue.push(msg);
return;
}
if (newRev !== (rev + 1)) { if (newRev !== (rev + 1)) {
window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${rev + 1}`); window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${rev + 1}`);
// setChannelState("DISCONNECTED", "badmessage_newchanges"); // setChannelState("DISCONNECTED", "badmessage_newchanges");
return; return;
} }
rev = newRev; rev = newRev;
editor.applyChangesToBase(changeset, author, apool); editor.applyChangesToBase(changeset, author, apool);
});
} else if (msg.type === 'ACCEPT_COMMIT') { } else if (msg.type === 'ACCEPT_COMMIT') {
serverMessageTaskQueue.enqueue(() => {
const newRev = msg.newRev; const newRev = msg.newRev;
if (msgQueue.length > 0) {
if (newRev !== (msgQueue[msgQueue.length - 1].newRev + 1)) {
window.console.warn('bad message revision on ACCEPT_COMMIT: ' +
`${newRev} not ${msgQueue[msgQueue.length - 1][0] + 1}`);
// setChannelState("DISCONNECTED", "badmessage_acceptcommit");
return;
}
msgQueue.push(msg);
return;
}
if (newRev !== (rev + 1)) { if (newRev !== (rev + 1)) {
window.console.warn(`bad message revision on ACCEPT_COMMIT: ${newRev} not ${rev + 1}`); window.console.warn(`bad message revision on ACCEPT_COMMIT: ${newRev} not ${rev + 1}`);
// setChannelState("DISCONNECTED", "badmessage_acceptcommit"); // setChannelState("DISCONNECTED", "badmessage_acceptcommit");
@ -235,50 +214,33 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
} }
rev = newRev; rev = newRev;
acceptCommit(); acceptCommit();
});
} else if (msg.type === 'CLIENT_RECONNECT') { } else if (msg.type === 'CLIENT_RECONNECT') {
// Server sends a CLIENT_RECONNECT message when there is a client reconnect. // Server sends a CLIENT_RECONNECT message when there is a client reconnect.
// Server also returns all pending revisions along with this CLIENT_RECONNECT message // Server also returns all pending revisions along with this CLIENT_RECONNECT message
serverMessageTaskQueue.enqueue(() => {
if (msg.noChanges) { if (msg.noChanges) {
// If no revisions are pending, just make everything normal // If no revisions are pending, just make everything normal
setIsPendingRevision(false); setIsPendingRevision(false);
return; return;
} }
const {headRev, newRev, changeset, author = '', apool} = msg;
const headRev = msg.headRev;
const newRev = msg.newRev;
const changeset = msg.changeset;
const author = (msg.author || '');
const apool = msg.apool;
if (msgQueue.length > 0) {
if (newRev !== (msgQueue[msgQueue.length - 1].newRev + 1)) {
window.console.warn('bad message revision on CLIENT_RECONNECT: ' +
`${newRev} not ${msgQueue[msgQueue.length - 1][0] + 1}`);
// setChannelState("DISCONNECTED", "badmessage_acceptcommit");
return;
}
msg.type = 'NEW_CHANGES';
msgQueue.push(msg);
return;
}
if (newRev !== (rev + 1)) { if (newRev !== (rev + 1)) {
window.console.warn(`bad message revision on CLIENT_RECONNECT: ${newRev} not ${rev + 1}`); window.console.warn(`bad message revision on CLIENT_RECONNECT: ${newRev} not ${rev + 1}`);
// setChannelState("DISCONNECTED", "badmessage_acceptcommit"); // setChannelState("DISCONNECTED", "badmessage_acceptcommit");
return; return;
} }
rev = newRev; rev = newRev;
if (author === pad.getUserId()) { if (author === pad.getUserId()) {
acceptCommit(); acceptCommit();
} else { } else {
editor.applyChangesToBase(changeset, author, apool); editor.applyChangesToBase(changeset, author, apool);
} }
if (newRev === headRev) { if (newRev === headRev) {
// Once we have applied all pending revisions, make everything normal // Once we have applied all pending revisions, make everything normal
setIsPendingRevision(false); setIsPendingRevision(false);
} }
});
} else if (msg.type === 'NO_COMMIT_PENDING') { } else if (msg.type === 'NO_COMMIT_PENDING') {
if (committing) { if (committing) {
// server missed our commit message; abort that commit // server missed our commit message; abort that commit