Verified Commit 3e5882be authored by Yorick Peterse's avatar Yorick Peterse 🌴

Rewrite the process scheduler from the ground up

The old scheduler was a little over two years old, and due for a
rewrite. While it worked, it was not very efficient and many features
were bolted on top; process pinning being an example.

The new scheduler relies less heavy on locking, only using mutexes
paired with condition variables to wake up sleeping threads. This will
allow it to scale much better as the number of threads goes up.

Another big benefit is clearer code. The old scheduler's code was a
mess, largely because we focused more on getting a proof of concept out
instead of building a scheduler for the next few years.

== Suspending and rescheduling processes

As part of this rewrite, the way timeouts and rescheduling of processes
is handled is also rewritten. When a process is suspended and receives a
message, the sender will try to reschedule it immediately. This makes
sending messages a little bit more expensive, but allows for much faster
rescheduling of processes. This also removes the need for a separate
thread to perform a linear scan over a list of processes to determine
which ones need to be rescheduled.

Processes that suspend themselves with a timeout are stored in a binary
heap, managed by a separate thread. Communication with this thread is
done using a channel, offloading most of the work to the separate
timeout thread. When a process with a timeout is rescheduled, its entry
in the heap is marked as invalid instead of being removed. This makes
the operation a constant time operation, at the cost of the binary heap
getting fragmented. To combat fragmentation, the timeout thread will
periodically remove invalid entries from the heap.

Rescheduling processes is done entirely using atomic operations, instead
of using mutexes. This requires some careful coding to take into account
multiple threads trying to reschedule the same process, but should allow
all of this to scale much better.

The new approach of suspending and rescheduling processes requires one
additional word of memory per process. This memory is used to mark the
process as suspended, and to optionally store a pointer to its timeout
(if one was used).

== Message counts

The number of messages in a mailbox is now stored explicitly using an
atomic integer, instead of obtaining this from the synchronised
data structures internal to a mailbox. This requires one word of extra
memory per process, but makes it much cheaper to check if a process has
messages. This is important, because when rescheduling a process such
checks are performed several times.

== Asynchronous IO and further improvements

