README.md 5.15 KB
Newer Older
1 2 3 4 5
# Qupid
___
[![Build Status](https://gitlab.com/kylehqcom/qupid/badges/master/build.svg)](https://gitlab.com/kylehqcom/qupid/commits/master)
[![Go Report Card](https://goreportcard.com/badge/gitlab.com/kylehqcom/qupid)](https://goreportcard.com/report/gitlab.com/kylehqcom/qupid)

Kyle Clarke's avatar
Kyle Clarke committed
6 7 8
Qupid is a Queue/Pipeline package written in [Golang](https://golang.org/) that gives helpful accessors to queue, consume, return and halt queue processing.

If you like this package or are using for your own needs, then let me know via [https://twitter.com/kylehqcom](https://twitter.com/kylehqcom)
9

10 11 12 13 14
```
// Install with the usual or add to you package manager of choice.
go get gitlab.com/kylehqcom/qupid
```

15 16
### A contrived example

17 18 19 20 21 22 23 24 25 26
In a normal workflow you might:

 1. setup a new Qupid Q instance `qupid.WithPipelineBuffer(50)`
 2. create a `NewEntry(ID) *entry` assigning an entity to the entry `WithEntity(ent)`
 3. queue up the entry`Queue(*entry)`
 4. process the entry on the `<-Consume()` channel
 5. send a `NewResult(*entry) result` to the `Complete(result)` method
 6. After processing for a couple seconds, lets interrupt the Q instance with `Stop()`

If you have checked out the code, `go run main.go` from the `/example` directory to see Qupid in action.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46

```
package main

import (
	"errors"
	"fmt"
	"time"

	"gitlab.com/kylehqcom/qupid"
)

var q = qupid.NewQueue(qupid.WithPipelineBuffer(50))

func Do() {
	t := time.NewTicker(time.Millisecond * 200)
	go func() {
		for {
			select {
			case <-t.C:
47
			    // Get items to process
48 49 50 51 52 53 54 55 56
				ent, err := ToProcess()
				if err != nil {
					Stop()
					return
				}
				fmt.Println("Adding an entry to the queue with Name:", ent.Name)
				q.Queue(qupid.NewEntry(ent.ID, qupid.WithEntity(ent)))

			case e := <-q.Consume():
57
			    // Only one entry will be consumed due to identical ids being passed.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
				res := qupid.NewResult(e)
				ent, ok := e.Entity.(*Ent)
				if !ok {
					res.Err = errors.New("WTF?")
				}
				fmt.Println("Consuming from the queue with Name:", ent.Name)
				q.Complete(res)

			case res := <-q.Results():
				fmt.Println("Yah a result came through for ID:", res.Entry.ID)

			case <-q.Interrupt():
				t.Stop()
				return
			}
		}
	}()
}

type Ent struct {
	ID   string
	Name string
}

func ToProcess() (*Ent, error) {
	return &Ent{
		ID:   "foo",
		Name: "bar",
	}, nil
}

func Stop() {
	fmt.Println("Calling stop")
	q.Stop()
}

func main() {
	defer Stop()
	Do()
	time.Sleep(time.Second * 2)
}

```

By default Qupid will track entries added/queued via the ID value given to each `*entry`. Note that entries that are already completed will be ignored if queued up again. It's **YOUR** responsibility to call `Complete()` with a `NewResult(e *entry)` to mark a queued entry as complete. It's also **YOUR** responsibility to ensure that processing is idempotent!

If you would like to remove completed entries, you have a couple of options. You can create your Qupid.Q instance with the `WithPruneAfter(d time.duration)` option.

```
// Tell Qupid to auto prune any entries that have been completed 
108
// that are older than an hour.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
var q = qupid.NewQueue(qupid.WithPruneAfter(time.Hour))
```

Or you can explicity remove an entry on an existing Q instance with

```
// By string ID
q.RemoveByID(ID string)

// By an *entry
q.Remove(e *entry)
```

Note that the remove calls are not fussy and will remove from the internal completed queue ***OR*** the queued queue if not yet completed/processed.

If you're not concerned about internal tracking at all and just want to utilise the Queue(), Consume() and Stop() accessors, then it's best to disable the internal tracking altogether with

```
// Tell Qupid to ignore internal tracking of entry ID's
var q = qupid.NewQueue(qupid.WithInternalTrackingDisabled())
```

### Entry

133
An Entry requires a string ID. This is often enough since *you* the caller knows the context of the ID, eg to a known datastore in your app. However depending on how you gained your ID, it may be more efficient to add the entity directly to the queue entry. You do so with
134 135 136 137 138

```
// Get you entity instance from a datastore.
entity := datastore.Find(ID)

139
// Perhaps there is no point calling the datastore again for processing so add the entity.
140 141 142 143 144 145 146 147 148
entry := qupid.NewEntry(entity.ID, qupid.WithEntity(entity))

```

An Entity is assigned using the empty `interface{}` so be sure you use type assertion to confirm your entity when consuming. 

```
ent, ok := e.Entity.(*Ent)
if !ok {
Kyle Clarke's avatar
Kyle Clarke committed
149
    log.Fatal("WTF?")
150 151 152
}
```

153
You can also check an Entries state with `e.IsCompleted()` and `e.IsProcessing()`.
154 155 156

### Result

157
Although a `result` can be completely ignored, it can be a handy way to confirm the output of your processing. A result has the requirement of an `*entry` and can also be assigned a single value of type `Error`.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180

```
// An entry comes off the queue.
e := <-q.Consume()

// Create a new result
res := qupid.NewResult(e)

// Now do some work with this queue entry.
err := DoSomeWork(e)

// If an error occurred, add the error
if err != nil {
   res.Err = err
}
q.Complete(res)

// Somewhere else in the code base being 
// subscribed to the Result channel.
res := <-q.Results():
if res.Err != nil {
   // Code to handle, eg log, re-queue, flap arms?
}
Kyle Clarke's avatar
Kyle Clarke committed
181

182 183
// Proceed as normal.

Kyle Clarke's avatar
Kyle Clarke committed
184
```