Commit 9581c861 authored by Nikolay Zamulov's avatar Nikolay Zamulov
Browse files

Remove Kubernetes dependency.

parent 57004bcf
Pipeline #342855699 failed with stage
in 1 minute and 28 seconds
......@@ -74,9 +74,9 @@ $ curl -sL http://127.0.01:8080/metrics
# HELP DCGM_FI_DEV_MEMORY_TEMP Memory temperature (in C).
# TYPE DCGM_FI_DEV_MEMORY_TEMP gauge
...
DCGM_FI_DEV_SM_CLOCK{gpu="0", UUID="GPU-604ac76c-d9cf-fef3-62e9-d92044ab6e52",container="",namespace="",pod=""} 139
DCGM_FI_DEV_MEM_CLOCK{gpu="0", UUID="GPU-604ac76c-d9cf-fef3-62e9-d92044ab6e52",container="",namespace="",pod=""} 405
DCGM_FI_DEV_MEMORY_TEMP{gpu="0", UUID="GPU-604ac76c-d9cf-fef3-62e9-d92044ab6e52",container="",namespace="",pod=""} 9223372036854775794
DCGM_FI_DEV_SM_CLOCK{gpu="0", UUID="GPU-604ac76c-d9cf-fef3-62e9-d92044ab6e52"} 139
DCGM_FI_DEV_MEM_CLOCK{gpu="0", UUID="GPU-604ac76c-d9cf-fef3-62e9-d92044ab6e52"} 405
DCGM_FI_DEV_MEMORY_TEMP{gpu="0", UUID="GPU-604ac76c-d9cf-fef3-62e9-d92044ab6e52"} 9223372036854775794
...
```
......
......@@ -8,30 +8,4 @@ require (
github.com/stretchr/testify v1.5.1
github.com/urfave/cli/v2 v2.2.0
golang.org/x/sys v0.0.0-20200413165638-669c56c373c4 // indirect
google.golang.org/grpc v1.28.1
k8s.io/kubernetes v1.18.2
)
replace (
k8s.io/api => k8s.io/api v0.18.2
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.18.2
k8s.io/apimachinery => k8s.io/apimachinery v0.18.2
k8s.io/apiserver => k8s.io/apiserver v0.18.2
k8s.io/cli-runtime => k8s.io/cli-runtime v0.18.2
k8s.io/client-go => k8s.io/client-go v0.18.2
k8s.io/cloud-provider => k8s.io/cloud-provider v0.18.2
k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.18.2
k8s.io/code-generator => k8s.io/code-generator v0.18.2
k8s.io/component-base => k8s.io/component-base v0.18.2
k8s.io/cri-api => k8s.io/cri-api v0.18.2
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.18.2
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.18.2
k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.18.2
k8s.io/kube-proxy => k8s.io/kube-proxy v0.18.2
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.18.2
k8s.io/kubectl => k8s.io/kubectl v0.18.2
k8s.io/kubelet => k8s.io/kubelet v0.18.2
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.18.2
k8s.io/metrics => k8s.io/metrics v0.18.2
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.18.2
)
This diff is collapsed.
/*
* Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"context"
"fmt"
"net"
"os"
"time"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
)
var (
socketDir = "/var/lib/kubelet/pod-resources"
socketPath = socketDir + "/kubelet.sock"
connectionTimeout = 10 * time.Second
)
func NewPodMapper(c *Config) *PodMapper {
logrus.Infof("Kubernetes metrics collection enabled!")
return &PodMapper{
Config: c,
}
}
func (p *PodMapper) Name() string {
return "podMapper"
}
func (p *PodMapper) Process(metrics [][]Metric) error {
_, err := os.Stat(socketPath)
if os.IsNotExist(err) {
logrus.Infof("No Kubelet socket, ignoring")
return nil
}
// TODO: This needs to be moved out of the critical path.
c, cleanup, err := connectToServer(socketPath)
if err != nil {
return err
}
defer cleanup()
pods, err := ListPods(c)
if err != nil {
return err
}
deviceToPod := ToDeviceToPod(pods)
// Note: for loop are copies the value, if we want to change the value
// and not the copy, we need to use the indexes
for i, device := range metrics {
for j, val := range device {
GPUID, err := val.getIDOfType(p.Config.KubernetesGPUIdType)
if err != nil {
return err
}
metrics[i][j].Attributes[podAttribute] = deviceToPod[GPUID].Name
metrics[i][j].Attributes[namespaceAttribute] = deviceToPod[GPUID].Namespace
metrics[i][j].Attributes[containerAttribute] = deviceToPod[GPUID].Container
}
}
return nil
}
func connectToServer(socket string) (*grpc.ClientConn, func(), error) {
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, socket, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return nil, func() {}, fmt.Errorf("failure connecting to %s: %v", socket, err)
}
return conn, func() { conn.Close() }, nil
}
func ListPods(conn *grpc.ClientConn) (*podresourcesapi.ListPodResourcesResponse, error) {
client := podresourcesapi.NewPodResourcesListerClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()
resp, err := client.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
return nil, fmt.Errorf("failure getting pod resources %v", err)
}
return resp, nil
}
func ToDeviceToPod(devicePods *podresourcesapi.ListPodResourcesResponse) map[string]PodInfo {
deviceToPodMap := make(map[string]PodInfo)
for _, pod := range devicePods.GetPodResources() {
for _, container := range pod.GetContainers() {
for _, device := range container.GetDevices() {
if device.GetResourceName() != nvidiaResourceName {
continue
}
podInfo := PodInfo{
Name: pod.GetName(),
Namespace: pod.GetNamespace(),
Container: container.GetName(),
}
for _, uuid := range device.GetDeviceIds() {
deviceToPodMap[uuid] = podInfo
}
}
}
}
return deviceToPodMap
}
/*
* Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/larsn777/gpu-monitoring-tools/bindings/go/dcgm"
"google.golang.org/grpc"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/util"
)
var tmpDir string
func TestProcessPodMapper(t *testing.T) {
cleanup := CreateTmpDir(t)
defer cleanup()
cleanup, err := dcgm.Init(dcgm.Embedded)
require.NoError(t, err)
defer cleanup()
c, cleanup := testDCGMCollector(t, sampleCounters)
defer cleanup()
out, err := c.GetMetrics()
require.NoError(t, err)
original := append(out[:0:0], out...)
socketPath = tmpDir + "/kubelet.sock"
server := grpc.NewServer()
gpus := GetGPUUUIDs(original)
podresourcesapi.RegisterPodResourcesListerServer(server, NewPodResourcesMockServer(gpus))
cleanup = StartMockServer(t, server, socketPath)
defer cleanup()
podMapper := NewPodMapper(&Config{})
err = podMapper.Process(out)
require.NoError(t, err)
require.Len(t, out, len(original))
for i, dev := range out {
for _, metric := range dev {
require.Contains(t, metric.Attributes, podAttribute)
require.Contains(t, metric.Attributes, namespaceAttribute)
require.Contains(t, metric.Attributes, containerAttribute)
// TODO currently we rely on ordering and implicit expectations of the mock implementation
// This should be a table comparison
require.Equal(t, metric.Attributes[podAttribute], fmt.Sprintf("gpu-pod-%d", i))
require.Equal(t, metric.Attributes[namespaceAttribute], "default")
require.Equal(t, metric.Attributes[containerAttribute], "default")
}
}
}
func GetGPUUUIDs(metrics [][]Metric) []string {
gpus := make([]string, len(metrics))
for i, dev := range metrics {
gpus[i] = dev[0].GPUUUID
}
return gpus
}
func StartMockServer(t *testing.T, server *grpc.Server, socket string) func() {
l, err := util.CreateListener("unix://" + socket)
require.NoError(t, err)
stopped := make(chan interface{})
go func() {
server.Serve(l)
close(stopped)
}()
return func() {
server.Stop()
select {
case <-stopped:
return
case <-time.After(1 * time.Second):
t.Fatal("Failed waiting for gRPC server to stop")
}
}
}
func CreateTmpDir(t *testing.T) func() {
path, err := ioutil.TempDir("", "gpu-monitoring-tools")
require.NoError(t, err)
tmpDir = path
return func() {
require.NoError(t, os.RemoveAll(tmpDir))
}
}
// Contains a list of UUIDs
type PodResourcesMockServer struct {
gpus []string
}
func NewPodResourcesMockServer(used []string) *PodResourcesMockServer {
return &PodResourcesMockServer{
gpus: used,
}
}
func (s *PodResourcesMockServer) List(ctx context.Context, req *podresourcesapi.ListPodResourcesRequest) (*podresourcesapi.ListPodResourcesResponse, error) {
podResources := make([]*podresourcesapi.PodResources, len(s.gpus))
for i, gpu := range s.gpus {
podResources[i] = &podresourcesapi.PodResources{
Name: fmt.Sprintf("gpu-pod-%d", i),
Namespace: "default",
Containers: []*podresourcesapi.ContainerResources{
&podresourcesapi.ContainerResources{
Name: "default",
Devices: []*podresourcesapi.ContainerDevices{
&podresourcesapi.ContainerDevices{
ResourceName: nvidiaResourceName,
DeviceIds: []string{gpu},
},
},
},
},
}
}
return &podresourcesapi.ListPodResourcesResponse{
PodResources: podResources,
}, nil
}
......@@ -17,7 +17,6 @@
package main
import (
"fmt"
"os"
"sync"
"syscall"
......@@ -34,8 +33,6 @@ var (
CLIFieldsFile = "collectors"
CLIAddress = "address"
CLICollectInterval = "collect-interval"
CLIKubernetes = "kubernetes"
CLIKubernetesGPUIDType = "kubernetes-gpu-id-type"
)
func main() {
......@@ -66,19 +63,6 @@ func main() {
Usage: "Interval of time at which point metrics are collected. Unit is milliseconds (ms).",
EnvVars: []string{"DCGM_EXPORTER_INTERVAL"},
},
&cli.BoolFlag{
Name: CLIKubernetes,
Aliases: []string{"k"},
Value: false,
Usage: "Enable kubernetes mapping metrics to kubernetes pods",
EnvVars: []string{"DCGM_EXPORTER_KUBERNETES"},
},
&cli.StringFlag{
Name: CLIKubernetesGPUIDType,
Value: string(GPUUID),
Usage: fmt.Sprintf("Choose Type of GPU ID to use to map kubernetes resources to pods. Possible values: '%s', '%s'", GPUUID, DeviceName),
EnvVars: []string{"DCGM_EXPORTER_KUBERNETES_GPU_ID_TYPE"},
},
}
c.Action = func(c *cli.Context) error {
......@@ -150,8 +134,6 @@ restart:
return nil
}
}
return nil
}
func contextToConfig(c *cli.Context) *Config {
......@@ -159,8 +141,6 @@ func contextToConfig(c *cli.Context) *Config {
CollectorsFile: c.String(CLIFieldsFile),
Address: c.String(CLIAddress),
CollectInterval: c.Int(CLICollectInterval),
Kubernetes: c.Bool(CLIKubernetes),
KubernetesGPUIdType: KubernetesGPUIDType(c.String(CLIKubernetesGPUIDType)),
CollectDCP: true,
}
}
......@@ -44,11 +44,6 @@ func NewMetricsPipeline(c *Config) (*MetricsPipeline, func(), error) {
return nil, func() {}, err
}
transformations := []Transform{}
if c.Kubernetes {
transformations = append(transformations, NewPodMapper(c))
}
return &MetricsPipeline{
config: c,
......@@ -56,7 +51,6 @@ func NewMetricsPipeline(c *Config) (*MetricsPipeline, func(), error) {
countersText: countersText,
gpuCollector: gpuCollector,
transformations: transformations,
}, func() {
cleanup()
}, nil
......
......@@ -28,28 +28,17 @@ import (
var (
SkipDCGMValue = "SKIPPING DCGM VALUE"
FailedToConvert = "ERROR - FAILED TO CONVERT TO STRING"
nvidiaResourceName = "nvidia.com/gpu"
// Note standard resource attributes
podAttribute = "pod"
namespaceAttribute = "namespace"
containerAttribute = "container"
)
type KubernetesGPUIDType string
const (
GPUUID KubernetesGPUIDType = "uid"
DeviceName KubernetesGPUIDType = "device-name"
GPUUID string = "uid"
DeviceName string = "device-name"
)
type Config struct {
CollectorsFile string
Address string
CollectInterval int
Kubernetes bool
KubernetesGPUIdType KubernetesGPUIDType
CollectDCP bool
}
......@@ -92,7 +81,7 @@ type Metric struct {
Attributes map[string]string
}
func (m Metric) getIDOfType(idType KubernetesGPUIDType) (string, error) {
func (m Metric) getIDOfType(idType string) (string, error) {
switch idType {
case GPUUID:
return m.GPUUUID, nil
......@@ -116,13 +105,3 @@ type MetricsServer struct {
metrics string
metricsChan chan string
}
type PodMapper struct {
Config *Config
}
type PodInfo struct {
Name string
Namespace string
Container string
}
The MIT License (MIT)
Copyright (c) 2015 Microsoft
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# go-winio
This repository contains utilities for efficiently performing Win32 IO operations in
Go. Currently, this is focused on accessing named pipes and other file handles, and
for using named pipes as a net transport.
This code relies on IO completion ports to avoid blocking IO on system threads, allowing Go
to reuse the thread to schedule another goroutine. This limits support to Windows Vista and
newer operating systems. This is similar to the implementation of network sockets in Go's net
package.
Please see the LICENSE file for licensing information.
This project has adopted the [Microsoft Open Source Code of
Conduct](https://opensource.microsoft.com/codeofconduct/). For more information
see the [Code of Conduct
FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact
[opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional
questions or comments.
Thanks to natefinch for the inspiration for this library. See https://github.com/natefinch/npipe
for another named pipe implementation.
// +build windows
package winio
import (
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"runtime"
"syscall"
"unicode/utf16"
)
//sys backupRead(h syscall.Handle, b []byte, bytesRead *uint32, abort bool, processSecurity bool, context *uintptr) (err error) = BackupRead
//sys backupWrite(h syscall.Handle, b []byte, bytesWritten *uint32, abort bool, processSecurity bool, context *uintptr) (err error) = BackupWrite
const (
BackupData = uint32(iota + 1)
BackupEaData
BackupSecurity
BackupAlternateData
BackupLink
BackupPropertyData
BackupObjectId
BackupReparseData
BackupSparseBlock
BackupTxfsData
)
const (
StreamSparseAttributes = uint32(8)
)
const (
WRITE_DAC = 0x40000
WRITE_OWNER = 0x80000
ACCESS_SYSTEM_SECURITY = 0x1000000
)
// BackupHeader represents a backup stream of a file.
type BackupHeader struct {
Id uint32 // The backup stream ID
Attributes uint32 // Stream attributes
Size int64 // The size of the stream in bytes
Name string // The name of the stream (for BackupAlternateData only).
Offset int64 // The offset of the stream in the file (for BackupSparseBlock only).
}
type win32StreamId struct {
StreamId uint32
Attributes uint32
Size uint64
NameSize uint32
}
// BackupStreamReader reads from a stream produced by the BackupRead Win32 API and produces a series
// of BackupHeader values.
type BackupStreamReader struct {
r io.Reader
bytesLeft int64
}
// NewBackupStreamReader produces a BackupStreamReader from any io.Reader.
func NewBackupStreamReader(r io.Reader) *BackupStreamReader {
return &BackupStreamReader{r, 0}
}
// Next returns the next backup stream and prepares for calls to Read(). It skips the remainder of the current stream if
// it was not completely read.
func (r *BackupStreamReader) Next() (*BackupHeader, error) {
if r.bytesLeft > 0 {
if s, ok := r.r.(io.Seeker); ok {
// Make sure Seek on io.SeekCurrent sometimes succeeds
// before trying the actual seek.
if _, err := s.Seek(0, io.SeekCurrent); err == nil {
if _, err = s.Seek(r.bytesLeft, io.SeekCurrent); err != nil {
return nil, err
}
r.bytesLeft = 0
}
}
if _, err := io.Copy(ioutil.Discard, r); err != nil {
return nil, err
}
}
var wsi win32StreamId
if err := binary.Read(r.r, binary.LittleEndian, &wsi); err != nil {
return nil, err
}