Use bit flags for tracking a process' status

Using the bits in a single byte allows us to track different flags of a
process (e.g. the blocking or main flag) in a single byte. This also
allows us to track the termination status of a process, without needing
extra memory.

The use of this new status field also allows us to stop sending messages
to a process that has terminated.

This fixes #186
parent 9be61a04
Pipeline #99322602 passed with stages
in 21 minutes and 35 seconds
......@@ -40,6 +40,7 @@ pub mod object_value;
pub mod platform;
pub mod prefetch;
pub mod process;
pub mod process_status;
pub mod register;
pub mod runtime_error;
pub mod runtime_panic;
......
......@@ -12,6 +12,7 @@ use crate::immix::local_allocator::LocalAllocator;
use crate::mailbox::Mailbox;
use crate::object_pointer::{ObjectPointer, ObjectPointerPointer};
use crate::object_value;
use crate::process_status::ProcessStatus;
use crate::scheduler::timeouts::Timeout;
use crate::tagged_pointer::{self, TaggedPointer};
use crate::vm::state::State;
......@@ -77,16 +78,11 @@ pub struct LocalData {
/// The current execution context of this process.
pub context: Box<ExecutionContext>,
/// A boolean indicating if this process is performing a blocking operation.
pub blocking: bool,
/// A boolean indicating if this process is the main process or not.
///
/// When the main process terminates, so does the entire program.
pub main: bool,
/// The ID of the thread this process is pinned to.
pub thread_id: Option<u8>,
/// The status of the process.
status: ProcessStatus,
}
pub struct Process {
......@@ -131,10 +127,9 @@ impl Process {
allocator: LocalAllocator::new(global_allocator.clone(), config),
context: Box::new(context),
panic_handler: ObjectPointer::null(),
blocking: false,
main: false,
thread_id: None,
mailbox: Mutex::new(Mailbox::new()),
status: ProcessStatus::new(),
};
ArcWithoutWeak::new(Process {
......@@ -155,19 +150,27 @@ impl Process {
}
pub fn set_main(&self) {
self.local_data_mut().main = true;
self.local_data_mut().status.set_main();
}
pub fn is_main(&self) -> bool {
self.local_data().main
self.local_data().status.is_main()
}
pub fn set_blocking(&self, value: bool) {
self.local_data_mut().blocking = value;
pub fn set_blocking(&self, enable: bool) {
self.local_data_mut().status.set_blocking(enable);
}
pub fn is_blocking(&self) -> bool {
self.local_data().blocking
self.local_data().status.is_blocking()
}
pub fn set_terminated(&self) {
self.local_data_mut().status.set_terminated();
}
pub fn is_terminated(&self) -> bool {
self.local_data().status.is_terminated()
}
pub fn thread_id(&self) -> Option<u8> {
......@@ -393,6 +396,13 @@ impl Process {
// garbage collected at this time.
let mut mailbox = local_data.mailbox.lock();
// When a process terminates it will acquire the mailbox lock first.
// Checking the status after acquiring the lock allows us to obtain a
// stable view of the status.
if self.is_terminated() {
return;
}
mailbox.send(local_data.allocator.copy_object(message_to_copy));
}
......@@ -480,9 +490,20 @@ impl Process {
blocks
}
pub fn reclaim_and_finalize(&self, state: &State) {
pub fn terminate(&self, state: &State) {
// The mailbox lock _must_ be acquired first, otherwise we may end up
// reclaiming blocks while another process is allocating message into
// them.
let _mailbox = self.local_data_mut().mailbox.lock();
let mut blocks = self.reclaim_all_blocks();
// Once terminated we don't want to receive any messages any more, as
// they will never be received and thus lead to an increase in memory.
// Thus, we mark the process as terminated. We must do this _after_
// acquiring the lock to ensure other processes sending messages will
// observe the right value.
self.set_terminated();
for block in blocks.iter_mut() {
block.reset();
block.finalize();
......@@ -646,6 +667,19 @@ mod tests {
assert!(received.raw.raw != input_message.raw.raw);
}
#[test]
fn test_send_message_from_external_process_with_closed_mailbox() {
let (_machine, _block, process) = setup();
let message = process
.allocate(object_value::integer(14), process.allocate_empty());
process.set_terminated();
process.send_message_from_external_process(message);
assert!(process.receive_message().is_none());
}
#[test]
fn test_allocate_f64_as_i64_with_a_small_float() {
let (machine, _block, process) = setup();
......
//! Status types for processes.
use std::sync::atomic::{AtomicU8, Ordering};
/// The status of a process, represented as a set of bits.
///
/// We use an atomic U8 since an external process may read this value while we
/// are changing it (e.g. when a process sends a message while the receiver
/// enters the blocking status).
///
/// While concurrent reads are allowed, only the owning process should change
/// the status.
pub struct ProcessStatus {
/// The bits used to indicate the status of the process.
///
/// Multiple bits may be set in order to combine different statuses. For
/// example, if the main process is blocking it will set both bits.
bits: AtomicU8,
}
impl ProcessStatus {
/// A regular process.
const NORMAL: u8 = 0b0;
/// The main process.
const MAIN: u8 = 0b1;
/// The process is performing a blocking operation.
const BLOCKING: u8 = 0b10;
/// The process is terminated.
const TERMINATED: u8 = 0b100;
pub fn new() -> Self {
Self {
bits: AtomicU8::new(Self::NORMAL),
}
}
pub fn set_main(&mut self) {
self.update_bits(Self::MAIN, true);
}
pub fn is_main(&self) -> bool {
self.bit_is_set(Self::MAIN)
}
pub fn set_blocking(&mut self, enable: bool) {
self.update_bits(Self::BLOCKING, enable);
}
pub fn is_blocking(&self) -> bool {
self.bit_is_set(Self::BLOCKING)
}
pub fn set_terminated(&mut self) {
self.update_bits(Self::TERMINATED, true);
}
pub fn is_terminated(&self) -> bool {
self.bit_is_set(Self::TERMINATED)
}
fn update_bits(&mut self, mask: u8, enable: bool) {
let bits = self.bits.load(Ordering::Acquire);
let new_bits = if enable { bits | mask } else { bits & !mask };
self.bits.store(new_bits, Ordering::Release);
}
fn bit_is_set(&self, bit: u8) -> bool {
self.bits.load(Ordering::Acquire) & bit == bit
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_status() {
let status = ProcessStatus::new();
assert_eq!(status.is_main(), false);
assert_eq!(status.is_blocking(), false);
assert_eq!(status.is_terminated(), false);
}
#[test]
fn test_set_main() {
let mut status = ProcessStatus::new();
assert_eq!(status.is_main(), false);
status.set_main();
assert!(status.is_main());
}
#[test]
fn test_set_blocking() {
let mut status = ProcessStatus::new();
assert_eq!(status.is_blocking(), false);
status.set_blocking(true);
assert!(status.is_blocking());
status.set_blocking(false);
assert_eq!(status.is_blocking(), false);
}
#[test]
fn test_set_terminated() {
let mut status = ProcessStatus::new();
assert_eq!(status.is_terminated(), false);
status.set_terminated();
assert!(status.is_terminated());
}
}
......@@ -2054,7 +2054,7 @@ impl Machine {
worker.leave_exclusive_mode();
}
process.reclaim_and_finalize(&self.state);
process.terminate(&self.state);
// Terminate once the main process has finished execution.
if process.is_main() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment