Commit 4c605304 authored by Kyle Clarke's avatar Kyle Clarke 💬

Because methods should take interfaces and return public structs.

I removed the runner info struct. Firstly, making the runner have public fields and secondly, information about the running schedules should be abstracted away as another layer atop of Kevin.
parent 39a8cb07
Pipeline #5471338 passed with stages
in 26 seconds
......@@ -35,24 +35,24 @@ const (
)
// Runners are a map of all job runners key'd on JobID.
type Runners map[JobID]*runner
type Runners map[JobID]*Runner
// Runner struct to contain meta data and the runner task.
type runner struct {
completed time.Time
type Runner struct {
Completed time.Time
err error
callFn reflect.Value
callFnParams []reflect.Value
job *Job
runningCount uint64
started time.Time
RunningCount uint64
Started time.Time
stop bool
status string
Status string
}
// Given a Job, use reflection to create the func and func params to
// call inside do.
func initRunner(j *Job) (*runner, error) {
func initRunner(j *Job) (*Runner, error) {
// Be sure we can parse the func and func params
f := j.Fn
rt := reflect.TypeOf(f)
......@@ -71,101 +71,135 @@ func initRunner(j *Job) (*runner, error) {
in[k] = reflect.ValueOf(fnParam)
}
jr := NewRunner()
jr.job = j
jr.callFn = Fn
jr.callFnParams = in
return jr, nil
}
// NewRunner will return a pointer to a new runner instance.
func NewRunner() *runner {
return &runner{}
return &Runner{
job: j,
callFn: Fn,
callFnParams: in,
}, nil
}
// CanBegin will return true if a begin time is set and is older than the current time.
func (r *runner) CanBegin() bool {
func (r *Runner) CanBegin() bool {
return r.job.Begins.IsZero() || r.job.Begins.Before(time.Now())
}
// CapBreached will return true if this Job has a Cap and it is over or equal to the running count.
func (r *runner) CapBreached() bool {
return (r.job.Cap != 0 && r.runningCount >= r.job.Cap)
func (r *Runner) CapBreached() bool {
return (r.job.Cap != 0 && r.RunningCount >= r.job.Cap)
}
// Status will return a humanized string representation of the runner.
func (r *Runner) CurrentStatus() string {
var s string
switch true {
case r.IsPending():
s = StatusPending
break
case r.IsStopping():
s = StatusStopping
break
default:
s = r.Status
}
return s
}
// IsCompleted will return true if a completed time is not zero and therefore complete.
func (r *Runner) IsCompleted() bool {
return !r.Completed.IsZero()
}
// IsPending will return true if not complete and not started.
func (r *Runner) IsPending() bool {
return (!r.IsCompleted() && r.Started.IsZero())
}
// IsRunning will return true if not complete and started.
func (r *Runner) IsRunning() bool {
return (!r.IsCompleted() && !r.Started.IsZero())
}
// IsStopping will return true if not complete but stop flag true.
func (r *Runner) IsStopping() bool {
return (!r.IsCompleted() && r.Stopped())
}
// Expired will return true if an expiration time is set and is before now.
func (r *runner) Expired() bool {
func (r *Runner) Expired() bool {
return (!r.job.Expires.IsZero() && r.job.Expires.Before(time.Now()))
}
// Run invokes do() on this job runner instance via a go routine.
func (r *runner) Run() {
func (r *Runner) Run() {
go do(r)
}
// Stop will set this runners stop flag. This does not ensure that the runner job will
// stop when called. It does guarantee that the next "Every" invocation will stop.
func (r *runner) Stop() {
func (r *Runner) Stop() {
r.stop = true
}
// Stopped returns this runners current stop flag value.
func (r *runner) Stopped() bool {
func (r *Runner) Stopped() bool {
return r.stop
}
// do is called via Run in a go routine.
func do(r *runner) {
func do(r *Runner) {
safety := uint64(0)
e := time.NewTicker(r.job.Every)
defer func(r *runner) {
defer func(r *Runner) {
// On close set the completed time for this job runner and call stop to be safe and mitigate leaks.
r.completed = time.Now().UTC()
r.Completed = time.Now().UTC()
r.Stop()
}(r)
for range e.C {
// Check if the stop flag been set against the runner task.
if r.stop {
r.status = StatusStopped
r.Status = StatusStopped
e.Stop()
break
}
// If we have a start time, skip this tick if time not passed.
if !r.CanBegin() {
r.status = StatusCannotBegin
r.Status = StatusCannotBegin
continue
}
// Also check for an expiration time.
if r.Expired() {
r.status = StatusExpired
r.Status = StatusExpired
break
}
// Check for a Cap and whether it will breach.
if r.CapBreached() {
r.status = StatusCapBreached
r.Status = StatusCapBreached
break
}
// Set the started time if zero.
if r.started.IsZero() {
r.started = time.Now().UTC()
if r.Started.IsZero() {
r.Started = time.Now().UTC()
}
// Assign the running status
r.status = StatusRunning
r.Status = StatusRunning
// Make the call in a non blocking go routine, don't wait for execution completion.
go r.callFn.Call(r.callFnParams)
// Increment the safety and running counts.
safety++
r.runningCount++
r.RunningCount++
if safety == math.MaxUint64 {
r.status = StatusError
r.Status = StatusError
r.err = ErrJobMaximumTickBreach
break
}
......@@ -174,71 +208,9 @@ func do(r *runner) {
// we still check that this job is still assigned to this schedule.
_, err := r.job.schedule.FindByJobID(r.job.ID)
if err != nil {
r.status = StatusError
r.Status = StatusError
r.err = ErrJobOrphaned
break
}
}
}
// RunnerInfo is a struct that contains runner instance information.
type RunnerInfo struct {
ID string
Status string
Cap uint64
Every float64
RunningCount uint64
CompletedAt time.Time
StartedAt time.Time
Stopped bool
}
// Info will return a new populated RunnerInfo struct. Reference, not a pointer.
func (r *runner) Info() RunnerInfo {
ri := RunnerInfo{
ID: string(r.job.ID),
Every: r.job.Every.Seconds(),
Cap: r.job.Cap,
RunningCount: r.runningCount,
CompletedAt: r.completed,
StartedAt: r.started,
Stopped: r.stop,
}
var status string
switch true {
case ri.IsPending():
status = StatusPending
break
case ri.IsStopping():
status = StatusStopping
break
default:
status = r.status
}
ri.Status = status
return ri
}
// IsCompleted will return true if a completed time is not zero and therefore complete.
func (i RunnerInfo) IsCompleted() bool {
return !i.CompletedAt.IsZero()
}
// IsPending will return true if not complete and not started.
func (i RunnerInfo) IsPending() bool {
return (!i.IsCompleted() && i.StartedAt.IsZero())
}
// IsRunning will return true if not complete and started.
func (i RunnerInfo) IsRunning() bool {
return (!i.IsCompleted() && !i.StartedAt.IsZero())
}
// IsStopping will return true if not complete but stop flag true.
func (i RunnerInfo) IsStopping() bool {
return (!i.IsCompleted() && i.Stopped)
}
......@@ -68,13 +68,13 @@ func TestRunner_CapBreached(t *testing.T) {
for i := 0; i < 5; i++ {
switch true {
case 0 == r.Info().RunningCount:
case 0 == r.RunningCount:
if r.CapBreached() {
t.Error("Cap should not be breached with 0 or 1 running count.")
}
case 1 == r.Info().RunningCount:
case 1 < r.Info().RunningCount:
case 1 == r.RunningCount:
case 1 < r.RunningCount:
t.Error("Cap should have breached on execution!")
if !r.CapBreached() {
t.Error("CapBreached() method should return true.")
......@@ -130,29 +130,30 @@ func TestRunner_Stop(t *testing.T) {
}
func TestRunner_Stopped(t *testing.T) {
r := NewRunner()
r := Runner{}
if r.Stopped() {
t.Error("Stopped should be false by default.")
}
}
func TestRunnerInfo_IsPending(t *testing.T) {
i := RunnerInfo{}
if !i.IsPending() {
func TestRunner_IsPending(t *testing.T) {
r := Runner{}
if !r.IsPending() {
t.Error("No completed and zero start time should be pending.")
}
}
func TestRunnerInfo_IsRunning(t *testing.T) {
i := RunnerInfo{StartedAt: time.Now()}
if !i.IsRunning() {
func TestRunner_IsRunning(t *testing.T) {
r := Runner{Started: time.Now()}
if !r.IsRunning() {
t.Error("No completed and a start time should be running.")
}
}
func TestRunnerInfo_IsStopping(t *testing.T) {
i := RunnerInfo{Stopped: true}
if !i.IsStopping() {
func TestRunner_IsStopping(t *testing.T) {
r := Runner{}
r.Stop()
if !r.IsStopping() {
t.Error("Zero completed time and stopped flag should be stopping.")
}
}
......@@ -170,7 +170,7 @@ func (s *Schedule) doPurge(before time.Duration) {
if !s.IsEmpty() {
d := time.Now().UTC().Add(-1 * before)
for _, r := range s.runners {
if r.Info().IsCompleted() && r.Info().CompletedAt.Before(d) {
if r.IsCompleted() && r.Completed.Before(d) {
delete(s.runners, r.job.ID)
}
}
......@@ -185,12 +185,12 @@ func (s *Schedule) All() Runners {
// Find will attempt to find and return a Runner instance on this schedule by a common String.
// Internally Kevin uses a JobID type so this Find method is used as a convenience for your
// common string references. Returns error on not found.
func (s *Schedule) Find(ID string) (*runner, error) {
func (s *Schedule) Find(ID string) (*Runner, error) {
return s.FindByJobID(JobID(ID))
}
// Find will attempt to find and return a Runner instance on this schedule by JobID. Returns error on not found.
func (s *Schedule) FindByJobID(ID JobID) (*runner, error) {
func (s *Schedule) FindByJobID(ID JobID) (*Runner, error) {
j, ok := s.runners[ID]
if ok {
return j, nil
......
......@@ -443,39 +443,38 @@ func TestSchedule_Stop(t *testing.T) {
j.Every = 200 * time.Millisecond
ID, _ := s.Run(j)
r, _ := s.FindByJobID(ID)
if r.Info().IsRunning() {
if r.IsRunning() {
t.Error("Execution path should beat the job and therefore not be running.")
}
time.Sleep(1 * time.Second)
if !r.Info().IsRunning() {
if !r.IsRunning() {
t.Error("We have slept so job should be running.")
}
if r.Info().Stopped {
if r.Stopped() {
t.Error("Runner should not have a stop flag of true.")
}
s.Stop(j.ID)
time.Sleep(300 * time.Millisecond)
i := r.Info()
if !i.Stopped {
if !r.Stopped() {
t.Error("Runner should have a stop flag of true.")
}
if i.RunningCount < 2 {
if r.RunningCount < 2 {
t.Error("Runner should run job at least twice.")
}
if i.RunningCount == j.Cap {
if r.RunningCount == j.Cap {
t.Error("Runner should not have run to the jobs Cap.")
}
if i.Status != StatusStopped {
if r.CurrentStatus() != StatusStopped {
t.Error("Incorrect status - should be StatusStopped.")
}
if !i.IsCompleted() {
if !r.IsCompleted() {
t.Error("IsCompleted should be true.")
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment