Commit f95f315e authored by Kyle Clarke's avatar Kyle Clarke

Added a new readme and example file for public users.

parent 9c654ce4
Pipeline #6768853 (#) passed with stage
in 40 seconds
# Qupid
___
[![Build Status](https://gitlab.com/kylehqcom/qupid/badges/master/build.svg)](https://gitlab.com/kylehqcom/qupid/commits/master)
[![Go Report Card](https://goreportcard.com/badge/gitlab.com/kylehqcom/qupid)](https://goreportcard.com/report/gitlab.com/kylehqcom/qupid)
Qupid is a Queue/Pipeline package written in [Golang](https://golang.org/) that gives helpful accessors to queue, consume, return and halt queue processing.
### A contrived example
Lets setup a new Qupid Q instance `qupid.WithPipelineBuffer(50)`, create a `NewEntry(ID) *entry` to pass to `Queue(*entry)`, assigning an entity to the entry `WithEntity(ent)`. Then process the entry on the `<-Consume()` channel, and send a `NewResult(*entry) result` to the `Complete(result)` method. After processing for a couple seconds, lets interrupt the Q instance with `Stop()`.
```
package main
import (
"errors"
"fmt"
"time"
"gitlab.com/kylehqcom/qupid"
)
var q = qupid.NewQueue(qupid.WithPipelineBuffer(50))
func Do() {
t := time.NewTicker(time.Millisecond * 200)
go func() {
for {
select {
case <-t.C:
ent, err := ToProcess()
if err != nil {
Stop()
return
}
fmt.Println("Adding an entry to the queue with Name:", ent.Name)
q.Queue(qupid.NewEntry(ent.ID, qupid.WithEntity(ent)))
case e := <-q.Consume():
res := qupid.NewResult(e)
ent, ok := e.Entity.(*Ent)
if !ok {
res.Err = errors.New("WTF?")
}
fmt.Println("Consuming from the queue with Name:", ent.Name)
q.Complete(res)
case res := <-q.Results():
fmt.Println("Yah a result came through for ID:", res.Entry.ID)
case <-q.Interrupt():
t.Stop()
return
}
}
}()
}
type Ent struct {
ID string
Name string
}
func ToProcess() (*Ent, error) {
return &Ent{
ID: "foo",
Name: "bar",
}, nil
}
func Stop() {
fmt.Println("Calling stop")
q.Stop()
}
func main() {
defer Stop()
Do()
time.Sleep(time.Second * 2)
}
```
By default Qupid will track entries added/queued via the ID value given to each `*entry`. Note that entries that are already completed will be ignored if queued up again. It's **YOUR** responsibility to call `Complete()` with a `NewResult(e *entry)` to mark a queued entry as complete. It's also **YOUR** responsibility to ensure that processing is idempotent!
If you would like to remove completed entries, you have a couple of options. You can create your Qupid.Q instance with the `WithPruneAfter(d time.duration)` option.
```
// Tell Qupid to auto prune any entries that have been completed
// for longer than an hour.
var q = qupid.NewQueue(qupid.WithPruneAfter(time.Hour))
```
Or you can explicity remove an entry on an existing Q instance with
```
// By string ID
q.RemoveByID(ID string)
// By an *entry
q.Remove(e *entry)
```
Note that the remove calls are not fussy and will remove from the internal completed queue ***OR*** the queued queue if not yet completed/processed.
If you're not concerned about internal tracking at all and just want to utilise the Queue(), Consume() and Stop() accessors, then it's best to disable the internal tracking altogether with
```
// Tell Qupid to ignore internal tracking of entry ID's
var q = qupid.NewQueue(qupid.WithInternalTrackingDisabled())
```
### Entry
An Entry requires a string ID. This is often enough since *yo*u the caller knows the context of the ID, eg to a known datastore in your app. However depending on how you gained your ID, it may be more efficient to add the entity directly to the queue entry. You do so with
```
// Get you entity instance from a datastore.
entity := datastore.Find(ID)
// Maybe no point calling the datastore again for processing so add the entity.
entry := qupid.NewEntry(entity.ID, qupid.WithEntity(entity))
```
An Entity is assigned using the empty `interface{}` so be sure you use type assertion to confirm your entity when consuming.
```
ent, ok := e.Entity.(*Ent)
if !ok {
os.Fatal("WTF?")
}
```
You can also check if an Entries state with `e.IsCompleted()` and `e.IsProcessing()`.
### Result
Although a `result` can be completely ignored from your process, it can be a handy way to confirm your process. A result has the requirement of an `*entry` and can also be assigned a single `Error`.
```
// An entry comes off the queue.
e := <-q.Consume()
// Create a new result
res := qupid.NewResult(e)
// Now do some work with this queue entry.
err := DoSomeWork(e)
// If an error occurred, add the error
if err != nil {
res.Err = err
}
q.Complete(res)
// Somewhere else in the code base being
// subscribed to the Result channel.
res := <-q.Results():
if res.Err != nil {
// Code to handle, eg log, re-queue, flap arms?
}
// Proceed as normal.
```
\ No newline at end of file
package main
import (
"errors"
"fmt"
"time"
"gitlab.com/kylehqcom/qupid"
)
var q = qupid.NewQueue(qupid.WithPipelineBuffer(1000), qupid.WithPruneAfter(time.Minute))
func Do() {
t := time.NewTicker(time.Millisecond * 200)
go func() {
for {
select {
case <-t.C:
ent, err := ToProcess()
if err != nil {
Stop()
return
}
fmt.Println("Adding an entry to the queue with Name:", ent.Name)
q.Queue(qupid.NewEntry(ent.ID, qupid.WithEntity(ent)))
case e := <-q.Consume():
res := qupid.NewResult(e)
ent, ok := e.Entity.(*Ent)
if !ok {
res.Err = errors.New("WTF?")
}
fmt.Println("Consuming from the queue with Name:", ent.Name)
q.Complete(res)
case res := <-q.Results():
fmt.Println("Yah a result came through for ID:", res.Entry.ID)
case <-q.Interrupt():
t.Stop()
return
}
}
}()
}
type Ent struct {
ID string
Name string
}
func ToProcess() (*Ent, error) {
return &Ent{
ID: "foo",
Name: "bar",
}, nil
}
func Stop() {
fmt.Println("Calling stop")
q.Stop()
}
func main() {
defer Stop()
Do()
time.Sleep(time.Second * 2)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment