ecl_wakeup_waiters now take an argument with the number of processes to awake

parent 1141033a
......@@ -84,8 +84,8 @@ mp_barrier_arrivers_count(cl_object barrier)
}
@(defun mp::barrier-unblock (barrier &key reset_count disable kill_waiting)
int ping_flags = ECL_WAKEUP_ALL | ECL_WAKEUP_RESET_FLAG;
int kill_flags = ECL_WAKEUP_ALL | ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_KILL;
int ping_flags = ECL_WAKEUP_RESET_FLAG;
int kill_flags = ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_KILL;
@
unlikely_if (type_of(barrier) != t_barrier) {
FEerror_not_a_barrier(barrier);
......@@ -96,7 +96,9 @@ mp_barrier_arrivers_count(cl_object barrier)
barrier->barrier.arrivers_count = -1;
else
barrier->barrier.arrivers_count = barrier->barrier.count;
ecl_wakeup_waiters(the_env, barrier, Null(kill_waiting)? ping_flags : kill_flags);
ecl_wakeup_waiters(the_env, barrier,
Null(kill_waiting)? ping_flags : kill_flags,
ECL_WAKEUP_ALL);
@(return)
@)
......
......@@ -87,15 +87,15 @@ mp_condition_variable_timedwait(cl_object cv, cl_object lock, cl_object seconds)
cl_object
mp_condition_variable_signal(cl_object cv)
{
ecl_wakeup_waiters(ecl_process_env(), cv,
ECL_WAKEUP_ONE | ECL_WAKEUP_RESET_FLAG);
ecl_wakeup_waiters(ecl_process_env(), cv, ECL_WAKEUP_RESET_FLAG,
ECL_WAKEUP_ONE);
@(return Ct)
}
cl_object
mp_condition_variable_broadcast(cl_object cv)
{
ecl_wakeup_waiters(ecl_process_env(), cv,
ECL_WAKEUP_ALL | ECL_WAKEUP_RESET_FLAG);
ecl_wakeup_waiters(ecl_process_env(), cv, ECL_WAKEUP_RESET_FLAG,
ECL_WAKEUP_ALL);
@(return Ct)
}
......@@ -113,7 +113,7 @@ mp_giveup_lock(cl_object lock)
if (--lock->lock.counter == 0) {
lock->lock.owner = Cnil;
print_lock("releasing %p\t", lock, lock);
ecl_wakeup_waiters(env, lock, ECL_WAKEUP_ONE);
ecl_wakeup_waiters(env, lock, 0, ECL_WAKEUP_ONE);
} else {
print_lock("released %p\t", lock, lock);
}
......
......@@ -278,7 +278,7 @@ ecl_wait_on(cl_env_ptr env, cl_object (*condition)(cl_env_ptr, cl_object), cl_ob
* first processes are always allowed to check for the
* condition */
if (Null(output) && (firstone == record)) {
ecl_wakeup_waiters(the_env, o, 0);
ecl_wakeup_waiters(the_env, o, 0, ECL_WAKEUP_ONE);
}
/* 7) Restoring signals is done last, to ensure that
......@@ -292,7 +292,7 @@ ecl_wait_on(cl_env_ptr env, cl_object (*condition)(cl_env_ptr, cl_object), cl_ob
}
void
ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags)
ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags, cl_fixnum n)
{
ecl_disable_interrupts_env(the_env);
ecl_get_spinlock(the_env, &q->queue.spinlock);
......@@ -303,7 +303,7 @@ ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags)
* because of the UNWIND-PROTECT in ecl_wait_on(), but
* sometimes shit happens */
cl_object *tail, l;
for (tail = &q->queue.list; (l = *tail) != Cnil; ) {
for (tail = &q->queue.list; n && ((l = *tail) != Cnil); ) {
cl_object p = ECL_CONS_CAR(l);
if (p->process.phase == ECL_PROCESS_INACTIVE ||
p->process.phase == ECL_PROCESS_EXITING) {
......@@ -319,9 +319,8 @@ ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags)
mp_process_kill(p);
else
ecl_interrupt_process(p, Cnil);
if ((flags & ECL_WAKEUP_ALL) == 0)
break;
tail = &ECL_CONS_CDR(l);
--n;
}
}
}
......
......@@ -83,9 +83,9 @@ mp_semaphore_wait_count(cl_object semaphore)
unlikely_if (type_of(semaphore) != t_semaphore) {
FEerror_not_a_semaphore(semaphore);
}
AO_fetch_and_add((AO_t*)&semaphore->semaphore.counter, 1);
while (n-- && semaphore->semaphore.queue_list != Cnil) {
ecl_wakeup_waiters(env, semaphore, ECL_WAKEUP_ONE);
AO_fetch_and_add((AO_t*)&semaphore->semaphore.counter, n);
if (semaphore->semaphore.queue_list != Cnil) {
ecl_wakeup_waiters(env, semaphore, 0, n);
}
@(return)
}
......
......@@ -480,7 +480,7 @@ extern void print_lock(char *s, cl_object lock, ...);
extern void ecl_get_spinlock(cl_env_ptr env, cl_object *lock);
extern void ecl_giveup_spinlock(cl_object *lock);
extern cl_object ecl_wait_on(cl_env_ptr env, cl_object (*condition)(cl_env_ptr, cl_object), cl_object o);
extern void ecl_wakeup_waiters(cl_env_ptr the_env, cl_object o, bool all);
extern void ecl_wakeup_waiters(cl_env_ptr the_env, cl_object o, int flags, cl_fixnum n);
#endif
/* threads/rwlock.d */
......
......@@ -899,10 +899,10 @@ struct ecl_process {
int trap_fpe_bits;
};
#define ECL_WAKEUP_ONE 0
#define ECL_WAKEUP_ALL 1
#define ECL_WAKEUP_RESET_FLAG 2
#define ECL_WAKEUP_KILL 4
#define ECL_WAKEUP_ONE 1
#define ECL_WAKEUP_ALL MOST_POSITIVE_FIXNUM
#define ECL_WAKEUP_RESET_FLAG 1
#define ECL_WAKEUP_KILL 2
struct ecl_queue {
HEADER;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment