Commit 117fd2ce authored by Kyle Clarke's avatar Kyle Clarke 💬

Adding tests into seperate package highlighted all private members that were...

Adding tests into seperate package highlighted all private members that were required to be public. Altered tests, source and read me.
parent 3f8cd84b
......@@ -10,7 +10,7 @@ Kevin is an in memory non persistent scheduler written in Golang that simplifies
repeat method calls. Kevin keeps track of jobs asynchronously running in go
routines but still allows you to stop jobs running at anytime.
Lets run a simple function every minute.
Lets run a simple function Every minute.
```
......@@ -23,13 +23,13 @@ func Simple() {
// Do some repeated work.
}
id, _ := kevin.DefaultSchedule().Run(&Job{
fn: Simple,
every: time.Duration(time.Second),
ID, _ := kevin.DefaultSchedule().Run(&Job{
Fn: Simple,
Every: time.Duration(time.Second),
})
// Some time later we check the status.
runner, _ := kevin.DefaultSchedule().Find(id)
runner, _ := kevin.DefaultSchedule().Find(ID)
i := runner.Info()
if i.IsRunning() {
// All is well
......@@ -49,9 +49,9 @@ create your own custom schedules call
```
c := kevin.ScheduleConfig{
name: "custom",
purgeBefore: OPTIONAL time.Duration
purgePoll: OPTIONAL time.Duration
Name: "custom",
PurgeBefore: OPTIONAL time.Duration
PurgePoll: OPTIONAL time.Duration
})
s, err := kevin.NewSchedule(c)
......@@ -60,16 +60,16 @@ An error will be returned on an empty name "" or if a schedule of the same name
found. Names **are** case sensitive.
The optional purge fields help define an auto purger created on your new schedule. If
left to the defaults, every hour the schedule will be purged, removing all
***completed*** job runners that are older than 24 hours. Alter the ***purgeBefore***
left to the defaults, Every hour the schedule will be purged, removing all
***completed*** job runners that are older than 24 hours. Alter the ***PurgeBefore***
time.Duration to purge older/newer runners.
Don't want to purge anything, no problem. Just pass a zero duration as your
***purgePoll*** value.
***PurgePoll*** value.
```
c := kevin.ScheduleConfig{
name: "custom",
purgePoll: time.Duration(0),
Name: "custom",
PurgePoll: time.Duration(0),
})
```
Of course you can always just call `purge()` manually on your schedule if you prefer.
......@@ -96,22 +96,22 @@ type Job struct {
begins time.Time
// Cap the number of times this job will run.
cap uint64
Cap uint64
// Run this job **every. Required and cannot be zero!
every time.Duration
// Run this job **Every. Required and cannot be zero!
Every time.Duration
// Ensure that this job does not run after the expires time.
expires time.Time
// Name of the function to be called on **every.
fn interface{}
// Name of the function to be called on **Every.
Fn interface{}
// Assign the parameters to pass to fn. Use the convenience method of Job.AddFnParams()
fnParams []interface{}
// Assign the parameters to pass to Fn. Use the convenience method of Job.AddFnParams()
FnParams []interface{}
// Assign your own JobID to track your job runners.
id JobID
ID JobID
}
```
Note that when assigning ***begin*** and ***expires*** time values, the time given does
......@@ -119,11 +119,11 @@ Note that when assigning ***begin*** and ***expires*** time values, the time giv
guarantee however that a job will not run before or after the times given. Use the
default duration zero values to ignore these fields.
Because Kevin uses reflection to process ***fn*** and ***fnParams***, empty
Because Kevin uses reflection to process ***Fn*** and ***FnParams***, empty
interface values are the defined type. The syntax is a little weird on applying
params directly to the struct, eg;
```
fnParams: []interface{}{"www.kylehq.com", "name drop yo", 24}
FnParams: []interface{}{"www.kylehq.com", "name drop yo", 24}
```
So instead use the convenience method of `Job.AddFnParams()`. This allows you mix
......@@ -133,23 +133,23 @@ on the fly. Just be sure you maintain order.
func ParamLots(x, y, z string, hit int64) {
j := &Job{
cap: 10,
fn: ParamLots,
fnParams: []interface{}{"www.kylehq.com"}, // 1 param assigned
every: time.Duration(time.Second),
Cap: 10,
Fn: ParamLots,
FnParams: []interface{}{"www.kylehq.com"}, // 1 param assigned
Every: time.Duration(time.Second),
}
// Assign 3 more to fulfil ParamLots
j.AddFnParams("yankee", "zebra", 21)
id, err := kevin.DefaultSchedule().Run(j)
ID, err := kevin.DefaultSchedule().Run(j)
```
#### JobIDs
By default a `JobID` is returned on `Add()` and `Run()` to enable you to store and
track your job runners progress. A `JobID` is **unique ONLY to each schedule**. If you
are creating numerous schedules, it may be beneficial to provide your own universal id,
are creating numerous schedules, it may be beneficial to provide your own universal ID,
such as a `user_id`.
For example, imagine you have x3 schedules, one for http requests, one for sms requests
......@@ -163,7 +163,7 @@ s.Reset()
// Remove all user specific runners from all schedules.
for _, s := range kevin.pool.schedules {
err := s.Remove("your user id here")
err := s.Remove("your user ID here")
if err != nil {
// Alert
}
......@@ -171,8 +171,8 @@ for _, s := range kevin.pool.schedules {
```
## Caveats / TODO's
**Job.every** - You will note that you can assign ***any*** time.Duration value to a
jobs' every field. Kevin could limit the every to a minimum duration of 1 second. But
**Job.Every** - You will note that you can assign ***any*** time.Duration value to a
jobs' Every field. Kevin could limit the Every to a minimum duration of 1 second. But
micro and nano second values are a construct of the Golang language so Kevin should
support them too. However, Kevin **cannot guarantee** the performance of this package
under nano second circumstances. Although some steps have been taken to sync.lock
......
......@@ -13,7 +13,7 @@ var (
ErrScheduleJobNotFound = NewKevinError("Scheduled job not found.")
ErrDurationZero = NewKevinError("Task duration of zero is not allowed.")
ErrJobDuplicateId = NewKevinError("Add job attempted on schedule with duplicate job id.")
ErrJobDuplicateId = NewKevinError("Add job attempted on schedule with duplicate job ID.")
ErrJobFunctionParamMismatch = NewKevinError("Mismatch in params passed to assigned Fn.")
ErrJobNonFunctionType = NewKevinError("Fn assigned not of func type.")
ErrJobMaximumTickBreach = NewKevinError("The job Every ticker has reached the maximum uint64 number of ticks.")
......
......@@ -5,12 +5,16 @@ import (
"time"
)
const DefaultName = "default"
const (
DefaultName = "default"
DefaultPurgeBefore = 24 * time.Hour
DefaultPurgePoll = time.Hour
)
var pool *SchedulePool = &SchedulePool{
schedules: make(Schedules),
defaultPurgeBefore: 24 * time.Hour,
defaultPurgePoll: time.Hour,
defaultPurgeBefore: DefaultPurgeBefore,
defaultPurgePoll: DefaultPurgePoll,
}
func init() {
......@@ -34,9 +38,11 @@ type Schedules map[string]*Schedule
// SchedulesInfo represents information about all Schedulers assigned to the schedule pool.
type SchedulesInfo struct {
scheduleCount int64
jobCount int64
schedules map[string]ScheduleInfo
JobCount int64
PurgeBefore time.Duration
PurgePoll time.Duration
ScheduleCount int64
Schedules map[string]ScheduleInfo
}
// ByName will attempt to return by name a Scheduler instance for the local schedule pool.
......@@ -54,7 +60,7 @@ func DefaultSchedule() *Schedule {
s, err := ByName(DefaultName)
if err != nil {
sc := NewScheduleConfig()
sc.name = DefaultName
sc.Name = DefaultName
s, _ = NewSchedule(sc)
}
......@@ -62,19 +68,21 @@ func DefaultSchedule() *Schedule {
}
// Info will return a populated SchedulesInfo struct.
func Info() (SchedulesInfo, error) {
func Info() SchedulesInfo {
c := SchedulesInfo{
schedules: make(map[string]ScheduleInfo),
Schedules: make(map[string]ScheduleInfo),
PurgeBefore: pool.defaultPurgeBefore,
PurgePoll: pool.defaultPurgePoll,
}
for _, s := range pool.schedules {
i := s.Info()
c.scheduleCount++
c.jobCount += i.count
c.schedules[i.name] = i
c.ScheduleCount++
c.JobCount += i.Count
c.Schedules[i.Name] = i
}
return c, nil
return c
}
// IsEmpty will return true if all schedules in this pool are empty.
......
package kevin
package kevin_test
import (
"testing"
. "gitlab.com/kylehqcom/kevin"
"time"
)
......@@ -35,24 +37,24 @@ func TestDefaultSchedule(t *testing.T) {
func TestInfo(t *testing.T) {
Reset()
i, _ := Info()
if 0 < i.jobCount {
i := Info()
if 0 < i.JobCount {
t.Error("Should have zero job in pool info.")
}
if 1 != len(i.schedules) {
if 1 != len(i.Schedules) {
t.Error("We should have at least the default schedule.")
}
s, _ := NewSchedule(ScheduleConfig{name: "TestInfo"})
s, _ := NewSchedule(ScheduleConfig{Name: "TestInfo"})
s.Add(newSimpleJob())
i, _ = Info()
if 1 != i.jobCount {
i = Info()
if 1 != i.JobCount {
t.Error("Should have one job in pool info.")
}
if 2 != len(i.schedules) {
if 2 != len(i.Schedules) {
t.Error("We should have 2 schedules.")
}
......@@ -74,11 +76,15 @@ func TestIsEmpty(t *testing.T) {
}
func TestPurgePollingInterval(t *testing.T) {
d := pool.defaultPurgePoll + (10 * time.Second)
d := DefaultPurgePoll + (10 * time.Second)
PurgePollingInterval(d)
if pool.defaultPurgePoll != d {
Info()
if Info().PurgePoll != d {
t.Error("Assigning new purge poll interval did not stick.")
}
// Reset back to the default value.
PurgePollingInterval(DefaultPurgePoll)
}
func TestReset(t *testing.T) {
......
......@@ -7,12 +7,12 @@ import (
)
const (
Status_Running = "Running"
Status_Stopped = "Stopped"
Status_Errored = "Errored"
Status_Expired = "Expired"
Status_Cap_Breached = "Cap Breached"
Status_Cannot_Begin = "Cannot Begin"
StatusRunning = "Running"
StatusStopped = "Stopped"
StatusError = "Error"
StatusExpired = "Expired"
StatusCapBreached = "Cap Breached"
StatusCannotBegin = "Cannot Begin"
)
// JobRunners are a map of all job runners key'd on JobID.
......@@ -35,43 +35,43 @@ type JobRunner struct {
// call inside doRun.
func initRunner(j *Job) (*JobRunner, error) {
// Be sure we can parse the func and func params
f := j.fn
f := j.Fn
rt := reflect.TypeOf(f)
if rt.Kind() != reflect.Func {
return nil, ErrJobNonFunctionType
}
fnParams := j.fnParams
fn := reflect.ValueOf(f)
if len(fnParams) != fn.Type().NumIn() {
FnParams := j.FnParams
Fn := reflect.ValueOf(f)
if len(FnParams) != Fn.Type().NumIn() {
return nil, ErrJobFunctionParamMismatch
}
in := make([]reflect.Value, len(fnParams))
for k, fnParam := range fnParams {
in := make([]reflect.Value, len(FnParams))
for k, fnParam := range FnParams {
in[k] = reflect.ValueOf(fnParam)
}
return &JobRunner{
job: j,
callFn: fn,
callFn: Fn,
callFnParams: in,
}, nil
}
// CanBegin will return true if a begin time is set and is older than the current time.
func (r *JobRunner) CanBegin() bool {
return r.job.begins.IsZero() || r.job.begins.Before(time.Now())
return r.job.Begins.IsZero() || r.job.Begins.Before(time.Now())
}
// CapBreached will return true if this Job has a cap and it is over the running count.
// CapBreached will return true if this Job has a Cap and it is over the running count.
func (r *JobRunner) CapBreached() bool {
return (r.job.cap != 0 && r.runningCount > r.job.cap)
return (r.job.Cap != 0 && r.runningCount > r.job.Cap)
}
// Expired will return true if an expiration time is set and is before now.
func (r *JobRunner) Expired() bool {
return (!r.job.expires.IsZero() && r.job.expires.Before(time.Now()))
return (!r.job.Expires.IsZero() && r.job.Expires.Before(time.Now()))
}
func (r *JobRunner) Run() {
......@@ -79,8 +79,8 @@ func (r *JobRunner) Run() {
}
// Stop will set this runners stop flag. This does not ensure that the runner job will
// stop when called. It does guarantee that the next "every" invocation will stop.
func (r *JobRunner) Stop() {
// stop when called. It does guarantee that the next "Every" invocation will stop.
func (r *JobRunner) Stop() {
r.stop = true
}
......@@ -92,35 +92,35 @@ func (r *JobRunner) Stopped() bool {
// do is called via Run in a go routine.
func do(r *JobRunner) {
safety := uint64(0)
every := time.NewTicker(r.job.every)
Every := time.NewTicker(r.job.Every)
defer func() {
// On close set the completed time for this job runner.
r.completed = time.Now().UTC()
}()
for range every.C {
for range Every.C {
// Check if the stop flag been set against the runner task.
if r.stop {
r.status = Status_Stopped
every.Stop()
r.status = StatusStopped
Every.Stop()
break
}
// If we have a start time, skip this tick if time not passed.
if !r.CanBegin() {
r.status = Status_Cannot_Begin
r.status = StatusCannotBegin
continue
}
// Also check for an expiration time.
if r.Expired() {
r.status = Status_Expired
r.status = StatusExpired
break
}
// Check for a cap and whether it will breach.
// Check for a Cap and whether it will breach.
if r.CapBreached() {
r.status = Status_Cap_Breached
r.status = StatusCapBreached
break
}
......@@ -132,22 +132,22 @@ func do(r *JobRunner) {
// Increment the safety and running counts.
safety++
r.runningCount++
r.status = Status_Running
r.status = StatusRunning
// Make the call...
r.callFn.Call(r.callFnParams)
if safety == math.MaxUint64 {
r.status = Status_Errored
r.status = StatusError
r.err = ErrJobMaximumTickBreach
break
}
// Check if this job is orphaned. Though this "should not" happen
// we still check that this job is still assigned to this schedule.
_, err := r.job.schedule.Find(r.job.id)
_, err := r.job.schedule.Find(r.job.ID)
if err != nil {
r.status = Status_Errored
r.status = StatusError
r.err = ErrJobOrphaned
break
}
......@@ -174,9 +174,14 @@ func (r *JobRunner) Info() JobRunnerInfo {
}
}
// Completed will return the completed time. May be zero.
func (i JobRunnerInfo) CompletedAt() time.Time {
return i.completedAt
}
// IsCompleted will return true if a completed time is set.
func (i JobRunnerInfo) IsCompleted() bool {
return !i.completedAt.IsZero()
return !i.CompletedAt().IsZero()
}
// IsPending will return true if not complete and not started.
......@@ -193,3 +198,13 @@ func (i JobRunnerInfo) IsRunning() bool {
func (i JobRunnerInfo) RunningCount() uint64 {
return i.runningCount
}
// Status will return a simple string determining the current runner status.
func (i JobRunnerInfo) Status() string {
return i.status
}
// Stopped will return true is ths runner has been stopped.
func (i JobRunnerInfo) Stopped() bool {
return i.stopped
}
package kevin
package kevin_test
import (
"testing"
"time"
. "gitlab.com/kylehqcom/kevin"
)
func Test_initRunner(t *testing.T) {
Reset()
s := DefaultSchedule()
// Test invalid method
j := newSimpleJob()
j.fn = "not a valid function"
jr, err := initRunner(j)
if jr != nil {
t.Error("Non nil Job runner instance on bad function.")
}
j.Fn = "not a valid function"
_, err := s.Add(j)
if err == nil {
t.Error("Nil error on Job runner instance on bad function.")
}
......@@ -30,21 +32,21 @@ func Test_initRunner(t *testing.T) {
func TestJobRunner_CanBegin(t *testing.T) {
Reset()
j := newSimpleJob()
j.begins = time.Now().Add(time.Minute)
j.Begins = time.Now().Add(time.Minute)
s := DefaultSchedule()
s.Add(j)
r, _ := s.Find(j.id)
r, _ := s.Find(j.ID)
if r.CanBegin() {
t.Error("Can begin should be false with date 10 mins in future.")
}
j.begins = time.Now().Add(-100 * time.Millisecond)
j.Begins = time.Now().Add(-100 * time.Millisecond)
if !r.CanBegin() {
t.Error("A job can begin when begin time 1 minute in the past.")
}
j.begins = time.Time{}
j.Begins = time.Time{}
if !r.CanBegin() {
t.Error("A job can begin with zero time.")
}
......@@ -53,27 +55,29 @@ func TestJobRunner_CanBegin(t *testing.T) {
func TestJobRunner_CapBreached(t *testing.T) {
Reset()
j := newSimpleJob()
j.cap = 0
j.Cap = 0
s := DefaultSchedule()
s.Add(j)
r, _ := s.Find(j.id)
r, _ := s.Find(j.ID)
if r.CapBreached() {
t.Error("Cap should not be breached with cap of zero.")
t.Error("Cap should not be breached with Cap of zero.")
}
j.cap = 1
if r.CapBreached() {
t.Error("Cap should not be breached with zero running count.")
}
j.Cap = 1
r.runningCount = 1
if r.CapBreached() {
t.Error("Cap should not be breached with equal values.")
}
for i := 0; i < 5; i++ {
switch true {
case 0 == r.Info().RunningCount():
case 1 == r.Info().RunningCount():
if r.CapBreached() {
t.Error("Cap should not be breached with 0 or 1 running count.")
}
default:
t.Error("Unknown case.")
}
r.runningCount = 2
if !r.CapBreached() {
t.Error("Cap is breached with 2.")
r.Run()
}
}
......@@ -81,22 +85,22 @@ func TestJobRunner_Expired(t *testing.T) {
Reset()
j := newSimpleJob()
j.expires = time.Time{}
j.Expires = time.Time{}
s := DefaultSchedule()
s.Add(j)
r, _ := s.Find(j.id)
r, _ := s.Find(j.ID)
if r.Expired() {
t.Error("Expired should be false as zero expire time assigned to job.")
}
j.expires = time.Now().Add(time.Minute)
j.Expires = time.Now().Add(time.Minute)
if r.Expired() {
t.Error("Expired should be false as expiration one minute from now.")
}
j.expires = time.Now().Add(-1 * time.Minute)
j.Expires = time.Now().Add(-1 * time.Minute)
if !r.Expired() {
t.Error("Expired should be true as expiration one minute in past.")
}
......@@ -107,7 +111,7 @@ func TestJobRunner_Stop(t *testing.T) {
j := newSimpleJob()
s := DefaultSchedule()
s.Add(j)
r, _ := s.Find(j.id)
r, _ := s.Find(j.ID)
if r.Stopped() {
t.Error("A job should not be stopped by default.")
}
......
......@@ -16,40 +16,40 @@ type Schedule struct {
// ScheduleConfig are used to create new Schedulers.
type ScheduleConfig struct {
name string
purgeBefore time.Duration
purgePoll time.Duration
Name string
PurgeBefore time.Duration
PurgePoll time.Duration
}
// SchedulesInfo represents information about all Schedulers assigned to the schedule pool.
type ScheduleInfo struct {
name string
count int64
Name string
Count int64
}
type JobID string
type Job struct {
// Ensure that this job does not run before the begins time.
begins time.Time
Begins time.Time
// Cap the number of times this job will run.
cap uint64
Cap uint64
// Run this job **every.
every time.Duration
// Run this job **Every.
Every time.Duration
// Ensure that this job does not run after the expires time.
expires time.Time
Expires time.Time
// Name of the function to be called on **every.
fn interface{}
// Name of the function to be called on **Every.
Fn interface{}
// Assign the parameters to pass to fn. Use the convenience method of Job.AddFnParams()
fnParams []interface{}
// Assign the parameters to pass to Fn. Use the convenience method of Job.AddFnParams()
FnParams []interface{}
// Assign your own JobID to track your job runners.
id JobID
ID JobID
// As a schedule can dig in to get a job runner, a job should be able to dig out to it's schedule.
schedule *Schedule
......@@ -57,13 +57,13 @@ type Job struct {
// A convenience method to added params to the Job instance.
func (j *Job) AddFnParams(params ...interface{}) {
j.fnParams = append(j.fnParams, params...)
j.FnParams = append(j.FnParams, params...)
}
// NewSchedule will return a new Scheduler instance. It expects ScheduleConfig.
func NewSchedule(sc ScheduleConfig) (*Schedule, error) {
if "" == sc.name {
if "" == sc.Name {
return nil, ErrScheduleEmptyName
}
......@@ -75,16 +75,16 @@ func NewSchedule(sc ScheduleConfig) (*Schedule, error) {
// Check for a schedule of the same name, do not just overwrite
pool.Lock()
_, ok := pool.schedules[sc.name]
_, ok := pool.schedules[sc.Name]
pool.Unlock()
if ok {
return nil, ErrScheduleDuplicateName
} else {
pool.schedules[sc.name] = schedule
pool.schedules[sc.Name] = schedule
}
// Setup the AutoPurge for this Schedule.
schedule.AutoPurge(sc.purgePoll)
schedule.AutoPurge(sc.PurgePoll)
return schedule, nil
}
......@@ -92,56 +92,56 @@ func NewSchedule(sc ScheduleConfig) (*Schedule, error) {
// NewScheduleConfig will create and return new ScheduleConfig pointer.
func NewScheduleConfig() ScheduleConfig {
return ScheduleConfig{
purgeBefore: pool.defaultPurgeBefore,
purgePoll: pool.defaultPurgePoll,
PurgeBefore: pool.defaultPurgeBefore,
PurgePoll: pool.defaultPurgePoll,
}
}
func (s *Schedule) Add(j *Job) (JobID, error) {
if "" == j.id {
j.id = s.GenerateID()
if "" == j.ID {
j.ID = s.GenerateID()
}
// Check for a duplicate job id on this schedule.
// Check for a duplicate job ID on this schedule.
s.Lock()
_, ok := s.runners[j.id]
_, ok := s.runners[j.ID]
s.Unlock()
if ok {
return j.id, ErrJobDuplicateId
return j.ID, ErrJobDuplicateId
}
// Assign this schedule
j.schedule = s
// Check we have a valid Every duration.
if 0 == j.every.Nanoseconds() {
return j.id, ErrDurationZero
if 0 == j.Every.Nanoseconds() {
return j.ID, ErrDurationZero
}
// Init a Job Runner which handles reflection of method calls with params.
r, err := initRunner(j)