Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kube: controller lifecycle improvements #636

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions go/Dockerfile.kube-forwarder
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
# syntax=docker/dockerfile:1
FROM golang:1.19-alpine AS builder

WORKDIR /go/src/github.com/moby/vpnkit/go
COPY . /go/src/github.com/moby/vpnkit

RUN apk add --no-cache musl-dev build-base
RUN GOPATH=/go CGO_ENABLED=1 go build -buildmode pie -ldflags "-linkmode=external -s -extldflags \"-fno-PIC -static\"" -o /kube-vpnkit-forwarder /go/src/github.com/moby/vpnkit/go/cmd/kube-vpnkit-forwarder/main.go

# no separate go mod download step because vendoring is in use
COPY . /src
WORKDIR /src

RUN --mount=type=bind,target=. \
--mount=type=cache,target=/root/.cache \
--mount=type=cache,target=/go/pkg/mod \
CGO_ENABLED=1 go build \
-mod=vendor \
-buildmode pie \
-ldflags '-linkmode=external -s -extldflags "-fno-PIC -static"' \
-o /kube-vpnkit-forwarder \
./go/cmd/kube-vpnkit-forwarder

FROM scratch
COPY --link --from=builder /kube-vpnkit-forwarder /kube-vpnkit-forwarder
Expand Down
50 changes: 42 additions & 8 deletions go/cmd/kube-vpnkit-forwarder/main.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
package main

import (
"context"
"flag"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/moby/vpnkit/go/pkg/controller"
"github.com/moby/vpnkit/go/pkg/vpnkit"
log "github.com/sirupsen/logrus"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logrus was being used in the controller but not here - switched for consistency and to add ability to config log level via flag

"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

const defaultLogLevel = log.InfoLevel

var path string
var logLevelName string

func main() {
flag.StringVar(&path, "path", "", "unix socket to vpnkit port forward API")
flag.StringVar(&logLevelName, "log-level", defaultLogLevel.String(), "log output level (error, warn, info, debug)")
flag.Parse()

if logLevel, err := log.ParseLevel(logLevelName); err == nil {
log.SetLevel(logLevel)
} else {
log.SetLevel(defaultLogLevel)
log.Warnf("Using default log level (%s): %v", defaultLogLevel.String(), err)
}

log.Println("Starting kube-vpnkit-forwarder...")

rootCtx := context.Background()

clusterConfig, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err.Error())
Expand All @@ -40,18 +54,38 @@ func main() {
if err != nil {
log.Fatal(err)
}
controller := controller.New(vpnkitClient, clientset.CoreV1())
defer controller.Dispose()

informer.AddEventHandler(controller)
vpnkitController := controller.New(rootCtx, vpnkitClient, clientset.CoreV1())
if _, err := informer.AddEventHandler(vpnkitController); err != nil {
log.Fatal(err)
}

// stop signals to the informer to stop the controllers
// informerDone signals that the informer has actually stopped running
stop := make(chan struct{})
go informer.Run(stop)
informerDone := make(chan struct{})
go func() {
defer close(informerDone)
informer.Run(stop)
}()

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
<-signalChan

log.Println("Shutdown signal received, exiting...")
log.Println("Shutdown signal received")
close(stop)

// allow the informer a chance to stop cleanly
log.Println("Waiting for controller to finish")
select {
case <-time.After(10 * time.Second):
log.Warn("Controller shutdown timed out")
case <-informerDone:
}

// always attempt cleanup, even if the informer didn't stop nicely,
// we can still hopefully unexpose any open ports
log.Println("Cleaning up controller")
cleanupCtx, cancel := context.WithTimeout(rootCtx, 15*time.Second)
defer cancel()
vpnkitController.Dispose(cleanupCtx)
}
37 changes: 27 additions & 10 deletions go/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"sync"

"github.com/moby/vpnkit/go/pkg/vpnkit"
"github.com/pkg/errors"
Expand All @@ -20,34 +21,50 @@ const annotation = "vpnkit-k8s-controller"
type Controller struct {
services corev1client.ServicesGetter
client vpnkit.Client

internalCtx context.Context
cancel context.CancelFunc
}

// New creates a new controller
func New(client vpnkit.Client, services corev1client.ServicesGetter) *Controller {
func New(rootCtx context.Context, client vpnkit.Client, services corev1client.ServicesGetter) *Controller {
internalCtx, cancel := context.WithCancel(rootCtx)
return &Controller{
services: services,
client: client,
internalCtx: internalCtx,
cancel: cancel,
services: services,
client: client,
}
}

var _ cache.ResourceEventHandler = &Controller{}

// Dispose unexpose all ports previously exposed by this controller
func (c *Controller) Dispose() {
ctx := context.Background()
// Dispose unexposes all ports previously exposed by this controller
func (c *Controller) Dispose(ctx context.Context) {
// stop any ongoing operations using the internalCtx
c.cancel()

ports, err := c.client.ListExposed(ctx)
if err != nil {
log.Infof("Cannot list exposed ports: %v", err)
return
}
for _, port := range ports {
var wg sync.WaitGroup
for i := range ports {
port := ports[i]
if port.Annotation != annotation {
continue
}
if err := c.client.Unexpose(ctx, &port); err != nil {
log.Infof("cannot unexpose port: %v", err)
}
wg.Add(1)
go func() {
defer wg.Done()
log.Infof("Unexposing port: %s", port.String())
if err := c.client.Unexpose(ctx, &port); err != nil {
log.Infof("cannot unexpose port: %v", err)
}
}()
}
wg.Wait()
}

// OnAdd exposes port if necessary
Expand Down
Loading