RealtimeEnvironment with Externally initiated events
Created by: aronstef
Hi Simpy maintainers
I have been using Simpy for quite some time now and decided to share some of my modifications to the realtime environment to enable operation with externally triggered events. (Thanks for an awesome framework :))
My use case is described in the Real-time simulations chapter in the Simpy docs as “if you have hardware-in-the-loop” as i am simulating node behavior with communication over a TCP socket to a hardware device. This is fine as long as simpy outputs data to the hardware but if the hardware inputs data to the simpy environment several challenges appear. One of which is mentioned in another issue.
As an example that illustrates the problems with the RealtimeEnvironment i will give an example with a production machine producing units, another production machine is started from an external process, and then the first machine breaks half way through the simulation.
import simpy
import threading
import time
SIM_TIME = 16 # Seconds
MACHINE_1_ID = 1
MACHINE_1_PRODUCTION_DELAY = 4 # Seconds
MACHINE_2_ID = 2
MACHINE_2_PRODUCTION_DELAY = 4 # Seconds
def production_machine(env, machine_id, production_delay):
# Produce units every "production_delay" until the machine breaks.
count = 0
while True:
try:
yield env.timeout(production_delay)
count += 1
print("Machine {}: Produced unit {} at time {}".format(machine_id, count, env.now))
except simpy.Interrupt:
print("Machine {}: Broken after producing {} units at time {}".format(machine_id, count, env.now))
break
def delayed_machine(env):
# Start a machine after a delay.
time.sleep(int(MACHINE_1_PRODUCTION_DELAY//2))
print("Starting machine {} at time {}.".format(MACHINE_2_ID, env.now))
env.process(production_machine(env, MACHINE_2_ID, MACHINE_2_PRODUCTION_DELAY))
def break_machine(env, machine):
# Break a machine after a delay.
time.sleep(int(SIM_TIME//2)+1)
machine.interrupt("break_machine")
env = simpy.RealtimeEnvironment()
# Start the production machine
machine = env.process(production_machine(env, MACHINE_1_ID, MACHINE_1_PRODUCTION_DELAY))
# Start another machine after a 15 sec delay (triggered externally).
delayed_machine_thread = threading.Thread(target=delayed_machine, args=(env, ))
delayed_machine_thread.start()
# Create handler for breaking or repairing the machine (triggered externally).
break_thread = threading.Thread(target=break_machine, args=(env, machine))
break_thread.start()
env.run(until=SIM_TIME)
This will currently output:
Starting machine 2 at time 0.
Machine 1: Produced unit 1 at time 4
Machine 2: Produced unit 1 at time 4
Machine 1: Produced unit 2 at time 8
Machine 2: Produced unit 2 at time 8
Machine 1: Broken after producing 2 units at time 8
Traceback (most recent call last):
File "production_machine_user_input.py", line 47, in <module>
env.run(until=SIM_TIME)
File "C:\Users\aro\OneDrive - Kamstrup A S\Desktop\simpy\src\simpy\core.py", line 138, in run
self.step()
File "C:\Users\aro\OneDrive - Kamstrup A S\Desktop\simpy\src\simpy\rt.py", line 78, in step
time() - real_time))
RuntimeError: Simulation too slow for real time (4.016s).
Problem 1: Sleep until next local event needs to be processed.
The RealtimeEnvironment step() function sleeps until the next event in the event queue needs to be processed so when a new event is inserted from a non event it will be processed at the time of the first event the step function was waiting for. This can cause the error Simulation too slow for real time
but not in this case as the production delay for both machines is the same.
Problem 2: env.now returns execution time of last event.
env.now is updated each time an event is processed, which means that it does not reflect the real realtime. This is desired behavior when the realtime simulation is behind in time so that events that should have been scheduled in the past dont get scheduled in the present. But events being triggered externally should get the wallclock time and not the time of the last processed event.
Problem 3: Interrupt is falsely scheduled in the past.
The interrupt is scheduled using schedule()
with delay set to zero, meaning the interrupt will be scheduled at the time of the last processed event, but it will be processed at the execution time of the next event in the queue meaning as a combination of problem 1 & 2, now triggering processing events at the wrong time and raising an exception.
def schedule(self, event, priority=NORMAL, delay=0):
"""Schedule an *event* with a given *priority* and a *delay*."""
heappush(self._queue,
(self._now + delay, priority, next(self._eid), event))
Now i have fixed these issues by overwriting some of the functions in RealtimeEnvironment and am going to present those fixes here. It might not be the best solution but i would really like to see these problems fixed in Simpy to expand the usage of Realtime simulations beyond only outputting data from the simulation to being able to accept input.
Solution to problem 2:
The current env.now
behavior is still needed for internal processes so i created a new env.external_now
for external processes in the RealtimeEnvironment that returns the current simulation timer and looks like this:
@property
def external_now(self):
return (time() - self.real_start) / self.factor
Solution to problem 1:
To avoid sleeping for a fixed delay i created a threading.Event()
which is triggered when external processes are receved, that way step waits for either delta or until an exernal process happens for which it updates the waiting delay.
class RealtimeSim:
# Add these two class variables
self.external_queue = queue.Queue()
self.external_event = threading.Event()
# Add following 3 methods
def external_process(self, proc_function, args, generator=True):
self.external_queue.put((time(), proc_function, args, generator), block=False)
self.external_event.set()
def _external_process_delayed(self, delay, proc_function, args, generator):
yield self.timeout(max(0, delay))
if generator:
self.process(proc_function(*args))
else:
proc_function(*args)
def step(self):
externally_triggered = False
while True:
if externally_triggered:
# To remove falsely delayed timeout events we move the current time forward
self._now = (time() - self.real_start) / self.factor
# Process all externally triggered events
while not self.external_queue.empty():
real_time, proc_function, args, generator = self.external_queue.get(block=False)
# Create a delayed process based on the time the external event was created
self.process(self._external_process_delayed((real_time - (self.real_start + (self._now * self.factor))) / self.factor,
proc_function, args, generator))
# Clear the event as all external processes have been processed
self.external_event.clear()
evt_time = self.peek()
if evt_time is Infinity:
raise EmptySchedule()
real_time_evt_time = self.real_start + (evt_time - self.env_start) * self._factor
if self._strict and time() - real_time_evt_time > self._factor:
# Events scheduled for time *t* may take just up to *t+1*
# for their computation, before an error is raised.
raise RuntimeError('Simulation too slow for real time (%.3fs).' % (
time() - real_time_evt_time))
delta = real_time_evt_time - time()
if delta <= 0:
if delta < -self.response_time:
if self._logger.isEnabledFor(logging.ERROR):
date_time = time.strftime('%d/%m/%Y %H:%M:%S')
self._logger.error("%s : The simulation too slow for real time. "
"It got behind by %d s\n", date_time, delta)
print("{} : The simulation too slow for real time. It got behind by {}s\n".format(date_time,
delta))
break
externally_triggered = self.external_event.wait(delta)
return Environment.step(self)
After fixing those two problem 3 will fix itself
Here is the modified code and correct output after the fixes have been applied:
import simpy
import threading
import time
SIM_TIME = 16 # Seconds
MACHINE_1_ID = 1
MACHINE_1_PRODUCTION_DELAY = 4 # Seconds
MACHINE_2_ID = 2
MACHINE_2_PRODUCTION_DELAY = 4 # Seconds
def production_machine(env, machine_id, production_delay):
# Produce units every "production_delay" until the machine breaks.
count = 0
while True:
try:
yield env.timeout(production_delay)
count += 1
print("Machine {}: Produced unit {} at time {}".format(machine_id, count, env.now))
except simpy.Interrupt:
print("Machine {}: Broken after producing {} units at time {}".format(machine_id, count, env.now))
break
def delayed_machine(env):
# Start a machine after a delay.
time.sleep(int(MACHINE_1_PRODUCTION_DELAY//2))
print("Starting machine {} at time {}.".format(MACHINE_2_ID, env.external_now))
env.external_process(production_machine, (env, MACHINE_2_ID, MACHINE_2_PRODUCTION_DELAY))
def break_machine(env, machine):
# Break a machine after a delay.
time.sleep(int(SIM_TIME//2)+1)
def interrupt_machine(machine):
machine.interrupt("break_machine")
env.external_process(interrupt_machine, (machine, ), generator=False)
env = simpy.RealtimeEnvironment()
# Start the production machine
machine = env.process(production_machine(env, MACHINE_1_ID, MACHINE_1_PRODUCTION_DELAY))
# Start another machine after a 15 sec delay (triggered externally).
delayed_machine_thread = threading.Thread(target=delayed_machine, args=(env, ))
delayed_machine_thread.start()
# Create handler for breaking or repairing the machine (triggered externally).
break_thread = threading.Thread(target=break_machine, args=(env, machine))
break_thread.start()
env.run(until=SIM_TIME)
Starting machine 2 at time 2.0251197814941406.
Machine 1: Produced unit 1 at time 4
Machine 2: Produced unit 1 at time 6.025235414505005
Machine 1: Produced unit 2 at time 8
Machine 1: Broken after producing 2 units at time 9.019486904144287
Machine 2: Produced unit 2 at time 10.025235414505005
Machine 2: Produced unit 3 at time 14.025235414505005