Commit 72c96d44 authored by Kyle Clarke's avatar Kyle Clarke 💬

Using a channel to stop every tickers, ensuring stop iimediately instead of ticker invocation.

parent 88e953a7
Pipeline #5969081 failed with stages
in 1 minute and 30 seconds
......@@ -112,7 +112,7 @@ DefaultSchedule().Reset()
```
A `Reset()` will first mark each job runner as stopped so that ***Every***
iterations will stop on next invocation before removing from the schedule. `Reset()`
iterations will be stopped before removing from the schedule. `Reset()`
**will not** remove the schedule itself.
## Jobs
......
......@@ -46,7 +46,8 @@ type Runner struct {
job *Job
RunningCount uint64
Started time.Time
stop bool
stop chan struct{}
stopped bool
Status string
}
......@@ -129,7 +130,7 @@ func (r *Runner) IsRunning() bool {
// Stopped returns this runners current stop flag value.
func (r *Runner) IsStopped() bool {
return r.stop
return r.stopped
}
// IsStopping will return true if not complete but stop flag true.
......@@ -142,75 +143,81 @@ func (r *Runner) Run() {
go do(r)
}
// Stop will set this runners stop flag. This does not ensure that the runner job will
// stop when called. It does guarantee that the next "Every" invocation will stop.
// Stop will stop execution of the ticker and assign the correct stopped status.
func (r *Runner) Stop() {
r.stop = true
if r.stop != nil {
close(r.stop)
}
r.stopped = true
r.Status = StatusStopped
}
// do is called via Run in a go routine.
func do(r *Runner) {
safety := uint64(0)
e := time.NewTicker(r.job.Every)
defer func(r *Runner) {
// On close set the completed time for this job runner and call stop to be safe and mitigate leaks.
// Create the ticker based on the jobs every value. Defer the completed flag and call stop on
// the ticker to mitigate memory leaks.
t := time.NewTicker(r.job.Every)
defer func(r *Runner, t *time.Ticker) {
r.Completed = time.Now().UTC()
r.Stop()
}(r)
for range e.C {
// Check if the stop flag been set against the runner task.
if r.stop {
r.Status = StatusStopped
e.Stop()
break
}
// If we have a start time, skip this tick if time not passed.
if !r.CanBegin() {
r.Status = StatusCannotBegin
continue
}
// Also check for an expiration time.
if r.IsExpired() {
r.Status = StatusExpired
break
}
t.Stop()
}(r, t)
// Check for a Cap and whether it will breach.
if r.CapBreached() {
r.Status = StatusCapBreached
break
}
// Create the stop channel called from outside this go routine.
r.stop = make(chan struct{})
// Set the started time if zero.
if r.Started.IsZero() {
r.Started = time.Now().UTC()
}
// Assign the running status
r.Status = StatusRunning
// Make the call in a non blocking go routine, don't wait for execution completion.
go r.callFn.Call(r.callFnParams)
// Increment the safety and running counts.
safety++
r.RunningCount++
if safety == math.MaxUint64 {
r.Status = StatusError
r.err = ErrJobMaximumTickBreach
break
}
// Check if this job is orphaned. Though this "should not" happen
// we still check that this job is still assigned to this schedule.
_, err := r.job.schedule.FindByJobID(r.job.ID)
if err != nil {
r.Status = StatusError
r.err = ErrJobOrphaned
break
// Init the safety var used to ensure we do not tick more than the maximum uint value.
safety := uint64(0)
for {
select {
case <-t.C:
// If we have a start time, skip this tick if time not passed.
if !r.CanBegin() {
r.Status = StatusCannotBegin
continue
}
// Also check for an expiration time.
if r.IsExpired() {
r.Status = StatusExpired
return
}
// Check for a Cap and whether it will breach.
if r.CapBreached() {
r.Status = StatusCapBreached
return
}
// Set the started time if zero.
if r.Started.IsZero() {
r.Started = time.Now().UTC()
}
// Assign the running status
r.Status = StatusRunning
// Make the call in a non blocking go routine, don't wait for execution completion.
go r.callFn.Call(r.callFnParams)
// Increment the safety and running counts.
safety++
r.RunningCount++
if safety == math.MaxUint64 {
r.Status = StatusError
r.err = ErrJobMaximumTickBreach
return
}
// Check if this job is orphaned. Though this "should not" happen
// we still check that this job is still assigned to this schedule.
_, err := r.job.schedule.FindByJobID(r.job.ID)
if err != nil {
r.Status = StatusError
r.err = ErrJobOrphaned
return
}
case <-r.stop:
return
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment