Commit 9e015306 authored by Kyle Clarke's avatar Kyle Clarke 💬

Adding the runner files to the repo. The glue for jobs and schedules.

parent 400e6cac
package kevin
import (
"reflect"
"time"
"fmt"
"math"
"github.com/matryer/runner"
)
// Runners are the glue that join schedules to jobs. They hold meta data about
// the running of the job and the runner task instance that can be stopped. A state
// can be inferred via the completed and started time values. eg
//
// Pending: started nil && completed nil
// Running: started !nil && completed nil
// Success: started !nil && completed !nil
// Fail: started nil && completed !nil
// JobRunners are a map of all job runners key'd on JobID.
type JobRunners map[JobID]*JobRunner
// JobRunner struct to contain meta data and the runner task.
type JobRunner struct {
runningCount uint64
completed time.Time
fn reflect.Value
in []reflect.Value
job Jobber
started time.Time
task *runner.Task
}
// A runResult holds params and meta info to be passed along a run channel.
type runResult struct {
err error
every *time.Ticker
job Jobber
jobRunner *JobRunner
schedule Scheduler
stopped bool
}
// Given a jobber, use reflection to create the func and func params to
// call inside the runner task.
func initJobRunner(j Jobber) (*JobRunner, error) {
// Be sure we can parse the func and func params
f := j.Fn()
rt := reflect.TypeOf(f)
if rt.Kind() != reflect.Func {
return nil, ErrJobNonFunctionType
}
fnParams := j.FnParams()
fn := reflect.ValueOf(f)
if len(fnParams) != fn.Type().NumIn() {
return nil, ErrJobFunctionParamMismatch
}
in := make([]reflect.Value, len(fnParams))
for k, fnParam := range fnParams {
in[k] = reflect.ValueOf(fnParam)
}
return &JobRunner{
job: j,
fn: fn,
in: in,
}, nil
}
func (jr *JobRunner) runJob(j Jobber, s Scheduler) error {
everyDuration := j.Every()
fmt.Println("in run job")
task := runner.Go(func(shouldStop runner.S) error {
defer func() {
// On close set the completed time for this job runner.
jr.completed = time.Now().UTC()
}()
// Every tickers are designed to have variable iterations, both long
// and short. Because of this, it doesn't make sense for the shouldStop()
// call to be invoked via the Every ticker. Take for example a task that
// runs every x2 weeks when a schedule reset is called. A reset expects
// all jobs to stop in 10secs by default.
//
// If order to handle these variable iterations, we need to assign the ticker
// a duration that is appropriate to the Every duration. Therefore we tick
// every second if the Every is a second or more, or at the Every duration
// to check shouldStop().
stopTicker := time.NewTicker(time.Second)
if everyDuration < time.Second {
stopTicker = time.NewTicker(everyDuration)
}
// Create the stopChan.
stopChan := make(chan bool)
go runStop(shouldStop, stopTicker, stopChan)
// Create the runChan
runChan := make(chan *runResult)
result := &runResult{
every: time.NewTicker(everyDuration),
job: j,
jobRunner: jr,
schedule: s,
}
go runDo(result, runChan)
for {
select {
case <-stopChan:
// Update the results stopped flag.
result.stopped = true
return nil
case <-runChan:
return result.err
default:
// nohup
}
}
return nil
})
jr.task = task
return nil
}
func runStop(shouldStop runner.S, stopTicker *time.Ticker, stopChan chan<- bool) {
for range stopTicker.C {
if shouldStop() {
fmt.Println("stop invoked")
stopChan <- true
}
}
}
func runDo(r *runResult, rc chan<- *runResult) {
safety := uint64(0)
for range r.every.C {
// Check if the stopped flag been set via the stopChan.
if r.stopped {
break
}
// If we have a start time, skip this tick if time not passed.
if !r.job.CanBegin() {
fmt.Println("we cannot begin")
continue
}
// Also check for an expiration time.
if r.job.Expired() {
fmt.Println("we have expired")
break
}
// Increment the safety and running counts.
safety++
r.jobRunner.runningCount++
// Check for a cap and whether it has been reached.
if r.job.CapBreached(r.jobRunner.runningCount) {
fmt.Println("cap is breached")
break
}
// Make the call...
r.jobRunner.fn.Call(r.jobRunner.in)
if safety == math.MaxUint64 {
fmt.Println("err max ticker breach")
r.err = ErrJobMaximumTickBreach
break
}
// Check if this job is orphaned. Though this "should not" happen
// as members are private and interfaces ensure manipulation, we still
// check that this job is still assigned to this schedule.
_, err := r.schedule.FindJob(r.job.ID())
if err != nil {
fmt.Println("orphaned job")
r.err = ErrJobOrphaned
break
}
}
rc <- r
}
package kevin
import (
"fmt"
"testing"
"time"
)
func Test_initJobRunner(t *testing.T) {
// Test invalid method
j := newTestJob()
j.fn = "not a valid function"
jr, err := initJobRunner(j)
if jr != nil {
t.Error("Non nil Job runner instance on bad function.")
}
if err == nil {
t.Error("Nil error on Job runner instance on bad function.")
}
if !IsKevinError(err) {
t.Error("Non Kevin error on Job runner instance on bad function.")
}
if err != ErrJobNonFunctionType {
t.Error("Kevin error of wrong type on Job runner instance on bad function.")
}
// Test method with no params
j = &Job{
cap: 10,
id: nameDrop,
every: time.Duration(100 * time.Millisecond),
expires: time.Now().AddDate(0, 0, 1),
begins: time.Now().Add(10 * time.Minute),
}
j.fn = methodWithOutParams
jr, err = initJobRunner(j)
if jr == nil {
t.Error("Job runner instance should be returned on methodWithOutParams.")
}
if err != nil {
t.Error("Non nil error on Job runner instance on methodWithOutParams.")
}
// Test method with params - setting fnParams on job instance
// and appending with the AddFnParams()
j = newTestJob()
j.fn = methodWithParams
jr, err = initJobRunner(j)
if jr != nil {
t.Error("Non nil Job runner instance on wrong params passed.")
}
if err == nil {
t.Error("Nil error on Job runner instance on no params passed.")
}
if !IsKevinError(err) {
t.Error("Non Kevin error on Job runner instance on no params passed.")
}
if err != ErrJobFunctionParamMismatch {
t.Error("Kevin error of wrong type on Job runner instance on no params passed.")
}
// Add the correct params, also note the appending of params
j.AddFnParams("bar", int64(23))
jr, err = initJobRunner(j)
if jr == nil {
t.Error("Nil Job runner instance when correct params passed.")
}
if err != nil {
t.Error(err)
}
}
func Test_runJob(t *testing.T) {
// Test invalid method
j := newTestJob()
j.fnParams = []interface{}{}
j.begins = time.Now().Add(5 * time.Second)
j.fn = methodWithOutParams
jr, _ := initJobRunner(j)
jr.runJob(j, DefaultSchedule())
//jr.task.Stop()
//jr.Stop()
time.Sleep(10 * time.Second)
fmt.Println(jr.completed)
fmt.Println(jr.runningCount)
fmt.Println(jr.task.Err())
}
//func Test_initJobRunner(t *testing.T) {
//
//}
//vals = jr.fn.Call(jr.in)
//ret := vals[0]
//if "returnTest" != ret.Type().Name() {
// t.Error("Incorrect struct type returned from methodWithParams")
//}
//
//if "foo" != ret.Field(0).String() {
// t.Error("Expecting 1st param of foo returned")
//}
//
//if "bar" != ret.Field(1).String() {
// t.Error("Expecting 2nd param of bar returned")
//}
//
//if 23 != ret.Field(2).Int() {
// t.Error("Expecting 3rd param of 23 returned")
//}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment