Skip to content
Snippets Groups Projects
Commit b8c29a8a authored by Pawel Rozlach's avatar Pawel Rozlach Committed by João Pereira
Browse files

test: Add missing storage drivers tests for Close, Commit, Cancel to the shared testsuite

parent b5b0cb4f
No related branches found
No related tags found
1 merge request!2053test: Add missing storage drivers tests for Close, Commit, Cancel to the shared testsuite
......@@ -538,7 +538,19 @@ func (w *writer) Close() error {
return storagedriver.ErrAlreadyClosed
}
w.closed = true
return w.bw.Flush()
if w.canceled {
// NOTE(prozlach): If the writer has already been canceled, then there
// is nothing to flush to the backend as the target file has already
// been deleted.
return nil
}
err := w.bw.Flush()
if err != nil {
return fmt.Errorf("flushing while closing writer: %w", err)
}
return nil
}
func (w *writer) Cancel() error {
......@@ -548,8 +560,16 @@ func (w *writer) Cancel() error {
return storagedriver.ErrAlreadyCommited
}
w.canceled = true
blobRef := w.driver.client.GetContainerReference(w.driver.container).GetBlobReference(w.path)
return blobRef.Delete(nil)
err := blobRef.Delete(nil)
if err != nil {
if is404(err) {
return nil
}
return fmt.Errorf("removing canceled blob: %w", err)
}
return nil
}
func (w *writer) Commit() error {
......@@ -562,7 +582,11 @@ func (w *writer) Commit() error {
return storagedriver.ErrAlreadyCanceled
}
w.committed = true
return w.bw.Flush()
err := w.bw.Flush()
if err != nil {
return fmt.Errorf("flushing while committing writer: %w", err)
}
return nil
}
type blockWriter struct {
......
......@@ -592,6 +592,14 @@ func (w *writer) Close() error {
return storagedriver.ErrAlreadyClosed
}
w.closed = true
if w.canceled {
// NOTE(prozlach): If the writer has already been canceled, then there
// is nothing to flush to the backend as the target file has already
// been deleted.
return nil
}
err := w.bw.Flush()
if err != nil {
return fmt.Errorf("flushing while closing writer: %w", err)
......@@ -606,9 +614,13 @@ func (w *writer) Cancel() error {
return storagedriver.ErrAlreadyCommited
}
w.canceled = true
blobRef := w.driver.client.NewBlobClient(w.path)
_, err := blobRef.Delete(w.ctx, nil)
if err != nil {
if Is404(err) {
return nil
}
return fmt.Errorf("removing canceled blob: %w", err)
}
return nil
......@@ -624,6 +636,7 @@ func (w *writer) Commit() error {
return storagedriver.ErrAlreadyCanceled
}
w.committed = true
err := w.bw.Flush()
if err != nil {
return fmt.Errorf("flushing while committing writer: %w", err)
......
......@@ -441,17 +441,23 @@ func (fw *fileWriter) Close() error {
if fw.closed {
return storagedriver.ErrAlreadyClosed
}
if fw.canceled {
// NOTE(prozlach): If the writer has already been canceled, then there
// is nothing to flush to the backend as the target file has already
// been deleted.
return nil
}
if err := fw.bw.Flush(); err != nil {
return err
return fmt.Errorf("flushing file while closing writer: %w", err)
}
if err := fw.file.Sync(); err != nil {
return err
return fmt.Errorf("syncing file while closing writer: %w", err)
}
if err := fw.file.Close(); err != nil {
return err
return fmt.Errorf("closing file while closing writer: %w", err)
}
fw.closed = true
return nil
......@@ -460,11 +466,20 @@ func (fw *fileWriter) Close() error {
func (fw *fileWriter) Cancel() error {
if fw.closed {
return storagedriver.ErrAlreadyClosed
} else if fw.committed {
return storagedriver.ErrAlreadyCommited
}
fw.canceled = true
_ = fw.file.Close()
return os.Remove(fw.file.Name())
err := os.Remove(fw.file.Name())
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil
}
return fmt.Errorf("removing file %q: %w", fw.file.Name(), err)
}
return nil
}
func (fw *fileWriter) Commit() error {
......
......@@ -442,6 +442,8 @@ type writer struct {
size int64
offset int64
closed bool
committed bool
canceled bool
sessionURI string
buffer []byte
buffSize int
......@@ -449,28 +451,47 @@ type writer struct {
// Cancel removes any written content from this FileWriter.
func (w *writer) Cancel() error {
w.closed = true
if w.closed {
return storagedriver.ErrAlreadyClosed
} else if w.committed {
return storagedriver.ErrAlreadyCommited
}
w.canceled = true
err := storageDeleteObject(context.Background(), w.storageClient, w.bucket, w.name)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil
}
var gerr *googleapi.Error
if errors.As(err, &gerr) {
if gerr.Code == http.StatusNotFound {
err = nil
}
if errors.As(err, &gerr) && gerr.Code == http.StatusNotFound {
return nil
}
}
return err
return fmt.Errorf("deleting object while canceling writer: %w", err)
}
func (w *writer) Close() error {
if w.closed {
return nil
return storagedriver.ErrAlreadyClosed
}
w.closed = true
if w.canceled {
// NOTE(prozlach): If the writer has already been canceled, then there
// is nothing to flush to the backend as the target file has already
// been deleted.
return nil
}
if w.committed {
// we are done already, return early
return nil
}
err := w.writeChunk()
if err != nil {
return err
return fmt.Errorf("writing chunk: %w", err)
}
// Copy the remaining bytes from the buffer to the upload session
......@@ -493,7 +514,7 @@ func (w *writer) Close() error {
return putContentsClose(wc, w.buffer[0:w.buffSize])
})
if err != nil {
return err
return fmt.Errorf("writing contents while closing writer: %w", err)
}
w.size = w.offset + int64(w.buffSize)
w.buffSize = 0
......@@ -523,10 +544,15 @@ func putContentsClose(wc *storage.Writer, contents []byte) error {
// available for future calls to StorageDriver.GetContent and
// StorageDriver.Reader.
func (w *writer) Commit() error {
if err := w.checkClosed(); err != nil {
return err
switch {
case w.closed:
return storagedriver.ErrAlreadyClosed
case w.committed:
return storagedriver.ErrAlreadyCommited
case w.canceled:
return storagedriver.ErrAlreadyCanceled
}
w.closed = true
w.committed = true
// no session started yet just perform a simple upload
if w.sessionURI == "" {
......@@ -564,13 +590,6 @@ func (w *writer) Commit() error {
return nil
}
func (w *writer) checkClosed() error {
if w.closed {
return fmt.Errorf("Writer already closed")
}
return nil
}
func (w *writer) writeChunk() error {
var err error
// chunks can be uploaded only in multiples of minChunkSize
......@@ -598,12 +617,18 @@ func (w *writer) writeChunk() error {
}
func (w *writer) Write(p []byte) (int, error) {
err := w.checkClosed()
if err != nil {
return 0, err
switch {
case w.closed:
return 0, storagedriver.ErrAlreadyClosed
case w.committed:
return 0, storagedriver.ErrAlreadyCommited
case w.canceled:
return 0, storagedriver.ErrAlreadyCanceled
}
var err error
var nn int
for nn < len(p) {
n := copy(w.buffer[w.buffSize:], p[nn:])
w.buffSize += n
......
......@@ -315,6 +315,7 @@ func (w *writer) Close() error {
return storagedriver.ErrAlreadyClosed
}
w.closed = true
return nil
}
......@@ -328,8 +329,14 @@ func (w *writer) Cancel() error {
w.d.mutex.Lock()
defer w.d.mutex.Unlock()
return w.d.root.delete(w.f.path())
err := w.d.root.delete(w.f.path())
if err != nil {
if errors.As(err, new(storagedriver.PathNotFoundError)) {
return nil
}
return fmt.Errorf("removing canceled blob: %w", err)
}
return nil
}
func (w *writer) Commit() error {
......
......@@ -1666,7 +1666,19 @@ func (w *writer) Close() error {
return storagedriver.ErrAlreadyClosed
}
w.closed = true
return w.flushPart()
if w.canceled {
// NOTE(prozlach): If the writer has been already canceled, then there
// is nothing to flush to the backend as the target file has already
// been deleted.
return nil
}
err := w.flushPart()
if err != nil {
return fmt.Errorf("fluxing buffers while closing writer: %w", err)
}
return nil
}
func (w *writer) Cancel() error {
......@@ -1676,6 +1688,7 @@ func (w *writer) Cancel() error {
return storagedriver.ErrAlreadyCommited
}
w.canceled = true
_, err := w.driver.S3.AbortMultipartUploadWithContext(
context.Background(),
&s3.AbortMultipartUploadInput{
......@@ -1683,7 +1696,10 @@ func (w *writer) Cancel() error {
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
})
return err
if err != nil {
return fmt.Errorf("aborting s3 multipart upload: %w", err)
}
return nil
}
func (w *writer) Commit() error {
......
......@@ -773,6 +773,184 @@ func (s *DriverSuite) TestAppendInexistentBlob() {
require.ErrorAs(s.T(), err, new(storagedriver.PathNotFoundError))
}
func (s *DriverSuite) TestWriterDoubleClose() {
destPath := randomPath(1, 32)
s.T().Logf("destination path for blob: %s", destPath)
defer s.deletePath(s.T(), firstPart(destPath), false)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
err = writer.Close()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyClosed)
}
func (s *DriverSuite) TestWriterDoubleCommit() {
destPath := randomPath(1, 32)
defer s.deletePath(s.T(), firstPart(destPath), false)
s.T().Logf("destination path for blob: %s", destPath)
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Commit()
require.NoError(s.T(), err)
err = writer.Commit()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyCommited)
}
func (s *DriverSuite) TestWriterWriteAfterClose() {
destPath := randomPath(1, 32)
defer s.deletePath(s.T(), firstPart(destPath), false)
s.T().Logf("destination path for blob: %s", destPath)
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyClosed)
}
func (s *DriverSuite) TestWriterCancelAfterClose() {
destPath := randomPath(1, 32)
defer s.deletePath(s.T(), firstPart(destPath), false)
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
err = writer.Cancel()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyClosed)
}
func (s *DriverSuite) TestWriterCommitAfterClose() {
destPath := randomPath(1, 32)
defer s.deletePath(s.T(), firstPart(destPath), false)
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
err = writer.Commit()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyClosed)
}
func (s *DriverSuite) TestWriterCommitAfterCancel() {
destPath := randomPath(1, 32)
defer s.deletePath(s.T(), firstPart(destPath), false)
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Commit()
require.NoError(s.T(), err)
err = writer.Cancel()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyCommited)
}
func (s *DriverSuite) TestWriterCancelAfterCommit() {
destPath := randomPath(1, 32)
defer s.deletePath(s.T(), firstPart(destPath), false)
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
err = writer.Cancel()
require.NoError(s.T(), err)
err = writer.Commit()
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyCanceled)
}
func (s *DriverSuite) TestWriterWriteAfterCommit() {
destPath := randomPath(1, 32)
defer s.deletePath(s.T(), firstPart(destPath), false)
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Commit()
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyCommited)
}
func (s *DriverSuite) TestWriterWriteAfterCancel() {
destPath := randomPath(1, 32)
defer s.deletePath(s.T(), firstPart(destPath), false)
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Cancel()
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.ErrorIs(s.T(), err, storagedriver.ErrAlreadyCanceled)
}
func (s *DriverSuite) TestWriterCancel() {
destPath := randomPath(1, 32)
defer s.deletePath(s.T(), firstPart(destPath), false)
contents := s.blobberFactory.GetBlobber(96).GetAllBytes()
s.T().Logf("destination path for blob: %s", destPath)
writer, err := s.StorageDriver.Writer(s.ctx, destPath, false)
require.NoError(s.T(), err)
_, err = writer.Write(contents)
require.NoError(s.T(), err)
err = writer.Cancel()
require.NoError(s.T(), err)
err = writer.Close()
require.NoError(s.T(), err)
_, err = s.StorageDriver.Stat(s.ctx, destPath)
require.ErrorAs(s.T(), err, new(storagedriver.PathNotFoundError))
}
// TestOverwriteAppendBlob checks that driver can overwrite blob created using
// Write() call with PutContent() call. In case of e.g. Azure, Write() creates
// AppendBlob and PutContent() creates a BlockBlob and there is no in-place
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment