Start of pull-stream based runtime framework

parent 54ac2d59
function app (imports) {
if (imports.thread.id() === 0) {
imports.stdio.unicodeStream.source(null, function next(end, char) {
if (end) return imports.stdio.unicodeStream.source(true)
imports.stdio.print(char)
imports.schedule([imports.stdio.unicodeStream.source, null, next])
})
}
}
function app (imports) {
if (imports.thread.id() === 0) {
imports.streams.ByteDuplexStream.sink(imports.streams.ByteDuplexStream.source)
}
}
function app (imports) {
if (imports.thread.id() === 0) {
var server = imports.streams.TCPDuplexSourceStream({port: 8080, host: 'localhost'});
imports.stdio.print('test 1')
var onConnection = function onConnection (err, connection) {
imports.stdio.print('test 2')
// Setup echo server
connection.sink(connection.source)
imports.scheduler.append([server, null, onConnection])
}
server(null, onConnection);
}
}
function app (imports) {
if (imports.thread.id() === 0) {
imports.streams.UnicodeThroughStream.encoder(imports.streams.ByteDuplexStream.sink)(
imports.streams.UnicodeThroughStream.decoder(
imports.streams.ByteDuplexStream.source
)
)
}
}
......@@ -6,22 +6,13 @@ function (imports) {
queue.push(task)
}
imports.schedule = schedule
imports.scheduler = imports.scheduler || {}
imports.scheduler.append = schedule
function read (end, cb) {
if (end) return cb(end)
var c = imports.stdio.getchar()
if (c === undefined) return schedule([read, end, cb])
if (c instanceof Error) return cb(c)
return schedule([cb, null, c])
}
var unicodeStream = imports.sandbox.compileTrustedFunction('unicode-stream.js')
imports.stdio.unicodeStream = {
source: unicodeStream(read),
sink: 'TBD'
}
imports.streams = {}
imports.streams.ByteDuplexStream = imports.sandbox.compileTrustedFunction('streams/byte-duplex-stream.js')(imports)
imports.streams.UnicodeThroughStream = imports.sandbox.compileTrustedFunction('streams/unicode-through-stream.js')(imports)
imports.streams.TCPDuplexSourceStream = imports.sandbox.compileTrustedFunction('streams/tcp-duplex-source-stream.js')(imports)
// run timeouts due
// run callbacks in queue
......
function (dukboot) {
var getchar = dukboot.stdio.getchar;
var putchar = dukboot.stdio.putchar;
var next = dukboot.scheduler.append;
// setup
function read (end, cb) {
if (end) return cb(end)
var c = getchar()
if (c === undefined) return next([read, end, cb])
if (c instanceof Error) return cb(c)
return next([cb, null, c])
}
function sink (read) {
read(null, function write (err, byte) {
if (err) return
var c = putchar(byte)
if (c instanceof Error) return read(true)
if (c === undefined) return next([read, null, write])
return next([read, null, write])
})
}
return {
source: read,
sink: sink
}
}
function (imports) {
var netLib = imports.netLib;
var print = imports.stdio.print;
var schedule = imports.scheduler.append;
// TCP connection stream
function duplexStream (clientsocket) {
return {
source: function read (end, cb) {
if (end) {
// Disconnect the client
print("-- Source received end message.");
netLib.netDisconnect(clientsocket);
return
}
var buffer = Uint8Array.allocPlain(1000);
// Check for data to receive and print if there was something
try {
var bytesRead = netLib.netRecv(clientsocket, buffer);
if (bytesRead > 0) {
var croppedBuffer = buffer.subarray(0, bytesRead);
print("-- Read " + bytesRead + " bytes.");
return schedule([cb, null, croppedBuffer])
} else {
return schedule([read, end, cb])
}
} catch (err) {
// an error means the other end hung up
print("-- Source hung up!");
netLib.netDisconnect(clientsocket);
return schedule([cb, true])
}
},
sink: function reader (read) {
read(null, function next (end, data) {
if (end) {
// Disconnect the client
print("-- Sink received end message.");
netLib.netDisconnect(clientsocket);
return
}
// Send data to the client
print(data.length)
try {
var bytesWritten = netLib.netSend(clientsocket, data);
print("-- Sent " + bytesWritten + " bytes.");
return schedule([read, null, next])
} catch (err) {
// Socket no longer there (client disconnected from us)
print("-- Sink hung up!");
return schedule([read, true])
}
})
}
}
}
return function streamOfStreams (args) {
// Start the netLib library
netLib.netInit();
var port = args.port;
var host = args.host;
var listensocket = netLib.netInitSocket();
// Set the server to listen
netLib.netListen(listensocket, port);
return function read (end, cb) {
if (end) {
print('test 4: end is true')
// TODO: We need a way to kill all current connections. :/
print("That's enough for now! Bye!");
netLib.netDisconnect(listensocket);
// Stop the netLib library
netLib.netStop();
return
}
// Fresh new sockets for everyone! (Well, every client anyway).
var clientsocket = netLib.netInitSocket();
if (netLib.netAccept(listensocket, clientsocket)) {
// A client has connected!
imports.stdio.print("Hello new client!");
var stream = duplexStream(clientsocket);
return cb(null, stream);
} else {
schedule([read, end, cb])
}
}
}
}
function (imports) {
var schedule = imports.scheduler.append;
function decoder (read) {
var UTF8decoder = new TextDecoder('utf-8')
function howManyBytes (firstByte) {
if (firstByte > 254) throw Error("Number too large to be a byte: " + firstByte)
if (firstByte >> 7 === 0) return 1
if (firstByte >> 5 === 0b110) return 2
if (firstByte >> 4 === 0b1110) return 3
if (firstByte >> 3 === 0b11110) return 4
}
return function unicodeThrough (end, cb) {
if (end) return cb ? cb(end) : end
var length = null;
var chars = [];
read(null, function next(end, byte) {
if (end) return cb(end)
length = length || howManyBytes(byte)
chars.push(byte)
var char = UTF8decoder.decode(new Uint8Array(chars))
if (chars.length === length) return cb(null, char)
schedule([read, null, next])
})
}
}
var UTF8encoder = new TextEncoder('utf-8')
function encoder (text, cb) {
var bytes = UTF8encoder.encode(text);
var i = 0;
cb(bytes[i], function next(err, cb) {
if (err) return cb(err)
i++
if (i == bytes.byteLength) return cb(null)
schedule([write, bytes[i], next])
})
}
return {
decoder: decoder,
encoder: encoder
}
}
function unicodeStream (read) {
var UTF8decoder = new TextDecoder('utf-8')
function howManyBytes (firstByte) {
if (firstByte > 254) throw Error("Number too large to be a byte: " + firstByte)
if (firstByte >> 7 === 0) return 1
if (firstByte >> 5 === 0b110) return 2
if (firstByte >> 4 === 0b1110) return 3
if (firstByte >> 3 === 0b11110) return 4
}
return function unicodeThrough (end, cb) {
if (end) return cb ? cb(end) : end
var length = null;
var chars = [];
read(null, function next(end, byte) {
if (end) return cb(end)
length = length || howManyBytes(byte)
chars.push(byte)
var char = UTF8decoder.decode(new Uint8Array(chars))
if (chars.length === length) return cb(null, char)
read(null, next)
})
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment