Holy guacamole Batman. Uh, see examples/streams/net2.js and runtime2. It uses...

Holy guacamole Batman. Uh, see examples/streams/net2.js and runtime2. It uses dukroll. Needs cleanup.
parent 0a1c5272
import dukboot from 'dukboot'
import runtime from './runtime2/dukboot.js'
const run = runtime(dukboot)
run(
function app (imports) {
if (imports.thread.id() === 0) {
imports.streams.ByteDuplexStream.sink(imports.streams.ByteDuplexStream.source)
}
}
)
import { loop } from './runtime2/scheduler.js'
import * as streams from './runtime2/streams/index.js'
if (dukboot.thread.id() === 0) {
streams.ByteDuplexStream.sink(streams.ByteDuplexStream.source)
}
loop()
import { thread, log } from 'dukboot'
import { append, loop } from './runtime2/scheduler.js'
import { TCPDuplexSourceStream } from './runtime2/streams'
if (thread.id() === 0) {
var server = TCPDuplexSourceStream({port: 8080, host: 'localhost'});
log.log('test 1')
var onConnection = function onConnection (err, connection) {
log.log('test 2')
// Setup echo server
connection.sink(connection.source)
append([server, null, onConnection])
}
server(null, onConnection);
}
loop()
import ByteDuplexStream from './streams/byte-duplex-stream.js'
import UnicodeThroughStream from './streams/unicode-through-stream.js'
import TCPDuplexStream from './streams/tcp-duplex-stream.js'
import TCPDuplexSourceStream from './streams/tcp-duplex-source-stream.js'
export default function (imports) {
function (imports) {
// setup
var queue = []
......@@ -15,10 +10,10 @@ export default function (imports) {
imports.scheduler.append = schedule
imports.streams = {}
imports.streams.ByteDuplexStream = ByteDuplexStream(imports)
imports.streams.UnicodeThroughStream = UnicodeThroughStream(imports)
imports.streams.TCPDuplexStream = TCPDuplexStream(imports)
imports.streams.TCPDuplexSourceStream = TCPDuplexSourceStream(imports)
imports.streams.ByteDuplexStream = imports.sandbox.compileTrustedFunction('streams/byte-duplex-stream.js')(imports)
imports.streams.UnicodeThroughStream = imports.sandbox.compileTrustedFunction('streams/unicode-through-stream.js')(imports)
imports.streams.TCPDuplexStream = imports.sandbox.compileTrustedFunction('streams/tcp-duplex-stream.js')(imports)
imports.streams.TCPDuplexSourceStream = imports.sandbox.compileTrustedFunction('streams/tcp-duplex-source-stream.js')(imports)
// run timeouts due
// run callbacks in queue
......
function (dukboot) {
var getUint8 = dukboot.stdio.getUint8;
var putUint8 = dukboot.stdio.putUint8;
var next = dukboot.scheduler.append;
// setup
function read (end, cb) {
if (end) return cb(end)
var c = getUint8()
if (c === undefined) return next([read, end, cb])
if (c instanceof Error) return cb(c)
return next([cb, null, c])
}
function sink (read) {
read(null, function write (err, byte) {
if (err) return
var c = putUint8(byte)
if (c instanceof Error) return read(true)
if (c === undefined) return next([read, null, write])
return next([read, null, write])
})
}
return {
source: read,
sink: sink
}
}
function (imports) {
var netLib = imports.netLib;
var print = imports.log.log;
var schedule = imports.scheduler.append;
var TCPDuplexStream = imports.streams.TCPDuplexStream;
return function streamOfStreams (args) {
// Start the netLib library
netLib.netInit();
var port = args.port;
var host = args.host;
var listensocket = netLib.netInitSocket();
// Set the server to listen
netLib.netListen(listensocket, port);
return function read (end, cb) {
if (end) {
// TODO: We need a way to kill all current connections. :/
print("That's enough for now! Bye!");
netLib.netDisconnect(listensocket);
// Stop the netLib library
netLib.netStop();
return
}
// Fresh new sockets for everyone! (Well, every client anyway).
var clientsocket = netLib.netInitSocket();
if (netLib.netAccept(listensocket, clientsocket)) {
// A client has connected!
print("Hello new client!");
var stream = TCPDuplexStream(clientsocket);
return cb(null, stream);
} else {
schedule([read, end, cb])
}
}
}
}
function (imports) {
var netLib = imports.netLib;
var print = imports.log.log;
var schedule = imports.scheduler.append;
// TCP connection stream
return function duplexStream (clientsocket) {
return {
source: function read (end, cb) {
if (end) {
// Disconnect the client
print("-- Source received end message.");
netLib.netDisconnect(clientsocket);
return
}
var buffer = Uint8Array.allocPlain(1000);
// Check for data to receive and print if there was something
try {
var bytesRead = netLib.netRecv(clientsocket, buffer);
if (bytesRead > 0) {
var croppedBuffer = buffer.subarray(0, bytesRead);
print("-- Read " + bytesRead + " bytes.");
return schedule([cb, null, croppedBuffer])
} else {
return schedule([read, end, cb])
}
} catch (err) {
// an error means the other end hung up
print("-- Source hung up!");
netLib.netDisconnect(clientsocket);
return schedule([cb, true])
}
},
sink: function reader (read) {
read(null, function next (end, data) {
if (end) {
// Disconnect the client
print("-- Sink received end message.");
netLib.netDisconnect(clientsocket);
return
}
// Send data to the client
try {
var bytesWritten = netLib.netSend(clientsocket, data);
print("-- Sent " + bytesWritten + " bytes.");
return schedule([read, null, next])
} catch (err) {
// Socket no longer there (client disconnected from us)
print("-- Sink hung up!");
return schedule([read, true])
}
})
}
}
}
}
function (imports) {
var schedule = imports.scheduler.append;
function decoder (read) {
var UTF8decoder = new TextDecoder('utf-8')
function howManyBytes (firstByte) {
if (firstByte > 254) throw Error("Number too large to be a byte: " + firstByte)
if (firstByte >> 7 === 0) return 1
if (firstByte >> 5 === 0b110) return 2
if (firstByte >> 4 === 0b1110) return 3
if (firstByte >> 3 === 0b11110) return 4
}
return function unicodeThrough (end, cb) {
if (end) return cb ? cb(end) : end
var length = null;
var chars = [];
read(null, function next(end, byte) {
if (end) return cb(end)
length = length || howManyBytes(byte)
chars.push(byte)
var char = UTF8decoder.decode(new Uint8Array(chars))
if (chars.length === length) return cb(null, char)
schedule([read, null, next])
})
}
}
var UTF8encoder = new TextEncoder('utf-8')
function encoder (text, cb) {
var bytes = UTF8encoder.encode(text);
var i = 0;
cb(bytes[i], function next(err, cb) {
if (err) return cb(err)
i++
if (i == bytes.byteLength) return cb(null)
schedule([write, bytes[i], next])
})
}
return {
decoder: decoder,
encoder: encoder
}
}
import { log } from 'dukboot'
var queue = []
export function append (task) {
queue.push(task)
}
function tick () {
if (queue.length === 0) throw Error("Queue complete.");
// Dequeue
var task = queue.shift();
// Run it
task[0].apply(null, task.slice(1))
return true
}
// event loop
export function loop () {
try {
while (tick()) {}
} catch (e) {
if (e.message !== "Queue complete.") log.log(e);
}
}
export default function (dukboot) {
var getUint8 = dukboot.stdio.getUint8;
var putUint8 = dukboot.stdio.putUint8;
var next = dukboot.scheduler.append;
// setup
function read (end, cb) {
if (end) return cb(end)
var c = getUint8()
if (c === undefined) return next([read, end, cb])
if (c instanceof Error) return cb(c)
return next([cb, null, c])
}
import { stdio } from 'dukboot'
import { append as next } from '../scheduler.js'
var getUint8 = stdio.getUint8;
var putUint8 = stdio.putUint8;
// setup
function read (end, cb) {
if (end) return cb(end)
var c = getUint8()
if (c === undefined) return next([read, end, cb])
if (c instanceof Error) return cb(c)
return next([cb, null, c])
}
function sink (read) {
read(null, function write (err, byte) {
if (err) return
var c = putUint8(byte)
if (c instanceof Error) return read(true)
if (c === undefined) return next([read, null, write])
return next([read, null, write])
})
}
function sink (read) {
read(null, function write (err, byte) {
if (err) return
var c = putUint8(byte)
if (c instanceof Error) return read(true)
if (c === undefined) return next([read, null, write])
return next([read, null, write])
})
}
return {
source: read,
sink: sink
}
export default {
source: read,
sink: sink
}
export { default as ByteDuplexStream } from './byte-duplex-stream.js'
export { default as UnicodeThroughStream } from './unicode-through-stream.js'
export { default as TCPDuplexStream } from './tcp-duplex-stream.js'
export { default as TCPDuplexSourceStream } from './tcp-duplex-source-stream.js'
export default function (imports) {
var netLib = imports.netLib;
var print = imports.log.log;
var schedule = imports.scheduler.append;
var TCPDuplexStream = imports.streams.TCPDuplexStream;
return function streamOfStreams (args) {
// Start the netLib library
netLib.netInit();
var port = args.port;
var host = args.host;
var listensocket = netLib.netInitSocket();
// Set the server to listen
netLib.netListen(listensocket, port);
return function read (end, cb) {
if (end) {
// TODO: We need a way to kill all current connections. :/
print("That's enough for now! Bye!");
netLib.netDisconnect(listensocket);
// Stop the netLib library
netLib.netStop();
return
}
// Fresh new sockets for everyone! (Well, every client anyway).
var clientsocket = netLib.netInitSocket();
if (netLib.netAccept(listensocket, clientsocket)) {
// A client has connected!
print("Hello new client!");
var stream = TCPDuplexStream(clientsocket);
return cb(null, stream);
} else {
schedule([read, end, cb])
}
import { netLib, log } from 'dukboot'
import { append as schedule } from '../scheduler.js'
import TCPDuplexStream from './tcp-duplex-stream.js'
export default function streamOfStreams (args) {
// Start the netLib library
netLib.netInit();
var port = args.port;
var host = args.host;
var listensocket = netLib.netInitSocket();
// Set the server to listen
netLib.netListen(listensocket, port);
return function read (end, cb) {
if (end) {
// TODO: We need a way to kill all current connections. :/
log.log("That's enough for now! Bye!");
netLib.netDisconnect(listensocket);
// Stop the netLib library
netLib.netStop();
return
}
// Fresh new sockets for everyone! (Well, every client anyway).
var clientsocket = netLib.netInitSocket();
if (netLib.netAccept(listensocket, clientsocket)) {
// A client has connected!
log.log("Hello new client!");
var stream = TCPDuplexStream(clientsocket);
return cb(null, stream);
} else {
schedule([read, end, cb])
}
}
}
export default function (imports) {
var netLib = imports.netLib;
var print = imports.log.log;
var schedule = imports.scheduler.append;
import { netLib, log } from 'dukboot'
import { append as schedule } from '../scheduler.js'
// TCP connection stream
return function duplexStream (clientsocket) {
return {
source: function read (end, cb) {
// TCP connection stream
export default function duplexStream (clientsocket) {
return {
source: function read (end, cb) {
if (end) {
// Disconnect the client
log.log("-- Source received end message.");
netLib.netDisconnect(clientsocket);
return
}
var buffer = Uint8Array.allocPlain(1000);
// Check for data to receive and print if there was something
try {
var bytesRead = netLib.netRecv(clientsocket, buffer);
if (bytesRead > 0) {
var croppedBuffer = buffer.subarray(0, bytesRead);
log.log("-- Read " + bytesRead + " bytes.");
return schedule([cb, null, croppedBuffer])
} else {
return schedule([read, end, cb])
}
} catch (err) {
// an error means the other end hung up
log.log("-- Source hung up!");
netLib.netDisconnect(clientsocket);
return schedule([cb, true])
}
},
sink: function reader (read) {
read(null, function next (end, data) {
if (end) {
// Disconnect the client
print("-- Source received end message.");
log.log("-- Sink received end message.");
netLib.netDisconnect(clientsocket);
return
}
var buffer = Uint8Array.allocPlain(1000);
// Check for data to receive and print if there was something
// Send data to the client
try {
var bytesRead = netLib.netRecv(clientsocket, buffer);
if (bytesRead > 0) {
var croppedBuffer = buffer.subarray(0, bytesRead);
print("-- Read " + bytesRead + " bytes.");
return schedule([cb, null, croppedBuffer])
} else {
return schedule([read, end, cb])
}
var bytesWritten = netLib.netSend(clientsocket, data);
log.log("-- Sent " + bytesWritten + " bytes.");
return schedule([read, null, next])
} catch (err) {
// an error means the other end hung up
print("-- Source hung up!");
netLib.netDisconnect(clientsocket);
return schedule([cb, true])
// Socket no longer there (client disconnected from us)
log.log("-- Sink hung up!");
return schedule([read, true])
}
},
sink: function reader (read) {
read(null, function next (end, data) {
if (end) {
// Disconnect the client
print("-- Sink received end message.");
netLib.netDisconnect(clientsocket);
return
}
// Send data to the client
try {
var bytesWritten = netLib.netSend(clientsocket, data);
print("-- Sent " + bytesWritten + " bytes.");
return schedule([read, null, next])
} catch (err) {
// Socket no longer there (client disconnected from us)
print("-- Sink hung up!");
return schedule([read, true])
}
})
}
})
}
}
}
export default function (imports) {
var schedule = imports.scheduler.append;
function decoder (read) {
var UTF8decoder = new TextDecoder('utf-8')
function howManyBytes (firstByte) {
if (firstByte > 254) throw Error("Number too large to be a byte: " + firstByte)
if (firstByte >> 7 === 0) return 1
if (firstByte >> 5 === 0b110) return 2
if (firstByte >> 4 === 0b1110) return 3
if (firstByte >> 3 === 0b11110) return 4
}
return function unicodeThrough (end, cb) {
if (end) return cb ? cb(end) : end
var length = null;
var chars = [];
read(null, function next(end, byte) {
if (end) return cb(end)
length = length || howManyBytes(byte)
chars.push(byte)
var char = UTF8decoder.decode(new Uint8Array(chars))
if (chars.length === length) return cb(null, char)
schedule([read, null, next])
})
}
import { append as schedule } from '../scheduler.js'
var UTF8decoder = new TextDecoder('utf-8')
var UTF8encoder = new TextEncoder('utf-8')
function decoder (read) {
function howManyBytes (firstByte) {
if (firstByte > 254) throw Error("Number too large to be a byte: " + firstByte)
if (firstByte >> 7 === 0) return 1
if (firstByte >> 5 === 0b110) return 2
if (firstByte >> 4 === 0b1110) return 3
if (firstByte >> 3 === 0b11110) return 4
}
var UTF8encoder = new TextEncoder('utf-8')
function encoder (text, cb) {
var bytes = UTF8encoder.encode(text);
var i = 0;
cb(bytes[i], function next(err, cb) {
if (err) return cb(err)
i++
if (i == bytes.byteLength) return cb(null)
schedule([write, bytes[i], next])
return function unicodeThrough (end, cb) {
if (end) return cb ? cb(end) : end
var length = null;
var chars = [];
read(null, function next(end, byte) {
if (end) return cb(end)
length = length || howManyBytes(byte)
chars.push(byte)
var char = UTF8decoder.decode(new Uint8Array(chars))
if (chars.length === length) return cb(null, char)
schedule([read, null, next])
})
}
return {
decoder: decoder,
encoder: encoder
}
}
function encoder (text, cb) {
var bytes = UTF8encoder.encode(text);
var i = 0;
cb(bytes[i], function next(err, cb) {
if (err) return cb(err)
i++
if (i == bytes.byteLength) return cb(null)
schedule([write, bytes[i], next])
})
}
export default {
encoder: encoder,
decoder: decoder
}
import { thread } from 'dukboot'
import { UnicodeThroughStream, ByteDuplexStream } from './runtime2/streams/index.js'
import { loop } from './runtime2/scheduler.js'
if (thread.id() === 0) {
UnicodeThroughStream.encoder(ByteDuplexStream.sink)(
UnicodeThroughStream.decoder(
ByteDuplexStream.source
)
)
}
loop()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment