test WAL cleanup if startup experiences errors

parent 86df5494
......@@ -73,7 +73,7 @@ test:
test-v:
go test -race -v -short -tags='debug testing' -timeout=15s $(pkgs) -run=$(run)
test-long: clean fmt vet lint
go test -v -race -tags='testing debug' -timeout=50s $(pkgs) -run=$(run)
go test -v -race -tags='testing debug' -timeout=500s $(pkgs) -run=$(run)
bench: clean fmt
go test -tags='debug testing' -timeout=500s -bench=$(run) $(pkgs)
cover: clean
......
......@@ -26,6 +26,7 @@ package contractmanager
// one.
import (
"errors"
"path/filepath"
"github.com/NebulousLabs/Sia/build"
......@@ -186,6 +187,12 @@ func newContractManager(dependencies dependencies, persistDir string) (*Contract
cm.log.Println("ERROR: Unable to spawn the contract manager synchronization loop:", err)
return nil, build.ExtendErr("error while spawning contract manager sync loop", err)
}
// Simulate an error to make sure the cleanup code is triggered correctly.
if cm.dependencies.disrupt("erroredStartup") {
err = errors.New("startup disrupted")
return nil, err
}
return cm, nil
}
......
package contractmanager
import (
"bytes"
"errors"
"os"
"path/filepath"
"sync"
"testing"
"github.com/NebulousLabs/Sia/build"
"github.com/NebulousLabs/Sia/crypto"
"github.com/NebulousLabs/Sia/modules"
)
......@@ -111,106 +108,55 @@ func TestNewContractManager(t *testing.T) {
}
}
// TestParallelFileAccess using a single file handle + ReadAt and WriteAt to
// write to multiple locations on a file in parallel, verifying that it's a
// safe thing to do.
func TestParallelFileAccess(t *testing.T) {
// dependencyErroredStartupis a mocked dependency that will cause the contract
// manager to be returned with an error upon startup.
type dependencyErroredStartup struct {
productionDependencies
}
// disrupt will disrupt the threadedSyncLoop, causing the loop to terminate as
// soon as it is created.
func (d *dependencyErroredStartup) disrupt(s string) bool {
// Cause an error to be returned during startup.
if s == "erroredStartup" {
return true
}
return false
}
// TestNewContractManagerErroredStartup uses disruption to simulate an error
// during startup, allowing the test to verify that the cleanup code ran
// correctly.
func TestNewContractManagerErroredStartup(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
// Create the file that will be used in parallel.
testdir := build.TempDir(modules.ContractManagerDir, "TestParallelFileAccess")
err := os.Mkdir(testdir, 0700)
if err != nil {
t.Fatal(err)
// Create a new contract manager where the startup gets disrupted.
d := new(dependencyErroredStartup)
testdir := build.TempDir(modules.ContractManagerDir, "TestNewContractManagerErroredStartup")
cmd := filepath.Join(testdir, modules.ContractManagerDir)
_, err := newContractManager(d, cmd)
if err.Error() != "startup disrupted" {
t.Fatal("expecting contract manager startup to be disrupted:", err)
}
f, err := os.Create(filepath.Join(testdir, "parallelFile"))
if err != nil {
t.Fatal(err)
// Verify that shutdown was triggered correctly - tmp files should be gone,
// WAL file should also be gone.
walFileName := filepath.Join(cmd, walFile)
walFileTmpName := filepath.Join(cmd, walFileTmp)
settingsFileTmpName := filepath.Join(cmd, settingsFileTmp)
_, err = os.Stat(walFileName)
if !os.IsNotExist(err) {
t.Error("file should have been removed:", err)
}
_, err = os.Stat(walFileTmpName)
if !os.IsNotExist(err) {
t.Error("file should have been removed:", err)
}
_, err = os.Stat(settingsFileTmpName)
if !os.IsNotExist(err) {
t.Error("file should have been removed:", err)
}
defer f.Close()
// Create the data that will be writted to the file, such that it can be
// verified later.
writesPerThread := 200
numThreads := 500
dataSize := 163 // Intentionally overlaps sector boundaries.
datas := make([][]byte, numThreads*writesPerThread)
for i := 0; i < numThreads*writesPerThread; i++ {
datas[i] = make([]byte, dataSize)
crypto.Read(datas[i])
}
// Spin up threads to make concurrent writes to the file in different
// locations. Have some reads + writes that are trying to overlap.
threadingModifier := 71
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
for i := 0; i < numThreads; i++ {
if i%threadingModifier == 0 {
wg1.Add(1)
} else {
wg2.Add(1)
}
go func(i int) {
if i%threadingModifier == 0 {
defer wg1.Done()
} else {
defer wg2.Done()
}
for j := 0; j < writesPerThread; j++ {
_, err := f.WriteAt(datas[i*j], int64(i*dataSize*j))
if err != nil {
t.Error(err)
}
}
}(i)
}
// Wait for the smaller set of first writes to complete.
wg1.Wait()
// Verify the results for the smaller set of writes.
for i := 0; i < numThreads; i++ {
if i%threadingModifier != 0 {
continue
}
wg1.Add(1)
go func(i int) {
defer wg1.Done()
for j := 0; j < writesPerThread; j++ {
data := make([]byte, dataSize)
_, err := f.ReadAt(data, int64(i*dataSize))
if err != nil {
t.Error(err)
}
if !bytes.Equal(data, datas[i]) {
t.Error("data mismatch for value", i)
}
}
}(i)
}
wg1.Wait()
wg2.Wait()
// Verify the results for all of the writes.
for i := 0; i < numThreads; i++ {
wg1.Add(1)
go func(i int) {
defer wg1.Done()
for j := 0; j < writesPerThread; j++ {
data := make([]byte, dataSize)
_, err := f.ReadAt(data, int64(i*dataSize))
if err != nil {
t.Error(err)
}
if !bytes.Equal(data, datas[i]) {
t.Error("data mismatch for value", i)
}
}
}(i)
}
wg1.Wait()
}
package contractmanager
import (
"bytes"
"os"
"path/filepath"
"sync"
"testing"
"github.com/NebulousLabs/Sia/build"
"github.com/NebulousLabs/Sia/crypto"
"github.com/NebulousLabs/Sia/modules"
)
// TestParallelFileAccess using a single file handle + ReadAt and WriteAt to
// write to multiple locations on a file in parallel, verifying that it's a
// safe thing to do.
func TestParallelFileAccess(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
// Create the file that will be used in parallel.
testdir := build.TempDir(modules.ContractManagerDir, "TestParallelFileAccess")
err := os.Mkdir(testdir, 0700)
if err != nil {
t.Fatal(err)
}
f, err := os.Create(filepath.Join(testdir, "parallelFile"))
if err != nil {
t.Fatal(err)
}
defer f.Close()
// Create the data that will be writted to the file, such that it can be
// verified later.
writesPerThread := 200
numThreads := 500
dataSize := 163 // Intentionally overlaps sector boundaries.
datas := make([][]byte, numThreads*writesPerThread)
for i := 0; i < numThreads*writesPerThread; i++ {
datas[i] = make([]byte, dataSize)
crypto.Read(datas[i])
}
// Spin up threads to make concurrent writes to the file in different
// locations. Have some reads + writes that are trying to overlap.
threadingModifier := 71
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
for i := 0; i < numThreads; i++ {
if i%threadingModifier == 0 {
wg1.Add(1)
} else {
wg2.Add(1)
}
go func(i int) {
if i%threadingModifier == 0 {
defer wg1.Done()
} else {
defer wg2.Done()
}
for j := 0; j < writesPerThread; j++ {
_, err := f.WriteAt(datas[i*j], int64(i*dataSize*j))
if err != nil {
t.Error(err)
}
}
}(i)
}
// Wait for the smaller set of first writes to complete.
wg1.Wait()
// Verify the results for the smaller set of writes.
for i := 0; i < numThreads; i++ {
if i%threadingModifier != 0 {
continue
}
wg1.Add(1)
go func(i int) {
defer wg1.Done()
for j := 0; j < writesPerThread; j++ {
data := make([]byte, dataSize)
_, err := f.ReadAt(data, int64(i*dataSize))
if err != nil {
t.Error(err)
}
if !bytes.Equal(data, datas[i]) {
t.Error("data mismatch for value", i)
}
}
}(i)
}
wg1.Wait()
wg2.Wait()
// Verify the results for all of the writes.
for i := 0; i < numThreads; i++ {
wg1.Add(1)
go func(i int) {
defer wg1.Done()
for j := 0; j < writesPerThread; j++ {
data := make([]byte, dataSize)
_, err := f.ReadAt(data, int64(i*dataSize))
if err != nil {
t.Error(err)
}
if !bytes.Equal(data, datas[i]) {
t.Error("data mismatch for value", i)
}
}
}(i)
}
wg1.Wait()
}
......@@ -262,11 +262,13 @@ func (wal *writeAheadLog) load() error {
err := wal.fileWALTmp.Close()
if err != nil {
wal.cm.log.Println("Error closing wal file during contract manager shutdown:", err)
wal.cm.log.Println("ERROR: error closing wal file during contract manager shutdown:", err)
return
}
err = os.Remove(filepath.Join(wal.cm.persistDir, walFileTmp))
if err != nil {
wal.cm.log.Println("Error removing temporary WAL during contract manager shutdown:", err)
wal.cm.log.Println("ERROR: error removing temporary WAL during contract manager shutdown:", err)
return
}
})
......@@ -303,7 +305,19 @@ func (wal *writeAheadLog) load() error {
if err != nil {
build.ExtendErr("unable to write to settings temp file", err)
}
// Renaming process means that the settings tmp file does not need to be
// removed upon shutdown.
wal.cm.tg.AfterStop(func() {
wal.mu.Lock()
defer wal.mu.Unlock()
err := wal.fileSettingsTmp.Close()
if err != nil {
wal.cm.log.Println("ERROR: unable to close settings temporary file")
return
}
err = os.Remove(filepath.Join(wal.cm.persistDir, settingsFileTmp))
if err != nil {
wal.cm.log.Println("ERROR: unable to remove settings temporary file")
return
}
})
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment