Commit 33f89571 authored by Kyle Clarke's avatar Kyle Clarke

Split logical units into own files. Added mutex to entry

So that we can lock an entry, I added a sync mutex to get a reliable is
processing or is complete return value. Also split the code into files
based on logical blocks. Re-checked the example file included and
updated.
parent 070d9884
Pipeline #41717810 passed with stage
in 39 seconds
package qupid
import (
"sync"
"time"
)
type (
// Entry is a Queue entry/item that stores an ID and processing times, along with an optional entity.
Entry struct {
ID string
Entity interface{}
processing time.Time
completed time.Time
sync.Mutex
}
// EntryOption is a func type to assign options to a new Entry pointer.
EntryOption func(*Entry)
)
// WithEntity will assign an entity to the queue entry.
func WithEntity(entity interface{}) EntryOption {
return func(e *Entry) {
e.Entity = entity
}
}
// NewEntry will return a new entry pointer instance. Pass a required ID, and optional options.
func NewEntry(id string, eos ...EntryOption) *Entry {
e := &Entry{ID: id}
for _, eo := range eos {
eo(e)
}
return e
}
// IsCompleted will return whether the entry is deemed to be processed/completed.
func (e *Entry) IsCompleted() bool {
e.Lock()
defer e.Unlock()
return !e.completed.IsZero()
}
// IsProcessing will return whether the entry is currently being processed.
func (e *Entry) IsProcessing() bool {
e.Lock()
defer e.Unlock()
return !e.processing.IsZero() && e.completed.IsZero()
}
package qupid
// QupidError is the error struct used for the qupid package.
type QupidError struct {
s string
}
var (
// ErrQupidProcessingStopped denotes the queue has been stopped so no queuing can occur.
ErrQupidProcessingStopped = NewQupidError("Queue processing has been stopped.")
// ErrQupidPipelineBufferFull denotes that the queue pipeline is full and cannot add anymore entries.
ErrQupidPipelineBufferFull = NewQupidError("Queue pipeline buffer is full. No longer adding queued items.")
)
// Error will return the string representation of the error instance.
func (e QupidError) Error() string {
return e.s
}
// IsQupidError will confirm with the error instance is of type QupidError.
func IsQupidError(err error) bool {
_, ok := err.(*QupidError)
return ok
}
// NewQupidError will return a new QupidError pointer instance.
func NewQupidError(errorMessage string) error {
return &QupidError{errorMessage}
}
......@@ -3,16 +3,24 @@ package main
import (
"errors"
"fmt"
"math/rand"
"strconv"
"time"
"gitlab.com/kylehqcom/qupid"
)
var q = qupid.NewQueue(qupid.WithPipelineBuffer(1000), qupid.WithPruneAfter(time.Minute))
type qLogger struct{}
func (ql qLogger) Printf(string, ...interface{}) {
// no hup
}
var q = qupid.NewQueue(qLogger{}, qupid.WithPipelineBuffer(1000), qupid.WithPruneAfter(time.Minute))
// Do will add, consume and send results to the queue. Plus stop/return when invoked.
func Do() {
t := time.NewTicker(time.Millisecond * 200)
t := time.NewTicker(time.Millisecond * 100)
go func() {
for {
select {
......@@ -28,9 +36,9 @@ func Do() {
case e := <-q.Consume():
// In general you never want your consumer to be blocking so
// place is a goroutine as required.
go func(e *qupid.Entry) {
res := qupid.NewResult(e)
ent, ok := e.Entity.(*Ent)
go func(ee *qupid.Entry) {
res := qupid.NewResult(ee)
ent, ok := ee.Entity.(*Ent)
if !ok {
res.Err = errors.New("WTF?")
}
......@@ -54,11 +62,14 @@ type Ent struct {
Name string
}
var r = rand.New(rand.NewSource(99))
// ToProcess will simply return a new entity struct for the example.
func ToProcess() (*Ent, error) {
id := strconv.Itoa(int(r.Int63()))
return &Ent{
ID: "foo",
Name: "bar",
ID: id,
Name: "bar-" + id,
}, nil
}
......@@ -71,5 +82,5 @@ func Stop() {
func main() {
defer Stop()
Do()
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 1)
}
......@@ -14,6 +14,14 @@ const (
)
type (
// qName is a string type to name internal processing queues.
qName string
// iQueue is an internal queue used for processing.
iQueue struct {
queued map[string]*Entry
}
// Q is the default q struct which holds internal queue data, along with the appropriate channels
// for callers to select on.
Q struct {
......@@ -278,16 +286,6 @@ func (q *Q) Stop() {
q.pipeline.stopped = true
}
type (
// qName is a string type to name internal processing queues.
qName string
// iQueue is an internal queue used for processing.
iQueue struct {
queued map[string]*Entry
}
)
func (iq *iQueue) add(e *Entry) {
iq.queued[e.ID] = e
}
......@@ -299,86 +297,3 @@ func (iq *iQueue) Len() int {
func (iq *iQueue) remove(e *Entry) {
delete(iq.queued, e.ID)
}
type (
Entry struct {
ID string
Entity interface{}
processing time.Time
completed time.Time
}
// EntryOption is a func type to assign options to a new Entry pointer.
EntryOption func(*Entry)
)
// WithEntity will assign an entity to the queue entry.
func WithEntity(entity interface{}) EntryOption {
return func(e *Entry) {
e.Entity = entity
}
}
// NewEntry will return a new entry pointer instance. Pass a required ID, and optional options.
func NewEntry(id string, eos ...EntryOption) *Entry {
e := &Entry{ID: id}
for _, eo := range eos {
eo(e)
}
return e
}
// IsCompleted will return whether the entry is deemed to be processed/completed.
func (e *Entry) IsCompleted() bool {
return !e.completed.IsZero()
}
// IsProcessing will return whether the entry is currently being processed.
func (e *Entry) IsProcessing() bool {
return !e.processing.IsZero() && !e.IsCompleted()
}
type (
// result is a struct that can optionally be passed to the Q.Complete() call. This allows
// for listeners to act on potential errors encountered on processing.
result struct {
Entry *Entry
Err error
}
)
// NewResult will return a result instance. A valid entry instance is required to create a result.
func NewResult(e *Entry) result {
return result{
Entry: e,
}
}
// QupidError is the error struct used for the qupid package.
type QupidError struct {
s string
}
var (
// ErrQupidProcessingStopped denotes the queue has been stopped so no queuing can occur.
ErrQupidProcessingStopped = NewQupidError("Queue processing has been stopped.")
// ErrQupidPipelineBufferFull denotes that the queue pipeline is full and cannot add anymore entries.
ErrQupidPipelineBufferFull = NewQupidError("Queue pipeline buffer is full. No longer adding queued items.")
)
// Error will return the string representation of the error instance.
func (e QupidError) Error() string {
return e.s
}
// IsQupidError will confirm with the error instance is of type QupidError.
func IsQupidError(err error) bool {
_, ok := err.(*QupidError)
return ok
}
// NewQupidError will return a new QupidError pointer instance.
func NewQupidError(errorMessage string) error {
return &QupidError{errorMessage}
}
......@@ -11,7 +11,7 @@ import (
type qLogger struct{}
func (ql qLogger) Printf(format string, a ...interface{}) {
fmt.Printf(format, a)
fmt.Printf(format, a...)
}
func TestWithPipelineBuffer(t *testing.T) {
......
package qupid
type (
// result is a struct that can optionally be passed to the Q.Complete() call. This allows
// for listeners to act on potential errors encountered on processing.
result struct {
Entry *Entry
Err error
}
)
// NewResult will return a result instance. A valid entry instance is required to create a result.
func NewResult(e *Entry) result {
return result{
Entry: e,
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment