Skip to content

Commit

Permalink
Add kvm plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Sim <[email protected]>
  • Loading branch information
ihcsim committed Sep 2, 2024
1 parent 84ab031 commit 529e843
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 0 deletions.
46 changes: 46 additions & 0 deletions cmd/kvm/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import (
"context"
"errors"
"os"
"os/signal"
"syscall"
"time"

"github.com/ihcsim/kubelet-plugin/pkg/plugins/kvm"
"github.com/rs/zerolog"
)

func main() {
var (
log = logger()
plugin = kvm.NewPlugin(log)
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigCh
log.Info().Msg("received signal, attempting graceful shutdown")
cancel()
}()

if err := plugin.Run(ctx); err != nil {
if !errors.Is(ctx.Err(), context.Canceled) {
log.Error().Err(err).Send()
return
}
}

log.Info().Msg("shutdown completed successfully")
}

func logger() *zerolog.Logger {
w := zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339}
l := zerolog.New(w).With().Timestamp().Logger()
return &l
}
78 changes: 78 additions & 0 deletions pkg/plugins/kvm/device.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package kvm

import (
"context"
"time"

"k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)

const hostDevicePath = "/dev/kvm"

var watchIntervalDuration = 10 * time.Second

func (p *DevicePlugin) ListAndWatch(empty *v1beta1.Empty, stream v1beta1.DevicePlugin_ListAndWatchServer) error {
p.log.Debug().Msg("calling DevicePlugin.ListAndWatch()")
tick := time.NewTicker(watchIntervalDuration)
defer tick.Stop()

for range tick.C {
hasChanged, err := p.discoverDevices()
if err != nil {
return err
}

if !hasChanged {
continue
}

changed := p.cache
resp := &v1beta1.ListAndWatchResponse{
Devices: []*v1beta1.Device{
&v1beta1.Device{
ID: changed.ID,
Health: changed.Health.String(),
},
},
}

p.log.Debug().Any("changeSet", changed).Msg("sending ListAndWatch response")
if err := stream.Send(resp); err != nil {
p.log.Err(err).Msg("failed to send ListAndWatch response")
return err
}
}

return nil
}

func (p *DevicePlugin) Allocate(ctx context.Context, r *v1beta1.AllocateRequest) (*v1beta1.AllocateResponse, error) {
p.log.Debug().Msg("calling DevicePlugin.Allocate()")
resp := &v1beta1.AllocateResponse{}
for _, allocateRequest := range r.ContainerRequests {
car := &v1beta1.ContainerAllocateResponse{}
for _, id := range allocateRequest.DevicesIDs {
p.log.Info().Str("name", id).Msg("allocating CDI device")
car.CDIDevices = append(car.CDIDevices, &v1beta1.CDIDevice{
Name: id,
})
}
resp.ContainerResponses = append(resp.ContainerResponses, car)
}
return resp, nil
}

func (p *DevicePlugin) GetPreferredAllocation(ctx context.Context, r *v1beta1.PreferredAllocationRequest) (*v1beta1.PreferredAllocationResponse, error) {
p.log.Debug().Msg("calling DevicePlugin.GetPreferredAllocation()")
return &v1beta1.PreferredAllocationResponse{}, nil
}

func (p *DevicePlugin) PreStartContainer(ctx context.Context, r *v1beta1.PreStartContainerRequest) (*v1beta1.PreStartContainerResponse, error) {
p.log.Debug().Msg("calling DevicePlugin.PrestartContainer()")
return &v1beta1.PreStartContainerResponse{}, nil
}

func (p *DevicePlugin) GetDevicePluginOptions(context.Context, *v1beta1.Empty) (*v1beta1.DevicePluginOptions, error) {
p.log.Debug().Msg("calling DevicePlugin.GetDevicePluginOptions()")
return &v1beta1.DevicePluginOptions{}, nil
}
56 changes: 56 additions & 0 deletions pkg/plugins/kvm/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package kvm

import (
"os"
"path/filepath"
"time"

"github.com/ihcsim/kubelet-plugin/pkg/plugins"
)

func (p *DevicePlugin) discoverDevices() (bool, error) {
id := resourceName
device := &plugins.Device{
ID: id,
Health: plugins.Healthy,
}
hasChanged := false

// always update cache
defer func() {
p.cache = &plugins.DeviceState{
LastSeenTimestamp: time.Now().Unix(),
Device: device,
}
}()

if _, err := os.Open(hostDevicePath); err != nil {
device.Health = plugins.Unhealthy
hasChanged = true
return hasChanged, err
}

log := p.log.With().
Str("device", id).
Str("health", device.Health.String()).
Str("path", filepath.Join(hostDevicePath, device.ID)).
Logger()

// add new device's state to cache
if p.cache == nil {
log.Info().Msg("found new device")
hasChanged = true
return hasChanged, nil
}

lastSeenState := p.cache
if lastSeenState.Health == device.Health {
log.Info().
Str("before", lastSeenState.Health.String()).
Time("last seen", time.Unix(lastSeenState.LastSeenTimestamp, 0)).
Msg("device health changed")
hasChanged = true
}

return hasChanged, nil
}
91 changes: 91 additions & 0 deletions pkg/plugins/kvm/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package kvm

import (
"context"
"errors"
"fmt"

"github.com/ihcsim/kubelet-plugin/pkg/plugins"
"github.com/rs/zerolog"
"k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)

const (
socketName = "kvm.sock"
resourceName = "github.com.ihcsim/kvm"
)

var (
_ v1beta1.DevicePluginServer = &DevicePlugin{}
socketPath = v1beta1.DevicePluginPath + socketName
)

type DevicePlugin struct {
cache *plugins.DeviceState
server *plugins.Server
log *zerolog.Logger
}

func NewPlugin(log *zerolog.Logger) *DevicePlugin {
server := plugins.NewServer(socketPath)
plugin := &DevicePlugin{
server: server,
log: log,
}

v1beta1.RegisterDevicePluginServer(server.GRPC, plugin)

plugin.log.Info().Msg("plugin initialized")
return plugin
}

func (p *DevicePlugin) Run(ctx context.Context) error {
// errCh is used to collect errors from goroutines and
// handle them in Run().
var (
runErr error
errCh = make(chan error)
)
defer close(errCh)
go func() {
for err := range errCh {
if err != nil {
p.log.Err(err).Msg("error reported by goroutine")
runErr = errors.Join(runErr, err)
}
}
}()

if err := p.server.GRPCServe(ctx, errCh); err != nil {
return err
}

if err := p.server.GRPCReady(ctx); err != nil {
return err
}
p.log.Info().Str("addr", socketPath).Msg("grpc server ready")

kubeletAddr := fmt.Sprintf("unix://%s", v1beta1.KubeletSocket)
if err := plugins.RegisterWithKubelet(ctx, socketName, resourceName, kubeletAddr); err != nil {
return err
}
p.log.Info().Str("addr", socketPath).Msg("plugin registered with kubelet")

restart := func() error {
if err := p.server.GRPCServe(ctx, errCh); err != nil {
return err
}

return plugins.RegisterWithKubelet(ctx, socketName, resourceName, kubeletAddr)
}

closeHandler, err := plugins.RegisterWithRestartHandler(ctx, restart, socketPath, p.log, errCh)
if err != nil {
return err
}
defer closeHandler()
p.log.Info().Str("addr", socketPath).Msg("restart handler configured")

<-ctx.Done()
return runErr
}
13 changes: 13 additions & 0 deletions yaml/butane.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# use to generate ignition config of flatcar linux
variant: flatcar
version: 1.0.0
storage:
files:
- path: /etc/hostname
contents:
inline: "flatcar-01"
passwd:
users:
- name: core
ssh_authorized_keys:
- "${SSH_PUB_KEY}"

0 comments on commit 529e843

Please sign in to comment.