Refactor retry logic for artifacts/cache

`doRetry` was expecting a func signiture of
parent af343971
......@@ -20,18 +20,18 @@ type ArtifactsDownloaderCommand struct {
network common.Network
}
func (c *ArtifactsDownloaderCommand) download(file string) (bool, error) {
func (c *ArtifactsDownloaderCommand) download(file string) error {
switch c.network.DownloadArtifacts(c.JobCredentials, file) {
case common.DownloadSucceeded:
return false, nil
return nil
case common.DownloadNotFound:
return false, os.ErrNotExist
return os.ErrNotExist
case common.DownloadForbidden:
return false, os.ErrPermission
return os.ErrPermission
case common.DownloadFailed:
return true, os.ErrInvalid
return retryableErr{err: os.ErrInvalid}
default:
return false, os.ErrInvalid
return os.ErrInvalid
}
}
......@@ -54,7 +54,7 @@ func (c *ArtifactsDownloaderCommand) Execute(context *cli.Context) {
defer os.Remove(file.Name())
// Download artifacts file
err = c.doRetry(func() (bool, error) {
err = c.doRetry(func() error {
return c.download(file.Name())
})
if err != nil {
......
......@@ -64,15 +64,18 @@ func (c *ArtifactsUploaderCommand) createReadStream() (string, io.ReadCloser, er
case common.ArtifactFormatZip, common.ArtifactFormatDefault:
pr, pw := io.Pipe()
go c.generateZipArchive(pw)
return name + ".zip", pr, nil
case common.ArtifactFormatGzip:
pr, pw := io.Pipe()
go c.generateGzipStream(pw)
return name + ".gz", pr, nil
case common.ArtifactFormatRaw:
file, err := c.openRawStream()
return name, file, err
default:
......@@ -80,14 +83,15 @@ func (c *ArtifactsUploaderCommand) createReadStream() (string, io.ReadCloser, er
}
}
func (c *ArtifactsUploaderCommand) createAndUpload() (bool, error) {
func (c *ArtifactsUploaderCommand) createAndUpload() error {
artifactsName, stream, err := c.createReadStream()
if err != nil {
return false, err
return err
}
if stream == nil {
logrus.Errorln("No files to upload")
return false, nil
return nil
}
defer stream.Close()
......@@ -102,15 +106,15 @@ func (c *ArtifactsUploaderCommand) createAndUpload() (bool, error) {
// Upload the data
switch c.network.UploadRawArtifacts(c.JobCredentials, stream, options) {
case common.UploadSucceeded:
return false, nil
return nil
case common.UploadForbidden:
return false, os.ErrPermission
return os.ErrPermission
case common.UploadTooLarge:
return false, errors.New("Too large")
return errors.New("too large")
case common.UploadFailed:
return true, os.ErrInvalid
return retryableErr{err: os.ErrInvalid}
default:
return false, os.ErrInvalid
return os.ErrInvalid
}
}
......
......@@ -34,23 +34,23 @@ func (c *CacheArchiverCommand) getClient() *CacheClient {
return c.client
}
func (c *CacheArchiverCommand) upload() (bool, error) {
func (c *CacheArchiverCommand) upload() error {
logrus.Infoln("Uploading", filepath.Base(c.File), "to", url_helpers.CleanURL(c.URL))
file, err := os.Open(c.File)
if err != nil {
return false, err
return err
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
return false, err
return err
}
req, err := http.NewRequest("PUT", c.URL, file)
if err != nil {
return true, err
return retryableErr{err: err}
}
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Last-Modified", fi.ModTime().Format(http.TimeFormat))
......@@ -58,17 +58,21 @@ func (c *CacheArchiverCommand) upload() (bool, error) {
resp, err := c.getClient().Do(req)
if err != nil {
return true, err
return retryableErr{err: err}
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
// Retry on server errors
retry := resp.StatusCode/100 == 5
return retry, fmt.Errorf("Received: %s", resp.Status)
err = fmt.Errorf("received: %s", resp.Status)
if resp.StatusCode/100 == 5 {
err = retryableErr{err: err}
}
return err
}
return false, nil
return nil
}
func (c *CacheArchiverCommand) Execute(*cli.Context) {
......@@ -87,6 +91,7 @@ func (c *CacheArchiverCommand) Execute(*cli.Context) {
// Check if list of files changed
if !c.isFileChanged(c.File) {
logrus.Infoln("Archive is up to date!")
return
}
......
......@@ -41,32 +41,36 @@ func checkIfUpToDate(path string, resp *http.Response) (bool, time.Time) {
return fi != nil && !date.After(fi.ModTime()), date
}
func (c *CacheExtractorCommand) download() (bool, error) {
func (c *CacheExtractorCommand) download() error {
os.MkdirAll(filepath.Dir(c.File), 0700)
resp, err := c.getClient().Get(c.URL)
if err != nil {
return true, err
return retryableErr{err: err}
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return false, os.ErrNotExist
return os.ErrNotExist
} else if resp.StatusCode/100 != 2 {
// Retry on server errors
retry := resp.StatusCode/100 == 5
return retry, fmt.Errorf("Received: %s", resp.Status)
err = fmt.Errorf("received: %s", resp.Status)
if resp.StatusCode/100 == 5 {
err = retryableErr{err: err}
}
return err
}
upToDate, date := checkIfUpToDate(c.File, resp)
if upToDate {
logrus.Infoln(filepath.Base(c.File), "is up to date")
return false, nil
return nil
}
file, err := ioutil.TempFile(filepath.Dir(c.File), "cache")
if err != nil {
return false, err
return err
}
defer os.Remove(file.Name())
defer file.Close()
......@@ -74,20 +78,21 @@ func (c *CacheExtractorCommand) download() (bool, error) {
logrus.Infoln("Downloading", filepath.Base(c.File), "from", url_helpers.CleanURL(c.URL))
_, err = io.Copy(file, resp.Body)
if err != nil {
return true, err
return retryableErr{err: err}
}
os.Chtimes(file.Name(), time.Now(), date)
err = file.Close()
if err != nil {
return false, err
return err
}
err = os.Rename(file.Name(), c.File)
if err != nil {
return false, err
return err
}
return false, nil
return nil
}
func (c *CacheExtractorCommand) Execute(context *cli.Context) {
......
package helpers
import (
"github.com/sirupsen/logrus"
"time"
"github.com/sirupsen/logrus"
)
type retryHelper struct {
......@@ -10,13 +11,31 @@ type retryHelper struct {
RetryTime time.Duration `long:"retry-time" description:"How long to wait between retries"`
}
func (r *retryHelper) doRetry(handler func() (bool, error)) (err error) {
retry, err := handler()
for i := 0; retry && i < r.Retry; i++ {
// wait one second to retry
logrus.Warningln("Retrying...")
// retryableErr indicates that an error can be retried. To specify that an error
// can be retried simply wrap the original error. For example:
//
// retryableErr{err: errors.New("some error")}
type retryableErr struct {
err error
}
func (e retryableErr) Error() string {
return e.err.Error()
}
func (r *retryHelper) doRetry(handler func() error) error {
err := handler()
for i := 0; i < r.Retry; i++ {
if _, ok := err.(retryableErr); !ok {
return err
}
time.Sleep(r.RetryTime)
retry, err = handler()
logrus.WithError(err).Warningln("Retrying...")
err = handler()
}
return
return err
}
......@@ -2,8 +2,9 @@ package helpers
import (
"errors"
"github.com/stretchr/testify/assert"
"testing"
"github.com/stretchr/testify/assert"
)
func TestDoRetryError(t *testing.T) {
......@@ -12,9 +13,9 @@ func TestDoRetryError(t *testing.T) {
}
retryCount := 0
err := r.doRetry(func() (bool, error) {
err := r.doRetry(func() error {
retryCount++
return true, errors.New("error")
return retryableErr{err: errors.New("error")}
})
assert.Error(t, err)
assert.Equal(t, r.Retry+1, retryCount)
......@@ -26,9 +27,9 @@ func TestDoRetry(t *testing.T) {
}
retryCount := 0
err := r.doRetry(func() (bool, error) {
err := r.doRetry(func() error {
retryCount++
return false, nil
return nil
})
assert.NoError(t, err)
assert.Equal(t, 1, retryCount)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment