Commit 0f6ce226 authored by Kyle Clarke's avatar Kyle Clarke 💬

Go formatting plus removal of all interfaces. So much easier...

parent 788e8e57
......@@ -8,9 +8,9 @@ import (
const DefaultName = "default"
var pool *SchedulePool = &SchedulePool{
schedules: make(Schedules),
schedules: make(Schedules),
defaultPurgeAfter: 24 * time.Hour,
defaultPurgePoll: time.Hour,
defaultPurgePoll: time.Hour,
}
func init() {
......@@ -20,13 +20,13 @@ func init() {
type SchedulePool struct {
sync.Mutex
schedules Schedules
schedules Schedules
// Set the default purge after, completed jobs after this time are purged.
defaultPurgeAfter time.Duration
// Set the default purge polling interval on this pool.
defaultPurgePoll time.Duration
defaultPurgePoll time.Duration
}
// Schedules are a string map of Schedulers
......
package kevin
import (
"math"
"reflect"
"time"
"math"
)
const (
INFO_CANNOT_BEGIN = "Cannot begin"
INFO_CAP_BREACH = "Cap breached"
INFO_EXPIRED = "Expired"
INFO_STOPPED = "Stopped"
Status_Running = "Running"
Status_Stopped = "Stopped"
Status_Errored = "Errored"
Status_Expired = "Expired"
Status_Cap_Breached = "Cap Breached"
Status_Cannot_Begin = "Cannot Begin"
)
// Runners are the glue that join schedules to jobs. They hold meta data about
// the running of the job and the runner task instance that can be stopped. A state
// can be inferred via the completed and started time values. eg
//
// Pending: started nil && completed nil
// Running: started !nil && completed nil
// Success: started !nil && completed !nil
// Fail: started nil && completed !nil
// JobRunners are a map of all job runners key'd on JobID.
type JobRunners map[JobID]*JobRunner
type Runners map[JobID]*JobRunner
// JobRunner struct to contain meta data and the runner task.
type JobRunner struct {
runningCount uint64
completed time.Time
fn reflect.Value
in []reflect.Value
job Jobber
err error
callFn reflect.Value
callFnParams []reflect.Value
job *Job
runningCount uint64
started time.Time
task *RunnerTask
}
// A runResult holds params and meta info to be passed along a run channel.
type RunnerTask struct {
err error
job Jobber
jobRunner *JobRunner
schedule Scheduler
info string
stop bool
stop bool
status string
}
// Given a jobber, use reflection to create the func and func params to
// call inside doRun.
func initJobRunner(j Jobber) (*JobRunner, error) {
func initRunner(j *Job) (*JobRunner, error) {
// Be sure we can parse the func and func params
f := j.Fn()
f := j.fn
rt := reflect.TypeOf(f)
if rt.Kind() != reflect.Func {
return nil, ErrJobNonFunctionType
}
fnParams := j.FnParams()
fnParams := j.fnParams
fn := reflect.ValueOf(f)
if len(fnParams) != fn.Type().NumIn() {
return nil, ErrJobFunctionParamMismatch
......@@ -69,83 +53,130 @@ func initJobRunner(j Jobber) (*JobRunner, error) {
}
return &JobRunner{
job: j,
fn: fn,
in: in,
job: j,
callFn: fn,
callFnParams: in,
}, nil
}
// CanBegin will return true if a begin time is set and is older than the current time.
func (r *JobRunner) CanBegin() bool {
return r.job.begins.IsZero() || r.job.begins.Before(time.Now())
}
func (jr *JobRunner) run(j Jobber, s Scheduler) error {
// Create a new populated RunnerTask
jr.task = &RunnerTask{
job: j,
jobRunner: jr,
schedule: s,
}
go do(jr.task)
// CapBreached will return true if this Job has a cap and it is over the running count.
func (r *JobRunner) CapBreached() bool {
return (r.job.cap != 0 && r.runningCount > r.job.cap)
}
// Expired will return true if an expiration time is set and is before now.
func (r *JobRunner) Expired() bool {
return (!r.job.expires.IsZero() && r.job.expires.Before(time.Now()))
}
func (r *JobRunner) Run() {
go do(r)
}
return nil
func (r *JobRunner) Stopped() bool {
return r.stop
}
func do(r *RunnerTask) {
func do(r *JobRunner) {
safety := uint64(0)
every := time.NewTicker(r.job.Every())
every := time.NewTicker(r.job.every)
defer func() {
// On close set the completed time for this job runner.
r.jobRunner.completed = time.Now().UTC()
r.completed = time.Now().UTC()
}()
for range every.C {
// Check if the stop flag been set against the runner task.
if r.stop {
r.info = INFO_STOPPED
r.status = Status_Stopped
every.Stop()
break
}
// If we have a start time, skip this tick if time not passed.
if !r.job.CanBegin() {
r.info = INFO_CANNOT_BEGIN
if !r.CanBegin() {
r.status = Status_Cannot_Begin
continue
}
// Also check for an expiration time.
if r.job.Expired() {
r.info = INFO_EXPIRED
if r.Expired() {
r.status = Status_Expired
break
}
// Check for a cap and whether it will breach.
if r.job.CapBreached(r.jobRunner.runningCount + 1) {
r.info = INFO_CAP_BREACH
if r.CapBreached() {
r.status = Status_Cap_Breached
break
}
// Set the started time if zero.
if r.jobRunner.started.IsZero() {
r.jobRunner.started = time.Now().UTC()
if r.started.IsZero() {
r.started = time.Now().UTC()
}
// Increment the safety and running counts.
safety++
r.jobRunner.runningCount++
r.runningCount++
r.status = Status_Running
// Make the call...
r.jobRunner.fn.Call(r.jobRunner.in)
r.callFn.Call(r.callFnParams)
if safety == math.MaxUint64 {
r.status = Status_Errored
r.err = ErrJobMaximumTickBreach
break
}
// Check if this job is orphaned. Though this "should not" happen
// as members are private and interfaces ensure manipulation, we still
// check that this job is still assigned to this schedule.
_, err := r.schedule.FindJob(r.job.ID())
// we still check that this job is still assigned to this schedule.
_, err := r.job.schedule.FindJob(r.job.id)
if err != nil {
r.status = Status_Errored
r.err = ErrJobOrphaned
break
}
}
}
type JobRunnerInfo struct {
status string
runningCount uint64
completedAt time.Time
startedAt time.Time
stopped bool
}
func (r *JobRunner) Info() JobRunnerInfo {
return JobRunnerInfo{
status: r.status,
runningCount: r.runningCount,
completedAt: r.completed,
startedAt: r.started,
stopped: r.stop,
}
}
// IsCompleted will return true if a completed time is set.
func (i JobRunnerInfo) IsCompleted() bool {
return !i.completedAt.IsZero()
}
func (i JobRunnerInfo) IsPending() bool {
return (!i.IsCompleted() && i.startedAt.IsZero())
}
func (i JobRunnerInfo) IsRunning() bool {
return (!i.IsCompleted() && !i.startedAt.IsZero())
}
func (i JobRunnerInfo) RunningCount() uint64 {
return i.runningCount
}
......@@ -5,11 +5,11 @@ import (
"time"
)
func Test_initJobRunner(t *testing.T) {
func Test_initRunner(t *testing.T) {
// Test invalid method
j := newTestJob()
j := newSimpleJob(false)
j.fn = "not a valid function"
jr, err := initJobRunner(j)
jr, err := initRunner(j)
if jr != nil {
t.Error("Non nil Job runner instance on bad function.")
}
......@@ -25,97 +25,93 @@ func Test_initJobRunner(t *testing.T) {
if err != ErrJobNonFunctionType {
t.Error("Kevin error of wrong type on Job runner instance on bad function.")
}
}
// Test method with no params
j = &Job{
cap: 10,
id: nameDrop,
every: time.Duration(100 * time.Millisecond),
expires: time.Now().AddDate(0, 0, 1),
begins: time.Now().Add(10 * time.Minute),
}
j.fn = methodWithOutParams
jr, err = initJobRunner(j)
if jr == nil {
t.Error("Job runner instance should be returned on methodWithOutParams.")
}
func TestJobRunner_CanBegin(t *testing.T) {
Reset()
j := newSimpleJob(false)
j.begins = time.Now().Add(time.Minute)
s := DefaultSchedule()
s.AddJob(j)
r, _ := s.FindJob(j.id)
if err != nil {
t.Error("Non nil error on Job runner instance on methodWithOutParams.")
if r.CanBegin() {
t.Error("Can begin should be false with date 10 mins in future.")
}
// Test method with params - setting fnParams on job instance
// and appending with the AddFnParams()
j = newTestJob()
j.fn = methodWithParams
jr, err = initJobRunner(j)
if jr != nil {
t.Error("Non nil Job runner instance on wrong params passed.")
j.begins = time.Now().Add(-100 * time.Millisecond)
if !r.CanBegin() {
t.Error("A job can begin when begin time 1 minute in the past.")
}
if err == nil {
t.Error("Nil error on Job runner instance on no params passed.")
j.begins = time.Time{}
if !r.CanBegin() {
t.Error("A job can begin with zero time.")
}
}
if !IsKevinError(err) {
t.Error("Non Kevin error on Job runner instance on no params passed.")
func TestJobRunner_CapBreached(t *testing.T) {
Reset()
j := newSimpleJob(false)
j.cap = 0
s := DefaultSchedule()
s.AddJob(j)
r, _ := s.FindJob(j.id)
if r.CapBreached() {
t.Error("Cap should not be breached with cap of zero.")
}
if err != ErrJobFunctionParamMismatch {
t.Error("Kevin error of wrong type on Job runner instance on no params passed.")
j.cap = 1
if r.CapBreached() {
t.Error("Cap should not be breached with zero running count.")
}
// Add the correct params, also note the appending of params
j.AddFnParams("bar", int64(23))
jr, err = initJobRunner(j)
if jr == nil {
t.Error("Nil Job runner instance when correct params passed.")
r.runningCount = 1
if r.CapBreached() {
t.Error("Cap should not be breached with equal values.")
}
if err != nil {
t.Error(err)
r.runningCount = 2
if !r.CapBreached() {
t.Error("Cap is breached with 2.")
}
}
func Test_runJob(t *testing.T) {
// Test invalid method
j := newTestJob()
j.fnParams = []interface{}{}
j.begins = time.Now().Add(5 * time.Second)
j.fn = methodWithOutParams
func TestJobRunner_Expired(t *testing.T) {
Reset()
jr, _ := initJobRunner(j)
jr.runJob(j, DefaultSchedule())
//jr.task.Stop()
j := newSimpleJob(false)
j.expires = time.Time{}
//jr.Stop()
s := DefaultSchedule()
s.AddJob(j)
r, _ := s.FindJob(j.id)
if r.Expired() {
t.Error("Expired should be false as zero expire time assigned to job.")
}
j.expires = time.Now().Add(time.Minute)
if r.Expired() {
t.Error("Expired should be false as expiration one minute from now.")
}
time.Sleep(10 * time.Second)
fmt.Println(jr.completed)
fmt.Println(jr.runningCount)
fmt.Println(jr.task.Err())
j.expires = time.Now().Add(-1 * time.Minute)
if !r.Expired() {
t.Error("Expired should be true as expiration one minute in past.")
}
}
func TestJobRunner_Stopped(t *testing.T) {
r := JobRunner{}
if r.Stopped() {
t.Error("Stopped should be false by default.")
}
}
//func Test_initJobRunner(t *testing.T) {
//
//}
//vals = jr.fn.Call(jr.in)
//ret := vals[0]
//if "returnTest" != ret.Type().Name() {
// t.Error("Incorrect struct type returned from methodWithParams")
//}
//
//if "foo" != ret.Field(0).String() {
// t.Error("Expecting 1st param of foo returned")
//}
//
//if "bar" != ret.Field(1).String() {
// t.Error("Expecting 2nd param of bar returned")
//}
//
//if 23 != ret.Field(2).Int() {
// t.Error("Expecting 3rd param of 23 returned")
//}
func TestJobRunnerInfo_IsPending(t *testing.T) {
i := JobRunnerInfo{}
if !i.IsPending() {
t.Error("No completed and zero start time should be pending.")
}
}
......@@ -40,6 +40,11 @@ type Job struct {
schedule *Schedule
}
// A convenience method to added params to the Job instance.
func (j *Job) AddFnParams(params ...interface{}) {
j.fnParams = append(j.fnParams, params...)
}
// NewSchedule will return a new Scheduler instance. It expects ScheduleConfig.
func NewSchedule(sc ScheduleConfig) (*Schedule, error) {
......@@ -105,15 +110,19 @@ func (s *Schedule) AddJob(j *Job) (JobID, error) {
return j.id, err
}
// Now run the job
err = r.Run()
s.runners[j.id] = r
return j.id, nil
}
func (s *Schedule) AddJobAndRun(j *Job) (JobID, error) {
id, err := s.AddJob(j)
if err != nil {
return j.id, err
return id, err
}
s.runners[j.id] = r
return j.id, nil
s.runners[id].Run()
return id, nil
}
// AutoPurge will setup a ticker to purge completed jobs based on this schedules
......
......@@ -185,7 +185,7 @@ func TestSchedule_AutoPurge(t *testing.T) {
j := newSimpleJob(false)
j.cap = 1
j.every = time.Duration(100 * time.Millisecond)
s.AddJob(j)
s.AddJobAndRun(j)
i := s.Info()
if i.count != 1 {
......@@ -206,6 +206,9 @@ func TestSchedule_AutoPurge(t *testing.T) {
t.Error("Completed job should have been purged.")
}
// Enforce else clause that will stop an existing ticker
s.AutoPurge(time.Duration(0))
// Reset the default purge poll duration
s.AutoPurge(pool.defaultPurgePoll)
}
......@@ -306,7 +309,7 @@ func TestSchedule_Purge(t *testing.T) {
j.cap = 1
j.every = time.Duration(100 * time.Millisecond)
s.AddJob(j)
s.AddJobAndRun(j)
i := s.Info()
if i.count != 1 {
t.Error("Expecting one job as auto purge is off.")
......@@ -339,6 +342,15 @@ func TestSchedule_RemoveJob(t *testing.T) {
if 0 != s.Info().count {
t.Error("Incorrect job count, should be zero after remove.")
}
err := s.RemoveJob(JobID("no such thing"))
if !IsKevinError(err) {
t.Error("Expecting error on no job runner found.")
}
if err != ErrScheduleJobNotFound {
t.Error("Incorrect error returned when job not found.")
}
}
func TestSchedule_Reset(t *testing.T) {
......@@ -354,6 +366,18 @@ func TestSchedule_Reset(t *testing.T) {
if 0 != s.Info().count {
t.Error("Incorrect job count, should be zero after reset.")
}
j := newParamJob(false)
s.AddJob(j)
j.id = JobID("something naff")
err := s.Reset()
if !IsKevinError(err) {
t.Error("Expecting error on no job to reset found.")
}
if err != ErrScheduleJobNotFound {
t.Error("Incorrect error returned when job not found.")
}
}
func TestSchedule_StopJob(t *testing.T) {
......@@ -374,7 +398,7 @@ func TestSchedule_StopJob(t *testing.T) {
j := newSimpleJob(false)
j.every = 200 * time.Millisecond
id, _ := s.AddJob(j)
id, _ := s.AddJobAndRun(j)
r, _ := s.FindJob(id)
if r.Info().IsRunning() {
t.Error("Execution path should beat the job and therefore not be running.")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment