runner.go 5.44 KB
Newer Older
1 2 3
package kevin

import (
4
	"math"
5 6
	"reflect"
	"time"
7
)
8

9
const (
10 11 12
	// StatusPending is a simple string representation of a pending status
	StatusPending = "pending"

13
	// StatusRunning is a simple string representation of a running status
14
	StatusRunning = "running"
15 16

	// StatusStopped is a simple string representation of a stopped status
17
	StatusStopped = "stopped"
18 19

	// StatusError is a simple string representation of an error status
20
	StatusError = "error"
21 22

	// StatusExpired is a simple string representation of an expired status
23
	StatusExpired = "expired"
24 25

	// StatusCapBreached is a simple string representation of a cap breached status
26
	StatusCapBreached = "cap breached"
27 28

	// StatusCannotBegin is a simple string representation of a cannot begin status
29
	StatusCannotBegin = "cannot begin"
30 31
)

32
// Runners are a map of all job runners key'd on JobID.
33
type Runners map[JobID]*runner
34

35 36
// Runner struct to contain meta data and the runner task.
type runner struct {
37
	completed    time.Time
38 39 40 41 42
	err          error
	callFn       reflect.Value
	callFnParams []reflect.Value
	job          *Job
	runningCount uint64
43
	started      time.Time
44 45
	stop         bool
	status       string
46 47
}

48 49
// Given a Job, use reflection to create the func and func params to
// call inside do.
50
func initRunner(j *Job) (*runner, error) {
51
	// Be sure we can parse the func and func params
52
	f := j.Fn
53 54 55 56 57
	rt := reflect.TypeOf(f)
	if rt.Kind() != reflect.Func {
		return nil, ErrJobNonFunctionType
	}

58 59 60
	FnParams := j.FnParams
	Fn := reflect.ValueOf(f)
	if len(FnParams) != Fn.Type().NumIn() {
61 62 63
		return nil, ErrJobFunctionParamMismatch
	}

64 65
	in := make([]reflect.Value, len(FnParams))
	for k, fnParam := range FnParams {
66 67 68
		in[k] = reflect.ValueOf(fnParam)
	}

69
	jr := NewRunner()
70 71 72 73 74 75
	jr.job = j
	jr.callFn = Fn
	jr.callFnParams = in
	return jr, nil
}

76 77 78
// NewRunner will return a pointer to a new runner instance.
func NewRunner() *runner {
	return &runner{}
79
}
80

81
// CanBegin will return true if a begin time is set and is older than the current time.
82
func (r *runner) CanBegin() bool {
83
	return r.job.Begins.IsZero() || r.job.Begins.Before(time.Now())
84 85
}

86
// CapBreached will return true if this Job has a Cap and it is over or equal to the running count.
87
func (r *runner) CapBreached() bool {
88
	return (r.job.Cap != 0 && r.runningCount >= r.job.Cap)
89 90 91
}

// Expired will return true if an expiration time is set and is before now.
92
func (r *runner) Expired() bool {
93
	return (!r.job.Expires.IsZero() && r.job.Expires.Before(time.Now()))
94 95
}

96
// Run invokes do() on this job runner instance via a go routine.
97
func (r *runner) Run() {
98 99
	go do(r)
}
100

101
// Stop will set this runners stop flag. This does not ensure that the runner job will
102
// stop when called. It does guarantee that the next "Every" invocation will stop.
103
func (r *runner) Stop() {
104 105 106
	r.stop = true
}

107
// Stopped returns this runners current stop flag value.
108
func (r *runner) Stopped() bool {
109
	return r.stop
110 111
}

112
// do is called via Run in a go routine.
113
func do(r *runner) {
114
	safety := uint64(0)
115
	e := time.NewTicker(r.job.Every)
116 117
	defer func() {
		// On close set the completed time for this job runner.
118
		r.completed = time.Now().UTC()
119 120
	}()

121
	for range e.C {
122 123
		// Check if the stop flag been set against the runner task.
		if r.stop {
124
			r.status = StatusStopped
125
			e.Stop()
126 127 128 129
			break
		}

		// If we have a start time, skip this tick if time not passed.
130
		if !r.CanBegin() {
131
			r.status = StatusCannotBegin
132 133 134 135
			continue
		}

		// Also check for an expiration time.
136
		if r.Expired() {
137
			r.status = StatusExpired
138 139 140
			break
		}

141
		// Check for a Cap and whether it will breach.
142
		if r.CapBreached() {
143
			r.status = StatusCapBreached
144 145 146
			break
		}

147
		// Set the started time if zero.
148 149
		if r.started.IsZero() {
			r.started = time.Now().UTC()
150 151
		}

152
		// Assign the running status
153
		r.status = StatusRunning
154

155 156
		// Make the call in a non blocking go routine, don't wait for execution completion.
		go r.callFn.Call(r.callFnParams)
157

158 159 160
		// Increment the safety and running counts.
		safety++
		r.runningCount++
161
		if safety == math.MaxUint64 {
162
			r.status = StatusError
163 164 165 166 167
			r.err = ErrJobMaximumTickBreach
			break
		}

		// Check if this job is orphaned. Though this "should not" happen
168
		// we still check that this job is still assigned to this schedule.
169
		_, err := r.job.schedule.FindByJobID(r.job.ID)
170
		if err != nil {
171
			r.status = StatusError
172 173 174 175 176
			r.err = ErrJobOrphaned
			break
		}
	}
}
177

178 179
// RunnerInfo is a struct that contains runner instance information.
type RunnerInfo struct {
180
	ID           string
181
	Status       string
182
	Cap          uint64
183
	Every        float64
184 185 186 187
	RunningCount uint64
	CompletedAt  time.Time
	StartedAt    time.Time
	Stopped      bool
188 189
}

190 191
// Info will return a new populated RunnerInfo struct. Reference, not a pointer.
func (r *runner) Info() RunnerInfo {
192 193 194 195 196
	status := r.status
	if "" == status {
		status = StatusPending
	}

197
	return RunnerInfo{
198 199 200
		ID:           string(r.job.ID),
		Status:       status,
		Every:        r.job.Every.Seconds(),
201
		Cap:          r.job.Cap,
202 203 204 205
		RunningCount: r.runningCount,
		CompletedAt:  r.completed,
		StartedAt:    r.started,
		Stopped:      r.stop,
206 207 208
	}
}

209
// IsCompleted will return true if a completed time is not zero and therefore complete.
210
func (i RunnerInfo) IsCompleted() bool {
211
	return !i.CompletedAt.IsZero()
212 213
}

214
// IsPending will return true if not complete and not started.
215
func (i RunnerInfo) IsPending() bool {
216
	return (!i.IsCompleted() && i.StartedAt.IsZero())
217 218
}

219
// IsRunning will return true if not complete and started.
220
func (i RunnerInfo) IsRunning() bool {
221
	return (!i.IsCompleted() && !i.StartedAt.IsZero())
222
}