Commit 9bd2d136 authored by Kyle Clarke's avatar Kyle Clarke 💬

updated runner to be simpler - schedules no longer need a timeout for stopped jobs

parent 775dd7a8
......@@ -4,11 +4,14 @@ import (
"reflect"
"time"
"fmt"
"math"
)
"github.com/matryer/runner"
const (
INFO_CANNOT_BEGIN = "Cannot begin"
INFO_CAP_BREACH = "Cap breached"
INFO_EXPIRED = "Expired"
INFO_STOPPED = "Stopped"
)
// Runners are the glue that join schedules to jobs. They hold meta data about
......@@ -31,21 +34,21 @@ type JobRunner struct {
in []reflect.Value
job Jobber
started time.Time
task *runner.Task
task *RunnerTask
}
// A runResult holds params and meta info to be passed along a run channel.
type runResult struct {
type RunnerTask struct {
err error
every *time.Ticker
job Jobber
jobRunner *JobRunner
schedule Scheduler
info string
stopped bool
}
// Given a jobber, use reflection to create the func and func params to
// call inside the runner task.
// call inside doRun.
func initJobRunner(j Jobber) (*JobRunner, error) {
// Be sure we can parse the func and func params
f := j.Fn()
......@@ -73,92 +76,48 @@ func initJobRunner(j Jobber) (*JobRunner, error) {
}
func (jr *JobRunner) runJob(j Jobber, s Scheduler) error {
everyDuration := j.Every()
fmt.Println("in run job")
task := runner.Go(func(shouldStop runner.S) error {
defer func() {
// On close set the completed time for this job runner.
jr.completed = time.Now().UTC()
}()
// Every tickers are designed to have variable iterations, both long
// and short. Because of this, it doesn't make sense for the shouldStop()
// call to be invoked via the Every ticker. Take for example a task that
// runs every x2 weeks when a schedule reset is called. A reset expects
// all jobs to stop in 10secs by default.
//
// If order to handle these variable iterations, we need to assign the ticker
// a duration that is appropriate to the Every duration. Therefore we tick
// every second if the Every is a second or more, or at the Every duration
// to check shouldStop().
stopTicker := time.NewTicker(time.Second)
if everyDuration < time.Second {
stopTicker = time.NewTicker(everyDuration)
}
// Create the stopChan.
stopChan := make(chan bool)
go runStop(shouldStop, stopTicker, stopChan)
// Create the runChan
runChan := make(chan *runResult)
result := &runResult{
every: time.NewTicker(everyDuration),
job: j,
jobRunner: jr,
schedule: s,
}
go runDo(result, runChan)
for {
select {
case <-stopChan:
// Update the results stopped flag.
result.stopped = true
return nil
case <-runChan:
return result.err
default:
// nohup
}
}
return nil
})
func (jr *JobRunner) run(j Jobber, s Scheduler) error {
// Create a new populated RunnerTask
jr.task = &RunnerTask{
job: j,
jobRunner: jr,
schedule: s,
}
go do(jr.task)
jr.task = task
return nil
}
func runStop(shouldStop runner.S, stopTicker *time.Ticker, stopChan chan<- bool) {
for range stopTicker.C {
if shouldStop() {
fmt.Println("stop invoked")
stopChan <- true
}
}
}
func runDo(r *runResult, rc chan<- *runResult) {
func do(r *RunnerTask) {
safety := uint64(0)
for range r.every.C {
every := time.NewTicker(r.job.Every())
defer func() {
// On close set the completed time for this job runner.
r.jobRunner.completed = time.Now().UTC()
}()
for range every.C {
// Check if the stopped flag been set via the stopChan.
if r.stopped {
r.info = INFO_STOPPED
break
}
// If we have a start time, skip this tick if time not passed.
if !r.job.CanBegin() {
fmt.Println("we cannot begin")
r.info = INFO_CANNOT_BEGIN
continue
}
// Also check for an expiration time.
if r.job.Expired() {
fmt.Println("we have expired")
r.info = INFO_EXPIRED
break
}
// Check for a cap and whether it will breach.
if r.job.CapBreached(r.jobRunner.runningCount + 1) {
r.info = INFO_CAP_BREACH
break
}
......@@ -166,17 +125,10 @@ func runDo(r *runResult, rc chan<- *runResult) {
safety++
r.jobRunner.runningCount++
// Check for a cap and whether it has been reached.
if r.job.CapBreached(r.jobRunner.runningCount) {
fmt.Println("cap is breached")
break
}
// Make the call...
r.jobRunner.fn.Call(r.jobRunner.in)
if safety == math.MaxUint64 {
fmt.Println("err max ticker breach")
r.err = ErrJobMaximumTickBreach
break
}
......@@ -186,11 +138,8 @@ func runDo(r *runResult, rc chan<- *runResult) {
// check that this job is still assigned to this schedule.
_, err := r.schedule.FindJob(r.job.ID())
if err != nil {
fmt.Println("orphaned job")
r.err = ErrJobOrphaned
break
}
}
rc <- r
}
package kevin
import (
"fmt"
"testing"
"time"
)
......
......@@ -9,15 +9,16 @@ import (
// Scheduler interface
type Scheduler interface {
GenerateJobID() JobID
Info() ScheduleInfo
IsEmpty() bool
Name() string
AutoPurge(d time.Duration)
Purge() error
Reset(d time.Duration) error
Reset() error
AddJob(j Jobber) (JobID, error)
FindJob(ID JobID) (*JobRunner, error)
RemoveJob(ID JobID, d time.Duration) error
StopJob(ID JobID, d time.Duration) error
RemoveJob(ID JobID) error
StopJob(ID JobID) error
}
type Schedule struct {
......@@ -33,6 +34,43 @@ type ScheduleConfig struct {
purgeAfter time.Duration
}
// SchedulesInfo represents information about all Schedulers assigned to the schedule pool.
type ScheduleInfo struct {
Name string
Count int64
}
// NewSchedule will return a new Scheduler instance. It expects ScheduleConfig.
func NewSchedule(sc *ScheduleConfig) (Scheduler, error) {
if "" == sc.name {
return nil, ErrScheduleEmptyName
}
// Create a new schedule, add to the schedulePool.
schedule := &Schedule{
jobrunners: make(JobRunners),
config: sc,
}
// Check for a purge duration and setup a ticker for this Schedule
if 0 != sc.purgeAfter.Seconds() {
schedule.AutoPurge(schedulePool.defaultPurgePoll)
}
// Check for a schedule of the same name, do not just overwrite
schedulePool.Lock()
_, ok := schedulePool.schedules[sc.name]
schedulePool.Unlock()
if ok {
return nil, ErrScheduleDuplicateName
} else {
schedulePool.schedules[sc.name] = schedule
}
return schedule, nil
}
// NewScheduleConfig will create a new ScheduleConfig struct with any defaults assigned.
func NewScheduleConfig() *ScheduleConfig {
return &ScheduleConfig{}
......@@ -62,7 +100,7 @@ func (s *Schedule) AddJob(j Jobber) (JobID, error) {
}
// Now run the job
err = jr.runJob(j, s)
err = jr.run(j, s)
if err != nil {
return j.ID(), err
}
......@@ -122,6 +160,13 @@ func (s *Schedule) GenerateJobID() JobID {
return id
}
func (s *Schedule) Info() ScheduleInfo {
return ScheduleInfo{
Name: s.Name(),
Count: int64(len(s.jobrunners)),
}
}
func (s *Schedule) IsEmpty() bool {
return 0 == len(s.jobrunners)
}
......@@ -138,8 +183,8 @@ func (s *Schedule) Purge() error {
}
// RemoveJob will first attempt to stop the job and then remove from this schedule.
func (s *Schedule) RemoveJob(ID JobID, d time.Duration) error {
err := s.StopJob(ID, d)
func (s *Schedule) RemoveJob(ID JobID) error {
err := s.StopJob(ID)
if err != nil {
return err
}
......@@ -149,9 +194,9 @@ func (s *Schedule) RemoveJob(ID JobID, d time.Duration) error {
}
// Reset will attempt to stop and remove ALL jobs from this schedule.
func (s *Schedule) Reset(d time.Duration) error {
func (s *Schedule) Reset() error {
for _, j := range s.jobrunners {
err := s.RemoveJob(j.job.ID(), d)
err := s.RemoveJob(j.job.ID())
if err != nil {
return err
}
......@@ -160,25 +205,12 @@ func (s *Schedule) Reset(d time.Duration) error {
}
// Reset will attempt to stop the execution on the next job runnerfrom this schedule.
func (s *Schedule) StopJob(ID JobID, d time.Duration) error {
func (s *Schedule) StopJob(ID JobID) error {
j, err := s.FindJob(ID)
if err != nil {
return err
}
// Default to 10 secs if 0 passed
if d.Nanoseconds() == 0 {
d = time.Duration(10 * time.Second)
}
j.task.Stop()
select {
case <-j.task.StopChan():
// task successfully stopped
return nil
case <-time.After(d):
// task didn't stop in time
return ErrJobStopTimeout
}
j.task.stopped = true
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment