Commit 1d968168 authored by David Vorick's avatar David Vorick Committed by GitHub

Merge pull request #1587 from NebulousLabs/log

Contractor journal
parents 49e4c12d fd0a2410
......@@ -1011,3 +1011,126 @@ func TestRenterAllowance(t *testing.T) {
}
}
}
// TestHostAndRentReload sets up an integration test where a host and renter
// do basic uploads and downloads, with an intervening shutdown+startup.
func TestHostAndRentReload(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
st, err := createServerTester("TestHostAndRentReload")
if err != nil {
t.Fatal(err)
}
// Announce the host and start accepting contracts.
err = st.announceHost()
if err != nil {
t.Fatal(err)
}
err = st.acceptContracts()
if err != nil {
t.Fatal(err)
}
err = st.setHostStorage()
if err != nil {
t.Fatal(err)
}
// Set an allowance for the renter, allowing a contract to be formed.
allowanceValues := url.Values{}
testFunds := "10000000000000000000000000000" // 10k SC
testPeriod := "10"
allowanceValues.Set("funds", testFunds)
allowanceValues.Set("period", testPeriod)
err = st.stdPostAPI("/renter", allowanceValues)
if err != nil {
t.Fatal(err)
}
// Create a file.
path := filepath.Join(st.dir, "test.dat")
err = createRandFile(path, 1024)
if err != nil {
t.Fatal(err)
}
// Upload the file to the renter.
uploadValues := url.Values{}
uploadValues.Set("source", path)
err = st.stdPostAPI("/renter/upload/test", uploadValues)
if err != nil {
t.Fatal(err)
}
// Only one piece will be uploaded (10% at current redundancy).
var rf RenterFiles
for i := 0; i < 200 && (len(rf.Files) != 1 || rf.Files[0].UploadProgress < 10); i++ {
st.getAPI("/renter/files", &rf)
time.Sleep(100 * time.Millisecond)
}
if len(rf.Files) != 1 || rf.Files[0].UploadProgress < 10 {
t.Fatal("the uploading is not succeeding for some reason:", rf.Files[0])
}
// Try downloading the file.
downpath := filepath.Join(st.dir, "testdown.dat")
err = st.stdGetAPI("/renter/download/test?destination=" + downpath)
if err != nil {
t.Fatal(err)
}
// Check that the download has the right contents.
orig, err := ioutil.ReadFile(path)
if err != nil {
t.Fatal(err)
}
download, err := ioutil.ReadFile(downpath)
if err != nil {
t.Fatal(err)
}
if bytes.Compare(orig, download) != 0 {
t.Fatal("data mismatch when downloading a file")
}
// The renter's downloads queue should have 1 entry now.
var queue RenterDownloadQueue
if err = st.getAPI("/renter/downloads", &queue); err != nil {
t.Fatal(err)
}
if len(queue.Downloads) != 1 {
t.Fatalf("expected renter to have 1 download in the queue; got %v", len(queue.Downloads))
}
// close and reopen the server
err = st.server.Close()
if err != nil {
t.Fatal(err)
}
st, err = assembleServerTester(st.walletKey, st.dir)
if err != nil {
t.Fatal(err)
}
defer st.server.Close()
err = st.announceHost()
if err != nil {
t.Fatal(err)
}
// Try downloading the file.
err = st.stdGetAPI("/renter/download/test?destination=" + downpath)
if err != nil {
t.Fatal(err)
}
// Check that the download has the right contents.
orig, err = ioutil.ReadFile(path)
if err != nil {
t.Fatal(err)
}
download, err = ioutil.ReadFile(downpath)
if err != nil {
t.Fatal(err)
}
if bytes.Compare(orig, download) != 0 {
t.Fatal("data mismatch when downloading a file")
}
}
......@@ -28,8 +28,8 @@ var (
// as a safeguard against desynchronizing with the host.
// TODO: save a diff of the Merkle roots instead of all of them.
type cachedRevision struct {
Revision types.FileContractRevision
MerkleRoots modules.MerkleRootSet
Revision types.FileContractRevision `json:"revision"`
MerkleRoots modules.MerkleRootSet `json:"merkleroots"`
}
// A Contractor negotiates, revises, renews, and provides access to file
......@@ -124,6 +124,12 @@ func (c *Contractor) ResolveID(id types.FileContractID) types.FileContractID {
return id
}
// Close closes the Contractor.
func (c *Contractor) Close() error {
c.log.Close()
return c.persist.Close()
}
// New returns a new Contractor.
func New(cs consensusSet, wallet walletShim, tpool transactionPool, hdb hostDB, persistDir string) (*Contractor, error) {
// Check for nil inputs.
......
package contractor
import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
......@@ -73,13 +70,6 @@ func TestNew(t *testing.T) {
if !os.IsNotExist(err) {
t.Fatalf("expected invalid directory, got %v", err)
}
// Corrupted persist file.
ioutil.WriteFile(filepath.Join(dir, "contractor.json"), []byte{1, 2, 3}, 0666)
_, err = New(stub, stub, stub, stub, dir)
if _, ok := err.(*json.SyntaxError); !ok {
t.Fatalf("expected invalid json, got %v", err)
}
}
// TestContract tests the Contract method.
......@@ -124,7 +114,7 @@ func TestContract(t *testing.T) {
// TestContracts tests the Contracts method.
func TestContracts(t *testing.T) {
var stub newStub
dir := build.TempDir("contractor", "TestNew")
dir := build.TempDir("contractor", "TestContracts")
c, err := New(stub, stub, stub, stub, dir)
if err != nil {
t.Fatalf("expected nil, got %v", err)
......
......@@ -4,7 +4,6 @@ import (
"path/filepath"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/persist"
"github.com/NebulousLabs/Sia/types"
)
......@@ -54,8 +53,9 @@ type (
persister interface {
save(contractorPersist) error
saveSync(contractorPersist) error
update(...journalUpdate) error
load(*contractorPersist) error
Close() error
}
)
......@@ -68,32 +68,46 @@ type walletBridge struct {
func (ws *walletBridge) NextAddress() (types.UnlockConditions, error) { return ws.w.NextAddress() }
func (ws *walletBridge) StartTransaction() transactionBuilder { return ws.w.StartTransaction() }
// stdPersist implements the persister interface via persist.SaveFile and
// persist.LoadFile. The metadata and filename required by these functions is
// internal to stdPersist.
// stdPersist implements the persister interface via the journal type. The
// filename required by these functions is internal to stdPersist.
type stdPersist struct {
meta persist.Metadata
journal *journal
filename string
}
func (p *stdPersist) save(data contractorPersist) error {
return persist.SaveFile(p.meta, data, p.filename)
if p.journal == nil {
var err error
p.journal, err = newJournal(p.filename, data)
return err
}
return p.journal.checkpoint(data)
}
func (p *stdPersist) saveSync(data contractorPersist) error {
return persist.SaveFileSync(p.meta, data, p.filename)
func (p *stdPersist) update(us ...journalUpdate) error {
return p.journal.update(us)
}
func (p *stdPersist) load(data *contractorPersist) error {
return persist.LoadFile(p.meta, data, p.filename)
var err error
p.journal, err = openJournal(p.filename, data)
if err != nil {
// try loading old persist
err = loadv110persist(filepath.Dir(p.filename), data)
if err != nil {
return err
}
p.journal, err = newJournal(p.filename, *data)
}
return err
}
func (p stdPersist) Close() error {
return p.journal.Close()
}
func newPersist(dir string) *stdPersist {
return &stdPersist{
meta: persist.Metadata{
Header: "Contractor Persistence",
Version: "0.5.2",
},
filename: filepath.Join(dir, "contractor.json"),
filename: filepath.Join(dir, "contractor.journal"),
}
}
......@@ -79,7 +79,10 @@ func (hd *hostDownloader) Sector(root crypto.Hash) ([]byte, error) {
hd.contractor.mu.Lock()
hd.contractor.contracts[contract.ID] = contract
hd.contractor.saveSync()
hd.contractor.persist.update(updateDownloadRevision{
NewRevisionTxn: contract.LastRevisionTxn,
NewDownloadSpending: contract.DownloadSpending,
})
hd.contractor.mu.Unlock()
return sector, nil
......@@ -190,7 +193,7 @@ func (c *Contractor) Downloader(id types.FileContractID) (_ Downloader, err erro
}
// supply a SaveFn that saves the revision to the contractor's persist
// (the existing revision will be overwritten when SaveFn is called)
d.SaveFn = c.saveRevision(contract.ID)
d.SaveFn = c.saveDownloadRevision(contract.ID)
// cache downloader
hd := &hostDownloader{
......
......@@ -113,7 +113,13 @@ func (he *hostEditor) Upload(data []byte) (crypto.Hash, error) {
}
he.contractor.mu.Lock()
he.contractor.contracts[contract.ID] = contract
he.contractor.saveSync()
he.contractor.persist.update(updateUploadRevision{
NewRevisionTxn: contract.LastRevisionTxn,
NewSectorRoot: sectorRoot,
NewSectorIndex: len(contract.MerkleRoots) - 1,
NewUploadSpending: contract.UploadSpending,
NewStorageSpending: contract.StorageSpending,
})
he.contractor.mu.Unlock()
he.contract = contract
......@@ -254,7 +260,7 @@ func (c *Contractor) Editor(id types.FileContractID) (_ Editor, err error) {
}
// supply a SaveFn that saves the revision to the contractor's persist
// (the existing revision will be overwritten when SaveFn is called)
e.SaveFn = c.saveRevision(contract.ID)
e.SaveFn = c.saveUploadRevision(contract.ID)
// cache editor
he := &hostEditor{
......
......@@ -113,7 +113,6 @@ func (c *Contractor) managedNewContract(host modules.HostDBEntry, numSectors uin
contractValue := contract.RenterFunds()
c.log.Printf("Formed contract with %v for %v SC", host.NetAddress, contractValue.Div(types.SiacoinPrecision))
return contract, nil
}
......
......@@ -187,6 +187,7 @@ func TestIntegrationFormContract(t *testing.T) {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......@@ -214,6 +215,7 @@ func TestIntegrationReviseContract(t *testing.T) {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......@@ -262,6 +264,7 @@ func TestIntegrationUploadDownload(t *testing.T) {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......@@ -317,16 +320,15 @@ func TestIntegrationUploadDownload(t *testing.T) {
// TestIntegrationDelete tests that the contractor can delete a sector from a
// contract previously formed with a host.
func TestIntegrationDelete(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
t.Skip("deletion is deprecated")
// create testing trio
h, c, _, err := newTestingTrio("TestIntegrationDelete")
if err != nil {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......@@ -382,16 +384,15 @@ func TestIntegrationDelete(t *testing.T) {
// TestIntegrationInsertDelete tests that the contractor can insert and delete
// a sector during the same revision.
func TestIntegrationInsertDelete(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
t.Skip("deletion is deprecated")
// create testing trio
h, c, _, err := newTestingTrio("TestIntegrationInsertDelete")
if err != nil {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......@@ -442,16 +443,15 @@ func TestIntegrationInsertDelete(t *testing.T) {
// TestIntegrationModify tests that the contractor can modify a previously-
// uploaded sector.
func TestIntegrationModify(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
t.Skip("modification is deprecated")
// create testing trio
h, c, _, err := newTestingTrio("TestIntegrationModify")
if err != nil {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......@@ -519,6 +519,7 @@ func TestIntegrationRenew(t *testing.T) {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......@@ -646,6 +647,7 @@ func TestIntegrationResync(t *testing.T) {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......@@ -784,6 +786,7 @@ func TestIntegrationDownloaderCaching(t *testing.T) {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......@@ -877,6 +880,7 @@ func TestIntegrationEditorCaching(t *testing.T) {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......@@ -969,6 +973,7 @@ func TestIntegrationCachedRenew(t *testing.T) {
t.Fatal(err)
}
defer h.Close()
defer c.Close()
// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
......
This diff is collapsed.
This diff is collapsed.
package contractor
import (
"path/filepath"
"github.com/NebulousLabs/Sia/crypto"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/persist"
"github.com/NebulousLabs/Sia/types"
)
// contractorPersist defines what Contractor data persists across sessions.
type contractorPersist struct {
Allowance modules.Allowance
BlockHeight types.BlockHeight
CachedRevisions []cachedRevision
Contracts []modules.RenterContract
CurrentPeriod types.BlockHeight
LastChange modules.ConsensusChangeID
OldContracts []modules.RenterContract
RenewedIDs map[string]string
// COMPATv1.0.4-lts
FinancialMetrics struct {
ContractSpending types.Currency `json:"contractspending"`
DownloadSpending types.Currency `json:"downloadspending"`
StorageSpending types.Currency `json:"storagespending"`
UploadSpending types.Currency `json:"uploadspending"`
} `json:",omitempty"`
Allowance modules.Allowance `json:"allowance"`
BlockHeight types.BlockHeight `json:"blockheight"`
CachedRevisions map[string]cachedRevision `json:"cachedrevisions"`
Contracts map[string]modules.RenterContract `json:"contracts"`
CurrentPeriod types.BlockHeight `json:"currentperiod"`
LastChange modules.ConsensusChangeID `json:"lastchange"`
OldContracts []modules.RenterContract `json:"oldcontracts"`
RenewedIDs map[string]string `json:"renewedids"`
}
// persistData returns the data in the Contractor that will be saved to disk.
func (c *Contractor) persistData() contractorPersist {
data := contractorPersist{
Allowance: c.allowance,
BlockHeight: c.blockHeight,
CurrentPeriod: c.currentPeriod,
LastChange: c.lastChange,
RenewedIDs: make(map[string]string),
Allowance: c.allowance,
BlockHeight: c.blockHeight,
CachedRevisions: make(map[string]cachedRevision),
Contracts: make(map[string]modules.RenterContract),
CurrentPeriod: c.currentPeriod,
LastChange: c.lastChange,
RenewedIDs: make(map[string]string),
}
for _, rev := range c.cachedRevisions {
data.CachedRevisions = append(data.CachedRevisions, rev)
data.CachedRevisions[rev.Revision.ParentID.String()] = rev
}
for _, contract := range c.contracts {
data.Contracts = append(data.Contracts, contract)
data.Contracts[contract.ID.String()] = contract
}
for _, contract := range c.oldContracts {
contract.MerkleRoots = []crypto.Hash{} // prevent roots from being saved to disk twice
data.OldContracts = append(data.OldContracts, contract)
}
for oldID, newID := range c.renewedIDs {
......@@ -110,31 +108,6 @@ func (c *Contractor) load() error {
c.renewedIDs[types.FileContractID(oldHash)] = types.FileContractID(newHash)
}
// COMPATv1.0.4-lts
//
// If loading old persist, only aggregate metrics are known. Store these
// in a special contract under a special identifier.
if fm := data.FinancialMetrics; !fm.ContractSpending.Add(fm.DownloadSpending).Add(fm.StorageSpending).Add(fm.UploadSpending).IsZero() {
c.oldContracts[metricsContractID] = modules.RenterContract{
ID: metricsContractID,
TotalCost: fm.ContractSpending,
DownloadSpending: fm.DownloadSpending,
StorageSpending: fm.StorageSpending,
UploadSpending: fm.UploadSpending,
// Give the contract a fake startheight so that it will included
// with the other contracts in the current period. Note that in
// update.go, the special contract is specifically deleted when a
// new period begins.
StartHeight: c.currentPeriod + 1,
// We also need to add a ValidProofOutput so that the RenterFunds
// method will not panic. The value should be 0, i.e. "all funds
// were spent."
LastRevision: types.FileContractRevision{
NewValidProofOutputs: make([]types.SiacoinOutput, 2),
},
}
}
return nil
}
......@@ -145,30 +118,52 @@ func (c *Contractor) save() error {
// saveSync saves the Contractor persistence data to disk and then syncs to disk.
func (c *Contractor) saveSync() error {
return c.persist.saveSync(c.persistData())
return c.persist.save(c.persistData())
}
// saveRevision returns a function that saves a revision. It is used by the
// Editor and Downloader types to prevent desynchronizing with their host.
func (c *Contractor) saveRevision(id types.FileContractID) func(types.FileContractRevision, []crypto.Hash) error {
// saveUploadRevision returns a function that saves an upload revision. It is
// used by the Editor type to prevent desynchronizing with the host.
func (c *Contractor) saveUploadRevision(id types.FileContractID) func(types.FileContractRevision, []crypto.Hash) error {
return func(rev types.FileContractRevision, newRoots []crypto.Hash) error {
c.mu.Lock()
defer c.mu.Unlock()
c.cachedRevisions[id] = cachedRevision{rev, newRoots}
return c.saveSync()
return c.persist.update(updateCachedUploadRevision{
Revision: rev,
// only the last root is new
SectorRoot: newRoots[len(newRoots)-1],
SectorIndex: len(newRoots) - 1,
})
}
}
// saveDownloadRevision returns a function that saves an upload revision. It
// is used by the Downloader type to prevent desynchronizing with the host.
func (c *Contractor) saveDownloadRevision(id types.FileContractID) func(types.FileContractRevision, []crypto.Hash) error {
return func(rev types.FileContractRevision, _ []crypto.Hash) error {
c.mu.Lock()
defer c.mu.Unlock()
// roots have not changed
cr := c.cachedRevisions[id]
cr.Revision = rev
c.cachedRevisions[id] = cr
return c.persist.update(updateCachedDownloadRevision{
Revision: rev,
})
}
}
// addPubKeys rescans the blockchain to fill in the HostPublicKey of
// contracts, identified by their NetAddress.
func addPubKeys(cs consensusSet, contracts []modules.RenterContract) []modules.RenterContract {
func addPubKeys(cs consensusSet, contracts map[string]modules.RenterContract) map[string]modules.RenterContract {
pubkeys := make(pubkeyScanner)
for _, c := range contracts {
pubkeys[c.NetAddress] = types.SiaPublicKey{}
}
cs.ConsensusSetSubscribe(pubkeys, modules.ConsensusChangeBeginning)
for i, c := range contracts {
contracts[i].HostPublicKey = pubkeys[c.NetAddress]
for id, c := range contracts {
c.HostPublicKey = pubkeys[c.NetAddress]
contracts[id] = c
}
return contracts
}
......@@ -196,3 +191,76 @@ func (pubkeys pubkeyScanner) ProcessConsensusChange(cc modules.ConsensusChange)
}
}
}
// COMPATv1.1.0
func loadv110persist(dir string, data *contractorPersist) error {
var oldPersist struct {
Allowance modules.Allowance
BlockHeight types.BlockHeight
CachedRevisions []cachedRevision
Contracts []modules.RenterContract
CurrentPeriod types.BlockHeight
LastChange modules.ConsensusChangeID
OldContracts []modules.RenterContract
RenewedIDs map[string]string
FinancialMetrics struct {
ContractSpending types.Currency
DownloadSpending types.Currency
StorageSpending types.Currency
UploadSpending types.Currency
}
}
err := persist.LoadFile(persist.Metadata{
Header: "Contractor Persistence",
Version: "0.5.2",
}, &oldPersist, filepath.Join(dir, "contractor.json"))
if err != nil {
return err
}
cachedRevisions := make(map[string]cachedRevision)
for _, rev := range oldPersist.CachedRevisions {
cachedRevisions[rev.Revision.ParentID.String()] = rev
}
contracts := make(map[string]modules.RenterContract)
for _, c := range oldPersist.Contracts {
contracts[c.ID.String()] = c
}
// COMPATv1.0.4-lts
//
// If loading old persist, only aggregate metrics are known. Store these
// in a special contract under a special identifier.
if fm := oldPersist.FinancialMetrics; !fm.ContractSpending.Add(fm.DownloadSpending).Add(fm.StorageSpending).Add(fm.UploadSpending).IsZero() {
oldPersist.OldContracts = append(oldPersist.OldContracts, modules.RenterContract{
ID: metricsContractID,
TotalCost: fm.ContractSpending,
DownloadSpending: fm.DownloadSpending,
StorageSpending: fm.StorageSpending,
UploadSpending: fm.UploadSpending,
// Give the contract a fake startheight so that it will included
// with the other contracts in the current period. Note that in
// update.go, the special contract is specifically deleted when a
// new period begins.
StartHeight: oldPersist.CurrentPeriod + 1,
// We also need to add a ValidProofOutput so that the RenterFunds
// method will not panic. The value should be 0, i.e. "all funds
// were spent."
LastRevision: types.FileContractRevision{
NewValidProofOutputs: make([]types.SiacoinOutput, 2),
},
})
}
*data = contractorPersist{
Allowance: oldPersist.Allowance,
BlockHeight: oldPersist.BlockHeight,
CachedRevisions: cachedRevisions,
Contracts: contracts,
CurrentPeriod: oldPersist.CurrentPeriod,
LastChange: oldPersist.LastChange,
OldContracts: oldPersist.OldContracts,
RenewedIDs: oldPersist.RenewedIDs,
}
return nil
}
......@@ -15,9 +15,10 @@ import (
// memPersist implements the persister interface in-memory.
type memPersist contractorPersist
func (m *memPersist) save(data contractorPersist) error { *m = memPersist(data); return nil }
func (m *memPersist) saveSync(data contractorPersist) error { *m = memPersist(data); return nil }
func (m memPersist) load(data *contractorPersist) error { *data = contractorPersist(m); return nil }
func (m *memPersist) save(data contractorPersist) error { *m = memPersist(data); return nil }
func (m *memPersist) update(...journalUpdate) error { return nil }
func (m memPersist) load(data *contractorPersist) error { *data = contractorPersist(m); return nil }
func (m memPersist) Close() error { return nil }
// TestSaveLoad tests that the contractor can save and load itself.
func TestSaveLoad(t *testing.T) {
......
......@@ -180,6 +180,9 @@ func (c *Contractor) managedRenewContracts() error {
c.contracts[contract.ID] = contract
// add a mapping from old->new contract
c.renewedIDs[oldID] = contract.ID
// move the cachedRevision entry to the new ID
c.cachedRevisions[contract.ID] = c.cachedRevisions[oldID]
delete(c.cachedRevisions, oldID)
}
err = c.saveSync()
c.mu.Unlock()
......
......@@ -148,7 +148,7 @@ func TestIsOffline(t *testing.T) {