Removed storing of entire process statuses

Obtaining the process status has always been a bit questionable. For
one, it's not particularly useful to see that a process is running or
being garbage collected. Second, it requires a full 8 bytes of memory
per process to store.

In this commit, we drop the storing of full process statuses, and add a
boolean flag "waiting for message" that we use instead where necessary.
Currently this won't reduce the size of a process due to alignment
requirements, but in the future we may be able to work around this by
reducing the size of other fields.
parent 3caf9631
Pipeline #39431313 failed with stages
in 20 minutes and 4 seconds
......@@ -89,7 +89,6 @@ module Inkoc
Throw
SetRegister
TailCall
ProcessStatus
ProcessSuspendCurrent
IntegerGreaterOrEqual
IntegerSmallerOrEqual
......
......@@ -1418,10 +1418,6 @@ module Inkoc
typedb.integer_type.new_instance
end
def on_raw_process_status(*)
typedb.integer_type.new_instance
end
def on_raw_process_suspend_current(*)
TypeSystem::Void.new
end
......
......@@ -1177,10 +1177,6 @@ module Inkoc
raw_nullary_instruction(:ProcessCurrentPid, node, body)
end
def on_raw_process_status(node, body)
raw_unary_instruction(:ProcessStatus, node, body)
end
def on_raw_process_suspend_current(node, body)
timeout = process_node(node.arguments.fetch(0), body)
......
......@@ -372,30 +372,6 @@ def blocking!(R)(block: do -> R) -> R {
block.call
}
## Returns the status of a process as an `Integer`.
##
## The following values can be returned:
##
## * 0: The process has been scheduled.
## * 1: The process is running.
## * 2: The process has been suspended.
## * 3: The process has been suspended for garbage collection.
## * 4: The process is waiting for a message to arrive.
## * 5: The process finished execution.
##
## If a process does not exist (any more) then the status will also be `5`.
##
## # Examples
##
## Getting the status of a process:
##
## import std::process
##
## process.status(process.current) # => 1
def status(pid: ToInteger) -> Integer {
_INKOC.process_status(pid.to_integer)
}
## Suspends the current process until it is rescheduled.
##
## The argument of this method can be used to set a minimum suspension time (in
......
......@@ -141,7 +141,6 @@ mod tests {
let mut request = Request::heap(state, process.clone());
process.set_register(0, process.allocate_empty());
process.running();
request.perform();
assert!(process.get_register(0).is_marked());
......
......@@ -33,8 +33,6 @@ impl Pools {
let pool_id = process.pool_id();
if let Some(pool) = self.get(pool_id) {
process.scheduled();
let job = if let Some(thread_id) = process.thread_id() {
Job::pinned(process, thread_id)
} else {
......@@ -93,11 +91,9 @@ mod tests {
let pools = Pools::new(1, 1);
let (_machine, _block, process) = setup();
process.running();
pools.schedule(process.clone());
assert_eq!(pools.pools[0].inner.queues.len(), 1);
assert_eq!(process.available_for_execution(), true);
}
#[test]
......
......@@ -5,7 +5,7 @@ use std::hash::{Hash, Hasher};
use std::i64;
use std::mem;
use std::panic::RefUnwindSafe;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use binding::RcBinding;
......@@ -29,28 +29,6 @@ use vm::state::RcState;
pub type RcProcess = Arc<Process>;
#[derive(Debug, Clone, Copy)]
#[repr(usize)]
pub enum ProcessStatus {
/// The process has been (re-)scheduled for execution.
Scheduled,
/// The process is running.
Running,
/// The process has been suspended.
Suspended,
/// The process has been suspended for garbage collection.
SuspendForGc,
/// The process is waiting for a message to arrive.
WaitingForMessage,
/// The process has finished execution.
Finished,
}
pub struct LocalData {
/// The process-local memory allocator.
pub allocator: LocalAllocator,
......@@ -84,8 +62,8 @@ pub struct Process {
/// The process identifier of this process.
pub pid: PID,
/// The status of this process.
pub status: AtomicUsize,
/// If the process is waiting for a message.
pub waiting_for_message: AtomicBool,
}
unsafe impl Sync for LocalData {}
......@@ -110,13 +88,11 @@ impl Process {
thread_id: None,
};
let process = Process {
Arc::new(Process {
pid,
status: AtomicUsize::new(ProcessStatus::Scheduled as usize),
local_data: UnsafeCell::new(local_data),
};
Arc::new(process)
waiting_for_message: AtomicBool::new(false),
})
}
pub fn from_block(
......@@ -174,24 +150,6 @@ impl Process {
target.set_parent(boxed);
}
pub fn status_integer(&self) -> u8 {
self.status() as u8
}
pub fn status(&self) -> ProcessStatus {
let status = self.status.load(Ordering::Acquire);
match status {
0 => ProcessStatus::Scheduled,
1 => ProcessStatus::Running,
2 => ProcessStatus::Suspended,
3 => ProcessStatus::SuspendForGc,
4 => ProcessStatus::WaitingForMessage,
5 => ProcessStatus::Finished,
_ => panic!("invalid process status: {}", status),
}
}
/// Pops an execution context.
///
/// This method returns true if we're at the top of the execution context
......@@ -385,57 +343,6 @@ impl Process {
self.context().code
}
pub fn available_for_execution(&self) -> bool {
match self.status() {
ProcessStatus::Scheduled => true,
_ => false,
}
}
pub fn set_status(&self, new_status: ProcessStatus) {
self.status.store(new_status as usize, Ordering::Release);
}
pub fn running(&self) {
self.set_status(ProcessStatus::Running);
}
pub fn finished(&self) {
self.set_status(ProcessStatus::Finished);
}
pub fn scheduled(&self) {
self.set_status(ProcessStatus::Scheduled);
}
pub fn suspended(&self) {
self.set_status(ProcessStatus::Suspended);
}
pub fn suspend_for_gc(&self) {
self.set_status(ProcessStatus::SuspendForGc);
}
pub fn waiting_for_message(&self) {
self.set_status(ProcessStatus::WaitingForMessage);
}
pub fn is_waiting_for_message(&self) -> bool {
match self.status() {
ProcessStatus::WaitingForMessage => true,
_ => false,
}
}
pub fn wakeup_after_suspension_timeout(&self) {
if self.is_waiting_for_message() {
// When a timeout expires we don't want to retry the last
// instruction as otherwise we'd end up in an infinite loop if
// no message is received.
self.advance_instruction_index();
}
}
pub fn has_messages(&self) -> bool {
self.local_data().mailbox.has_messages()
}
......@@ -560,6 +467,22 @@ impl Process {
pointers
}
pub fn waiting_for_message(&self) {
self.waiting_for_message.store(true, Ordering::Relaxed);
}
pub fn is_waiting_for_message(&self) -> bool {
self.waiting_for_message.load(Ordering::Relaxed)
}
pub fn should_reschedule_for_received_message(&self) -> bool {
self.is_waiting_for_message() && self.has_messages()
}
pub fn no_longer_waiting_for_message(&self) {
self.waiting_for_message.store(false, Ordering::Relaxed);
}
}
impl PartialEq for Process {
......
......@@ -77,24 +77,16 @@ impl SuspendedProcess {
/// Returns `true` if the current entry's process should be rescheduled for
/// execution.
pub fn should_reschedule(&self) -> bool {
let waiting_for_message = self.process.is_waiting_for_message();
if waiting_for_message && self.process.has_messages() {
if self.process.should_reschedule_for_received_message() {
return true;
}
if let Some(timeout) = self.timeout {
let resume_after = self.suspended_at + timeout;
if Instant::now() >= resume_after {
self.process.wakeup_after_suspension_timeout();
true
} else {
false
}
Instant::now() >= resume_after
} else {
!waiting_for_message
!self.process.is_waiting_for_message()
}
}
}
......
......@@ -87,7 +87,6 @@ pub enum InstructionType {
Throw,
SetRegister,
TailCall,
ProcessStatus,
ProcessSuspendCurrent,
IntegerGreaterOrEqual,
IntegerSmallerOrEqual,
......
......@@ -285,8 +285,6 @@ impl Machine {
) -> Result<(), String> {
let mut reductions = self.state.config.reductions;
process.running();
let mut context;
let mut index;
let mut instruction;
......@@ -926,27 +924,30 @@ impl Machine {
if let Some(msg) = process.receive_message() {
context.set_register(reg, msg);
} else {
let time_ptr = context.get_register(instruction.arg(1));
// When resuming (except when the timeout expires) we
// want to retry this instruction so we can store the
// received message in the target register.
context.instruction_index = index - 1;
continue;
}
// If the timeout expires we won't retry this
// instruction so we need to ensure the register is
// already set.
if process.is_waiting_for_message() {
// A timeout expired, but no message was received.
context.set_register(reg, self.state.nil_object);
process.no_longer_waiting_for_message();
process::wait_for_message(
&self.state,
process,
process::optional_timeout(time_ptr)?,
);
return Ok(());
continue;
}
let time_ptr = context.get_register(instruction.arg(1));
// When resuming we want to retry this instruction so we can
// store the received message in the target register.
context.instruction_index = index - 1;
process::wait_for_message(
&self.state,
process,
process::optional_timeout(time_ptr)?,
);
return Ok(());
}
InstructionType::ProcessCurrentPid => {
let reg = instruction.arg(0);
......@@ -954,13 +955,6 @@ impl Machine {
context.set_register(reg, pid);
}
InstructionType::ProcessStatus => {
let reg = instruction.arg(0);
let pid = process.get_register(instruction.arg(1));
let res = process::status(&self.state, pid)?;
context.set_register(reg, res);
}
InstructionType::ProcessSuspendCurrent => {
let time_ptr = context.get_register(instruction.arg(0));
let timeout = process::optional_timeout(time_ptr)?;
......@@ -1728,8 +1722,6 @@ impl Machine {
};
}
process.finished();
if process.is_pinned() {
// A pinned process can only run on the corresponding worker.
// Because pinned workers won't run already unpinned processes, and
......@@ -1778,7 +1770,6 @@ impl Machine {
}
fn schedule_gc_request(&self, request: GcRequest) {
request.process.suspend_for_gc();
self.state.gc_pool.schedule(Job::normal(request));
}
......
......@@ -3,7 +3,7 @@ use block::Block;
use immix::copy_object::CopyObject;
use object_pointer::ObjectPointer;
use pool::Worker;
use process::{Process, ProcessStatus, RcProcess};
use process::{Process, RcProcess};
use stacktrace;
use vm::state::RcState;
......@@ -92,25 +92,7 @@ pub fn current_pid(state: &RcState, process: &RcProcess) -> ObjectPointer {
process.allocate_usize(process.pid, state.integer_prototype)
}
pub fn status(
state: &RcState,
pid_ptr: ObjectPointer,
) -> Result<ObjectPointer, String> {
let pid = pid_ptr.usize_value()?;
let table = state.process_table.lock();
let status = if let Some(receiver) = table.get(pid) {
receiver.status_integer()
} else {
ProcessStatus::Finished as u8
};
Ok(ObjectPointer::integer(i64::from(status)))
}
pub fn suspend(state: &RcState, process: &RcProcess, timeout: Option<f64>) {
process.suspended();
state.suspension_list.suspend(process.clone(), timeout);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment