Fix ArrayBuffer support and add pull-stream-server demo.

parent 8b50a237
function (args) {
var env = args.environ;
var argv = args.argv;
var modules = args.modules;
var Queues = args.Queues;
var timers = Queues.timers;
var compile = modules.compile;
if (args.OpenMP.id !== 0) return
var queue = []
function schedule (task) {
queue.push(task)
}
function tick () {
if (queue.length === 0) return false;
// Dequeue
var task = queue.shift();
// Run it
task[0].apply(null, task.slice(1))
return true
}
// modules.webserver();
var print = modules.print
var netLib = modules.netLib
var port = 8080;
var buffsize = 2000;
var buffer = Uint8Array.allocPlain(1000);
var tint = 0;
print("\nListening on port " + port);
// Start the netLib library
netLib.netInit();
// Initiate the sockets
function duplexStream (clientsocket) {
var source = function read (end, cb) {
if (end) {
// Disconnect the client
print("-- Source received end message.");
netLib.netDisconnect(clientsocket);
return
}
var buffer = Uint8Array.allocPlain(1000);
// Check for data to receive and print if there was something
try {
var bytesRead = netLib.netRecv(clientsocket, buffer);
if (bytesRead > 0) {
var croppedBuffer = buffer.subarray(0, bytesRead);
print("-- Read " + bytesRead + " bytes.");
return schedule([cb, null, croppedBuffer])
} else {
return schedule([read, end, cb])
}
} catch (err) {
// an error means the other end hung up
print("-- Source hung up!");
netLib.netDisconnect(clientsocket);
return schedule([cb, true])
}
}
var sink = function reader (read) {
read(null, function next (end, data) {
if (end) {
// Disconnect the client
print("-- Sink received end message.");
netLib.netDisconnect(clientsocket);
return
}
// Send data to the client
print(data.length)
try {
var bytesWritten = netLib.netSend(clientsocket, data);
print("-- Sent " + bytesWritten + " bytes.");
return schedule([read, null, next])
} catch (err) {
// Socket no longer there (client disconnected from us)
print("-- Sink hung up!");
return schedule([read, true])
}
})
}
return {source: source, sink: sink}
}
var clientsocket = netLib.netInitSocket();
var listensocket = netLib.netInitSocket();
// Set the server to listen
netLib.netListen(listensocket, port);
print("\n");
var connections = 0
var reads = 0
// Main program loop
while (true) {
// Check for an incoming connection
if (netLib.netAccept(listensocket, clientsocket)) {
// A client has connected!
print("Hello new client!");
var stream = duplexStream(clientsocket);
// basic echo server
// stream.source(null, function yayIncoming (end, data) {
// print(new TextDecoder().decode(data));
// reads++;
// stream.sink(reads > 4, data)
// schedule([stream.source, null, yayIncoming])
// })
stream.sink(function (abort, cb) {
stream.source(abort, function (end, data) {
if (end) return cb(end, data);
var text = new TextDecoder().decode(data);
print(text);
// Upon receiving "fin" close the connection
if (text === 'fin') {
stream.source(true)
return cb(true)
}
return cb(end, data);
})
})
while (tick());
// Each thread serves 2 requests before getting tired.
if (++connections == 2) break;
}
}
print("That's enough for now! Bye!");
netLib.netDisconnect(listensocket);
// Stop the netLib library
netLib.netStop();
return
}
......@@ -27,9 +27,12 @@ void app_push_netLib(duk_context *ctx) {
duk_put_prop_string(ctx, -2, "netSend");
duk_push_c_function(ctx, wrap_netRecv, 2);
duk_put_prop_string(ctx, -2, "netRecv");
duk_push_c_function(ctx, wrap_netSocketStruct, 1);
duk_put_prop_string(ctx, -2, "netSocketStruct");
}
duk_ret_t wrap_netSocketStruct(duk_context *ctx, netSocket *socket) {
duk_ret_t wrap_netSocketStruct(duk_context *ctx) {
netSocket *socket = duk_get_pointer(ctx, 0);
duk_push_object(ctx);
duk_push_int(ctx, socket->number); // the socket descriptor from the socket() call
duk_put_prop_string(ctx, -2, "number");
......@@ -127,9 +130,8 @@ duk_ret_t wrap_netIsDataPending(duk_context *ctx) {
duk_ret_t wrap_netSend(duk_context *ctx) {
netSocket *netSock = duk_get_pointer(ctx, 0);
duk_size_t bytes = 0;
const char *data = duk_get_buffer(ctx, 1, &bytes);
const char *data = duk_get_buffer_data(ctx, 1, &bytes);
int ret = netSend(netSock, data, (int) bytes);
printf("bytes = %d, ret=%d", bytes, ret);
if (ret == -1) {
return duk_generic_error(ctx, "netSend error");
}
......@@ -141,14 +143,13 @@ duk_ret_t wrap_netSend(duk_context *ctx) {
duk_ret_t wrap_netRecv(duk_context *ctx) {
netSocket *netSock = duk_get_pointer(ctx, 0);
duk_size_t bytes = 0;
char *data = duk_get_buffer(ctx, 1, &bytes);
char *data = duk_get_buffer_data(ctx, 1, &bytes);
int ret = netRecv(netSock, data, (int) bytes);
if (ret == -1) {
return duk_generic_error(ctx, "netRecv error");
}
duk_pop_2(ctx);
duk_push_int(ctx, ret);
printf("bytes = %d, ret=%d", bytes, ret);
return 1;
}
......
......@@ -117,7 +117,22 @@ int main(int argc, char *argv[] /* char *environ[] */) {
// Run top-level module, passing the prime arg as the sole argument.
ret = duk_pcall(ctx, 1);
if (ret != 0) {
fprintf(stderr, "%s %s\n", argv[1], duk_safe_to_string(ctx, -1));
if (duk_is_error(ctx, -1)) {
duk_dup(ctx, -1);
fprintf(stderr, "\033[31m%s\033[39;49m\n",duk_safe_to_string(ctx, -1));
duk_pop(ctx);
if (duk_get_prop_string(ctx, -1, "fileName")) {
fprintf(stderr, "%s", duk_safe_to_string(ctx, -1));
}
duk_pop(ctx);
if (duk_get_prop_string(ctx, -1, "lineNumber")) {
fprintf(stderr, ":%s\n", duk_safe_to_string(ctx, -1));
}
duk_pop(ctx);
} else {
duk_push_context_dump(ctx);
printf("%s\n", duk_to_string(ctx, -1));
}
goto finally;
}
// else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment