Skip to content
Snippets Groups Projects
Select Git revision
  • main default protected
  • jupyterhub
  • artifacts
  • freeze-ssh-keys
  • fix-org-policy-unreg
  • email-users
  • fix-portalops-logout
  • stream-status
  • registries
  • e2mtest
  • xdc-org-mounts
  • storage
  • nilaway
  • grpc-web-service-auth
  • bjk-dev
  • model-branch-update
  • xdc-base-ub2404
  • small-regex
  • xdc-idempotence
  • next protected
  • v1.3.35 protected
  • v1.3.34 protected
  • v1.3.33 protected
  • v1.3.32 protected
  • v1.3.31 protected
  • v1.3.30 protected
  • v1.3.29 protected
  • v1.3.28 protected
  • v1.3.27 protected
  • v1.3.26 protected
  • v1.3.25 protected
  • v1.3.24 protected
  • v1.3.23 protected
  • v1.3.22 protected
  • v1.3.21 protected
  • v1.3.20 protected
  • v1.3.19 protected
  • v1.3.18 protected
  • v1.3.17 protected
  • v1.3.16 protected
40 results

alloc.go

Code owners
Assign users and groups as approvers for specific file changes. Learn more.
alloc.go 9.50 KiB
package realize

import (
	"context"
	"fmt"
	"io"
	"strings"

	"github.com/minio/minio-go/v7"
	log "github.com/sirupsen/logrus"
	api "gitlab.com/mergetb/api/portal/v1/go"
	me "gitlab.com/mergetb/portal/services/pkg/merror"
	"gitlab.com/mergetb/portal/services/pkg/storage"
	xir "gitlab.com/mergetb/xir/v0.3/go"
	clientv3 "go.etcd.io/etcd/client/v3"
	"google.golang.org/protobuf/proto"
)

/*=============================================================================
 * Allocation Data Mangement
 * -------------------------
 *
 * 	The allocation service handles requests for multiple realization services
 * 	that are competing for the same resources. Thus the allocation service must
 * 	detect and return errors when collisions occur between realization service
 * 	replicas competing for the same resources.
 *
 *  The basic data layout in etcd is the following
 *
 *  /alloc/resource/
 *    - id: (revision, allocation-list)
 *
 *  When resource allocation requests come in from realization services, there
 *  is a revision tag on every ResourceAllocationList object. In the event that
 *  the revision of a ResourceAllocationList is less than what is currently in
 *  etcd, the request must be rejected as this indicates the data has changed
 *  since the realization service read the allocation table.
 *
 *  The same basic situation exists for cables
 *
 *  /alloc/cable/
 *    - id: (revision, allocation-list)
 *
 *  The allocation service must
 *     - Stamp all resource lists with a revision on fetch requests
 *     - Ensure that lesser versions can never overwrite greater versions
 *
 *  The implementation of this service uses the etcd key revision as the
 *  tracker. When data is fetched from etcd, the key revision is added to the
 *  protobuf. Likewise when data is pushed, the revision in the protobuf
 *  received is set as a key-revision precondition on the etcd transaction that
 *  mutates the data store.
 *
 *  TODO: create secondary index keyed on realization id, so dealloc does not
 *        need to read the entire table.
 *===========================================================================*/

/*=============================================================================
 * Allocation Data Mangement
 * -------------------------
 *
 *  /alloc/resource/:mzid/:node
 *  /alloc/cable/:mzid/:link
 *
 */

func ReadTable() (*api.AllocationTable, error) {

	etcd := storage.EtcdClient
	at := api.NewAllocationTable()

	err := readResources(at, etcd)
	if err != nil {
		return nil, err
	}

	err = readCables(at, etcd)
	if err != nil {
		return nil, err
	}

	return at, nil

}

func readResources(at *api.AllocationTable, etcd *clientv3.Client) error {

	resp, err := etcd.Get(
		context.Background(), "/alloc/resource/", clientv3.WithPrefix())
	if err != nil {
		return me.DatabaseError("db error", err).Log()
	}

	//log.WithFields(log.Fields{"entries": len(resp.Kvs)}).Info("table read")

	for _, x := range resp.Kvs {

		l := new(api.ResourceAllocationList)
		err := proto.Unmarshal(x.Value, l)
		if err != nil {
			log.WithFields(log.Fields{
				"key": string(x.Key),
			}).Warn("corrupt resource allocation encountered")
			continue
		}
		l.Revision = x.Version
		//log.Infof("%s @ %d", x.Key, x.Version)
		rs := strings.Split(string(x.Key), "/")[3]
		at.Resource[rs] = l

	}

	return nil

}

func readCables(at *api.AllocationTable, etcd *clientv3.Client) error {

	resp, err := etcd.Get(
		context.Background(), "/alloc/cable/", clientv3.WithPrefix())
	if err != nil {
		return me.DatabaseError("db error", err).Log()
	}

	//log.WithFields(log.Fields{"entries": len(resp.Kvs)}).Info("table read")

	for _, x := range resp.Kvs {

		l := new(api.CableAllocationList)
		err := proto.Unmarshal(x.Value, l)
		if err != nil {
			log.WithFields(log.Fields{
				"key": string(x.Key),
			}).Warn("corrupt resource allocation encountered")
			continue
		}
		l.Revision = x.Version
		cb := strings.Split(string(x.Key), "/")[3]
		at.Cable[cb] = l

	}

	return nil

}

func Alloc(new, current *api.AllocationTable) (*api.AllocationTable, error) {

	// resources

	for k, newV := range new.Resource {

		currentV, ok := current.Resource[k]
		if ok {
			currentV.Value = append(currentV.Value, newV.Value...)
			//currentV.Revision = newV.Revision
			//newV.Revision = currentV.Revision
		} else {
			current.Resource[k] = newV
		}

	}

	// cables

	for k, newV := range new.Cable {

		currentV, ok := current.Cable[k]
		if ok {
			currentV.Value = append(currentV.Value, newV.Value...)
			//currentV.Revision = newV.Revision
			//newV.Revision = currentV.Revision
		} else {
			current.Cable[k] = newV
		}

	}

	return current, nil

}

func Dealloc(current *api.AllocationTable, mzid string) (*api.AllocationTable, error) {

	a := api.NewAllocationTable()

	// construct a new table leaving out this mzid

	for name, alloc := range current.Resource {
		a.Resource[name] = &api.ResourceAllocationList{
			Revision: alloc.Revision,
		}
	}

	for name, allocations := range current.Resource {
		for _, allocation := range allocations.Value {
			if allocation.Mzid != mzid {
				a.AllocateResource(name, allocation)
			}
		}
	}

	for name, alloc := range current.Cable {
		a.Cable[name] = &api.CableAllocationList{
			Revision: alloc.Revision,
		}
	}

	for name, allocations := range current.Cable {
		for _, allocation := range allocations.Value {
			if allocation.Mzid != mzid {
				a.AllocateCable(name, allocation)
			}
		}
	}

	return a, nil

}

func rakey(s string) string {
	return fmt.Sprintf("/alloc/resource/%s", s)
}

func cakey(s string) string {
	return fmt.Sprintf("/alloc/cable/%s", s)
}

func WriteTable(a *api.AllocationTable) error {

	etcd := storage.EtcdClient

	var ops []clientv3.Op
	var ifs []clientv3.Cmp

	for resource, list := range a.Resource {

		buf, err := proto.Marshal(list)
		if err != nil {
			return me.DataCorruptionError(resource, err).Log()
		}
		key := rakey(resource)
		//log.Infof("write %s @ %d #%d", key, list.Revision, len(list.Value))
		ifs = append(ifs, clientv3.Compare(clientv3.Version(key), "=", list.Revision))
		ops = append(ops, clientv3.OpPut(key, string(buf)))

		/*
			for _, x := range list.Value {
				log.Debug(x.Resource)
			}
		*/
	}

	for cable, list := range a.Cable {

		buf, err := proto.Marshal(list)
		if err != nil {
			return me.DataCorruptionError(cable, err).Log()
		}
		key := cakey(cable)
		ifs = append(ifs, clientv3.Compare(clientv3.Version(key), "=", list.Revision))
		ops = append(ops, clientv3.OpPut(key, string(buf)))

		/*
			for _, x := range list.Value {
				log.Debug(x.Cable)

			}
		*/
	}

	kvc := clientv3.NewKV(etcd)
	resp, err := kvc.Txn(context.Background()).
		If(ifs...).
		Then(ops...).
		Commit()

	if err != nil {
		return me.DatabaseError("transaction error", err).Log()
	}
	if !resp.Succeeded {
		return me.DatabaseError("transaction failed", nil).Log()
	}

	return nil

}

func BuildResourceInternet() (*xir.Topology, error) {

	fs, err := storage.ListFacilities()
	if err != nil {
		return nil, err
	}

	// bail if no facilities commissioned
	if len(fs) == 0 {
		return nil, fmt.Errorf("no facilities have been commissioned")
	}

	inet := &xir.Resource{
		Id:    "the-internet",
		Roles: []xir.Role{xir.Role_Gateway},
		Alloc: []xir.AllocMode{xir.AllocMode_NoAlloc},
		NICs: []*xir.NIC{{
			StartingIndex: 0,
		}},
	}

	ri := &xir.Facility{
		Id:        "resource-internet",
		Resources: []*xir.Resource{inet},
	}

	m := make(map[*storage.Facility]*xir.Facility)

	// collect facility models from MinIO

	for _, facility := range fs {

		bucket := fmt.Sprintf("facility-%s", facility.Name)

		obj, err := storage.MinIOClient.GetObject(
			context.TODO(),
			bucket,
			"model",
			minio.GetObjectOptions{},
		)

		if err != nil {
			log.Warnf("failed to fetch model for %s", facility.Name)
			continue
		}
		defer obj.Close()

		buf, err := io.ReadAll(obj)
		if err != nil {
			log.Warnf("failed to read model for %s", facility.Name)
			continue
		}

		model, err := xir.FacilityFromB64String(string(buf))
		if err != nil {
			log.Warnf("failed to parse model for %s", facility.Name)
			continue
		}

		m[facility] = model
	}

	// construct resource internet from facility models

	for _, facility := range fs {

		fields := log.Fields{"facility": facility.Name}

		model := m[facility]

		if model == nil {
			log.Warnf("Facility %s has no model, skipping", facility.Name)
			continue
		}

		var gw []*xir.Resource
		for _, r := range model.Resources {
			r.Facility = facility.Name
			ri.Resources = append(ri.Resources, r)
			if r.HasRole(xir.Role_Gateway) {
				gw = append(gw, r)
			}
		}
		for _, c := range model.Cables {
			c.Facility = facility.Name
			ri.Cables = append(ri.Cables, c)
		}

		if len(gw) == 0 {
			log.WithFields(fields).Warn("facility has no gateway")
		}

		gatewayLinks := 0
		for _, g := range gw {

			for _, n := range g.NICs {
				for _, p := range n.Ports {

					if p.Role != xir.LinkRole_GatewayLink {
						continue
					}
					gatewayLinks++

					ip := &xir.Port{
						Parent: inet.Id,
						Role:   xir.LinkRole_GatewayLink,
					}
					inet.NICs[0].Ports = append(inet.NICs[0].Ports, ip)

					capacity := uint64(0) //TODO
					c := xir.GenericCable(
						fmt.Sprintf("%s-%s", ip.Label(), p.Label()),
						capacity,
						xir.CableKind_CableKind_Undefined,
						xir.ConnectorKind_ConnectorKind_Undefined,
						xir.Layer1_Layer1_Undefined,
					)
					c.Facility = "resource-internet"
					xir.Connect(ip, c.Ends[0].Connectors[0])
					xir.Connect(p, c.Ends[1].Connectors[0])

					ri.Cables = append(ri.Cables, c)

				}
			}

		}

		if gatewayLinks == 0 {
			log.WithFields(fields).Warn("facility has no gateway links")
		}

	}

	return ri.Lift(), nil

}