From e786d72a68c4d989b90272b3f2abcee403d54993 Mon Sep 17 00:00:00 2001 From: Milas Bowman Date: Fri, 12 May 2023 13:16:26 -0400 Subject: [PATCH] kube: controller lifecycle improvements * Optimize the `Dockerfile` to use cache mounts * Use the `context.Context` during HTTP operations * Add an internal context for normal controller operations * Sequence shutdown to wait for informer to stop and then attempt to unexpose any ports before exiting * Unexpose ports in parallel, we just log out all the errors regardless Signed-off-by: Milas Bowman --- go/Dockerfile.kube-forwarder | 18 +- go/cmd/kube-vpnkit-forwarder/main.go | 50 +++- go/pkg/controller/controller.go | 37 ++- go/pkg/controller/controller_test.go | 348 +++++++++++++++------------ go/pkg/vpnkit/client.go | 24 +- 5 files changed, 291 insertions(+), 186 deletions(-) diff --git a/go/Dockerfile.kube-forwarder b/go/Dockerfile.kube-forwarder index 25076d56f..bdd133e88 100644 --- a/go/Dockerfile.kube-forwarder +++ b/go/Dockerfile.kube-forwarder @@ -1,11 +1,21 @@ # syntax=docker/dockerfile:1 FROM golang:1.19-alpine AS builder -WORKDIR /go/src/github.com/moby/vpnkit/go -COPY . /go/src/github.com/moby/vpnkit - RUN apk add --no-cache musl-dev build-base -RUN GOPATH=/go CGO_ENABLED=1 go build -buildmode pie -ldflags "-linkmode=external -s -extldflags \"-fno-PIC -static\"" -o /kube-vpnkit-forwarder /go/src/github.com/moby/vpnkit/go/cmd/kube-vpnkit-forwarder/main.go + +# no separate go mod download step because vendoring is in use +COPY . /src +WORKDIR /src + +RUN --mount=type=bind,target=. \ + --mount=type=cache,target=/root/.cache \ + --mount=type=cache,target=/go/pkg/mod \ + CGO_ENABLED=1 go build \ + -mod=vendor \ + -buildmode pie \ + -ldflags '-linkmode=external -s -extldflags "-fno-PIC -static"' \ + -o /kube-vpnkit-forwarder \ + ./go/cmd/kube-vpnkit-forwarder FROM scratch COPY --link --from=builder /kube-vpnkit-forwarder /kube-vpnkit-forwarder diff --git a/go/cmd/kube-vpnkit-forwarder/main.go b/go/cmd/kube-vpnkit-forwarder/main.go index 8e08c245c..6050fab7e 100644 --- a/go/cmd/kube-vpnkit-forwarder/main.go +++ b/go/cmd/kube-vpnkit-forwarder/main.go @@ -1,8 +1,8 @@ package main import ( + "context" "flag" - "log" "os" "os/signal" "syscall" @@ -10,19 +10,33 @@ import ( "github.com/moby/vpnkit/go/pkg/controller" "github.com/moby/vpnkit/go/pkg/vpnkit" + log "github.com/sirupsen/logrus" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) +const defaultLogLevel = log.InfoLevel + var path string +var logLevelName string func main() { flag.StringVar(&path, "path", "", "unix socket to vpnkit port forward API") + flag.StringVar(&logLevelName, "log-level", defaultLogLevel.String(), "log output level (error, warn, info, debug)") flag.Parse() + if logLevel, err := log.ParseLevel(logLevelName); err == nil { + log.SetLevel(logLevel) + } else { + log.SetLevel(defaultLogLevel) + log.Warnf("Using default log level (%s): %v", defaultLogLevel.String(), err) + } + log.Println("Starting kube-vpnkit-forwarder...") + rootCtx := context.Background() + clusterConfig, err := rest.InClusterConfig() if err != nil { log.Fatal(err.Error()) @@ -40,18 +54,38 @@ func main() { if err != nil { log.Fatal(err) } - controller := controller.New(vpnkitClient, clientset.CoreV1()) - defer controller.Dispose() - - informer.AddEventHandler(controller) + vpnkitController := controller.New(rootCtx, vpnkitClient, clientset.CoreV1()) + if _, err := informer.AddEventHandler(vpnkitController); err != nil { + log.Fatal(err) + } + // stop signals to the informer to stop the controllers + // informerDone signals that the informer has actually stopped running stop := make(chan struct{}) - go informer.Run(stop) + informerDone := make(chan struct{}) + go func() { + defer close(informerDone) + informer.Run(stop) + }() signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) <-signalChan - - log.Println("Shutdown signal received, exiting...") + log.Println("Shutdown signal received") close(stop) + + // allow the informer a chance to stop cleanly + log.Println("Waiting for controller to finish") + select { + case <-time.After(10 * time.Second): + log.Warn("Controller shutdown timed out") + case <-informerDone: + } + + // always attempt cleanup, even if the informer didn't stop nicely, + // we can still hopefully unexpose any open ports + log.Println("Cleaning up controller") + cleanupCtx, cancel := context.WithTimeout(rootCtx, 15*time.Second) + defer cancel() + vpnkitController.Dispose(cleanupCtx) } diff --git a/go/pkg/controller/controller.go b/go/pkg/controller/controller.go index d41bf51ce..a4d613f24 100644 --- a/go/pkg/controller/controller.go +++ b/go/pkg/controller/controller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "sync" "github.com/moby/vpnkit/go/pkg/vpnkit" "github.com/pkg/errors" @@ -20,34 +21,50 @@ const annotation = "vpnkit-k8s-controller" type Controller struct { services corev1client.ServicesGetter client vpnkit.Client + + internalCtx context.Context + cancel context.CancelFunc } // New creates a new controller -func New(client vpnkit.Client, services corev1client.ServicesGetter) *Controller { +func New(rootCtx context.Context, client vpnkit.Client, services corev1client.ServicesGetter) *Controller { + internalCtx, cancel := context.WithCancel(rootCtx) return &Controller{ - services: services, - client: client, + internalCtx: internalCtx, + cancel: cancel, + services: services, + client: client, } } var _ cache.ResourceEventHandler = &Controller{} -// Dispose unexpose all ports previously exposed by this controller -func (c *Controller) Dispose() { - ctx := context.Background() +// Dispose unexposes all ports previously exposed by this controller +func (c *Controller) Dispose(ctx context.Context) { + // stop any ongoing operations using the internalCtx + c.cancel() + ports, err := c.client.ListExposed(ctx) if err != nil { log.Infof("Cannot list exposed ports: %v", err) return } - for _, port := range ports { + var wg sync.WaitGroup + for i := range ports { + port := ports[i] if port.Annotation != annotation { continue } - if err := c.client.Unexpose(ctx, &port); err != nil { - log.Infof("cannot unexpose port: %v", err) - } + wg.Add(1) + go func() { + defer wg.Done() + log.Infof("Unexposing port: %s", port.String()) + if err := c.client.Unexpose(ctx, &port); err != nil { + log.Infof("cannot unexpose port: %v", err) + } + }() } + wg.Wait() } // OnAdd exposes port if necessary diff --git a/go/pkg/controller/controller_test.go b/go/pkg/controller/controller_test.go index 90dd7f581..3ac759cb7 100644 --- a/go/pkg/controller/controller_test.go +++ b/go/pkg/controller/controller_test.go @@ -17,9 +17,10 @@ import ( ) func TestNodePortService(t *testing.T) { + ctx := testContext(t) client := mockVpnKitClient{} kubeClient := kubernetes.NewSimpleClientset() - controller := New(&client, kubeClient.CoreV1()) + controller := New(ctx, &client, kubeClient.CoreV1()) service := v1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: "ns1", @@ -37,50 +38,53 @@ func TestNodePortService(t *testing.T) { ClusterIP: "10.0.0.1", }, } - - controller.OnAdd(&service) - - assert.EqualValues(t, client.exposed, []vpnkit.Port{ - { - Proto: vpnkit.TCP, - OutIP: net.ParseIP("0.0.0.0"), - OutPort: 8080, - InIP: net.ParseIP("10.0.0.1"), - InPort: 8080, - Annotation: annotation, - }, - }) - assert.Contains(t, kubeClient.Fake.Actions(), core.NewUpdateSubresourceAction( - schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}, - "status", - "ns1", - &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns1", - Name: "service1", + controller.OnAdd(&service, true) + + assert.EqualValues( + t, client.exposed, []vpnkit.Port{ + { + Proto: vpnkit.TCP, + OutIP: net.ParseIP("0.0.0.0"), + OutPort: 8080, + InIP: net.ParseIP("10.0.0.1"), + InPort: 8080, + Annotation: annotation, }, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeNodePort, - Ports: []v1.ServicePort{ - { - Protocol: v1.ProtocolTCP, - Port: 8080, - NodePort: 8080, - }, + }, + ) + assert.Contains( + t, kubeClient.Fake.Actions(), core.NewUpdateSubresourceAction( + schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}, + "status", + "ns1", + &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + Name: "service1", }, - ClusterIP: "10.0.0.1", - }, - Status: v1.ServiceStatus{ - LoadBalancer: v1.LoadBalancerStatus{ - Ingress: []v1.LoadBalancerIngress{ + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeNodePort, + Ports: []v1.ServicePort{ { - Hostname: "localhost", + Protocol: v1.ProtocolTCP, + Port: 8080, + NodePort: 8080, + }, + }, + ClusterIP: "10.0.0.1", + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{ + { + Hostname: "localhost", + }, }, }, }, }, - }, - )) + ), + ) controller.OnDelete(&service) assert.Len(t, client.exposed, 0) @@ -88,7 +92,8 @@ func TestNodePortService(t *testing.T) { func TestLoadBalancerService(t *testing.T) { client := mockVpnKitClient{} - controller := New(&client, kubernetes.NewSimpleClientset().CoreV1()) + ctx := testContext(t) + controller := New(ctx, &client, kubernetes.NewSimpleClientset().CoreV1()) service := v1.Service{ Spec: v1.ServiceSpec{ @@ -105,24 +110,27 @@ func TestLoadBalancerService(t *testing.T) { ClusterIP: "10.96.48.189", }, } - - controller.OnAdd(&service) - assert.EqualValues(t, client.exposed, []vpnkit.Port{ - { - Proto: vpnkit.TCP, - OutIP: net.ParseIP("0.0.0.0"), - OutPort: 80, - InIP: net.ParseIP("10.96.48.189"), - InPort: 80, - Annotation: annotation, + controller.OnAdd(&service, true) + + assert.EqualValues( + t, client.exposed, []vpnkit.Port{ + { + Proto: vpnkit.TCP, + OutIP: net.ParseIP("0.0.0.0"), + OutPort: 80, + InIP: net.ParseIP("10.96.48.189"), + InPort: 80, + Annotation: annotation, + }, }, - }) + ) } func TestAddTwice(t *testing.T) { + ctx := testContext(t) client := mockVpnKitClient{} kubeClient := kubernetes.NewSimpleClientset() - controller := New(&client, kubeClient.CoreV1()) + controller := New(ctx, &client, kubeClient.CoreV1()) service := v1.Service{ Spec: v1.ServiceSpec{ @@ -139,81 +147,91 @@ func TestAddTwice(t *testing.T) { }, } - controller.OnAdd(&service) + controller.OnAdd(&service, true) controller.OnUpdate(&service, &service) assert.Len(t, client.exposed, 1) assert.Len(t, kubeClient.Fake.Actions(), 1) } func TestOverlappingPorts(t *testing.T) { + ctx := testContext(t) client := mockVpnKitClient{} - controller := New(&client, kubernetes.NewSimpleClientset().CoreV1()) + controller := New(ctx, &client, kubernetes.NewSimpleClientset().CoreV1()) - controller.OnAdd(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns1", - Name: "service1", - }, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeLoadBalancer, - Ports: []v1.ServicePort{ - { - Name: "web", - Protocol: v1.ProtocolTCP, - Port: 80, - NodePort: 30185, + controller.OnAdd( + &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + Name: "service1", + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeLoadBalancer, + Ports: []v1.ServicePort{ + { + Name: "web", + Protocol: v1.ProtocolTCP, + Port: 80, + NodePort: 30185, + }, }, + ClusterIP: "10.96.48.189", }, - ClusterIP: "10.96.48.189", }, - }) + true, + ) - controller.OnAdd(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns1", - Name: "service2", - }, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeLoadBalancer, - Ports: []v1.ServicePort{ - { - Name: "http", - Protocol: v1.ProtocolTCP, - Port: 80, - NodePort: 12345, - }, - { - Name: "https", - Protocol: v1.ProtocolTCP, - Port: 443, - NodePort: 12346, + controller.OnAdd( + &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + Name: "service2", + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeLoadBalancer, + Ports: []v1.ServicePort{ + { + Name: "http", + Protocol: v1.ProtocolTCP, + Port: 80, + NodePort: 12345, + }, + { + Name: "https", + Protocol: v1.ProtocolTCP, + Port: 443, + NodePort: 12346, + }, }, + ClusterIP: "10.96.48.190", }, - ClusterIP: "10.96.48.190", }, - }) - - assert.EqualValues(t, client.exposed, []vpnkit.Port{ - { - Proto: vpnkit.TCP, - OutIP: net.ParseIP("0.0.0.0"), - OutPort: 80, - InIP: net.ParseIP("10.96.48.189"), - InPort: 80, - Annotation: annotation, - }, - { - Proto: vpnkit.TCP, - OutIP: net.ParseIP("0.0.0.0"), - OutPort: 443, - InIP: net.ParseIP("10.96.48.190"), - InPort: 443, - Annotation: annotation, + false, + ) + + assert.EqualValues( + t, client.exposed, []vpnkit.Port{ + { + Proto: vpnkit.TCP, + OutIP: net.ParseIP("0.0.0.0"), + OutPort: 80, + InIP: net.ParseIP("10.96.48.189"), + InPort: 80, + Annotation: annotation, + }, + { + Proto: vpnkit.TCP, + OutIP: net.ParseIP("0.0.0.0"), + OutPort: 443, + InIP: net.ParseIP("10.96.48.190"), + InPort: 443, + Annotation: annotation, + }, }, - }) + ) } func TestControllerDispose(t *testing.T) { + ctx := testContext(t) client := mockVpnKitClient{} otherPort := vpnkit.Port{ Proto: "unix", @@ -221,55 +239,63 @@ func TestControllerDispose(t *testing.T) { OutPath: "/var/run/docker.sock", } client.Expose(context.Background(), &otherPort) - controller := New(&client, kubernetes.NewSimpleClientset().CoreV1()) + controller := New(ctx, &client, kubernetes.NewSimpleClientset().CoreV1()) - controller.OnAdd(&v1.Service{ - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeNodePort, - Ports: []v1.ServicePort{ - { - Protocol: v1.ProtocolTCP, - Port: 8080, - NodePort: 8080, + controller.OnAdd( + &v1.Service{ + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeNodePort, + Ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolTCP, + Port: 8080, + NodePort: 8080, + }, }, + ClusterIP: "10.0.0.1", }, - ClusterIP: "10.0.0.1", }, - }) + true, + ) assert.Equal(t, 2, len(client.exposed)) - controller.Dispose() + controller.Dispose(ctx) assert.Equal(t, 1, len(client.exposed)) assert.EqualValues(t, client.exposed, []vpnkit.Port{otherPort}) } func TestDiscardClusterIPService(t *testing.T) { + ctx := testContext(t) client := mockVpnKitClient{} - controller := New(&client, kubernetes.NewSimpleClientset().CoreV1()) + controller := New(ctx, &client, kubernetes.NewSimpleClientset().CoreV1()) - controller.OnAdd(&v1.Service{ - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeClusterIP, - Ports: []v1.ServicePort{ - { - Name: "web", - Protocol: v1.ProtocolTCP, - Port: 8080, + controller.OnAdd( + &v1.Service{ + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeClusterIP, + Ports: []v1.ServicePort{ + { + Name: "web", + Protocol: v1.ProtocolTCP, + Port: 8080, + }, }, + ClusterIP: "10.0.0.1", }, - ClusterIP: "10.0.0.1", }, - }) + true, + ) assert.Len(t, client.exposed, 0) } func TestCloseUnusedPortsAfterUpdate(t *testing.T) { + ctx := testContext(t) client := mockVpnKitClient{} kubeClient := kubernetes.NewSimpleClientset() - controller := New(&client, kubeClient.CoreV1()) + controller := New(ctx, &client, kubeClient.CoreV1()) source := v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -288,36 +314,40 @@ func TestCloseUnusedPortsAfterUpdate(t *testing.T) { ClusterIP: "10.0.0.1", }, } - controller.OnAdd(&source) + controller.OnAdd(&source, true) - controller.OnUpdate(&source, &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns1", - Name: "service1", - }, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeNodePort, - Ports: []v1.ServicePort{ - { - Protocol: v1.ProtocolTCP, - Port: 9090, - NodePort: 9090, + controller.OnUpdate( + &source, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + Name: "service1", + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeNodePort, + Ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolTCP, + Port: 9090, + NodePort: 9090, + }, }, + ClusterIP: "10.0.0.2", }, - ClusterIP: "10.0.0.2", }, - }) - - assert.EqualValues(t, client.exposed, []vpnkit.Port{ - { - Proto: vpnkit.TCP, - OutIP: net.ParseIP("0.0.0.0"), - OutPort: 9090, - InIP: net.ParseIP("10.0.0.2"), - InPort: 9090, - Annotation: annotation, + ) + + assert.EqualValues( + t, client.exposed, []vpnkit.Port{ + { + Proto: vpnkit.TCP, + OutIP: net.ParseIP("0.0.0.0"), + OutPort: 9090, + InIP: net.ParseIP("10.0.0.2"), + InPort: 9090, + Annotation: annotation, + }, }, - }) + ) } type mockVpnKitClient struct { @@ -347,3 +377,9 @@ func (c *mockVpnKitClient) ListExposed(_ context.Context) ([]vpnkit.Port, error) func (c *mockVpnKitClient) DumpState(_ context.Context, _ io.Writer) error { return nil } + +func testContext(t testing.TB) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + return ctx +} diff --git a/go/pkg/vpnkit/client.go b/go/pkg/vpnkit/client.go index ecdc00b87..a7568a260 100644 --- a/go/pkg/vpnkit/client.go +++ b/go/pkg/vpnkit/client.go @@ -50,7 +50,7 @@ func (e *ExposeError) Error() string { return e.Message } -func (h *httpClient) Expose(_ context.Context, port *Port) error { +func (h *httpClient) Expose(ctx context.Context, port *Port) error { var buf bytes.Buffer enc := json.NewEncoder(&buf) if err := enc.Encode(port); err != nil { @@ -60,7 +60,7 @@ func (h *httpClient) Expose(_ context.Context, port *Port) error { if port.Proto == Unix { path = ExposePipePath } - request, err := http.NewRequest("PUT", "http://unix"+path, &buf) + request, err := http.NewRequestWithContext(ctx, http.MethodPut, "http://unix"+path, &buf) if err != nil { return err } @@ -85,7 +85,7 @@ func (h *httpClient) Expose(_ context.Context, port *Port) error { return nil } -func (h *httpClient) Unexpose(_ context.Context, port *Port) error { +func (h *httpClient) Unexpose(ctx context.Context, port *Port) error { var buf bytes.Buffer enc := json.NewEncoder(&buf) if err := enc.Encode(port); err != nil { @@ -95,7 +95,7 @@ func (h *httpClient) Unexpose(_ context.Context, port *Port) error { if port.Proto == Unix { path = UnexposePipePath } - request, err := http.NewRequest("DELETE", "http://unix"+path, &buf) + request, err := http.NewRequestWithContext(ctx, http.MethodDelete, "http://unix"+path, &buf) if err != nil { return err } @@ -111,8 +111,12 @@ func (h *httpClient) Unexpose(_ context.Context, port *Port) error { return nil } -func (h *httpClient) ListExposed(context.Context) ([]Port, error) { - res, err := h.client.Get("http://unix" + ListPath) +func (h *httpClient) ListExposed(ctx context.Context) ([]Port, error) { + request, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://unix"+ListPath, nil) + if err != nil { + return nil, err + } + res, err := h.client.Do(request) if err != nil { fmt.Printf("GET failed with %v\n", err) return nil, err @@ -129,8 +133,12 @@ func (h *httpClient) ListExposed(context.Context) ([]Port, error) { return ports, nil } -func (h *httpClient) DumpState(_ context.Context, w io.Writer) error { - res, err := h.client.Get("http://unix" + DumpStatePath) +func (h *httpClient) DumpState(ctx context.Context, w io.Writer) error { + request, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://unix"+DumpStatePath, nil) + if err != nil { + return err + } + res, err := h.client.Do(request) if err != nil { fmt.Printf("GET failed with %v\n", err) return err