progress.go 2.49 KB
Newer Older
Mitar's avatar
Mitar committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package x

import (
	"context"
	"io"
	"sync/atomic"
	"time"
)

// CountingReader is an io.Reader proxy which counts the number of bytes
// it read and passed on.
type CountingReader struct {
	Reader io.Reader
	count  int64
}

// NewCountingReader returns a new CountingReader which reads
// from the reader and counts the bytes.
func NewCountingReader(reader io.Reader) *CountingReader {
	return &CountingReader{
		Reader: reader,
		count:  0,
	}
}

// Read implements io.Reader interface for CountingReader.
func (c *CountingReader) Read(p []byte) (int, error) {
	n, err := c.Reader.Read(p)
	atomic.AddInt64(&c.count, int64(n))
	return n, err
}

// Count implements counter interface for CountingReader.
//
// It returns the number of bytes read until now.
func (c *CountingReader) Count() int64 {
	return atomic.LoadInt64(&c.count)
}

type counter interface {
	Count() int64
}

// Progress describes current progress as reported by the counter.
type Progress struct {
	Count     int64
	Size      int64
	Started   time.Time
	Current   time.Time
	Elapsed   time.Duration
	remaining time.Duration
	estimated time.Time
}

func (p Progress) Percent() float64 {
	return float64(p.Count) / float64(p.Size) * 100.0 //nolint:gomnd
}

func (p Progress) Remaining() time.Duration {
	return p.remaining
}

func (p Progress) Estimated() time.Time {
	return p.estimated
}

type Ticker struct {
	C    <-chan Progress
	stop func()
}

// Stop stops the ticker and frees resources.
func (t *Ticker) Stop() {
	t.stop()
}

// NewTicker creates a new Ticker which at regular interval reports the
// progress as reported by the counter c.
func NewTicker(ctx context.Context, c counter, size int64, interval time.Duration) *Ticker {
	ctx, cancel := context.WithCancel(ctx)
	started := time.Now()
	output := make(chan Progress)
	ticker := time.NewTicker(interval)
	go func() {
		defer cancel()
		defer close(output)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case now := <-ticker.C:
				count := c.Count()
				elapsed := now.Sub(started)
				ratio := float64(count) / float64(size)
				total := time.Duration(float64(elapsed) / ratio)
				estimated := started.Add(total)
				progress := Progress{
					Count:     count,
					Size:      size,
					Started:   started,
					Current:   now,
					Elapsed:   elapsed,
					remaining: estimated.Sub(now),
					estimated: estimated,
				}
107
108
				select {
				case <-ctx.Done():
Mitar's avatar
Mitar committed
109
					return
110
				case output <- progress:
Mitar's avatar
Mitar committed
111
112
113
114
115
116
117
118
119
				}
			}
		}
	}()
	return &Ticker{
		C:    output,
		stop: cancel,
	}
}