While this commit does not add support for asynchronous IO operations,
the rewrite will make it easier to do so in future commits. The process
lookup table also remains unchanged, but we're currently investigating
if we can get rid of PIDs and the lookup table entirely; potentially
speeding up process spawning by quite a bit.
parent cdaaa5f7
Pipeline #52469220 failed with stages
in 14 minutes and 25 seconds
......@@ -111,7 +111,6 @@ require 'inkoc/tir/instruction/set_parent_local'
require 'inkoc/tir/instruction/goto_next_block_if_true'
require 'inkoc/tir/instruction/skip_next_block'
require 'inkoc/tir/instruction/local_exists'
require 'inkoc/tir/instruction/move_to_pool'
require 'inkoc/tir/instruction/return'
require 'inkoc/tir/instruction/run_block'
require 'inkoc/tir/instruction/run_block_with_receiver'
......
......@@ -102,7 +102,7 @@ module Inkoc
FloatCeil
FloatRound
Drop
MoveToPool
SetBlocking
StdoutFlush
StderrFlush
FileRemove
......
......@@ -306,12 +306,6 @@ module Inkoc
compiled_code.instruct(:Drop, [object], tir_ins.location)
end
def on_move_to_pool(tir_ins, compiled_code, *)
id = tir_ins.pool_id.id
compiled_code.instruct(:MoveToPool, [id], tir_ins.location)
end
def on_panic(tir_ins, compiled_code, *)
message = tir_ins.message.id
......
......@@ -1507,7 +1507,7 @@ module Inkoc
typedb.nil_type.new_instance
end
def on_raw_move_to_pool(*)
def on_raw_set_blocking(*)
typedb.boolean_type.new_instance
end
......
......@@ -1151,7 +1151,7 @@ module Inkoc
end
def on_raw_process_spawn(node, body)
raw_binary_instruction(:ProcessSpawn, node, body)
raw_unary_instruction(:ProcessSpawn, node, body)
end
def on_raw_process_send_message(node, body)
......@@ -1258,8 +1258,8 @@ module Inkoc
get_nil(body, node.location)
end
def on_raw_move_to_pool(node, body)
raw_unary_instruction(:MoveToPool, node, body)
def on_raw_set_blocking(node, body)
raw_unary_instruction(:SetBlocking, node, body)
end
def on_raw_panic(node, body)
......
# frozen_string_literal: true
module Inkoc
module TIR
module Instruction
class MoveToPool
include Inspect
include Predicates
attr_reader :pool_id, :location
def initialize(pool_id, location)
@pool_id = pool_id
@location = location
end
def register
pool_id
end
def visitor_method
:on_move_to_pool
end
end
end
end
end
......@@ -59,12 +59,6 @@
## reaches zero.
import std::conversion::(ToFloat, ToInteger, ToString)
## The ID of the pool to schedule regular processes on.
let PRIMARY_POOL = 0
## The ID of the pool to use for slow or (potentially) blocking operations.
let SECONDARY_POOL = 1
## The sending-half of a channel.
object Sender!(T) {
def init(pid: ToInteger) {
......@@ -189,7 +183,7 @@ def receive(timeout: ToFloat = 0.0) {
## 10 # => 10
## }
def spawn(block: lambda) -> Integer {
_INKOC.process_spawn(block, PRIMARY_POOL)
_INKOC.process_spawn(block)
}
## Spawns a process that accepts messages of a single type.
......@@ -279,11 +273,11 @@ def channel!(T)(receiver: lambda (Receiver!(T))) -> Sender!(T) {
##
## result # => 12
def blocking!(R)(block: do -> R) -> R {
let moved = _INKOC.move_to_pool(SECONDARY_POOL)
let moved = _INKOC.set_blocking(True)
defer {
# "moved" will be set to true the first time we try to move a process, and
# false if it has already been moved to the target pool. For example:
# "moved" will be set to true the first time we try to mark a process as
# blocking, and false if it has already been marked. For example:
#
# import std::process
#
......@@ -291,14 +285,14 @@ def blocking!(R)(block: do -> R) -> R {
# process.blocking { # moved = false
# process.blocking { # moved = false
# # ...
# } # still on the secondary pool
# } # still on the secondary pool
# } # still on the blocking pool
# } # still on the blocking pool
# } # now we can move back to the primary pool
#
# Using the `if_true` below, we ensure that we only move the process back
# once we return from the outer most call to this method.
moved.if_true {
_INKOC.move_to_pool(PRIMARY_POOL)
_INKOC.set_blocking(False)
}
}
......
......@@ -115,7 +115,9 @@ test.group('std::process.suspend') do (g) {
let duration = start.elapsed.to_float
assert.true(duration >= wait)
# Due to the use of floats the 0.01 might be rounded down to 0.009999 or a
# similar value. This means we can't simply assert that `duration >= 0.01`.
assert.true(duration >= 0.005)
}
}
......
......@@ -464,7 +464,9 @@ test.group('std::time::Instant.elapsed') do (g) {
process.suspend(duration.from_milliseconds(10))
assert.true(time.elapsed.as_milliseconds >= 10.0)
# Due to the use of floats, the exact time might be slightly smaller than
# 10.0 (e.g. 9.9999994).
assert.true(time.elapsed.as_milliseconds >= 9.0)
}
}
......
......@@ -102,6 +102,15 @@ dependencies = [
"vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.2.0"
......@@ -111,6 +120,15 @@ dependencies = [
"crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-epoch 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-epoch"
version = "0.3.1"
......@@ -119,12 +137,25 @@ dependencies = [
"arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-epoch"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-utils"
version = "0.2.2"
......@@ -133,6 +164,15 @@ dependencies = [
"cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-utils"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "dirs"
version = "1.0.3"
......@@ -197,6 +237,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "inko"
version = "0.3.0"
dependencies = [
"crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"dirs 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"float-cmp 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -229,7 +271,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "lazy_static"
version = "1.0.1"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
......@@ -263,7 +305,7 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
......@@ -429,7 +471,7 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -514,7 +556,7 @@ name = "thread_local"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
......@@ -610,9 +652,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum cfg-if 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "405216fd8fe65f718daa7102ea808a946b6ce40c742998fbfd3463645552de18"
"checksum clang-sys 0.21.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e414af9726e1d11660801e73ccc7fb81803fb5f49e5903a25b348b2b3b480d2e"
"checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e"
"checksum crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0f0ed1a4de2235cabda8558ff5840bffb97fcb64c97827f354a451307df5f72b"
"checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3"
"checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71"
"checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150"
"checksum crossbeam-epoch 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "04c9e3102cc2d69cd681412141b390abd55a362afc1540965dad0ad4d34280b4"
"checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9"
"checksum crossbeam-utils 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f8306fcef4a7b563b76b7dd949ca48f52bc1141aa067d2ea09565f3e2652aa5c"
"checksum dirs 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f679c09c1cf5428702cc10f6846c56e4e23420d3a88bcc9335b17c630a7b710b"
"checksum either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3be565ca5c557d7f59e7cfcf1844f9e3033650c929c6566f511e8005f205c1d0"
"checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b"
......@@ -624,7 +670,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73"
"checksum lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e6412c5e2ad9584b0b8e979393122026cdd6d2a80b933f890dcd694ddbe73739"
"checksum lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a374c89b9db55895453a74c1e38861d9deec0b01b405a82516e9d5de4820dea1"
"checksum libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)" = "b685088df2b950fccadf07a7187c8ef846a959c142338a48f9dc0b94517eb5f1"
"checksum libffi 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d8a9dac273181f514d742b6b858be5153570c5b80dd4d6020093c0fa584578b1"
"checksum libffi-sys 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4675d5d7fdbba34b66218fae3b0d528c2b29580a64ca2ccc5bbfc5af2324b373"
......
......@@ -26,3 +26,5 @@ dirs = "^1.0"
libloading = "^0.5"
libffi = "^0.6"
libffi-sys = ">=0.6.3"
crossbeam-deque = "^0.7"
crossbeam-channel = "^0.3"
......@@ -34,7 +34,6 @@ clippy:
${CARGO_CMD} clippy -- -Dwarnings \
-Arenamed_and_removed_lints \
-Aclippy::new_without_default \
-Aclippy::new_without_default_derive \
-Aclippy::needless_range_loop
rustfmt-check:
......
......@@ -3,11 +3,17 @@
//! ArcWithoutWeak is a pointer similar to Rust's Arc type, except no weak
//! references are supported. This makes ArcWithoutWeak ideal for performance
//! sensitive code where weak references are not needed.
use std::cmp;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
/// The inner value of a pointer.
///
/// This uses the C representation to ensure that the value is always the first
/// member of this structure. This in turn allows one to read the value of this
/// `Inner` using `*mut T`.
#[repr(C)]
pub struct Inner<T> {
value: T,
references: AtomicUsize,
......@@ -22,6 +28,30 @@ unsafe impl<T> Sync for ArcWithoutWeak<T> {}
unsafe impl<T> Send for ArcWithoutWeak<T> {}
impl<T> ArcWithoutWeak<T> {
/// Consumes the `ArcWithoutWeak`, returning the wrapped pointer.
///
/// The returned pointer is in reality a pointer to the inner structure,
/// instead of a pointer directly to the value.
#[cfg_attr(feature = "cargo-clippy", allow(wrong_self_convention))]
pub fn into_raw(value: Self) -> *mut T {
let raw = value.inner;
mem::forget(value);
raw as *mut T
}
/// Constructs an `ArcWithoutWeak` from a raw pointer.
///
/// This method is incredibly unsafe, as it makes no attempt to verify if
/// the pointer actually a pointer previously created using
/// `ArcWithoutWeak::into_raw()`.
pub unsafe fn from_raw(value: *mut T) -> Self {
ArcWithoutWeak {
inner: value as *mut Inner<T>,
}
}
pub fn new(value: T) -> Self {
let inner = Inner {
value,
......@@ -40,13 +70,17 @@ impl<T> ArcWithoutWeak<T> {
pub fn references(&self) -> usize {
self.inner().references.load(Ordering::SeqCst)
}
pub fn as_ptr(&self) -> *mut T {
self.inner as *mut T
}
}
impl<T> Deref for ArcWithoutWeak<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &(*self.inner).value }
&self.inner().value
}
}
......@@ -67,7 +101,7 @@ impl<T> Clone for ArcWithoutWeak<T> {
impl<T> Drop for ArcWithoutWeak<T> {
fn drop(&mut self) {
unsafe {
if self.inner().references.fetch_sub(1, Ordering::Release) == 1 {
if self.inner().references.fetch_sub(1, Ordering::AcqRel) == 1 {
let boxed = Box::from_raw(self.inner as *mut Inner<T>);
drop(boxed);
......@@ -76,6 +110,26 @@ impl<T> Drop for ArcWithoutWeak<T> {
}
}
impl<T: PartialOrd> PartialOrd for ArcWithoutWeak<T> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
(**self).partial_cmp(&**other)
}
}
impl<T: Ord> Ord for ArcWithoutWeak<T> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
(**self).cmp(&**other)
}
}
impl<T: PartialEq> PartialEq for ArcWithoutWeak<T> {
fn eq(&self, other: &Self) -> bool {
(**self) == (**other)
}
}
impl<T: Eq> Eq for ArcWithoutWeak<T> {}
#[cfg(test)]
mod tests {
use super::*;
......@@ -105,4 +159,33 @@ mod tests {
assert_eq!(pointer.references(), 1);
}
#[test]
fn test_cmp() {
let foo = ArcWithoutWeak::new(10);
let bar = ArcWithoutWeak::new(20);
assert_eq!(foo.cmp(&bar), cmp::Ordering::Less);
assert_eq!(foo.cmp(&foo), cmp::Ordering::Equal);
assert_eq!(bar.cmp(&foo), cmp::Ordering::Greater);
}
#[test]
fn test_partial_cmp() {
let foo = ArcWithoutWeak::new(10);
let bar = ArcWithoutWeak::new(20);
assert_eq!(foo.partial_cmp(&bar), Some(cmp::Ordering::Less));
assert_eq!(foo.partial_cmp(&foo), Some(cmp::Ordering::Equal));
assert_eq!(bar.partial_cmp(&foo), Some(cmp::Ordering::Greater));
}
#[test]
fn test_eq() {
let foo = ArcWithoutWeak::new(10);
let bar = ArcWithoutWeak::new(20);
assert!(foo == foo);
assert!(foo != bar);
}
}
......@@ -27,7 +27,6 @@ const DEFAULT_MATURE_THRESHOLD: u32 = (16 * 1024 * 1024) / (BLOCK_SIZE as u32);
const DEFAULT_MAILBOX_THRESHOLD: u32 = 1;
const DEFAULT_GROWTH_FACTOR: f64 = 1.5;
const DEFAULT_GROWTH_THRESHOLD: f64 = 0.9;
const DEFAULT_SUSPENSION_CHECK_INTERVAL: f64 = 0.1;
const DEFAULT_REDUCTIONS: usize = 1000;
/// Structure containing the configuration settings for the virtual machine.
......@@ -36,29 +35,25 @@ pub struct Config {
pub directories: Vec<PathBuf>,
/// The number of primary process threads to run.
pub primary_threads: u8,
pub primary_threads: usize,
/// The number of secondary process threads to run.
pub secondary_threads: u8,
/// The number of blocking process threads to run.
pub blocking_threads: usize,
/// The number of garbage collector threads to run. Defaults to 2 threads.
pub gc_threads: u8,
/// The number of garbage collector threads to run.
pub gc_threads: usize,
/// The number of finalizer threads to run. Defaults to 2 threads.
pub finalizer_threads: u8,
/// The number of finalizer threads to run.
pub finalizer_threads: usize,
/// The number of threads to use for various generic parallel tasks such as
/// scanning stack frames during garbage collection. Defaults to the number
/// of physical CPU cores.
pub generic_parallel_threads: u8,
/// scanning stack frames during garbage collection.
pub generic_parallel_threads: usize,
/// The number of reductions a process can perform before being suspended.
/// Defaults to 1000.
pub reductions: usize,
/// The number of seconds to wait between checking for suspended processes.
pub suspension_check_interval: f64,
/// The number of memory blocks that can be allocated before triggering a
/// young collection.
pub young_threshold: u32,
......@@ -88,17 +83,16 @@ pub struct Config {
impl Config {
pub fn new() -> Config {
let cpu_count = num_cpus::get() as u8;
let cpu_count = num_cpus::get();
Config {
directories: Vec::new(),
primary_threads: cpu_count,
gc_threads: cpu_count,
finalizer_threads: cpu_count,
secondary_threads: cpu_count,
blocking_threads: cpu_count,
generic_parallel_threads: cpu_count,
reductions: DEFAULT_REDUCTIONS,
suspension_check_interval: DEFAULT_SUSPENSION_CHECK_INTERVAL,
young_threshold: DEFAULT_YOUNG_THRESHOLD,
mature_threshold: DEFAULT_MATURE_THRESHOLD,
heap_growth_factor: DEFAULT_GROWTH_FACTOR,
......@@ -112,19 +106,13 @@ impl Config {
/// Populates configuration settings based on environment variables.
#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
pub fn populate_from_env(&mut self) {
set_from_env!(self, primary_threads, "CONCURRENCY", u8);
set_from_env!(self, secondary_threads, "CONCURRENCY", u8);
set_from_env!(self, gc_threads, "CONCURRENCY", u8);
set_from_env!(self, finalizer_threads, "CONCURRENCY", u8);
set_from_env!(self, generic_parallel_threads, "CONCURRENCY", u8);
set_from_env!(self, primary_threads, "CONCURRENCY", usize);
set_from_env!(self, blocking_threads, "CONCURRENCY", usize);
set_from_env!(self, gc_threads, "CONCURRENCY", usize);
set_from_env!(self, finalizer_threads, "CONCURRENCY", usize);
set_from_env!(self, generic_parallel_threads, "CONCURRENCY", usize);
set_from_env!(self, reductions, "REDUCTIONS", usize);
set_from_env!(
self,
suspension_check_interval,
"SUSPENSION_CHECK_INTERVAL",
f64
);
set_from_env!(self, young_threshold, "YOUNG_THRESHOLD", u32);
set_from_env!(self, mature_threshold, "MATURE_THRESHOLD", u32);
......@@ -157,42 +145,6 @@ impl Config {
pub fn add_directory(&mut self, path: String) {
self.directories.push(PathBuf::from(path));
}
pub fn set_primary_threads(&mut self, threads: u8) {
if threads == 0 {
self.primary_threads = 1;
} else {
self.primary_threads = threads;
}
}
pub fn set_secondary_threads(&mut self, threads: u8) {
if threads == 0 {
self.secondary_threads = 1;
} else {
self.secondary_threads = threads;
}
}
pub fn set_gc_threads(&mut self, threads: u8) {
if threads == 0 {
self.gc_threads = 1;
} else {
self.gc_threads = threads;
}
}
pub fn set_reductions(&mut self, reductions: usize) {
if reductions > 0 {
self.reductions = reductions;
}
}
pub fn set_suspension_check_interval(&mut self, interval: f64) {
if interval > 0.0 {
self.suspension_check_interval = interval;
}
}
}
#[cfg(test)]
......@@ -235,40 +187,4 @@ mod tests {
assert_eq!(config.directories.len(), 1);
}
#[test]
fn test_set_primary_threads() {
let mut config = Config::new();
config.set_primary_threads(5);
assert_eq!(config.primary_threads, 5);
}
#[test]
fn test_set_gc_threads() {
let mut config = Config::new();
config.set_gc_threads(5);
assert_eq!(config.gc_threads, 5);
}
#[test]
fn test_set_reductions() {
let mut config = Config::new();
config.set_reductions(5);
assert_eq!(config.reductions, 5);
}
#[test]
fn test_set_secondary_threads() {
let mut config = Config::new();
config.set_secondary_threads(2);
assert_eq!(config.secondary_threads, 2);
}
}
......@@ -36,9 +36,10 @@ impl<T> DerefPointer<T> {
/// Atomically swaps the internal pointer with another one.
///
/// This boolean returns true if the pointer was swapped, false otherwise.
pub fn compare_and_swap(&mut self, current: *mut T, other: *mut T) -> bool {
#[cfg_attr(feature = "cargo-clippy", allow(trivially_copy_pass_by_ref))]
pub fn compare_and_swap(&self, current: *mut T, other: *mut T) -> bool {
self.as_atomic()
.compare_and_swap(current, other, Ordering::Release)
.compare_and_swap(current, other, Ordering::AcqRel)
== current
}
......@@ -136,7 +137,7 @@ mod tests {
let mut alice = "Alice".to_string();
let mut bob = "Bob".to_string();
let mut pointer = DerefPointer::new(&mut alice);
let pointer = DerefPointer::new(&mut alice);
let current = pointer.pointer;
let target = &mut bob as *mut String;
......
......@@ -28,7 +28,7 @@ pub fn collect(vm_state: &RcState, process: &RcProcess, profile: &mut Profile) {
profile.reclaim.stop();
vm_state.process_pools.schedule(process.clone());
vm_state.scheduler.schedule(process.clone());
profile.suspended.stop();
......
......@@ -31,7 +31,7 @@ pub fn collect(vm_state: &RcState, process: &RcProcess, profile: &mut Profile) {
profile.reclaim.stop();
profile.suspended.stop();
vm_state.process_pools.schedule(process.clone());
vm_state.scheduler.schedule(process.clone());
profile.total.stop();
profile.populate_tracing_statistics(&trace_result);
......
......@@ -2,11 +2,6 @@
//!
//! A Bucket contains a sequence of Immix blocks that all contain objects of the
//! same age.
use parking_lot::Mutex;
use pool::Job;
use rayon::prelude::*;
use std::cell::UnsafeCell;
use deref_pointer::DerefPointer;
use immix::block::Block;
use immix::block_list::BlockList;
......@@ -14,6 +9,10 @@ use immix::global_allocator::RcGlobalAllocator;
use immix::histograms::Histograms;
use object::Object;
use object_pointer::ObjectPointer;
use parking_lot::Mutex;
use rayon::prelude::*;
use scheduler::pool::Pool;
use std::cell::UnsafeCell;
use vm::state::RcState;
macro_rules! lock_bucket {
......@@ -237,7 +236,7 @@ impl Bucket {
pub fn reclaim_blocks(&mut self, state: &RcState, histograms: &Histograms) {
let mut reclaim = BlockList::new();
let finalize = self
let to_finalize = self
.blocks
.pointers()
.into_par_iter()
......@@ -261,14 +260,14 @@ impl Bucket {
}
if finalize {
Some(Job::normal(block))
Some(block)
} else {
None
}
})
.collect();
state.finalizer_pool.schedule_multiple(finalize);
state.finalizer_pool.schedule(to_finalize);
// We partition the blocks in sequence so we don't need to synchronise
// access to the destination lists.
......
#[global_allocator]
static A: std::alloc::System = std::alloc::System;