Select Git revision
-
Christopher Tran authoredChristopher Tran authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
alloc.go 9.50 KiB
package realize
import (
"context"
"fmt"
"io"
"strings"
"github.com/minio/minio-go/v7"
log "github.com/sirupsen/logrus"
api "gitlab.com/mergetb/api/portal/v1/go"
me "gitlab.com/mergetb/portal/services/pkg/merror"
"gitlab.com/mergetb/portal/services/pkg/storage"
xir "gitlab.com/mergetb/xir/v0.3/go"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto"
)
/*=============================================================================
* Allocation Data Mangement
* -------------------------
*
* The allocation service handles requests for multiple realization services
* that are competing for the same resources. Thus the allocation service must
* detect and return errors when collisions occur between realization service
* replicas competing for the same resources.
*
* The basic data layout in etcd is the following
*
* /alloc/resource/
* - id: (revision, allocation-list)
*
* When resource allocation requests come in from realization services, there
* is a revision tag on every ResourceAllocationList object. In the event that
* the revision of a ResourceAllocationList is less than what is currently in
* etcd, the request must be rejected as this indicates the data has changed
* since the realization service read the allocation table.
*
* The same basic situation exists for cables
*
* /alloc/cable/
* - id: (revision, allocation-list)
*
* The allocation service must
* - Stamp all resource lists with a revision on fetch requests
* - Ensure that lesser versions can never overwrite greater versions
*
* The implementation of this service uses the etcd key revision as the
* tracker. When data is fetched from etcd, the key revision is added to the
* protobuf. Likewise when data is pushed, the revision in the protobuf
* received is set as a key-revision precondition on the etcd transaction that
* mutates the data store.
*
* TODO: create secondary index keyed on realization id, so dealloc does not
* need to read the entire table.
*===========================================================================*/
/*=============================================================================
* Allocation Data Mangement
* -------------------------
*
* /alloc/resource/:mzid/:node
* /alloc/cable/:mzid/:link
*
*/
func ReadTable() (*api.AllocationTable, error) {
etcd := storage.EtcdClient
at := api.NewAllocationTable()
err := readResources(at, etcd)
if err != nil {
return nil, err
}
err = readCables(at, etcd)
if err != nil {
return nil, err
}
return at, nil
}
func readResources(at *api.AllocationTable, etcd *clientv3.Client) error {
resp, err := etcd.Get(
context.Background(), "/alloc/resource/", clientv3.WithPrefix())
if err != nil {
return me.DatabaseError("db error", err).Log()
}
//log.WithFields(log.Fields{"entries": len(resp.Kvs)}).Info("table read")
for _, x := range resp.Kvs {
l := new(api.ResourceAllocationList)
err := proto.Unmarshal(x.Value, l)
if err != nil {
log.WithFields(log.Fields{
"key": string(x.Key),
}).Warn("corrupt resource allocation encountered")
continue
}
l.Revision = x.Version
//log.Infof("%s @ %d", x.Key, x.Version)
rs := strings.Split(string(x.Key), "/")[3]
at.Resource[rs] = l
}
return nil
}
func readCables(at *api.AllocationTable, etcd *clientv3.Client) error {
resp, err := etcd.Get(
context.Background(), "/alloc/cable/", clientv3.WithPrefix())
if err != nil {
return me.DatabaseError("db error", err).Log()
}
//log.WithFields(log.Fields{"entries": len(resp.Kvs)}).Info("table read")
for _, x := range resp.Kvs {
l := new(api.CableAllocationList)
err := proto.Unmarshal(x.Value, l)
if err != nil {
log.WithFields(log.Fields{
"key": string(x.Key),
}).Warn("corrupt resource allocation encountered")
continue
}
l.Revision = x.Version
cb := strings.Split(string(x.Key), "/")[3]
at.Cable[cb] = l
}
return nil
}
func Alloc(new, current *api.AllocationTable) (*api.AllocationTable, error) {
// resources
for k, newV := range new.Resource {
currentV, ok := current.Resource[k]
if ok {
currentV.Value = append(currentV.Value, newV.Value...)
//currentV.Revision = newV.Revision
//newV.Revision = currentV.Revision
} else {
current.Resource[k] = newV
}
}
// cables
for k, newV := range new.Cable {
currentV, ok := current.Cable[k]
if ok {
currentV.Value = append(currentV.Value, newV.Value...)
//currentV.Revision = newV.Revision
//newV.Revision = currentV.Revision
} else {
current.Cable[k] = newV
}
}
return current, nil
}
func Dealloc(current *api.AllocationTable, mzid string) (*api.AllocationTable, error) {
a := api.NewAllocationTable()
// construct a new table leaving out this mzid
for name, alloc := range current.Resource {
a.Resource[name] = &api.ResourceAllocationList{
Revision: alloc.Revision,
}
}
for name, allocations := range current.Resource {
for _, allocation := range allocations.Value {
if allocation.Mzid != mzid {
a.AllocateResource(name, allocation)
}
}
}
for name, alloc := range current.Cable {
a.Cable[name] = &api.CableAllocationList{
Revision: alloc.Revision,
}
}
for name, allocations := range current.Cable {
for _, allocation := range allocations.Value {
if allocation.Mzid != mzid {
a.AllocateCable(name, allocation)
}
}
}
return a, nil
}
func rakey(s string) string {
return fmt.Sprintf("/alloc/resource/%s", s)
}
func cakey(s string) string {
return fmt.Sprintf("/alloc/cable/%s", s)
}
func WriteTable(a *api.AllocationTable) error {
etcd := storage.EtcdClient
var ops []clientv3.Op
var ifs []clientv3.Cmp
for resource, list := range a.Resource {
buf, err := proto.Marshal(list)
if err != nil {
return me.DataCorruptionError(resource, err).Log()
}
key := rakey(resource)
//log.Infof("write %s @ %d #%d", key, list.Revision, len(list.Value))
ifs = append(ifs, clientv3.Compare(clientv3.Version(key), "=", list.Revision))
ops = append(ops, clientv3.OpPut(key, string(buf)))
/*
for _, x := range list.Value {
log.Debug(x.Resource)
}
*/
}
for cable, list := range a.Cable {
buf, err := proto.Marshal(list)
if err != nil {
return me.DataCorruptionError(cable, err).Log()
}
key := cakey(cable)
ifs = append(ifs, clientv3.Compare(clientv3.Version(key), "=", list.Revision))
ops = append(ops, clientv3.OpPut(key, string(buf)))
/*
for _, x := range list.Value {
log.Debug(x.Cable)
}
*/
}
kvc := clientv3.NewKV(etcd)
resp, err := kvc.Txn(context.Background()).
If(ifs...).
Then(ops...).
Commit()
if err != nil {
return me.DatabaseError("transaction error", err).Log()
}
if !resp.Succeeded {
return me.DatabaseError("transaction failed", nil).Log()
}
return nil
}
func BuildResourceInternet() (*xir.Topology, error) {
fs, err := storage.ListFacilities()
if err != nil {
return nil, err
}
// bail if no facilities commissioned
if len(fs) == 0 {
return nil, fmt.Errorf("no facilities have been commissioned")
}
inet := &xir.Resource{
Id: "the-internet",
Roles: []xir.Role{xir.Role_Gateway},
Alloc: []xir.AllocMode{xir.AllocMode_NoAlloc},
NICs: []*xir.NIC{{
StartingIndex: 0,
}},
}
ri := &xir.Facility{
Id: "resource-internet",
Resources: []*xir.Resource{inet},
}
m := make(map[*storage.Facility]*xir.Facility)
// collect facility models from MinIO
for _, facility := range fs {
bucket := fmt.Sprintf("facility-%s", facility.Name)
obj, err := storage.MinIOClient.GetObject(
context.TODO(),
bucket,
"model",
minio.GetObjectOptions{},
)
if err != nil {
log.Warnf("failed to fetch model for %s", facility.Name)
continue
}
defer obj.Close()
buf, err := io.ReadAll(obj)
if err != nil {
log.Warnf("failed to read model for %s", facility.Name)
continue
}
model, err := xir.FacilityFromB64String(string(buf))
if err != nil {
log.Warnf("failed to parse model for %s", facility.Name)
continue
}
m[facility] = model
}
// construct resource internet from facility models
for _, facility := range fs {
fields := log.Fields{"facility": facility.Name}
model := m[facility]
if model == nil {
log.Warnf("Facility %s has no model, skipping", facility.Name)
continue
}
var gw []*xir.Resource
for _, r := range model.Resources {
r.Facility = facility.Name
ri.Resources = append(ri.Resources, r)
if r.HasRole(xir.Role_Gateway) {
gw = append(gw, r)
}
}
for _, c := range model.Cables {
c.Facility = facility.Name
ri.Cables = append(ri.Cables, c)
}
if len(gw) == 0 {
log.WithFields(fields).Warn("facility has no gateway")
}
gatewayLinks := 0
for _, g := range gw {
for _, n := range g.NICs {
for _, p := range n.Ports {
if p.Role != xir.LinkRole_GatewayLink {
continue
}
gatewayLinks++
ip := &xir.Port{
Parent: inet.Id,
Role: xir.LinkRole_GatewayLink,
}
inet.NICs[0].Ports = append(inet.NICs[0].Ports, ip)
capacity := uint64(0) //TODO
c := xir.GenericCable(
fmt.Sprintf("%s-%s", ip.Label(), p.Label()),
capacity,
xir.CableKind_CableKind_Undefined,
xir.ConnectorKind_ConnectorKind_Undefined,
xir.Layer1_Layer1_Undefined,
)
c.Facility = "resource-internet"
xir.Connect(ip, c.Ends[0].Connectors[0])
xir.Connect(p, c.Ends[1].Connectors[0])
ri.Cables = append(ri.Cables, c)
}
}
}
if gatewayLinks == 0 {
log.WithFields(fields).Warn("facility has no gateway links")
}
}
return ri.Lift(), nil
}