Use yjs and monaco to get colab code editing

This commit is contained in:
2021-11-07 19:29:44 +01:00
parent 041e1ccda6
commit 1bb4440d43
14 changed files with 737 additions and 168 deletions

View File

@@ -0,0 +1,2 @@
export const EDITOR_SESSION = 1
export const TERMINAL_SESSION = 2

View File

@@ -7,7 +7,10 @@
"dependencies": {
"body-parser": "^1.19.0",
"express": "^4.17.1",
"lib0": "^0.2.42",
"node-pty": "^0.10.1",
"ws": "^8.2.3"
"ws": "^8.2.3",
"y-protocols": "^1.0.5",
"yjs": "^13.5.18"
}
}

View File

@@ -28,7 +28,6 @@ export function getContainerShell (containerId, shell = 'sh') {
}
export function startContainer (image = 'alpine', cmd = ['sh', '-c', 'while true; do sleep 1d; done']) {
console.log(['run', '-d', '-l', config.containerLabel, image, ...cmd])
return exec(config.containerBinary, ['run', '--rm', '-d', '-l', config.containerLabel, image, ...cmd])
}

229
backend/src/editor.js Normal file
View File

@@ -0,0 +1,229 @@
// Mostly adopted from: https://github.com/yjs/y-websocket
import * as awarenessProtocol from 'y-protocols/awareness.js'
import * as syncProtocol from 'y-protocols/sync.js'
import { Doc } from 'yjs'
import { decoding, encoding, map } from 'lib0'
import { readFile, stat, writeFile } from 'fs/promises'
import { fileURLToPath } from 'url'
import path from 'path'
export async function upgrade (sessions, request, reqPath) {
// todo: check auth and project
// todo: check dir traversal
request.editorFile = `${path.dirname(fileURLToPath(import.meta.url))}/../../files/${reqPath[3]}`
return true
}
export async function connection (ws, req) {
ws.binaryType = 'arraybuffer'
// get doc, initialize if it does not exist yet
const doc = getYDoc(req.editorFile)
doc.conns.set(ws, new Set())
// listen and reply to events
ws.on('message', message => messageListener(ws, doc, new Uint8Array(message)))
// Check if connection is still alive
let pongReceived = true
const pingInterval = setInterval(() => {
if (!pongReceived) {
if (doc.conns.has(ws)) {
closeConn(doc, ws)
}
clearInterval(pingInterval)
} else if (doc.conns.has(ws)) {
pongReceived = false
try {
ws.ping()
} catch (e) {
closeConn(doc, ws)
clearInterval(pingInterval)
}
}
}, pingTimeout)
ws.on('close', () => {
closeConn(doc, ws)
clearInterval(pingInterval)
})
ws.on('pong', () => {
pongReceived = true
})
// put the following in a variables in a block so the interval handlers don't keep in in
// scope
{
// send sync step 1
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeSyncStep1(encoder, doc)
send(doc, ws, encoding.toUint8Array(encoder))
const awarenessStates = doc.awareness.getStates()
if (awarenessStates.size > 0) {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(doc.awareness, Array.from(awarenessStates.keys())))
send(doc, ws, encoding.toUint8Array(encoder))
}
}
return true
}
const messageSync = 0
const messageAwareness = 1
const wsReadyStateConnecting = 0
const wsReadyStateOpen = 1
const wsReadyStateClosing = 2
const wsReadyStateClosed = 3
const pingTimeout = 30000
const docs = new Map()
class WSSharedDoc extends Doc {
/**
* @param {string} name
*/
constructor (name) {
super({ gc: true })
this.name = name
this.conns = new Map()
this.awareness = new awarenessProtocol.Awareness(this)
this.awareness.setLocalState(null)
const awarenessChangeHandler = ({ added, updated, removed }, conn) => {
const changedClients = added.concat(updated, removed)
if (conn !== null) {
const connControlledIDs = this.conns.get(conn)
if (connControlledIDs !== undefined) {
added.forEach(clientID => connControlledIDs.add(clientID))
removed.forEach(clientID => connControlledIDs.delete(clientID))
}
}
// broadcast awareness update
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageAwareness)
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients))
const buff = encoding.toUint8Array(encoder)
this.conns.forEach((_, c) => {
send(this, c, buff)
})
}
this.awareness.on('update', awarenessChangeHandler)
this.on('update', updateHandler)
}
}
const updateHandler = (update, origin, doc) => {
const encoder = encoding.createEncoder()
encoding.writeVarUint(encoder, messageSync)
syncProtocol.writeUpdate(encoder, update)
const message = encoding.toUint8Array(encoder)
doc.conns.forEach((_, conn) => send(doc, conn, message))
writeUpdate(doc)
}
const debounce = (func, wait, immediate) => {
let timeout
return (...args) => {
let context = this
let later = () => {
timeout = null
if (!immediate) func.apply(context, args)
}
let callNow = immediate && !timeout
clearTimeout(timeout)
timeout = setTimeout(later, wait)
if (callNow) func.apply(context, args)
}
}
const writeUpdate = debounce(async doc => {
await writeFile(doc.name, doc.getText('monaco').toJSON())
}, 2000)
const getYDoc = file => map.setIfUndefined(docs, file, () => {
const doc = new WSSharedDoc(file)
doc.gc = true
docs.set(file, doc)
stat(file)
.then(() => {
return readFile(file, { encoding: 'utf8' })
})
.then(content => {
const text = doc.getText('monaco')
text.insert(0, content)
})
.catch(() => {
// new file
})
return doc
})
const messageListener = (conn, doc, message) => {
try {
const encoder = encoding.createEncoder()
const decoder = decoding.createDecoder(message)
const messageType = decoding.readVarUint(decoder)
switch (messageType) {
case messageSync:
encoding.writeVarUint(encoder, messageSync)
syncProtocol.readSyncMessage(decoder, encoder, doc, null)
if (encoding.length(encoder) > 1) {
send(doc, conn, encoding.toUint8Array(encoder))
}
break
case messageAwareness: {
awarenessProtocol.applyAwarenessUpdate(doc.awareness, decoding.readVarUint8Array(decoder), conn)
break
}
}
} catch (err) {
console.error(err)
doc.emit('error', [err])
}
}
const send = (doc, conn, m) => {
if (conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen) {
console.log(conn.readyState)
closeConn(doc, conn)
}
try {
conn.send(m, err => { err != null && closeConn(doc, conn) })
} catch (e) {
closeConn(doc, conn)
}
}
const closeConn = (doc, conn) => {
if (doc.conns.has(conn)) {
const controlledIds = doc.conns.get(conn)
doc.conns.delete(conn)
awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null)
docs.delete(doc.name)
}
conn.close()
}

45
backend/src/shell.js Normal file
View File

@@ -0,0 +1,45 @@
import { containerExists, getContainerShell } from './containers.js'
export async function upgrade(sessions, request, path) {
const [container, sessionId] = path.splice(2)
const session = sessions[sessionId]
if (session && session.container !== container) {
console.log('wrong session')
return false
}
if (!(await containerExists(container))) {
console.log('no container')
return false
}
if (!session) {
sessions[sessionId] = { container }
}
request.session = sessions[sessionId]
return true
}
export async function connection(ws, req) {
if (!req.session.term) {
req.session.term = getContainerShell(req.session.container)
}
ws.on('message', message => {
const decoded = message.toString()
req.session.term.write(decoded)
})
req.session.term.onData(data => {
ws.send(data)
})
req.session.term.onExit(exit => {
ws.send(`Process terminated with code ${exit.exitCode}`)
ws.close()
})
}

View File

@@ -1,63 +1,47 @@
import { containerExists, getContainerShell } from './containers.js'
import { WebSocketServer } from 'ws'
import { EDITOR_SESSION, TERMINAL_SESSION } from '../lib/SocketTypes.js'
import { upgrade as upgradeShell, connection as connectionShell } from './shell.js'
import { upgrade as upgradeEditor, connection as connectionEditor } from './editor.js'
export default (server, sessions) => {
const wss = new WebSocketServer({ noServer: true })
server.on('upgrade', async (request, socket, head) => {
const forbidden = () => {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n')
socket.destroy()
}
const path = request.url.substr(1).split('/')
if (path.length !== 3 || path[0] !== 'ws') {
if (path.length < 1 || path[0] !== 'ws') {
return forbidden()
}
const [_, container, sessionId] = path
const session = sessions[sessionId]
if (path.length > 3 && path[1] === 'editor' && (await upgradeEditor(sessions, request, path))) {
// /ws/editor/:project/:path
request.type = EDITOR_SESSION
if (session && session.container !== container) {
console.log('wrong session')
} else if (path.length === 4 && path[1] === 'terminal' && (await upgradeShell(sessions, request, path))) {
// /ws/terminal/:container/:session
request.type = TERMINAL_SESSION
} else {
return forbidden()
}
if (!(await containerExists(container))) {
console.log('no container')
return forbidden()
}
if (!session) {
sessions[sessionId] = { container }
}
request.session = sessions[sessionId]
wss.handleUpgrade(request, socket, head, ws => {
wss.emit('connection', ws, request);
});
wss.emit('connection', ws, request)
})
})
wss.on('connection', (ws, req) => {
if (!req.session.term) {
req.session.term = getContainerShell(req.session.container)
wss.on('connection', async (ws, req) => {
if (req.type === EDITOR_SESSION) {
await connectionEditor(ws, req)
}
ws.on('message', message => {
const decoded = message.toString()
req.session.term.write(decoded)
})
req.session.term.onData(data => {
ws.send(data)
})
req.session.term.onExit(exit => {
ws.send(`Process terminated with code ${exit.exitCode}`)
ws.close()
})
if (req.type === TERMINAL_SESSION) {
await connectionShell(ws, req)
}
})
}

View File

@@ -198,6 +198,18 @@ ipaddr.js@1.9.1:
resolved "https://registry.yarnpkg.com/ipaddr.js/-/ipaddr.js-1.9.1.tgz#bff38543eeb8984825079ff3a2a8e6cbd46781b3"
integrity sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==
isomorphic.js@^0.2.4:
version "0.2.4"
resolved "https://registry.yarnpkg.com/isomorphic.js/-/isomorphic.js-0.2.4.tgz#24ca374163ae54a7ce3b86ce63b701b91aa84969"
integrity sha512-Y4NjZceAwaPXctwsHgNsmfuPxR8lJ3f8X7QTAkhltrX4oGIv+eTlgHLXn4tWysC9zGTi929gapnPp+8F8cg7nA==
lib0@^0.2.42:
version "0.2.42"
resolved "https://registry.yarnpkg.com/lib0/-/lib0-0.2.42.tgz#6d8bf1fb8205dec37a953c521c5ee403fd8769b0"
integrity sha512-8BNM4MiokEKzMvSxTOC3gnCBisJH+jL67CnSnqzHv3jli3pUvGC8wz+0DQ2YvGr4wVQdb2R2uNNPw9LEpVvJ4Q==
dependencies:
isomorphic.js "^0.2.4"
media-typer@0.3.0:
version "0.3.0"
resolved "https://registry.yarnpkg.com/media-typer/-/media-typer-0.3.0.tgz#8710d7af0aa626f8fffa1ce00168545263255748"
@@ -383,3 +395,17 @@ ws@^8.2.3:
version "8.2.3"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.2.3.tgz#63a56456db1b04367d0b721a0b80cae6d8becbba"
integrity sha512-wBuoj1BDpC6ZQ1B7DWQBYVLphPWkm8i9Y0/3YdHjHKHiohOJ1ws+3OccDWtH+PoC9DZD5WOTrJvNbWvjS6JWaA==
y-protocols@^1.0.5:
version "1.0.5"
resolved "https://registry.yarnpkg.com/y-protocols/-/y-protocols-1.0.5.tgz#91d574250060b29fcac8f8eb5e276fbad594245e"
integrity sha512-Wil92b7cGk712lRHDqS4T90IczF6RkcvCwAD0A2OPg+adKmOe+nOiT/N2hvpQIWS3zfjmtL4CPaH5sIW1Hkm/A==
dependencies:
lib0 "^0.2.42"
yjs@^13.5.18:
version "13.5.18"
resolved "https://registry.yarnpkg.com/yjs/-/yjs-13.5.18.tgz#4151f381b170726b69be26296fd84b2efdc82e6e"
integrity sha512-6LcTL8gRe12fy89OvXG6Xs/uhsl7iwE4Wh106H1NItkcqcUaY2waTje1NVWmOZNXHXWVAzdf/fBQqB4Phq9sGA==
dependencies:
lib0 "^0.2.42"