Add zero-copy message passing between threads.

parent d3ffbf43
function (dukboot) {
var print = dukboot.stdio.print;
var n = dukboot.thread.pool().length;
var id = dukboot.thread.id();
var buffer = dukboot.message.alloc(5);
for (var i = 0; i < buffer.length; i++) {
buffer[i] = id;
}
var s = (id + 1) % n
print("id " + id + ": wrote " + buffer[0] + ', size=' + buffer.length);
dukboot.message.send(s, buffer);
buffer = undefined;
while(buffer == undefined)
buffer = dukboot.message.read();
print("id " + id + ": read " + buffer[0] + ', size=' + buffer.length);
}
#include <omp.h>
#include "duktape/duktape.h"
#include "bindings_message.h"
void push_bindings_message (duk_context *ctx) {
duk_push_c_function(ctx, c_alloc_shared_buffer, 1);
duk_put_prop_string(ctx, -2, "alloc");
duk_push_c_function(ctx, c_send_shared_buffer, 2);
duk_put_prop_string(ctx, -2, "send");
duk_push_c_function(ctx, c_read_my_messages, 0);
duk_put_prop_string(ctx, -2, "read");
}
int initExternalBuffers (size_t len) {
if (len == 0) return 1;
// Allocate an array of buffers
Messages.items = (messageBuffer*) malloc (sizeof(messageBuffer) * len);
if (Messages.items == NULL) return 1;
Messages.last = 0;
Messages.size = len;
return 0;
}
duk_ret_t c_alloc_shared_buffer (duk_context *ctx) {
int len = duk_require_int(ctx, -1);
messageBuffer msg;
// Allocate a buffer the requested size
// p = duk_push_buffer(ctx, 1024, 0);
msg.owner = omp_get_thread_num();
msg.buffer = (char*) malloc (sizeof(char) * len);
if (msg.buffer == NULL) {
return duk_generic_error(ctx, "Memory allocation failure");
}
msg.last = len;
msg.size = len;
// Add buffer to managed array.
int err = 0;
int index = 0;
#pragma omp critical (ADDTO_MESSAGES)
{
// printf("(%d / %d) ", (int) Messages.last, (int) Messages.size);
if (Messages.last == Messages.size - 1) {
err = 1;
} else {
index = Messages.last;
Messages.items[Messages.last++] = msg;
}
}
if (err) return duk_generic_error(ctx, "Max number of external buffers reached");
duk_push_external_buffer(ctx);
duk_config_buffer(ctx, -1, msg.buffer, len);
// Lame hack. Put the buffer location in a hidden symbol.
duk_push_buffer_object(ctx, -1, 0, len, DUK_BUFOBJ_UINT8ARRAY);
duk_push_pointer(ctx, Messages.items + index);
duk_put_prop_string(ctx, -2, "\xFF" "messageBuffer");
return 1;
}
duk_ret_t c_send_shared_buffer (duk_context *ctx) {
int newOwner = duk_require_int(ctx, -2);
// duk_size_t size;
// int ptr = (int) duk_get_buffer_data(ctx, -1, &size);
duk_get_prop_string(ctx, -1, "\xFF" "messageBuffer");
messageBuffer* msg = duk_require_pointer(ctx, -1);
msg->owner = newOwner;
return 0;
}
duk_ret_t c_read_my_messages (duk_context *ctx) {
int id = omp_get_thread_num();
for (size_t i = 0; i < Messages.last; i++) {
if (Messages.items[i].owner == id) {
duk_push_external_buffer(ctx);
duk_config_buffer(ctx, -1, Messages.items[i].buffer, Messages.items[i].last);
duk_push_buffer_object(ctx, -1, 0, Messages.items[i].last, DUK_BUFOBJ_UINT8ARRAY);
duk_push_pointer(ctx, Messages.items + i);
duk_put_prop_string(ctx, -2, "\xFF" "messageBuffer");
return 1;
}
}
return 0;
}
......@@ -17,6 +17,7 @@ extern char **environ;
#include "main.h"
mainArgs_t mainArgs;
messageBufferA Messages;
int main(int argc, char *argv[] /* char *environ[] */) {
mainArgs.argc = argc;
......@@ -53,9 +54,12 @@ int main(int argc, char *argv[] /* char *environ[] */) {
exit(0);
}
// Allocate 10 inter-thread communications buffers.
initExternalBuffers(8+7+6+5+4);
// Fork into native (gcc) threads.
duk_int_t result;
#pragma omp parallel
#pragma omp parallel shared(Messages)
{
// Create a JS environment
duk_context *ctx = duk_create_heap_default();
......@@ -97,6 +101,10 @@ int main(int argc, char *argv[] /* char *environ[] */) {
duk_push_object(ctx);
push_bindings_sandbox(ctx);
duk_put_prop_string(ctx, -2, "sandbox");
// Provide access to shared memory messages
duk_push_object(ctx);
push_bindings_message(ctx);
duk_put_prop_string(ctx, -2, "message");
// Provide access to native functions
app_unstash_global_string(ctx, "Duktape"); // summon from the magic stash
duk_put_prop_string(ctx, -2, "Duktape");
......
typedef struct messageBuffer {
int owner;
size_t last;
size_t size;
char* buffer;
} messageBuffer;
typedef struct messageBufferA {
size_t last;
size_t size;
messageBuffer* items;
} messageBufferA;
#define messageBufferFromRaw(ptr) ((messageBuffer*) ((int)ptr - offsetof(messageBuffer, buffer)))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment