Skip to content
Snippets Groups Projects
Commit b4c2abe9 authored by Stan Hu's avatar Stan Hu
Browse files

Add Azure Blob storage cache adapter and documentation

This commit makes it possible to use Azure Blob Storage as a runner
cache. Currently customers wanting to use Azure Blob Storage have to set
up a Minio Gateway, which isn't ideal because it requires customers to
maintain their own proxy server for Azure. We have a number of customers
who want native support for Azure Blob Storage.

Azure Blob storage will be fully supported for storing LFS, CI
artifacts, uploads, packages, etc. in 13.4
(gitlab#25877).
parent 53ff2e10
No related branches found
No related tags found
Loading
Pipeline #185566695 failed
Showing
with 1741 additions and 21 deletions
package azure
import (
"fmt"
"net/http"
"net/url"
"time"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitlab-runner/cache"
"gitlab.com/gitlab-org/gitlab-runner/common"
)
type signedURLGenerator func(name string, options *signedURLOptions) (*url.URL, error)
type azureAdapter struct {
timeout time.Duration
config *common.CacheAzureConfig
objectName string
generateSignedURL signedURLGenerator
credentialsResolver credentialsResolver
}
func (a *azureAdapter) GetDownloadURL() *url.URL {
return a.presignURL(http.MethodGet, "")
}
func (a *azureAdapter) GetUploadURL() *url.URL {
return a.presignURL(http.MethodPut, "")
}
func (a *azureAdapter) GetUploadHeaders() http.Header {
httpHeaders := http.Header{}
httpHeaders.Set("Content-Type", "application/octet-stream")
httpHeaders.Set("x-ms-blob-type", "BlockBlob")
return httpHeaders
}
func (a *azureAdapter) presignURL(method string, _ string) *url.URL {
if a.config.ContainerName == "" {
logrus.Error("ContainerName can't be empty")
return nil
}
err := a.credentialsResolver.Resolve()
if err != nil {
logrus.Errorf("error while resolving Azure credentials: %v", err)
return nil
}
credentials := a.credentialsResolver.Credentials()
url, err := a.generateSignedURL(a.objectName, &signedURLOptions{
ContainerName: a.config.ContainerName,
StorageDomain: a.config.StorageDomain,
Credentials: credentials,
Method: method,
Timeout: a.timeout,
})
if err != nil {
logrus.Errorf("error generating Azure pre-signed URL: %v", err)
return nil
}
return url
}
func New(config *common.CacheConfig, timeout time.Duration, objectName string) (cache.Adapter, error) {
azure := config.Azure
if azure == nil {
return nil, fmt.Errorf("missing Azure configuration")
}
cr, err := credentialsResolverInitializer(azure)
if err != nil {
return nil, fmt.Errorf("error while initializing Azure credentials resolver: %w", err)
}
a := &azureAdapter{
config: azure,
timeout: timeout,
objectName: objectName,
credentialsResolver: cr,
generateSignedURL: PresignedURL,
}
return a, nil
}
func init() {
err := cache.Factories().Register("azure", New)
if err != nil {
panic(err)
}
}
package azure
import (
"encoding/base64"
"errors"
"fmt"
"net/http"
"net/url"
"testing"
"time"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-runner/common"
)
var (
accountName = "azuretest"
accountKey = base64.StdEncoding.EncodeToString([]byte("12345"))
containerName = "test"
objectName = "key"
defaultTimeout = 1 * time.Hour
)
func defaultAzureCache() *common.CacheConfig {
return &common.CacheConfig{
Type: "azure",
Azure: &common.CacheAzureConfig{
ContainerName: containerName,
},
}
}
type adapterOperationInvalidConfigTestCase struct {
provideAzureConfig bool
errorOnCredentialsResolverInitialization bool
credentialsResolverResolveError bool
accountName string
accountKey string
containerName string
expectedErrorMsg string
}
func prepareMockedCredentialsResolverInitializer(tc adapterOperationInvalidConfigTestCase) func() {
oldCredentialsResolverInitializer := credentialsResolverInitializer
credentialsResolverInitializer = func(config *common.CacheAzureConfig) (*defaultCredentialsResolver, error) {
if tc.errorOnCredentialsResolverInitialization {
return nil, errors.New("test error")
}
return newDefaultCredentialsResolver(config)
}
return func() {
credentialsResolverInitializer = oldCredentialsResolverInitializer
}
}
func prepareMockedCredentialsResolverForInvalidConfig(adapter *azureAdapter, tc adapterOperationInvalidConfigTestCase) {
cr := &mockCredentialsResolver{}
resolveCall := cr.On("Resolve")
if tc.credentialsResolverResolveError {
resolveCall.Return(fmt.Errorf("test error"))
} else {
resolveCall.Return(nil)
}
cr.On("Credentials").Return(&common.CacheAzureCredentials{
AccountName: tc.accountName,
AccountKey: tc.accountKey,
})
adapter.credentialsResolver = cr
}
func testAdapterOperationWithInvalidConfig(
t *testing.T,
name string,
tc adapterOperationInvalidConfigTestCase,
adapter *azureAdapter,
operation func() *url.URL,
) {
t.Run(name, func(t *testing.T) {
prepareMockedCredentialsResolverForInvalidConfig(adapter, tc)
hook := test.NewGlobal()
u := operation()
assert.Nil(t, u)
message, err := hook.LastEntry().String()
require.NoError(t, err)
assert.Contains(t, message, tc.expectedErrorMsg)
})
}
func TestAdapterOperation_InvalidConfig(t *testing.T) {
tests := map[string]adapterOperationInvalidConfigTestCase{
"no-azure-config": {
containerName: containerName,
expectedErrorMsg: "Missing Azure configuration",
},
"error-on-credentials-resolver-initialization": {
provideAzureConfig: true,
errorOnCredentialsResolverInitialization: true,
},
"credentials-resolver-resolve-error": {
provideAzureConfig: true,
credentialsResolverResolveError: true,
containerName: containerName,
expectedErrorMsg: "error while resolving Azure credentials: test error",
},
"no-credentials": {
provideAzureConfig: true,
containerName: containerName,
expectedErrorMsg: "error generating Azure pre-signed URL: missing Azure storage account name",
},
"no-account-name": {
provideAzureConfig: true,
accountKey: accountKey,
containerName: containerName,
expectedErrorMsg: "error generating Azure pre-signed URL: missing Azure storage account name",
},
"no-account-key": {
provideAzureConfig: true,
accountName: accountName,
containerName: containerName,
expectedErrorMsg: "error generating Azure pre-signed URL: missing Azure storage account key",
},
"container-not-specified": {
provideAzureConfig: true,
accountName: "access-id",
accountKey: accountKey,
expectedErrorMsg: "ContainerName can't be empty",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
cleanupCredentialsResolverInitializerMock := prepareMockedCredentialsResolverInitializer(tc)
defer cleanupCredentialsResolverInitializerMock()
config := defaultAzureCache()
if !tc.provideAzureConfig {
config.Azure = nil
} else {
config.Azure.ContainerName = tc.containerName
}
a, err := New(config, defaultTimeout, objectName)
if !tc.provideAzureConfig {
assert.Nil(t, a)
assert.EqualError(t, err, "missing Azure configuration")
return
}
if tc.errorOnCredentialsResolverInitialization {
assert.Nil(t, a)
assert.EqualError(t, err, "error while initializing Azure credentials resolver: test error")
return
}
require.NotNil(t, a)
assert.NoError(t, err)
adapter, ok := a.(*azureAdapter)
require.True(t, ok, "Adapter should be properly casted to *adapter type")
testAdapterOperationWithInvalidConfig(t, "GetDownloadURL", tc, adapter, a.GetDownloadURL)
testAdapterOperationWithInvalidConfig(t, "GetUploadURL", tc, adapter, a.GetUploadURL)
})
}
}
type adapterOperationTestCase struct {
returnedURL string
returnedError error
expectedError string
}
func prepareMockedCredentialsResolver(adapter *azureAdapter) func(t *testing.T) {
cr := &mockCredentialsResolver{}
cr.On("Resolve").Return(nil)
cr.On("Credentials").Return(&common.CacheAzureCredentials{
AccountName: accountName,
AccountKey: accountKey,
})
adapter.credentialsResolver = cr
return func(t *testing.T) {
cr.AssertExpectations(t)
}
}
func prepareMockedSignedURLGenerator(
t *testing.T,
tc adapterOperationTestCase,
expectedMethod string,
adapter *azureAdapter,
) {
adapter.generateSignedURL = func(name string, opts *signedURLOptions) (*url.URL, error) {
assert.Equal(t, containerName, opts.ContainerName)
assert.Equal(t, accountName, opts.Credentials.AccountName)
assert.Equal(t, accountKey, opts.Credentials.AccountKey)
assert.Equal(t, expectedMethod, opts.Method)
url, err := url.Parse(tc.returnedURL)
if err != nil {
return nil, err
}
return url, tc.returnedError
}
}
func testAdapterOperation(
t *testing.T,
tc adapterOperationTestCase,
name string,
expectedMethod string,
adapter *azureAdapter,
operation func() *url.URL,
) {
t.Run(name, func(t *testing.T) {
cleanupCredentialsResolverMock := prepareMockedCredentialsResolver(adapter)
defer cleanupCredentialsResolverMock(t)
prepareMockedSignedURLGenerator(t, tc, expectedMethod, adapter)
hook := test.NewGlobal()
u := operation()
if tc.expectedError != "" {
message, err := hook.LastEntry().String()
require.NoError(t, err)
assert.Contains(t, message, tc.expectedError)
return
}
assert.Empty(t, hook.AllEntries())
assert.Equal(t, tc.returnedURL, u.String())
})
}
func TestAdapterOperation(t *testing.T) {
tests := map[string]adapterOperationTestCase{
"error-on-URL-signing": {
returnedURL: "",
returnedError: fmt.Errorf("test error"),
expectedError: "error generating Azure pre-signed URL: test error",
},
"invalid-URL-returned": {
returnedURL: "://test",
returnedError: nil,
expectedError: "error generating Azure pre-signed URL: parse",
},
"valid-configuration": {
returnedURL: "https://myaccount.blob.core.windows.net/mycontainer/mydirectory/myfile.txt?sig=XYZ&sp=r",
returnedError: nil,
expectedError: "",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
config := defaultAzureCache()
a, err := New(config, defaultTimeout, objectName)
require.NoError(t, err)
adapter, ok := a.(*azureAdapter)
require.True(t, ok, "Adapter should be properly casted to *adapter type")
testAdapterOperation(
t,
tc,
"GetDownloadURL",
http.MethodGet,
adapter,
a.GetDownloadURL,
)
testAdapterOperation(
t,
tc,
"GetUploadURL",
http.MethodPut,
adapter,
a.GetUploadURL,
)
headers := adapter.GetUploadHeaders()
require.NotNil(t, headers)
assert.Len(t, headers, 2)
assert.Equal(t, "application/octet-stream", headers.Get("Content-Type"))
assert.Equal(t, "BlockBlob", headers.Get("x-ms-blob-type"))
})
}
}
package azure
import (
"fmt"
"net/http"
"net/url"
"time"
"github.com/Azure/azure-storage-blob-go/azblob"
"gitlab.com/gitlab-org/gitlab-runner/common"
)
const DefaultAzureServer = "blob.core.windows.net"
type signedURLOptions struct {
ContainerName string
StorageDomain string
Credentials *common.CacheAzureCredentials
Method string
Timeout time.Duration
}
func PresignedURL(name string, o *signedURLOptions) (*url.URL, error) {
if o.Credentials.AccountName == "" {
return nil, fmt.Errorf("missing Azure storage account name")
}
if o.Credentials.AccountKey == "" {
return nil, fmt.Errorf("missing Azure storage account key")
}
credential, err := azblob.NewSharedKeyCredential(o.Credentials.AccountName, o.Credentials.AccountKey)
if err != nil {
return nil, fmt.Errorf("creating Azure signature: %w", err)
}
var permissions azblob.AccountSASPermissions
if o.Method == http.MethodPut {
permissions = azblob.AccountSASPermissions{Write: true}
} else {
permissions = azblob.AccountSASPermissions{Read: true}
}
// Set the desired SAS signature values and sign them with the
// shared key credentials to get the SAS query parameters.
// See https://docs.microsoft.com/en-us/rest/api/storageservices/create-account-sas
sas, err := azblob.AccountSASSignatureValues{
Protocol: azblob.SASProtocolHTTPS, // Users MUST use HTTPS (not HTTP)
StartTime: time.Now().Add(-1 * time.Hour).UTC(),
ExpiryTime: time.Now().Add(o.Timeout).UTC(),
Permissions: permissions.String(),
Services: azblob.AccountSASServices{Blob: true}.String(),
ResourceTypes: azblob.AccountSASResourceTypes{Object: true}.String(),
}.NewSASQueryParameters(credential)
if err != nil {
return nil, fmt.Errorf("creating Azure signature: %w", err)
}
domain := DefaultAzureServer
if o.StorageDomain != "" {
domain = o.StorageDomain
}
parts := azblob.BlobURLParts{
Scheme: "https",
Host: fmt.Sprintf("%s.%s", o.Credentials.AccountName, domain),
ContainerName: o.ContainerName,
BlobName: name,
SAS: sas,
}
url := parts.URL()
return &url, nil
}
package azure
import (
"fmt"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-runner/common"
)
type azureURLGenerationTest struct {
accountName string
accountKey string
storageDomain string
method string
errorOnGeneration bool
}
func TestAzureClientURLGeneration(t *testing.T) {
tests := map[string]azureURLGenerationTest{
"missing account name": {
accountKey: accountKey,
method: http.MethodGet,
errorOnGeneration: true,
},
"missing account key": {
accountName: accountName,
method: http.MethodGet,
errorOnGeneration: true,
},
"GET request": {
accountName: accountName,
accountKey: accountKey,
method: http.MethodGet,
},
"GET request in custom storage domain": {
accountName: accountName,
accountKey: accountKey,
storageDomain: "blob.core.chinacloudapi.cn",
method: http.MethodGet,
},
"PUT request": {
accountName: accountName,
accountKey: accountKey,
method: http.MethodPut,
},
}
for testName, testCase := range tests {
t.Run(testName, func(t *testing.T) {
opts := &signedURLOptions{
ContainerName: containerName,
StorageDomain: testCase.storageDomain,
Credentials: &common.CacheAzureCredentials{
AccountName: testCase.accountName,
AccountKey: testCase.accountKey,
},
Method: testCase.method,
Timeout: 1 * time.Hour,
}
url, err := PresignedURL(objectName, opts)
if testCase.errorOnGeneration {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, "https", url.Scheme)
domain := DefaultAzureServer
if testCase.storageDomain != "" {
domain = testCase.storageDomain
}
assert.Equal(t, fmt.Sprintf("%s.%s", testCase.accountName, domain), url.Host)
assert.Equal(t, fmt.Sprintf("/%s/%s", containerName, objectName), url.Path)
require.NotNil(t, url)
q := url.Query()
// Sanity check query parameters from
// https://docs.microsoft.com/en-us/rest/api/storageservices/create-account-sas
assert.NotNil(t, q["sv"]) // SignedVersion
assert.Equal(t, []string{"b"}, q["ss"]) // SignedServices (blob)
assert.Equal(t, []string{"o"}, q["srt"]) // SignedResourcesType (object)
assert.NotNil(t, q["st"]) // SignedStart
assert.NotNil(t, q["se"]) // SignedExpiry
assert.NotNil(t, q["sig"]) // Signature
assert.Equal(t, []string{"https"}, q["spr"]) // SignedProtocol
// SignedPermission
if testCase.method == http.MethodGet {
assert.Equal(t, []string{"r"}, q["sp"])
} else {
assert.Equal(t, []string{"w"}, q["sp"])
}
})
}
}
package azure
import (
"fmt"
"gitlab.com/gitlab-org/gitlab-runner/common"
)
type credentialsResolver interface {
Credentials() *common.CacheAzureCredentials
Resolve() error
}
type defaultCredentialsResolver struct {
config *common.CacheAzureConfig
credentials *common.CacheAzureCredentials
}
func (cr *defaultCredentialsResolver) Credentials() *common.CacheAzureCredentials {
return cr.credentials
}
func (cr *defaultCredentialsResolver) Resolve() error {
return cr.readCredentialsFromConfig()
}
func (cr *defaultCredentialsResolver) readCredentialsFromConfig() error {
if cr.config.AccountName == "" || cr.config.AccountKey == "" {
return fmt.Errorf("config for Azure present, but credentials are not configured")
}
cr.credentials.AccountName = cr.config.AccountName
cr.credentials.AccountKey = cr.config.AccountKey
return nil
}
func newDefaultCredentialsResolver(config *common.CacheAzureConfig) (*defaultCredentialsResolver, error) {
if config == nil {
return nil, fmt.Errorf("config can't be nil")
}
credentials := &defaultCredentialsResolver{
config: config,
credentials: &common.CacheAzureCredentials{},
}
return credentials, nil
}
var credentialsResolverInitializer = newDefaultCredentialsResolver
// Code generated by mockery v1.1.0. DO NOT EDIT.
package azure
import (
mock "github.com/stretchr/testify/mock"
common "gitlab.com/gitlab-org/gitlab-runner/common"
)
// mockCredentialsResolver is an autogenerated mock type for the credentialsResolver type
type mockCredentialsResolver struct {
mock.Mock
}
// Credentials provides a mock function with given fields:
func (_m *mockCredentialsResolver) Credentials() *common.CacheAzureCredentials {
ret := _m.Called()
var r0 *common.CacheAzureCredentials
if rf, ok := ret.Get(0).(func() *common.CacheAzureCredentials); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*common.CacheAzureCredentials)
}
}
return r0
}
// Resolve provides a mock function with given fields:
func (_m *mockCredentialsResolver) Resolve() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
......@@ -373,14 +373,28 @@ type CacheS3Config struct {
Insecure bool `toml:"Insecure,omitempty" long:"insecure" env:"CACHE_S3_INSECURE" description:"Use insecure mode (without https)"`
}
//nolint:lll
type CacheAzureCredentials struct {
AccountName string `toml:"AccountName,omitempty" long:"account-name" env:"CACHE_AZURE_ACCOUNT_NAME" description:"Account name for Azure Blob Storage"`
AccountKey string `toml:"AccountKey,omitempty" long:"account-key" env:"CACHE_AZURE_ACCOUNT_KEY" description:"Access key for Azure Blob Storage"`
}
//nolint:lll
type CacheAzureConfig struct {
CacheAzureCredentials
ContainerName string `toml:"ContainerName,omitempty" long:"container-name" env:"CACHE_AZURE_CONTAINER_NAME" description:"Name of the Azure container where cache will be stored"`
StorageDomain string `toml:"StorageDomain,omitempty" long:"storage-domain" env:"CACHE_AZURE_STORAGE_DOMAIN" description:"Domain name of the Azure storage (e.g. blob.core.windows.net)"`
}
//nolint:lll
type CacheConfig struct {
Type string `toml:"Type,omitempty" long:"type" env:"CACHE_TYPE" description:"Select caching method"`
Path string `toml:"Path,omitempty" long:"path" env:"CACHE_PATH" description:"Name of the path to prepend to the cache URL"`
Shared bool `toml:"Shared,omitempty" long:"shared" env:"CACHE_SHARED" description:"Enable cache sharing between runners."`
S3 *CacheS3Config `toml:"s3,omitempty" json:"s3" namespace:"s3"`
GCS *CacheGCSConfig `toml:"gcs,omitempty" json:"gcs" namespace:"gcs"`
S3 *CacheS3Config `toml:"s3,omitempty" json:"s3" namespace:"s3"`
GCS *CacheGCSConfig `toml:"gcs,omitempty" json:"gcs" namespace:"gcs"`
Azure *CacheAzureConfig `toml:"azure,omitempty" json:"azure" namespace:"azure"`
}
//nolint:lll
......
......@@ -658,21 +658,25 @@ use the `CredentialsFile`, the file needs to be present on GitLab Runner's machi
Below is a table containing a summary of `config.toml`, cli options and ENV variables for `register`:
| Setting | TOML field | CLI option for `register` | ENV for `register` | Before 12.0.0 TOML field | Before 12.0.0 CLI option | Before 12.0.0 ENV |
|---------------------|------------------------------------------|--------------------------------|-----------------------------------|-------------------------------------|--------------------------|---------------------------|
| Type | `[runners.cache] -> Type` | `--cache-type` | `$CACHE_TYPE` | | | |
| Path | `[runners.cache] -> Path` | `--cache-path` | `$CACHE_PATH` | | `--cache-s3-cache-path` | `$S3_CACHE_PATH` |
| Shared | `[runners.cache] -> Shared` | `--cache-shared` | `$CACHE_SHARED` | | `--cache-cache-shared` | |
| S3.ServerAddress | `[runners.cache.s3] -> ServerAddress` | `--cache-s3-server-address` | `$CACHE_S3_SERVER_ADDRESS` | `[runners.cache] -> ServerAddress` | | `$S3_SERVER_ADDRESS` |
| S3.AccessKey | `[runners.cache.s3] -> AccessKey` | `--cache-s3-access-key` | `$CACHE_S3_ACCESS_KEY` | `[runners.cache] -> AccessKey` | | `$S3_ACCESS_KEY` |
| S3.SecretKey | `[runners.cache.s3] -> SecretKey` | `--cache-s3-secret-key` | `$CACHE_S3_SECRET_KEY` | `[runners.cache] -> SecretKey` | | `$S3_SECRET_KEY` |
| S3.BucketName | `[runners.cache.s3] -> BucketName` | `--cache-s3-bucket-name` | `$CACHE_S3_BUCKET_NAME` | `[runners.cache] -> BucketName` | | `$S3_BUCKET_NAME` |
| S3.BucketLocation | `[runners.cache.s3] -> BucketLocation` | `--cache-s3-bucket-location` | `$CACHE_S3_BUCKET_LOCATION` | `[runners.cache] -> BucketLocation` | | `$S3_BUCKET_LOCATION` |
| S3.Insecure | `[runners.cache.s3] -> Insecure` | `--cache-s3-insecure` | `$CACHE_S3_INSECURE` | `[runners.cache] -> Insecure` | | `$S3_INSECURE` |
| GCS.AccessID | `[runners.cache.gcs] -> AccessID` | `--cache-gcs-access-id` | `$CACHE_GCS_ACCESS_ID` | | | |
| GCS.PrivateKey | `[runners.cache.gcs] -> PrivateKey` | `--cache-gcs-private-key` | `$CACHE_GCS_PRIVATE_KEY` | | | |
| GCS.CredentialsFile | `[runners.cache.gcs] -> CredentialsFile` | `--cache-gcs-credentials-file` | `$GOOGLE_APPLICATION_CREDENTIALS` | | | |
| GCS.BucketName | `[runners.cache.gcs] -> BucketName` | `--cache-gcs-bucket-name` | `$CACHE_GCS_BUCKET_NAME` | | | |
| Setting | TOML field | CLI option for `register` | ENV for `register` | Before 12.0.0 TOML field | Before 12.0.0 CLI option | Before 12.0.0 ENV |
|-----------------------|------------------------------------------|--------------------------------|-----------------------------------|-------------------------------------|--------------------------|---------------------------|
| `Type` | `[runners.cache] -> Type` | `--cache-type` | `$CACHE_TYPE` | | | |
| `Path` | `[runners.cache] -> Path` | `--cache-path` | `$CACHE_PATH` | | `--cache-s3-cache-path` | `$S3_CACHE_PATH` |
| `Shared` | `[runners.cache] -> Shared` | `--cache-shared` | `$CACHE_SHARED` | | `--cache-cache-shared` | |
| `S3.ServerAddress` | `[runners.cache.s3] -> ServerAddress` | `--cache-s3-server-address` | `$CACHE_S3_SERVER_ADDRESS` | `[runners.cache] -> ServerAddress` | | `$S3_SERVER_ADDRESS` |
| `S3.AccessKey` | `[runners.cache.s3] -> AccessKey` | `--cache-s3-access-key` | `$CACHE_S3_ACCESS_KEY` | `[runners.cache] -> AccessKey` | | `$S3_ACCESS_KEY` |
| `S3.SecretKey` | `[runners.cache.s3] -> SecretKey` | `--cache-s3-secret-key` | `$CACHE_S3_SECRET_KEY` | `[runners.cache] -> SecretKey` | | `$S3_SECRET_KEY` |
| `S3.BucketName` | `[runners.cache.s3] -> BucketName` | `--cache-s3-bucket-name` | `$CACHE_S3_BUCKET_NAME` | `[runners.cache] -> BucketName` | | `$S3_BUCKET_NAME` |
| `S3.BucketLocation` | `[runners.cache.s3] -> BucketLocation` | `--cache-s3-bucket-location` | `$CACHE_S3_BUCKET_LOCATION` | `[runners.cache] -> BucketLocation` | | `$S3_BUCKET_LOCATION` |
| `S3.Insecure` | `[runners.cache.s3] -> Insecure` | `--cache-s3-insecure` | `$CACHE_S3_INSECURE` | `[runners.cache] -> Insecure` | | `$S3_INSECURE` |
| `GCS.AccessID` | `[runners.cache.gcs] -> AccessID` | `--cache-gcs-access-id` | `$CACHE_GCS_ACCESS_ID` | | | |
| `GCS.PrivateKey` | `[runners.cache.gcs] -> PrivateKey` | `--cache-gcs-private-key` | `$CACHE_GCS_PRIVATE_KEY` | | | |
| `GCS.CredentialsFile` | `[runners.cache.gcs] -> CredentialsFile` | `--cache-gcs-credentials-file` | `$GOOGLE_APPLICATION_CREDENTIALS` | | | |
| `GCS.BucketName` | `[runners.cache.gcs] -> BucketName` | `--cache-gcs-bucket-name` | `$CACHE_GCS_BUCKET_NAME` | | | |
| `Azure.AccountName` | `[runners.cache.azure] -> AccountName` | `--cache-azure-account-name` | `$CACHE_AZURE_ACCOUNT_NAME` | | | |
| `Azure.AccountKey` | `[runners.cache.azure] -> AccountKey` | `--cache-azure-account-key` | `$CACHE_AZURE_ACCOUNT_KEY` | | | |
| `Azure.ContainerName` | `[runners.cache.azure] -> ContainerName` | `--cache-azure-container-name` | `$CACHE_AZURE_CONTAINER_NAME` | | | |
| `Azure.StorageDomain` | `[runners.cache.azure] -> StorageDomain` | `--cache-azure-storage-domain` | `$CACHE_AZURE_STORAGE_DOMAIN` | | | |
### The `[runners.cache.s3]` section
......@@ -759,6 +763,36 @@ Examples:
BucketName = "runners-cache"
```
### The `[runners.cache.azure]` section
> Introduced in GitLab Runner 13.4.0.
Configure native support for Azure Blob Storage. Read the
[Azure Blob Storage documentation](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction)
to learn more. Note that while S3 and GCS use the word `bucket` for a collection of objects, Azure uses the word
`container` to denote a collection of blobs.
| Parameter | Type | Description |
|-------------------|------------------|-------------|
| `AccountName` | string | Name of the Azure Blob Storage account used to access the storage. |
| `AccountKey` | string | Storage account access key used to access the container. |
| `ContainerName` | string | Name of the storage container in which to save cache data. |
| `StorageDomain` | string | Domain name [used to service Azure storage endpoints](https://docs.microsoft.com/en-us/azure/china/resources-developer-guide#check-endpoints-in-azure) (optional). Default is `blob.core.windows.net`. |
Example:
```toml
[runners.cache]
Type = "azure"
Path = "path/to/prefix"
Shared = false
[runners.cache.azure]
AccountName = "<AZURE STORAGE ACCOUNT NAME>"
AccountKey = "<AZURE STORAGE ACCOUNT KEY>"
ContainerName = "runners-cache"
StorageDomain = "blob.core.windows.net"
```
## The `[runners.kubernetes]` section
> Introduced in GitLab Runner v1.6.0.
......
......@@ -5,9 +5,9 @@ go 1.13
require (
cloud.google.com/go v0.49.0 // indirect
cloud.google.com/go/storage v1.0.0
github.com/Azure/azure-storage-blob-go v0.10.0
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/Azure/go-autorest/autorest v0.9.2 // indirect
github.com/Azure/go-autorest/autorest/adal v0.8.0 // indirect
github.com/BurntSushi/toml v0.3.1
github.com/Microsoft/go-winio v0.4.12 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
......@@ -51,7 +51,7 @@ require (
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/opencontainers/runc v1.0.0-rc6.0.20190115182101-c1e454b2a1bf // indirect
github.com/pkg/errors v0.8.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/prometheus/common v0.6.0
......
......@@ -16,14 +16,18 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
cloud.google.com/go/storage v1.0.0 h1:VV2nUM3wwLLGh9lSABFgZMjInyUbJeaRSE64WuAIQ+4=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-pipeline-go v0.2.2 h1:6oiIS9yaG6XCCzhgAgKFfIWyo4LLCiDhZot6ltoThhY=
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=
github.com/Azure/azure-storage-blob-go v0.10.0/go.mod h1:ep1edmW+kNQx4UfWM9heESNmQdijykocJ0YOxmMX8SE=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest v0.9.2 h1:6AWuh3uWrsZJcNoCHrCF/+g4aKPCU39kaMO6/qrnK/4=
github.com/Azure/go-autorest/autorest v0.9.2/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0=
github.com/Azure/go-autorest/autorest/adal v0.8.0 h1:CxTzQrySOxDnKpLjFJeZAS5Qrv/qFPkgLjx5bOAi//I=
github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S6HY6e6wcHn37qQMBQlvY3lc=
github.com/Azure/go-autorest/autorest/adal v0.8.3 h1:O1AGG9Xig71FxdX9HO5pGNyZ7TbSyHaVg+5eJO/jSGw=
github.com/Azure/go-autorest/autorest/adal v0.8.3/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q=
github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA=
github.com/Azure/go-autorest/autorest/date v0.2.0 h1:yW+Zlqf26583pE43KhfnhFcdmSWlm5Ew6bxipnr/tbM=
github.com/Azure/go-autorest/autorest/date v0.2.0/go.mod h1:vcORJHLJEh643/Ioh9+vPmf1Ij9AEBM5FuBIXLmIy0g=
......@@ -131,6 +135,8 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
......@@ -219,6 +225,8 @@ github.com/markelog/trie v0.0.0-20171230083431-098fa99650c0 h1:Ru7cWvwrqy58mwlw2
github.com/markelog/trie v0.0.0-20171230083431-098fa99650c0/go.mod h1:bwqF/XEduuRDC/RtXIx5FDeE8K6ruQWqCb2B4ol+LH8=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d h1:oNAwILwmgWKFpuU+dXvI6dl9jG2mAWAZLX3r9s0PPiw=
github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/minio/minio-go/v6 v6.0.49 h1:bU4kIa/qChTLC1jrWZ8F+8gOiw1MClubddAJVR4gW3w=
......@@ -263,6 +271,8 @@ github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
......@@ -330,6 +340,7 @@ golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d h1:1ZiEyfaQIg3Qh0EoqpwAakHVhecoE5wlSg5GjnafJGw=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
......
......@@ -11,6 +11,7 @@ import (
cli_helpers "gitlab.com/gitlab-org/gitlab-runner/helpers/cli"
"gitlab.com/gitlab-org/gitlab-runner/log"
_ "gitlab.com/gitlab-org/gitlab-runner/cache/azure"
_ "gitlab.com/gitlab-org/gitlab-runner/cache/gcs"
_ "gitlab.com/gitlab-org/gitlab-runner/cache/s3"
_ "gitlab.com/gitlab-org/gitlab-runner/commands"
......
MIT License
Copyright (c) Microsoft Corporation. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
\ No newline at end of file
package pipeline
import (
"context"
"github.com/mattn/go-ieproxy"
"net"
"net/http"
"os"
"time"
)
// The Factory interface represents an object that can create its Policy object. Each HTTP request sent
// requires that this Factory create a new instance of its Policy object.
type Factory interface {
New(next Policy, po *PolicyOptions) Policy
}
// FactoryFunc is an adapter that allows the use of an ordinary function as a Factory interface.
type FactoryFunc func(next Policy, po *PolicyOptions) PolicyFunc
// New calls f(next,po).
func (f FactoryFunc) New(next Policy, po *PolicyOptions) Policy {
return f(next, po)
}
// The Policy interface represents a mutable Policy object created by a Factory. The object can mutate/process
// the HTTP request and then forward it on to the next Policy object in the linked-list. The returned
// Response goes backward through the linked-list for additional processing.
// NOTE: Request is passed by value so changes do not change the caller's version of
// the request. However, Request has some fields that reference mutable objects (not strings).
// These references are copied; a deep copy is not performed. Specifically, this means that
// you should avoid modifying the objects referred to by these fields: URL, Header, Body,
// GetBody, TransferEncoding, Form, MultipartForm, Trailer, TLS, Cancel, and Response.
type Policy interface {
Do(ctx context.Context, request Request) (Response, error)
}
// PolicyFunc is an adapter that allows the use of an ordinary function as a Policy interface.
type PolicyFunc func(ctx context.Context, request Request) (Response, error)
// Do calls f(ctx, request).
func (f PolicyFunc) Do(ctx context.Context, request Request) (Response, error) {
return f(ctx, request)
}
// Options configures a Pipeline's behavior.
type Options struct {
HTTPSender Factory // If sender is nil, then the pipeline's default client is used to send the HTTP requests.
Log LogOptions
}
// LogLevel tells a logger the minimum level to log. When code reports a log entry,
// the LogLevel indicates the level of the log entry. The logger only records entries
// whose level is at least the level it was told to log. See the Log* constants.
// For example, if a logger is configured with LogError, then LogError, LogPanic,
// and LogFatal entries will be logged; lower level entries are ignored.
type LogLevel uint32
const (
// LogNone tells a logger not to log any entries passed to it.
LogNone LogLevel = iota
// LogFatal tells a logger to log all LogFatal entries passed to it.
LogFatal
// LogPanic tells a logger to log all LogPanic and LogFatal entries passed to it.
LogPanic
// LogError tells a logger to log all LogError, LogPanic and LogFatal entries passed to it.
LogError
// LogWarning tells a logger to log all LogWarning, LogError, LogPanic and LogFatal entries passed to it.
LogWarning
// LogInfo tells a logger to log all LogInfo, LogWarning, LogError, LogPanic and LogFatal entries passed to it.
LogInfo
// LogDebug tells a logger to log all LogDebug, LogInfo, LogWarning, LogError, LogPanic and LogFatal entries passed to it.
LogDebug
)
// LogOptions configures the pipeline's logging mechanism & level filtering.
type LogOptions struct {
Log func(level LogLevel, message string)
// ShouldLog is called periodically allowing you to return whether the specified LogLevel should be logged or not.
// An application can return different values over the its lifetime; this allows the application to dynamically
// alter what is logged. NOTE: This method can be called by multiple goroutines simultaneously so make sure
// you implement it in a goroutine-safe way. If nil, nothing is logged (the equivalent of returning LogNone).
// Usually, the function will be implemented simply like this: return level <= LogWarning
ShouldLog func(level LogLevel) bool
}
type pipeline struct {
factories []Factory
options Options
}
// The Pipeline interface represents an ordered list of Factory objects and an object implementing the HTTPSender interface.
// You construct a Pipeline by calling the pipeline.NewPipeline function. To send an HTTP request, call pipeline.NewRequest
// and then call Pipeline's Do method passing a context, the request, and a method-specific Factory (or nil). Passing a
// method-specific Factory allows this one call to Do to inject a Policy into the linked-list. The policy is injected where
// the MethodFactoryMarker (see the pipeline.MethodFactoryMarker function) is in the slice of Factory objects.
//
// When Do is called, the Pipeline object asks each Factory object to construct its Policy object and adds each Policy to a linked-list.
// THen, Do sends the Context and Request through all the Policy objects. The final Policy object sends the request over the network
// (via the HTTPSender object passed to NewPipeline) and the response is returned backwards through all the Policy objects.
// Since Pipeline and Factory objects are goroutine-safe, you typically create 1 Pipeline object and reuse it to make many HTTP requests.
type Pipeline interface {
Do(ctx context.Context, methodFactory Factory, request Request) (Response, error)
}
// NewPipeline creates a new goroutine-safe Pipeline object from the slice of Factory objects and the specified options.
func NewPipeline(factories []Factory, o Options) Pipeline {
if o.HTTPSender == nil {
o.HTTPSender = newDefaultHTTPClientFactory()
}
if o.Log.Log == nil {
o.Log.Log = func(LogLevel, string) {} // No-op logger
}
return &pipeline{factories: factories, options: o}
}
// Do is called for each and every HTTP request. It tells each Factory to create its own (mutable) Policy object
// replacing a MethodFactoryMarker factory (if it exists) with the methodFactory passed in. Then, the Context and Request
// are sent through the pipeline of Policy objects (which can transform the Request's URL/query parameters/headers) and
// ultimately sends the transformed HTTP request over the network.
func (p *pipeline) Do(ctx context.Context, methodFactory Factory, request Request) (Response, error) {
response, err := p.newPolicies(methodFactory).Do(ctx, request)
request.close()
return response, err
}
func (p *pipeline) newPolicies(methodFactory Factory) Policy {
// The last Policy is the one that actually sends the request over the wire and gets the response.
// It is overridable via the Options' HTTPSender field.
po := &PolicyOptions{pipeline: p} // One object shared by all policy objects
next := p.options.HTTPSender.New(nil, po)
// Walk over the slice of Factory objects in reverse (from wire to API)
markers := 0
for i := len(p.factories) - 1; i >= 0; i-- {
factory := p.factories[i]
if _, ok := factory.(methodFactoryMarker); ok {
markers++
if markers > 1 {
panic("MethodFactoryMarker can only appear once in the pipeline")
}
if methodFactory != nil {
// Replace MethodFactoryMarker with passed-in methodFactory
next = methodFactory.New(next, po)
}
} else {
// Use the slice's Factory to construct its Policy
next = factory.New(next, po)
}
}
// Each Factory has created its Policy
if markers == 0 && methodFactory != nil {
panic("Non-nil methodFactory requires MethodFactoryMarker in the pipeline")
}
return next // Return head of the Policy object linked-list
}
// A PolicyOptions represents optional information that can be used by a node in the
// linked-list of Policy objects. A PolicyOptions is passed to the Factory's New method
// which passes it (if desired) to the Policy object it creates. Today, the Policy object
// uses the options to perform logging. But, in the future, this could be used for more.
type PolicyOptions struct {
pipeline *pipeline
}
// ShouldLog returns true if the specified log level should be logged.
func (po *PolicyOptions) ShouldLog(level LogLevel) bool {
if po.pipeline.options.Log.ShouldLog != nil {
return po.pipeline.options.Log.ShouldLog(level)
}
return false
}
// Log logs a string to the Pipeline's Logger.
func (po *PolicyOptions) Log(level LogLevel, msg string) {
if !po.ShouldLog(level) {
return // Short circuit message formatting if we're not logging it
}
// We are logging it, ensure trailing newline
if len(msg) == 0 || msg[len(msg)-1] != '\n' {
msg += "\n" // Ensure trailing newline
}
po.pipeline.options.Log.Log(level, msg)
// If logger doesn't handle fatal/panic, we'll do it here.
if level == LogFatal {
os.Exit(1)
} else if level == LogPanic {
panic(msg)
}
}
var pipelineHTTPClient = newDefaultHTTPClient()
func newDefaultHTTPClient() *http.Client {
// We want the Transport to have a large connection pool
return &http.Client{
Transport: &http.Transport{
Proxy: ieproxy.GetProxyFunc(),
// We use Dial instead of DialContext as DialContext has been reported to cause slower performance.
Dial /*Context*/ : (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial, /*Context*/
MaxIdleConns: 0, // No limit
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: false,
MaxResponseHeaderBytes: 0,
//ResponseHeaderTimeout: time.Duration{},
//ExpectContinueTimeout: time.Duration{},
},
}
}
// newDefaultHTTPClientFactory creates a DefaultHTTPClientPolicyFactory object that sends HTTP requests to a Go's default http.Client.
func newDefaultHTTPClientFactory() Factory {
return FactoryFunc(func(next Policy, po *PolicyOptions) PolicyFunc {
return func(ctx context.Context, request Request) (Response, error) {
r, err := pipelineHTTPClient.Do(request.WithContext(ctx))
if err != nil {
err = NewError(err, "HTTP request failed")
}
return NewHTTPResponse(r), err
}
})
}
var mfm = methodFactoryMarker{} // Singleton
// MethodFactoryMarker returns a special marker Factory object. When Pipeline's Do method is called, any
// MethodMarkerFactory object is replaced with the specified methodFactory object. If nil is passed fro Do's
// methodFactory parameter, then the MethodFactoryMarker is ignored as the linked-list of Policy objects is created.
func MethodFactoryMarker() Factory {
return mfm
}
type methodFactoryMarker struct {
}
func (methodFactoryMarker) New(next Policy, po *PolicyOptions) Policy {
panic("methodFactoryMarker policy should have been replaced with a method policy")
}
// LogSanitizer can be implemented to clean secrets from lines logged by ForceLog
// By default no implemetation is provided here, because pipeline may be used in many different
// contexts, so the correct implementation is context-dependent
type LogSanitizer interface {
SanitizeLogMessage(raw string) string
}
var sanitizer LogSanitizer
var enableForceLog bool = true
// SetLogSanitizer can be called to supply a custom LogSanitizer.
// There is no threadsafety or locking on the underlying variable,
// so call this function just once at startup of your application
// (Don't later try to change the sanitizer on the fly).
func SetLogSanitizer(s LogSanitizer)(){
sanitizer = s
}
// SetForceLogEnabled can be used to disable ForceLog
// There is no threadsafety or locking on the underlying variable,
// so call this function just once at startup of your application
// (Don't later try to change the setting on the fly).
func SetForceLogEnabled(enable bool)() {
enableForceLog = enable
}
package pipeline
// ForceLog should rarely be used. It forceable logs an entry to the
// Windows Event Log (on Windows) or to the SysLog (on Linux)
func ForceLog(level LogLevel, msg string) {
if !enableForceLog {
return
}
if sanitizer != nil {
msg = sanitizer.SanitizeLogMessage(msg)
}
forceLog(level, msg)
}
// +build !windows,!nacl,!plan9
package pipeline
import (
"log"
"log/syslog"
)
// forceLog should rarely be used. It forceable logs an entry to the
// Windows Event Log (on Windows) or to the SysLog (on Linux)
func forceLog(level LogLevel, msg string) {
if defaultLogger == nil {
return // Return fast if we failed to create the logger.
}
// We are logging it, ensure trailing newline
if len(msg) == 0 || msg[len(msg)-1] != '\n' {
msg += "\n" // Ensure trailing newline
}
switch level {
case LogFatal:
defaultLogger.Fatal(msg)
case LogPanic:
defaultLogger.Panic(msg)
case LogError, LogWarning, LogInfo:
defaultLogger.Print(msg)
}
}
var defaultLogger = func() *log.Logger {
l, _ := syslog.NewLogger(syslog.LOG_USER|syslog.LOG_WARNING, log.LstdFlags)
return l
}()
package pipeline
import (
"os"
"syscall"
"unsafe"
)
// forceLog should rarely be used. It forceable logs an entry to the
// Windows Event Log (on Windows) or to the SysLog (on Linux)
func forceLog(level LogLevel, msg string) {
var el eventType
switch level {
case LogError, LogFatal, LogPanic:
el = elError
case LogWarning:
el = elWarning
case LogInfo:
el = elInfo
}
// We are logging it, ensure trailing newline
if len(msg) == 0 || msg[len(msg)-1] != '\n' {
msg += "\n" // Ensure trailing newline
}
reportEvent(el, 0, msg)
}
type eventType int16
const (
elSuccess eventType = 0
elError eventType = 1
elWarning eventType = 2
elInfo eventType = 4
)
var reportEvent = func() func(eventType eventType, eventID int32, msg string) {
advAPI32 := syscall.MustLoadDLL("advapi32.dll") // lower case to tie in with Go's sysdll registration
registerEventSource := advAPI32.MustFindProc("RegisterEventSourceW")
sourceName, _ := os.Executable()
sourceNameUTF16, _ := syscall.UTF16PtrFromString(sourceName)
handle, _, lastErr := registerEventSource.Call(uintptr(0), uintptr(unsafe.Pointer(sourceNameUTF16)))
if lastErr == nil { // On error, logging is a no-op
return func(eventType eventType, eventID int32, msg string) {}
}
reportEvent := advAPI32.MustFindProc("ReportEventW")
return func(eventType eventType, eventID int32, msg string) {
s, _ := syscall.UTF16PtrFromString(msg)
_, _, _ = reportEvent.Call(
uintptr(handle), // HANDLE hEventLog
uintptr(eventType), // WORD wType
uintptr(0), // WORD wCategory
uintptr(eventID), // DWORD dwEventID
uintptr(0), // PSID lpUserSid
uintptr(1), // WORD wNumStrings
uintptr(0), // DWORD dwDataSize
uintptr(unsafe.Pointer(&s)), // LPCTSTR *lpStrings
uintptr(0)) // LPVOID lpRawData
}
}()
// Copyright 2017 Microsoft Corporation. All rights reserved.
// Use of this source code is governed by an MIT
// license that can be found in the LICENSE file.
/*
Package pipeline implements an HTTP request/response middleware pipeline whose
policy objects mutate an HTTP request's URL, query parameters, and/or headers before
the request is sent over the wire.
Not all policy objects mutate an HTTP request; some policy objects simply impact the
flow of requests/responses by performing operations such as logging, retry policies,
timeouts, failure injection, and deserialization of response payloads.
Implementing the Policy Interface
To implement a policy, define a struct that implements the pipeline.Policy interface's Do method. Your Do
method is called when an HTTP request wants to be sent over the network. Your Do method can perform any
operation(s) it desires. For example, it can log the outgoing request, mutate the URL, headers, and/or query
parameters, inject a failure, etc. Your Do method must then forward the HTTP request to next Policy object
in a linked-list ensuring that the remaining Policy objects perform their work. Ultimately, the last Policy
object sends the HTTP request over the network (by calling the HTTPSender's Do method).
When an HTTP response comes back, each Policy object in the linked-list gets a chance to process the response
(in reverse order). The Policy object can log the response, retry the operation if due to a transient failure
or timeout, deserialize the response body, etc. Ultimately, the last Policy object returns the HTTP response
to the code that initiated the original HTTP request.
Here is a template for how to define a pipeline.Policy object:
type myPolicy struct {
node PolicyNode
// TODO: Add configuration/setting fields here (if desired)...
}
func (p *myPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
// TODO: Mutate/process the HTTP request here...
response, err := p.node.Do(ctx, request) // Forward HTTP request to next Policy & get HTTP response
// TODO: Mutate/process the HTTP response here...
return response, err // Return response/error to previous Policy
}
Implementing the Factory Interface
Each Policy struct definition requires a factory struct definition that implements the pipeline.Factory interface's New
method. The New method is called when application code wants to initiate a new HTTP request. Factory's New method is
passed a pipeline.PolicyNode object which contains a reference to the owning pipeline.Pipeline object (discussed later) and
a reference to the next Policy object in the linked list. The New method should create its corresponding Policy object
passing it the PolicyNode and any other configuration/settings fields appropriate for the specific Policy object.
Here is a template for how to define a pipeline.Policy object:
// NOTE: Once created & initialized, Factory objects should be goroutine-safe (ex: immutable);
// this allows reuse (efficient use of memory) and makes these objects usable by multiple goroutines concurrently.
type myPolicyFactory struct {
// TODO: Add any configuration/setting fields if desired...
}
func (f *myPolicyFactory) New(node pipeline.PolicyNode) Policy {
return &myPolicy{node: node} // TODO: Also initialize any configuration/setting fields here (if desired)...
}
Using your Factory and Policy objects via a Pipeline
To use the Factory and Policy objects, an application constructs a slice of Factory objects and passes
this slice to the pipeline.NewPipeline function.
func NewPipeline(factories []pipeline.Factory, sender pipeline.HTTPSender) Pipeline
This function also requires an object implementing the HTTPSender interface. For simple scenarios,
passing nil for HTTPSender causes a standard Go http.Client object to be created and used to actually
send the HTTP response over the network. For more advanced scenarios, you can pass your own HTTPSender
object in. This allows sharing of http.Client objects or the use of custom-configured http.Client objects
or other objects that can simulate the network requests for testing purposes.
Now that you have a pipeline.Pipeline object, you can create a pipeline.Request object (which is a simple
wrapper around Go's standard http.Request object) and pass it to Pipeline's Do method along with passing a
context.Context for cancelling the HTTP request (if desired).
type Pipeline interface {
Do(ctx context.Context, methodFactory pipeline.Factory, request pipeline.Request) (pipeline.Response, error)
}
Do iterates over the slice of Factory objects and tells each one to create its corresponding
Policy object. After the linked-list of Policy objects have been created, Do calls the first
Policy object passing it the Context & HTTP request parameters. These parameters now flow through
all the Policy objects giving each object a chance to look at and/or mutate the HTTP request.
The last Policy object sends the message over the network.
When the network operation completes, the HTTP response and error return values pass
back through the same Policy objects in reverse order. Most Policy objects ignore the
response/error but some log the result, retry the operation (depending on the exact
reason the operation failed), or deserialize the response's body. Your own Policy
objects can do whatever they like when processing outgoing requests or incoming responses.
Note that after an I/O request runs to completion, the Policy objects for that request
are garbage collected. However, Pipeline object (like Factory objects) are goroutine-safe allowing
them to be created once and reused over many I/O operations. This allows for efficient use of
memory and also makes them safely usable by multiple goroutines concurrently.
Inserting a Method-Specific Factory into the Linked-List of Policy Objects
While Pipeline and Factory objects can be reused over many different operations, it is
common to have special behavior for a specific operation/method. For example, a method
may need to deserialize the response's body to an instance of a specific data type.
To accommodate this, the Pipeline's Do method takes an additional method-specific
Factory object. The Do method tells this Factory to create a Policy object and
injects this method-specific Policy object into the linked-list of Policy objects.
When creating a Pipeline object, the slice of Factory objects passed must have 1
(and only 1) entry marking where the method-specific Factory should be injected.
The Factory marker is obtained by calling the pipeline.MethodFactoryMarker() function:
func MethodFactoryMarker() pipeline.Factory
Creating an HTTP Request Object
The HTTP request object passed to Pipeline's Do method is not Go's http.Request struct.
Instead, it is a pipeline.Request struct which is a simple wrapper around Go's standard
http.Request. You create a pipeline.Request object by calling the pipeline.NewRequest function:
func NewRequest(method string, url url.URL, options pipeline.RequestOptions) (request pipeline.Request, err error)
To this function, you must pass a pipeline.RequestOptions that looks like this:
type RequestOptions struct {
// The readable and seekable stream to be sent to the server as the request's body.
Body io.ReadSeeker
// The callback method (if not nil) to be invoked to report progress as the stream is uploaded in the HTTP request.
Progress ProgressReceiver
}
The method and struct ensure that the request's body stream is a read/seekable stream.
A seekable stream is required so that upon retry, the final Policy object can seek
the stream back to the beginning before retrying the network request and re-uploading the
body. In addition, you can associate a ProgressReceiver callback function which will be
invoked periodically to report progress while bytes are being read from the body stream
and sent over the network.
Processing the HTTP Response
When an HTTP response comes in from the network, a reference to Go's http.Response struct is
embedded in a struct that implements the pipeline.Response interface:
type Response interface {
Response() *http.Response
}
This interface is returned through all the Policy objects. Each Policy object can call the Response
interface's Response method to examine (or mutate) the embedded http.Response object.
A Policy object can internally define another struct (implementing the pipeline.Response interface)
that embeds an http.Response and adds additional fields and return this structure to other Policy
objects. This allows a Policy object to deserialize the body to some other struct and return the
original http.Response and the additional struct back through the Policy chain. Other Policy objects
can see the Response but cannot see the additional struct with the deserialized body. After all the
Policy objects have returned, the pipeline.Response interface is returned by Pipeline's Do method.
The caller of this method can perform a type assertion attempting to get back to the struct type
really returned by the Policy object. If the type assertion is successful, the caller now has
access to both the http.Response and the deserialized struct object.*/
package pipeline
package pipeline
import (
"fmt"
"runtime"
)
type causer interface {
Cause() error
}
func errorWithPC(msg string, pc uintptr) string {
s := ""
if fn := runtime.FuncForPC(pc); fn != nil {
file, line := fn.FileLine(pc)
s = fmt.Sprintf("-> %v, %v:%v\n", fn.Name(), file, line)
}
s += msg + "\n\n"
return s
}
func getPC(callersToSkip int) uintptr {
// Get the PC of Initialize method's caller.
pc := [1]uintptr{}
_ = runtime.Callers(callersToSkip, pc[:])
return pc[0]
}
// ErrorNode can be an embedded field in a private error object. This field
// adds Program Counter support and a 'cause' (reference to a preceding error).
// When initializing a error type with this embedded field, initialize the
// ErrorNode field by calling ErrorNode{}.Initialize(cause).
type ErrorNode struct {
pc uintptr // Represents a Program Counter that you can get symbols for.
cause error // Refers to the preceding error (or nil)
}
// Error returns a string with the PC's symbols or "" if the PC is invalid.
// When defining a new error type, have its Error method call this one passing
// it the string representation of the error.
func (e *ErrorNode) Error(msg string) string {
s := errorWithPC(msg, e.pc)
if e.cause != nil {
s += e.cause.Error() + "\n"
}
return s
}
// Cause returns the error that preceded this error.
func (e *ErrorNode) Cause() error { return e.cause }
// Temporary returns true if the error occurred due to a temporary condition.
func (e ErrorNode) Temporary() bool {
type temporary interface {
Temporary() bool
}
for err := e.cause; err != nil; {
if t, ok := err.(temporary); ok {
return t.Temporary()
}
if cause, ok := err.(causer); ok {
err = cause.Cause()
} else {
err = nil
}
}
return false
}
// Timeout returns true if the error occurred due to time expiring.
func (e ErrorNode) Timeout() bool {
type timeout interface {
Timeout() bool
}
for err := e.cause; err != nil; {
if t, ok := err.(timeout); ok {
return t.Timeout()
}
if cause, ok := err.(causer); ok {
err = cause.Cause()
} else {
err = nil
}
}
return false
}
// Initialize is used to initialize an embedded ErrorNode field.
// It captures the caller's program counter and saves the cause (preceding error).
// To initialize the field, use "ErrorNode{}.Initialize(cause, 3)". A callersToSkip
// value of 3 is very common; but, depending on your code nesting, you may need
// a different value.
func (ErrorNode) Initialize(cause error, callersToSkip int) ErrorNode {
pc := getPC(callersToSkip)
return ErrorNode{pc: pc, cause: cause}
}
// Cause walks all the preceding errors and return the originating error.
func Cause(err error) error {
for err != nil {
cause, ok := err.(causer)
if !ok {
break
}
err = cause.Cause()
}
return err
}
// ErrorNodeNoCause can be an embedded field in a private error object. This field
// adds Program Counter support.
// When initializing a error type with this embedded field, initialize the
// ErrorNodeNoCause field by calling ErrorNodeNoCause{}.Initialize().
type ErrorNodeNoCause struct {
pc uintptr // Represents a Program Counter that you can get symbols for.
}
// Error returns a string with the PC's symbols or "" if the PC is invalid.
// When defining a new error type, have its Error method call this one passing
// it the string representation of the error.
func (e *ErrorNodeNoCause) Error(msg string) string {
return errorWithPC(msg, e.pc)
}
// Temporary returns true if the error occurred due to a temporary condition.
func (e ErrorNodeNoCause) Temporary() bool {
return false
}
// Timeout returns true if the error occurred due to time expiring.
func (e ErrorNodeNoCause) Timeout() bool {
return false
}
// Initialize is used to initialize an embedded ErrorNode field.
// It captures the caller's program counter.
// To initialize the field, use "ErrorNodeNoCause{}.Initialize(3)". A callersToSkip
// value of 3 is very common; but, depending on your code nesting, you may need
// a different value.
func (ErrorNodeNoCause) Initialize(callersToSkip int) ErrorNodeNoCause {
pc := getPC(callersToSkip)
return ErrorNodeNoCause{pc: pc}
}
// NewError creates a simple string error (like Error.New). But, this
// error also captures the caller's Program Counter and the preceding error (if provided).
func NewError(cause error, msg string) error {
if cause != nil {
return &pcError{
ErrorNode: ErrorNode{}.Initialize(cause, 3),
msg: msg,
}
}
return &pcErrorNoCause{
ErrorNodeNoCause: ErrorNodeNoCause{}.Initialize(3),
msg: msg,
}
}
// pcError is a simple string error (like error.New) with an ErrorNode (PC & cause).
type pcError struct {
ErrorNode
msg string
}
// Error satisfies the error interface. It shows the error with Program Counter
// symbols and calls Error on the preceding error so you can see the full error chain.
func (e *pcError) Error() string { return e.ErrorNode.Error(e.msg) }
// pcErrorNoCause is a simple string error (like error.New) with an ErrorNode (PC).
type pcErrorNoCause struct {
ErrorNodeNoCause
msg string
}
// Error satisfies the error interface. It shows the error with Program Counter symbols.
func (e *pcErrorNoCause) Error() string { return e.ErrorNodeNoCause.Error(e.msg) }
package pipeline
import "io"
// ********** The following is common between the request body AND the response body.
// ProgressReceiver defines the signature of a callback function invoked as progress is reported.
type ProgressReceiver func(bytesTransferred int64)
// ********** The following are specific to the request body (a ReadSeekCloser)
// This struct is used when sending a body to the network
type requestBodyProgress struct {
requestBody io.ReadSeeker // Seeking is required to support retries
pr ProgressReceiver
}
// NewRequestBodyProgress adds progress reporting to an HTTP request's body stream.
func NewRequestBodyProgress(requestBody io.ReadSeeker, pr ProgressReceiver) io.ReadSeeker {
if pr == nil {
panic("pr must not be nil")
}
return &requestBodyProgress{requestBody: requestBody, pr: pr}
}
// Read reads a block of data from an inner stream and reports progress
func (rbp *requestBodyProgress) Read(p []byte) (n int, err error) {
n, err = rbp.requestBody.Read(p)
if err != nil {
return
}
// Invokes the user's callback method to report progress
position, err := rbp.requestBody.Seek(0, io.SeekCurrent)
if err != nil {
panic(err)
}
rbp.pr(position)
return
}
func (rbp *requestBodyProgress) Seek(offset int64, whence int) (offsetFromStart int64, err error) {
return rbp.requestBody.Seek(offset, whence)
}
// requestBodyProgress supports Close but the underlying stream may not; if it does, Close will close it.
func (rbp *requestBodyProgress) Close() error {
if c, ok := rbp.requestBody.(io.Closer); ok {
return c.Close()
}
return nil
}
// ********** The following are specific to the response body (a ReadCloser)
// This struct is used when sending a body to the network
type responseBodyProgress struct {
responseBody io.ReadCloser
pr ProgressReceiver
offset int64
}
// NewResponseBodyProgress adds progress reporting to an HTTP response's body stream.
func NewResponseBodyProgress(responseBody io.ReadCloser, pr ProgressReceiver) io.ReadCloser {
if pr == nil {
panic("pr must not be nil")
}
return &responseBodyProgress{responseBody: responseBody, pr: pr, offset: 0}
}
// Read reads a block of data from an inner stream and reports progress
func (rbp *responseBodyProgress) Read(p []byte) (n int, err error) {
n, err = rbp.responseBody.Read(p)
rbp.offset += int64(n)
// Invokes the user's callback method to report progress
rbp.pr(rbp.offset)
return
}
func (rbp *responseBodyProgress) Close() error {
return rbp.responseBody.Close()
}
package pipeline
import (
"io"
"net/http"
"net/url"
"strconv"
)
// Request is a thin wrapper over an http.Request. The wrapper provides several helper methods.
type Request struct {
*http.Request
}
// NewRequest initializes a new HTTP request object with any desired options.
func NewRequest(method string, url url.URL, body io.ReadSeeker) (request Request, err error) {
// Note: the url is passed by value so that any pipeline operations that modify it do so on a copy.
// This code to construct an http.Request is copied from http.NewRequest(); we intentionally omitted removeEmptyPort for now.
request.Request = &http.Request{
Method: method,
URL: &url,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(http.Header),
Host: url.Host,
}
if body != nil {
err = request.SetBody(body)
}
return
}
// SetBody sets the body and content length, assumes body is not nil.
func (r Request) SetBody(body io.ReadSeeker) error {
size, err := body.Seek(0, io.SeekEnd)
if err != nil {
return err
}
body.Seek(0, io.SeekStart)
r.ContentLength = size
r.Header["Content-Length"] = []string{strconv.FormatInt(size, 10)}
if size != 0 {
r.Body = &retryableRequestBody{body: body}
r.GetBody = func() (io.ReadCloser, error) {
_, err := body.Seek(0, io.SeekStart)
if err != nil {
return nil, err
}
return r.Body, nil
}
} else {
// in case the body is an empty stream, we need to use http.NoBody to explicitly provide no content
r.Body = http.NoBody
r.GetBody = func() (io.ReadCloser, error) {
return http.NoBody, nil
}
// close the user-provided empty body
if c, ok := body.(io.Closer); ok {
c.Close()
}
}
return nil
}
// Copy makes a copy of an http.Request. Specifically, it makes a deep copy
// of its Method, URL, Host, Proto(Major/Minor), Header. ContentLength, Close,
// RemoteAddr, RequestURI. Copy makes a shallow copy of the Body, GetBody, TLS,
// Cancel, Response, and ctx fields. Copy panics if any of these fields are
// not nil: TransferEncoding, Form, PostForm, MultipartForm, or Trailer.
func (r Request) Copy() Request {
if r.TransferEncoding != nil || r.Form != nil || r.PostForm != nil || r.MultipartForm != nil || r.Trailer != nil {
panic("Can't make a deep copy of the http.Request because at least one of the following is not nil:" +
"TransferEncoding, Form, PostForm, MultipartForm, or Trailer.")
}
copy := *r.Request // Copy the request
urlCopy := *(r.Request.URL) // Copy the URL
copy.URL = &urlCopy
copy.Header = http.Header{} // Copy the header
for k, vs := range r.Header {
for _, value := range vs {
copy.Header.Add(k, value)
}
}
return Request{Request: &copy} // Return the copy
}
func (r Request) close() error {
if r.Body != nil && r.Body != http.NoBody {
c, ok := r.Body.(*retryableRequestBody)
if !ok {
panic("unexpected request body type (should be *retryableReadSeekerCloser)")
}
return c.realClose()
}
return nil
}
// RewindBody seeks the request's Body stream back to the beginning so it can be resent when retrying an operation.
func (r Request) RewindBody() error {
if r.Body != nil && r.Body != http.NoBody {
s, ok := r.Body.(io.Seeker)
if !ok {
panic("unexpected request body type (should be io.Seeker)")
}
// Reset the stream back to the beginning
_, err := s.Seek(0, io.SeekStart)
return err
}
return nil
}
// ********** The following type/methods implement the retryableRequestBody (a ReadSeekCloser)
// This struct is used when sending a body to the network
type retryableRequestBody struct {
body io.ReadSeeker // Seeking is required to support retries
}
// Read reads a block of data from an inner stream and reports progress
func (b *retryableRequestBody) Read(p []byte) (n int, err error) {
return b.body.Read(p)
}
func (b *retryableRequestBody) Seek(offset int64, whence int) (offsetFromStart int64, err error) {
return b.body.Seek(offset, whence)
}
func (b *retryableRequestBody) Close() error {
// We don't want the underlying transport to close the request body on transient failures so this is a nop.
// The pipeline closes the request body upon success.
return nil
}
func (b *retryableRequestBody) realClose() error {
if c, ok := b.body.(io.Closer); ok {
return c.Close()
}
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment