Skip to content
Snippets Groups Projects
nodes.go 4.13 KiB
package gateway

import (
	"errors"
	"net"
	"time"

	"github.com/NebulousLabs/Sia/crypto"
	"github.com/NebulousLabs/Sia/encoding"
	"github.com/NebulousLabs/Sia/modules"
)

const (
	maxSharedNodes = 10
	maxAddrLength  = 100
	minPeers       = 3
)

// addNode adds an address to the set of nodes on the network.
func (g *Gateway) addNode(addr modules.NetAddress) error {
	if _, exists := g.nodes[addr]; exists {
		return errors.New("node already added")
	} else if net.ParseIP(addr.Host()) == nil {
		return errors.New("address is not routable: " + string(addr))
	} else if net.ParseIP(addr.Host()).IsLoopback() {
		return errors.New("cannot add loopback address")
	}
	g.nodes[addr] = struct{}{}
	return nil
}

func (g *Gateway) removeNode(addr modules.NetAddress) error {
	if _, exists := g.nodes[addr]; !exists {
		return errors.New("no record of that node")
	}
	delete(g.nodes, addr)
	g.log.Println("INFO: removed node", addr)
	return nil
}

func (g *Gateway) randomNode() (modules.NetAddress, error) {
	if len(g.nodes) > 0 {
		r, _ := crypto.RandIntn(len(g.nodes))
		for node := range g.nodes {
			if r <= 0 {
				return node, nil
			}
			r--
		}
	}

	return "", errNoPeers
}

// shareNodes is the receiving end of the ShareNodes RPC. It writes up to 10
// randomly selected nodes to the caller.
func (g *Gateway) shareNodes(conn modules.PeerConn) error {
	id := g.mu.RLock()
	var nodes []modules.NetAddress
	for node := range g.nodes {
		if len(nodes) == maxSharedNodes {
			break
		}
		nodes = append(nodes, node)
	}
	g.mu.RUnlock(id)
	return encoding.WriteObject(conn, nodes)
}

// requestNodes is the calling end of the ShareNodes RPC.
func (g *Gateway) requestNodes(conn modules.PeerConn) error {
	var nodes []modules.NetAddress
	if err := encoding.ReadObject(conn, &nodes, maxSharedNodes*maxAddrLength); err != nil {
		return err
	}
	id := g.mu.Lock()
	for _, node := range nodes {
		g.addNode(node)
	}
	g.save()
	g.mu.Unlock(id)
	return nil
}

// relayNode is the recipient end of the RelayNode RPC. It reads a node, adds
// it to the Gateway's node list, and relays it to each of the Gateway's
// peers. If the node is already in the node list, it is not relayed.
func (g *Gateway) relayNode(conn modules.PeerConn) error {
	// read address
	var addr modules.NetAddress
	if err := encoding.ReadObject(conn, &addr, maxAddrLength); err != nil {
		return err
	}
	// add node
	id := g.mu.Lock()
	defer g.mu.Unlock(id)
	if err := g.addNode(addr); err != nil {
		return err
	}
	g.save()
	// relay
	go g.Broadcast("RelayNode", addr)
	return nil
}

// sendAddress is the calling end of the RelayNode RPC.
func (g *Gateway) sendAddress(conn modules.PeerConn) error {
	// don't send if we aren't connectible
	if g.Address().Host() == "::1" {
		return errors.New("can't send address without knowing external IP")
	}
	return encoding.WriteObject(conn, g.Address())
}

// threadedNodeManager tries to keep the Gateway's node list healthy. As long
// as the Gateway has fewer than minNodeListSize nodes, it asks a random peer
// for more nodes. It also continually pings nodes in order to establish their
// connectivity. Unresponsive nodes are aggressively removed.
func (g *Gateway) threadedNodeManager() {
	for {
		select {
		case <-time.After(5 * time.Second):
		case <-g.closeChan:
			return
		}

		id := g.mu.RLock()
		numNodes := len(g.nodes)
		peer, err := g.randomPeer()
		g.mu.RUnlock(id)
		if err != nil {
			// can't do much until we have peers
			continue
		}

		if numNodes < minNodeListLen {
			g.RPC(peer, "ShareNodes", g.requestNodes)
		}

		// find an untested node to check
		id = g.mu.RLock()
		node, err := g.randomNode()
		g.mu.RUnlock(id)
		if err != nil {
			continue
		}

		// try to connect
		conn, err := net.DialTimeout("tcp", string(node), dialTimeout)
		if err != nil {
			id = g.mu.Lock()
			g.removeNode(node)
			g.save()
			g.mu.Unlock(id)
			continue
		}
		// if connection succeeds, supply an unacceptable version to ensure
		// they won't try to add us as a peer
		encoding.WriteObject(conn, "0.0.0")
		conn.Close()
		// sleep for an extra 10 minutes after success; we don't want to spam
		// connectable nodes
		select {
		case <-time.After(10 * time.Minute):
		case <-g.closeChan:
			return
		}
	}
}