Verified Commit fa13d6b3 authored by Kamil Trzciński's avatar Kamil Trzciński 🔴 Committed by Steve Azzopardi

Make DigitalOcean driver RateLimit aware

Signed-off-by: Tomasz Maczukin's avatarTomasz Maczukin <[email protected]>
parent b170508b
......@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"net"
"os"
"path"
......@@ -193,20 +194,26 @@ func (d *Driver) PreCreateCheck() error {
}
client := d.getClient()
regions, _, err := client.Regions.List(context.TODO(), nil)
if err != nil {
return err
}
for _, region := range regions {
if region.Slug == d.Region {
return nil
for {
log.Debugf("DO_API: Region List")
regions, resp, err := client.Regions.List(context.TODO(), nil)
if d.limitRate(resp) {
continue
}
if err != nil {
return err
}
for _, region := range regions {
if region.Slug == d.Region {
return nil
}
}
}
return fmt.Errorf("digitalocean requires a valid region")
return fmt.Errorf("digitalocean requires a valid region")
}
}
func (d *Driver) Create() error {
func (d *Driver) Create() (err error) {
var userdata string
if d.UserDataFile != "" {
buf, err := ioutil.ReadFile(d.UserDataFile)
......@@ -225,6 +232,13 @@ func (d *Driver) Create() error {
d.SSHKeyID = key.ID
defer func() {
if err != nil {
log.Infof("Droplet creation failed on Digital Ocean, removing...")
d.Remove()
}
}()
log.Infof("Creating Digital Ocean droplet...")
client := d.getClient()
......@@ -243,19 +257,37 @@ func (d *Driver) Create() error {
Tags: d.getTags(),
}
newDroplet, _, err := client.Droplets.Create(context.TODO(), createRequest)
if err != nil {
return err
var newDroplet *godo.Droplet
var resp *godo.Response
for {
log.Debugf("DO_API: Create Droplet")
newDroplet, resp, err = client.Droplets.Create(context.TODO(), createRequest)
if d.limitRate(resp) {
continue
} else if err != nil {
return err
} else {
break
}
}
d.DropletID = newDroplet.ID
try := 0
// On average it taks at least 5 seconds for assigning IP address
time.Sleep(5 * time.Second)
log.Info("Waiting for IP address to be assigned to the Droplet...")
for {
newDroplet, _, err = client.Droplets.Get(context.TODO(), d.DropletID)
if err != nil {
log.Debugf("DO_API: Get Droplet")
newDroplet, resp, err = client.Droplets.Get(context.TODO(), d.DropletID)
if d.limitRate(resp) {
continue
} else if err != nil {
return err
}
for _, network := range newDroplet.Networks.V4 {
if network.Type == "public" {
d.IPAddress = network.IPAddress
......@@ -266,13 +298,21 @@ func (d *Driver) Create() error {
break
}
time.Sleep(1 * time.Second)
if try >= 60 {
return fmt.Errorf("too many tries (%d)", try)
}
try++
d.sleep(2, 8)
}
log.Debugf("Created droplet ID %d, IP address %s",
log.Infof("Created droplet ID %d, IP address %s",
newDroplet.ID,
d.IPAddress)
// On average it takes at least 30 seconds for machine creation
time.Sleep(30 * time.Second)
return nil
}
......@@ -280,9 +320,20 @@ func (d *Driver) createSSHKey() (*godo.Key, error) {
d.SSHKeyPath = d.GetSSHKeyPath()
if d.SSHKeyFingerprint != "" {
key, resp, err := d.getClient().Keys.GetByFingerprint(context.TODO(), d.SSHKeyFingerprint)
if err != nil && resp.StatusCode == 404 {
return nil, fmt.Errorf("Digital Ocean SSH key with fingerprint %s doesn't exist", d.SSHKeyFingerprint)
var key *godo.Key
var resp *godo.Response
var err error
for {
log.Debugf("DO_API: Keys GetByFingerprint")
key, resp, err = d.getClient().Keys.GetByFingerprint(context.TODO(), d.SSHKeyFingerprint)
if d.limitRate(resp) {
continue
} else if err != nil && resp.StatusCode == 404 {
return nil, fmt.Errorf("Digital Ocean SSH key with fingerprint %s doesn't exist", d.SSHKeyFingerprint)
} else {
break
}
}
if d.SSHKey == "" {
......@@ -310,12 +361,17 @@ func (d *Driver) createSSHKey() (*godo.Key, error) {
PublicKey: string(publicKey),
}
key, _, err := d.getClient().Keys.Create(context.TODO(), createRequest)
if err != nil {
return key, err
for {
log.Debugf("DO_API: Keys Create")
key, resp, err := d.getClient().Keys.Create(context.TODO(), createRequest)
if d.limitRate(resp) {
continue
} else if err != nil {
return key, err
} else {
return key, nil
}
}
return key, nil
}
func (d *Driver) GetURL() (string, error) {
......@@ -332,55 +388,98 @@ func (d *Driver) GetURL() (string, error) {
}
func (d *Driver) GetState() (state.State, error) {
droplet, _, err := d.getClient().Droplets.Get(context.TODO(), d.DropletID)
if err != nil {
return state.Error, err
}
switch droplet.Status {
case "new":
return state.Starting, nil
case "active":
return state.Running, nil
case "off":
return state.Stopped, nil
for {
log.Debugf("DO_API: Droplets Get")
droplet, resp, err := d.getClient().Droplets.Get(context.TODO(), d.DropletID)
if d.limitRate(resp, true) {
continue
} else if err != nil {
return state.Error, err
}
switch droplet.Status {
case "new":
return state.Starting, nil
case "active":
return state.Running, nil
case "off":
return state.Stopped, nil
}
return state.None, nil
}
return state.None, nil
}
func (d *Driver) Start() error {
_, _, err := d.getClient().DropletActions.PowerOn(context.TODO(), d.DropletID)
return err
log.Debugf("DO_API: DropletActions PowerOn")
for {
_, resp, err := d.getClient().DropletActions.PowerOn(context.TODO(), d.DropletID)
if d.limitRate(resp) {
continue
}
return err
}
}
func (d *Driver) Stop() error {
_, _, err := d.getClient().DropletActions.Shutdown(context.TODO(), d.DropletID)
return err
log.Debugf("DO_API: DropletActions Shutdown")
for {
_, resp, err := d.getClient().DropletActions.Shutdown(context.TODO(), d.DropletID)
if d.limitRate(resp) {
continue
}
return err
}
}
func (d *Driver) Restart() error {
_, _, err := d.getClient().DropletActions.Reboot(context.TODO(), d.DropletID)
return err
log.Debugf("DO_API: DropletActions Reboot")
for {
_, resp, err := d.getClient().DropletActions.Reboot(context.TODO(), d.DropletID)
if d.limitRate(resp) {
continue
}
return err
}
}
func (d *Driver) Kill() error {
_, _, err := d.getClient().DropletActions.PowerOff(context.TODO(), d.DropletID)
return err
log.Debugf("DO_API: DropletActions PowerOff")
for {
_, resp, err := d.getClient().DropletActions.PowerOff(context.TODO(), d.DropletID)
if d.limitRate(resp) {
continue
}
return err
}
}
func (d *Driver) Remove() error {
client := d.getClient()
if d.SSHKeyFingerprint == "" {
if resp, err := client.Keys.DeleteByID(context.TODO(), d.SSHKeyID); err != nil {
if resp.StatusCode == 404 {
for {
log.Debugf("DO_API: Keys DeleteByID")
resp, err := client.Keys.DeleteByID(context.TODO(), d.SSHKeyID)
if err == nil {
break
} else if d.limitRate(resp) {
continue
} else if resp != nil && resp.StatusCode == 404 {
log.Infof("Digital Ocean SSH key doesn't exist, assuming it is already deleted")
break
} else {
return err
}
}
}
if resp, err := client.Droplets.Delete(context.TODO(), d.DropletID); err != nil {
if resp.StatusCode == 404 {
for {
log.Debugf("DO_API: Droplets Delete")
resp, err := client.Droplets.Delete(context.TODO(), d.DropletID)
if err == nil {
break
} else if d.limitRate(resp) {
continue
} else if resp != nil && resp.StatusCode == 404 {
log.Infof("Digital Ocean droplet doesn't exist, assuming it is already deleted")
break
} else {
return err
}
......@@ -409,6 +508,47 @@ func (d *Driver) getTags() []string {
return tagList
}
func (d *Driver) limitRate(resp *godo.Response, isStatus ...bool) bool {
if d.isRateLimited(resp) {
var wait, deviation time.Duration
// Calculate minimum time to wait, based on Reset
if reset := resp.Rate.Reset.Sub(time.Now()); reset > 0 {
wait += reset
}
if len(isStatus) > 0 && isStatus[0] {
// For status check, use short intervals
if wait < time.Second {
wait = time.Second
}
deviation += 2 * time.Second
} else {
// For creations use long intervals
if wait < 5*time.Second {
wait = 5 * time.Second
}
deviation += 5 * time.Second
log.Infof("Rate limit detected (waiting at least %v)...", wait)
}
d.sleep(wait, deviation)
return true
}
return false
}
func (d *Driver) isRateLimited(resp *godo.Response) bool {
if resp != nil && resp.StatusCode == 429 {
return true
}
return false
}
func (d *Driver) sleep(base, n time.Duration) {
time.Sleep(base + time.Duration(rand.Int63n(int64(n))))
}
func (d *Driver) GetSSHKeyPath() string {
if d.SSHKey != "" {
d.SSHKeyPath = d.ResolveStorePath(path.Base(d.SSHKey))
......
......@@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"
"io"
math_rand "math/rand"
"os"
"runtime"
"strconv"
......@@ -78,7 +79,7 @@ func CopyFile(src, dst string) error {
return os.Chmod(dst, fi.Mode())
}
func WaitForSpecificOrError(f func() (bool, error), maxAttempts int, waitInterval time.Duration) error {
func WaitForSpecificOrError(f func() (bool, error), maxAttempts int, waitInterval time.Duration, devInterval ...time.Duration) error {
for i := 0; i < maxAttempts; i++ {
stop, err := f()
if err != nil {
......@@ -88,18 +89,21 @@ func WaitForSpecificOrError(f func() (bool, error), maxAttempts int, waitInterva
return nil
}
time.Sleep(waitInterval)
for _, deviation := range devInterval {
time.Sleep(time.Duration(math_rand.Int63n(int64(deviation))))
}
}
return fmt.Errorf("Maximum number of retries (%d) exceeded", maxAttempts)
}
func WaitForSpecific(f func() bool, maxAttempts int, waitInterval time.Duration) error {
func WaitForSpecific(f func() bool, maxAttempts int, waitInterval time.Duration, devInterval ...time.Duration) error {
return WaitForSpecificOrError(func() (bool, error) {
return f(), nil
}, maxAttempts, waitInterval)
}, maxAttempts, waitInterval, devInterval...)
}
func WaitFor(f func() bool) error {
return WaitForSpecific(f, 60, 3*time.Second)
return WaitForSpecific(f, 60, 3*time.Second, 9*time.Second)
}
// TruncateID returns a shorten id
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment