runner.go 5.76 KB
Newer Older
1 2 3
package kevin

import (
4
	"math"
5
	"reflect"
6
	"sync"
7
	"time"
8
)
9

10
const (
11 12 13
	// StatusPending is a simple string representation of a pending status
	StatusPending = "pending"

14
	// StatusRunning is a simple string representation of a running status
15
	StatusRunning = "running"
16

17 18 19 20 21
	// StatusStopping is defined by the stop flag being true, but the ticker not yet invoked to check the
	// flag. This occurs because the stop flag is called from outside a ticker loop. The ticker checks
	// the stop flag value on each tick.
	StatusStopping = "stopping"

22
	// StatusStopped is a simple string representation of a stopped status
23
	StatusStopped = "stopped"
24 25

	// StatusError is a simple string representation of an error status
26
	StatusError = "error"
27 28

	// StatusExpired is a simple string representation of an expired status
29
	StatusExpired = "expired"
30 31

	// StatusCapBreached is a simple string representation of a cap breached status
32
	StatusCapBreached = "cap breached"
33 34

	// StatusCannotBegin is a simple string representation of a cannot begin status
35
	StatusCannotBegin = "cannot begin"
36 37
)

38
// Runners are a map of all job runners key'd on JobID.
39
type Runners map[JobID]*Runner
40

41
// Runner struct to contain meta data and the runner task.
42 43
type Runner struct {
	Completed    time.Time
44 45 46 47
	err          error
	callFn       reflect.Value
	callFnParams []reflect.Value
	job          *Job
48 49
	RunningCount uint64
	Started      time.Time
50 51
	stop         chan struct{}
	stopped      bool
52
	Status       string
53
	sync.Mutex
54 55
}

56 57
// Given a Job, use reflection to create the func and func params to
// call inside do.
58
func initRunner(j *Job) (*Runner, error) {
59
	// Be sure we can parse the func and func params
60
	f := j.Fn
61 62 63 64 65
	rt := reflect.TypeOf(f)
	if rt.Kind() != reflect.Func {
		return nil, ErrJobNonFunctionType
	}

66 67 68
	FnParams := j.FnParams
	Fn := reflect.ValueOf(f)
	if len(FnParams) != Fn.Type().NumIn() {
69 70 71
		return nil, ErrJobFunctionParamMismatch
	}

72 73
	in := make([]reflect.Value, len(FnParams))
	for k, fnParam := range FnParams {
74 75 76
		in[k] = reflect.ValueOf(fnParam)
	}

77 78 79 80 81
	return &Runner{
		job:          j,
		callFn:       Fn,
		callFnParams: in,
	}, nil
82
}
83

84
// CanBegin will return true if a begin time is set and is older than the current time.
85
func (r *Runner) CanBegin() bool {
86
	return r.job.Begins.IsZero() || r.job.Begins.Before(time.Now())
87 88
}

89
// CapBreached will return true if this Job has a Cap and it is over or equal to the running count.
90 91 92 93
func (r *Runner) CapBreached() bool {
	return (r.job.Cap != 0 && r.RunningCount >= r.job.Cap)
}

94
// CurrentStatus will return a humanized string representation of the runner.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
func (r *Runner) CurrentStatus() string {
	var s string
	switch true {
	case r.IsPending():
		s = StatusPending
		break

	case r.IsStopping():
		s = StatusStopping
		break

	default:
		s = r.Status
	}

	return s
}

// IsCompleted will return true if a completed time is not zero and therefore complete.
func (r *Runner) IsCompleted() bool {
	return !r.Completed.IsZero()
}

118 119 120 121 122
// IsExpired will return true if an expiration time is set and is before now.
func (r *Runner) IsExpired() bool {
	return (!r.job.Expires.IsZero() && r.job.Expires.Before(time.Now()))
}

123 124 125 126 127 128 129 130 131 132
// IsPending will return true if not complete and not started.
func (r *Runner) IsPending() bool {
	return (!r.IsCompleted() && r.Started.IsZero())
}

// IsRunning will return true if not complete and started.
func (r *Runner) IsRunning() bool {
	return (!r.IsCompleted() && !r.Started.IsZero())
}

133
// IsStopped returns this runners current stop flag value.
134
func (r *Runner) IsStopped() bool {
135 136
	r.Lock()
	defer r.Unlock()
137
	return r.stopped
138 139
}

140 141 142
// IsStopping will return true if not complete but stop flag true.
func (r *Runner) IsStopping() bool {
	return (!r.IsCompleted() && r.IsStopped())
143 144
}

145
// Run invokes do() on this job runner instance via a go routine.
146
func (r *Runner) Run() {
147 148
	go do(r)
}
149

150
// Stop will stop execution of the ticker and assign the correct stopped status.
151
func (r *Runner) Stop() {
152 153 154 155
	r.Lock()
	defer r.Unlock()
	r.stopped = true
	r.Status = StatusStopped
156 157 158
	if r.stop != nil {
		close(r.stop)
	}
159 160
}

161
// do is called via Run in a go routine.
162
func do(r *Runner) {
163 164 165 166
	// Create the ticker based on the jobs every value. Defer the completed flag and call stop on
	// the ticker to mitigate memory leaks.
	t := time.NewTicker(r.job.Every)
	defer func(r *Runner, t *time.Ticker) {
167
		r.Completed = time.Now().UTC()
168 169
		t.Stop()
	}(r, t)
170

171 172
	// Create the stop channel called from outside this go routine.
	r.stop = make(chan struct{})
173

174 175 176 177 178
	// Init the safety var used to ensure we do not tick more than the maximum uint value.
	safety := uint64(0)
	for {
		select {
		case <-t.C:
179 180 181 182 183 184

			// Only continue processing if this job is not stopped.
			if r.IsStopped() {
				return
			}

185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
			// If we have a start time, skip this tick if time not passed.
			if !r.CanBegin() {
				r.Status = StatusCannotBegin
				continue
			}

			// Also check for an expiration time.
			if r.IsExpired() {
				r.Status = StatusExpired
				return
			}

			// Check for a Cap and whether it will breach.
			if r.CapBreached() {
				r.Status = StatusCapBreached
				return
			}

			// Set the started time if zero.
			if r.Started.IsZero() {
				r.Started = time.Now().UTC()
			}

			// Assign the running status
			r.Status = StatusRunning

			// Make the call in a non blocking go routine, don't wait for execution completion.
			go r.callFn.Call(r.callFnParams)

			// Increment the safety and running counts.
			safety++
			r.RunningCount++
			if safety == math.MaxUint64 {
				r.Status = StatusError
				r.err = ErrJobMaximumTickBreach
				return
			}

			// Check if this job is orphaned. Though this "should not" happen
			// we still check that this job is still assigned to this schedule.
			_, err := r.job.schedule.FindByJobID(r.job.ID)
			if err != nil {
				r.Status = StatusError
				r.err = ErrJobOrphaned
				return
			}
		case <-r.stop:
			return
233 234 235
		}
	}
}