Implemented a first prototype of mailbox

parent c123b662
......@@ -248,7 +248,7 @@ THREAD_CFLAGS=''
THREAD_LIBS=''
THREAD_GC_FLAGS='--enable-threads=posix'
INSTALL_TARGET='install'
THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore threads/barrier"
THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore threads/barrier threads/mailbox"
clibs=''
SONAME=''
SONAME_LDFLAGS=''
......
......@@ -479,6 +479,12 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl
MAYBE_MARK(o->barrier.queue_spinlock);
MAYBE_MARK(o->barrier.name);
break;
case t_mailbox:
MAYBE_MARK(o->mailbox.data);
MAYBE_MARK(o->mailbox.name);
MAYBE_MARK(o->mailbox.reader_semaphore);
MAYBE_MARK(o->mailbox.writer_semaphore);
break;
# endif
case t_codeblock:
MAYBE_MARK(o->cblock.source);
......@@ -586,6 +592,7 @@ ecl_alloc_object(cl_type t)
case t_condition_variable:
case t_semaphore:
case t_barrier:
case t_mailbox:
#endif
case t_foreign:
case t_codeblock: {
......@@ -861,6 +868,7 @@ init_alloc(void)
sizeof(struct ecl_condition_variable), 0);
init_tm(t_semaphore, "SEMAPHORES", sizeof(struct ecl_semaphore), 0);
init_tm(t_barrier, "BARRIER", sizeof(struct ecl_barrier), 0);
init_tm(t_mailbox, "MAILBOX", sizeof(struct ecl_mailbox), 0);
#endif
init_tm(t_codeblock, "CODEBLOCK", sizeof(struct ecl_codeblock), -1);
init_tm(t_foreign, "FOREIGN", sizeof(struct ecl_foreign), 2);
......@@ -1017,6 +1025,11 @@ init_alloc(void)
to_bitmap(&o, &(o.barrier.name)) |
to_bitmap(&o, &(o.barrier.queue_list)) |
to_bitmap(&o, &(o.barrier.queue_spinlock));
type_info[t_mailbox].descriptor =
to_bitmap(&o, &(o.mailbox.name)) |
to_bitmap(&o, &(o.mailbox.data)) |
to_bitmap(&o, &(o.mailbox.reader_semaphore)) |
to_bitmap(&o, &(o.mailbox.writer_semaphore));
# endif
type_info[t_codeblock].descriptor =
to_bitmap(&o, &(o.cblock.data)) |
......
......@@ -387,13 +387,24 @@ write_condition_variable(cl_object x, cl_object stream)
_ecl_write_unreadable(x, "semaphore", Cnil, stream);
}
# ifdef ECL_SEMAPHORES
static void
write_semaphore(cl_object x, cl_object stream)
{
_ecl_write_unreadable(x, "semaphore", Cnil, stream);
}
# endif
static void
write_barrier(cl_object x, cl_object stream)
{
_ecl_write_unreadable(x, "barrier", Cnil, stream);
}
static void
write_mailbox(cl_object x, cl_object stream)
{
_ecl_write_unreadable(x, "mailbox", Cnil, stream);
}
#endif /* ECL_THREADS */
static void
......@@ -446,9 +457,9 @@ static printer dispatch[FREE+1] = {
write_lock, /* t_lock */
write_lock, /* t_rwlock */
write_condition_variable, /* t_condition_variable */
# ifdef ECL_SEMAPHORES
write_semaphore, /* t_semaphore */
# endif
write_barrier, /* t_barrier */
write_mailbox, /* t_mailbox */
#endif
write_codeblock, /* t_codeblock */
write_foreign, /* t_foreign */
......
......@@ -1623,6 +1623,14 @@ cl_symbols[] = {
{KEY_ "RESET-COUNT", KEYWORD, NULL, -1, OBJNULL},
{KEY_ "KILL-WAITING", KEYWORD, NULL, -1, OBJNULL},
{KEY_ "UNBLOCKED", KEYWORD, NULL, -1, OBJNULL},
{MP_ "MAILBOX", MP_ORDINARY, NULL, -1, OBJNULL},
{MP_ "MAKE-MAILBOX", MP_ORDINARY, IF_MP(mp_make_mailbox), -1, OBJNULL},
{MP_ "MAILBOX-NAME", MP_ORDINARY, IF_MP(mp_mailbox_name), 1, OBJNULL},
{MP_ "MAILBOX-COUNT", MP_ORDINARY, IF_MP(mp_mailbox_count), 1, OBJNULL},
{MP_ "MAILBOX-EMPTY-P", MP_ORDINARY, IF_MP(mp_mailbox_empty_p), 1, OBJNULL},
{MP_ "MAILBOX-READ", MP_ORDINARY, IF_MP(mp_mailbox_read), 1, OBJNULL},
{MP_ "MAILBOX-SEND", MP_ORDINARY, IF_MP(mp_mailbox_send), 2, OBJNULL},
/* #endif defined(ECL_THREADS) */
{SYS_ "WHILE", SI_ORDINARY, NULL, -1, OBJNULL},
......
......@@ -1623,6 +1623,14 @@ cl_symbols[] = {
{KEY_ "RESET-COUNT",NULL},
{KEY_ "KILL-WAITING",NULL},
{KEY_ "UNBLOCKED",NULL},
{MP_ "MAILBOX",NULL},
{MP_ "MAKE-MAILBOX",IF_MP("mp_make_mailbox")},
{MP_ "MAILBOX-NAME",IF_MP("mp_mailbox_name")},
{MP_ "MAILBOX-COUNT",IF_MP("mp_mailbox_count")},
{MP_ "MAILBOX-EMPTY-P",IF_MP("mp_mailbox_empty_p")},
{MP_ "MAILBOX-READ",IF_MP("mp_mailbox_read")},
{MP_ "MAILBOX-SEND",IF_MP("mp_mailbox_send")},
/* #endif defined(ECL_THREADS) */
{SYS_ "WHILE",NULL},
......
/* -*- mode: c; c-basic-offset: 8 -*- */
/*
mailbox.d -- thread communication queue
*/
/*
Copyright (c) 2012, Juan Jose Garcia Ripoll.
ECL is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
See file '../Copyright' for full details.
*/
#define AO_ASSUME_WINDOWS98 /* We need this for CAS */
#include <ecl/ecl.h>
#include <ecl/internal.h>
#if !defined(AO_HAVE_fetch_and_add1)
#error "Cannot implement mailboxs without AO_fetch_and_add1_full"
#endif
static ECL_INLINE void
FEerror_not_a_mailbox(cl_object mailbox)
{
FEwrong_type_argument(@'mp::mailbox', mailbox);
}
cl_object
ecl_make_mailbox(cl_object name, cl_fixnum count)
{
cl_object output = ecl_alloc_object(t_mailbox);
cl_fixnum mask;
for (mask = 1; mask < count; mask <<= 1) {}
if (mask == 1)
mask = 63;
count = mask + 1;
output->mailbox.name = name;
output->mailbox.data = si_make_vector(Ct, /* element type */
MAKE_FIXNUM(count), /* size */
Cnil, /* adjustable */
Cnil, /* fill pointer */
Cnil, /* displaced to */
Cnil); /* displacement */
output->mailbox.reader_semaphore =
ecl_make_semaphore(name, 0);
output->mailbox.writer_semaphore =
ecl_make_semaphore(name, count);
output->mailbox.read_pointer = 0;
output->mailbox.write_pointer = 1;
output->mailbox.mask = mask;
return output;
}
@(defun mp::make-mailbox (&key name (count MAKE_FIXNUM(128)))
@
{
@(return ecl_make_mailbox(name, fixnnint(count)))
}
@)
cl_object
mp_mailbox_name(cl_object mailbox)
{
cl_env_ptr env = ecl_process_env();
unlikely_if (type_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
}
ecl_return1(env, mailbox->mailbox.name);
}
cl_object
mp_mailbox_count(cl_object mailbox)
{
cl_env_ptr env = ecl_process_env();
unlikely_if (type_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
}
ecl_return1(env, MAKE_FIXNUM(mailbox->mailbox.data->vector.dim));
}
cl_object
mp_mailbox_empty_p(cl_object mailbox)
{
cl_env_ptr env = ecl_process_env();
unlikely_if (type_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
}
ecl_return1(env, mailbox->mailbox.reader_semaphore->semaphore.counter? Ct : Cnil);
}
cl_object
mp_mailbox_read(cl_object mailbox)
{
cl_env_ptr env = ecl_process_env();
cl_fixnum ndx;
cl_object output;
unlikely_if (type_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
}
mp_wait_on_semaphore(mailbox->mailbox.reader_semaphore);
{
ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) &
mailbox->mailbox.mask;
output = mailbox->mailbox.data->vector.self.t[ndx];
}
mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore);
ecl_return1(env, output);
}
cl_object
mp_mailbox_send(cl_object mailbox, cl_object msg)
{
cl_env_ptr env = ecl_process_env();
cl_fixnum ndx;
unlikely_if (type_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
}
mp_wait_on_semaphore(mailbox->mailbox.writer_semaphore);
{
ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) &
mailbox->mailbox.mask;
mailbox->mailbox.data->vector.self.t[ndx] = msg;
}
mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore);
ecl_return0(env);
}
......@@ -167,6 +167,8 @@ ecl_type_to_symbol(cl_type t)
return @'mp::semaphore';
case t_barrier:
return @'mp::barrier';
case t_mailbox:
return @'mp::mailbox';
#endif
case t_codeblock:
return @'si::code-block';
......
......@@ -90,6 +90,7 @@
#+threads (mp::condition-variable)
#+threads (mp::semaphore)
#+threads (mp::barrier)
#+threads (mp::mailbox)
#+sse2 (ext::sse-pack))))
(loop for (name . rest) in '#.+builtin-classes-list+
......
......@@ -4524,7 +4524,7 @@ THREAD_CFLAGS=''
THREAD_LIBS=''
THREAD_GC_FLAGS='--enable-threads=posix'
INSTALL_TARGET='install'
THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore threads/barrier"
THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore threads/barrier threads/mailbox"
clibs=''
SONAME=''
SONAME_LDFLAGS=''
......
......@@ -1745,6 +1745,15 @@ extern ECL_API cl_object mp_barrier_arrivers_count(cl_object);
extern ECL_API cl_object mp_barrier_wait _ARGS((cl_narg, cl_object, ...));
extern ECL_API cl_object mp_barrier_unblock _ARGS((cl_narg, cl_object, ...));
/* threads/mailbox.d */
extern ECL_API cl_object mp_make_mailbox _ARGS((cl_narg, ...));
extern ECL_API cl_object mp_mailbox_name(cl_object mailbox);
extern ECL_API cl_object mp_mailbox_count(cl_object mailbox);
extern ECL_API cl_object mp_mailbox_empty_p(cl_object);
extern ECL_API cl_object mp_mailbox_read(cl_object mailbox);
extern ECL_API cl_object mp_mailbox_send(cl_object mailbox, cl_object msg);
/* threads/atomic.c */
extern ECL_API cl_object ecl_atomic_get(cl_object *slot);
......
......@@ -83,6 +83,7 @@ typedef enum {
t_condition_variable,
t_semaphore,
t_barrier,
t_mailbox,
#endif
t_codeblock,
t_foreign,
......@@ -935,6 +936,17 @@ struct ecl_lock {
cl_fixnum counter;
};
struct ecl_mailbox {
HEADER;
cl_object name;
cl_object data;
cl_object reader_semaphore;
cl_object writer_semaphore;
cl_index read_pointer;
cl_index write_pointer;
cl_index mask;
};
struct ecl_rwlock {
HEADER;
cl_object name;
......@@ -1059,6 +1071,7 @@ union cl_lispunion {
struct ecl_condition_variable condition_variable; /* condition-variable */
struct ecl_semaphore semaphore; /* semaphore */
struct ecl_barrier barrier; /* barrier */
struct ecl_mailbox mailbox; /* mailbox */
#endif
struct ecl_codeblock cblock; /* codeblock */
struct ecl_foreign foreign; /* user defined data type */
......
......@@ -1233,6 +1233,7 @@ if not possible."
#+threads (MP::CONDITION-VARIABLE)
#+threads (MP::SEMAPHORE)
#+threads (MP::BARRIER)
#+threads (MP::MAILBOX)
#+ffi (FOREIGN-DATA)
#+sse2 (EXT:SSE-PACK (OR EXT:INT-SSE-PACK
EXT:FLOAT-SSE-PACK
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment