Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Runner that watches for Kubernetes APIs #4036

Merged
merged 5 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ linters-settings:
- pkg: github.com/crunchydata/postgres-operator/internal/testing/*
desc: The "internal/testing" packages should be used only in tests.

- pkg: k8s.io/client-go/discovery
desc: Use the "internal/kubernetes" package instead.

tests:
files: ['$test']
deny:
Expand Down Expand Up @@ -93,6 +96,11 @@ linters-settings:
issues:
exclude-generated: strict
exclude-rules:
# This internal package is the one place we want to do API discovery.
- linters: [depguard]
path: internal/kubernetes/discovery.go
text: k8s.io/client-go/discovery

# These value types have unmarshal methods.
# https://github.com/raeperd/recvcheck/issues/7
- linters: [recvcheck]
Expand Down
58 changes: 14 additions & 44 deletions cmd/postgres-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"go.opentelemetry.io/otel"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/healthz"

Expand All @@ -28,6 +27,7 @@ import (
"github.com/crunchydata/postgres-operator/internal/controller/standalone_pgadmin"
"github.com/crunchydata/postgres-operator/internal/feature"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/kubernetes"
"github.com/crunchydata/postgres-operator/internal/logging"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/internal/registration"
Expand Down Expand Up @@ -146,6 +146,12 @@ func main() {
// deprecation warnings when using an older version of a resource for backwards compatibility).
rest.SetDefaultWarningHandler(rest.NoWarnings{})

k8s, err := kubernetes.NewDiscoveryRunner(cfg)
assertNoError(err)
assertNoError(k8s.Read(ctx))

log.Info("Connected to Kubernetes", "api", k8s.Version().String(), "openshift", k8s.IsOpenShift())

options, err := initManager()
assertNoError(err)

Expand All @@ -154,24 +160,21 @@ func main() {
options.BaseContext = func() context.Context {
ctx := context.Background()
ctx = feature.NewContext(ctx, features)
ctx = kubernetes.NewAPIContext(ctx, k8s)
return ctx
}

mgr, err := runtime.NewManager(cfg, options)
assertNoError(err)

openshift := isOpenshift(cfg)
if openshift {
log.Info("detected OpenShift environment")
}
assertNoError(mgr.Add(k8s))

registrar, err := registration.NewRunner(os.Getenv("RSA_KEY"), os.Getenv("TOKEN_PATH"), shutdown)
assertNoError(err)
assertNoError(mgr.Add(registrar))
token, _ := registrar.CheckToken()

// add all PostgreSQL Operator controllers to the runtime manager
addControllersToManager(mgr, openshift, log, registrar)
addControllersToManager(mgr, log, registrar)

if features.Enabled(feature.BridgeIdentifiers) {
constructor := func() *bridge.Client {
Expand All @@ -191,7 +194,6 @@ func main() {
assertNoError(
upgradecheck.ManagedScheduler(
mgr,
openshift,
os.Getenv("CHECK_FOR_UPGRADES_URL"),
versionString,
token,
Expand All @@ -212,10 +214,9 @@ func main() {

// addControllersToManager adds all PostgreSQL Operator controllers to the provided controller
// runtime manager.
func addControllersToManager(mgr runtime.Manager, openshift bool, log logging.Logger, reg registration.Registration) {
func addControllersToManager(mgr runtime.Manager, log logging.Logger, reg registration.Registration) {
pgReconciler := &postgrescluster.Reconciler{
Client: mgr.GetClient(),
IsOpenShift: openshift,
Owner: postgrescluster.ControllerName,
Recorder: mgr.GetEventRecorderFor(postgrescluster.ControllerName),
Registration: reg,
Expand All @@ -240,10 +241,9 @@ func addControllersToManager(mgr runtime.Manager, openshift bool, log logging.Lo
}

pgAdminReconciler := &standalone_pgadmin.PGAdminReconciler{
Client: mgr.GetClient(),
Owner: "pgadmin-controller",
Recorder: mgr.GetEventRecorderFor(naming.ControllerPGAdmin),
IsOpenShift: openshift,
Client: mgr.GetClient(),
Owner: "pgadmin-controller",
Recorder: mgr.GetEventRecorderFor(naming.ControllerPGAdmin),
}

if err := pgAdminReconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -270,33 +270,3 @@ func addControllersToManager(mgr runtime.Manager, openshift bool, log logging.Lo
os.Exit(1)
}
}

func isOpenshift(cfg *rest.Config) bool {
const sccGroupName, sccKind = "security.openshift.io", "SecurityContextConstraints"

client, err := discovery.NewDiscoveryClientForConfig(cfg)
assertNoError(err)

groups, err := client.ServerGroups()
if err != nil {
assertNoError(err)
}
for _, g := range groups.Groups {
if g.Name != sccGroupName {
continue
}
for _, v := range g.Versions {
resourceList, err := client.ServerResourcesForGroupVersion(v.GroupVersion)
if err != nil {
assertNoError(err)
}
for _, r := range resourceList.APIResources {
if r.Kind == sccKind {
return true
}
}
}
}

return false
}
47 changes: 6 additions & 41 deletions internal/controller/postgrescluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ import (
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -33,6 +31,7 @@ import (
"github.com/crunchydata/postgres-operator/internal/config"
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/kubernetes"
"github.com/crunchydata/postgres-operator/internal/logging"
"github.com/crunchydata/postgres-operator/internal/pgaudit"
"github.com/crunchydata/postgres-operator/internal/pgbackrest"
Expand All @@ -51,11 +50,9 @@ const (

// Reconciler holds resources for the PostgresCluster reconciler
type Reconciler struct {
Client client.Client
DiscoveryClient *discovery.DiscoveryClient
IsOpenShift bool
Owner client.FieldOwner
PodExec func(
Client client.Client
Owner client.FieldOwner
PodExec func(
ctx context.Context, namespace, pod, container string,
stdin io.Reader, stdout, stderr io.Writer, command ...string,
) error
Expand Down Expand Up @@ -94,8 +91,9 @@ func (r *Reconciler) Reconcile(
// from its cache.
cluster.Default()

// TODO(openshift): Separate this into more specific detections elsewhere.
if cluster.Spec.OpenShift == nil {
cluster.Spec.OpenShift = &r.IsOpenShift
cluster.Spec.OpenShift = initialize.Bool(kubernetes.IsOpenShift(ctx))
}

// Keep a copy of cluster prior to any manipulations.
Expand Down Expand Up @@ -482,14 +480,6 @@ func (r *Reconciler) SetupWithManager(mgr manager.Manager) error {
}
}

if r.DiscoveryClient == nil {
var err error
r.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(mgr.GetConfig())
if err != nil {
return err
}
}

return builder.ControllerManagedBy(mgr).
For(&v1beta1.PostgresCluster{}).
Owns(&corev1.ConfigMap{}).
Expand All @@ -510,28 +500,3 @@ func (r *Reconciler) SetupWithManager(mgr manager.Manager) error {
r.controllerRefHandlerFuncs()). // watch all StatefulSets
Complete(r)
}

// GroupVersionKindExists checks to see whether a given Kind for a given
// GroupVersion exists in the Kubernetes API Server.
func (r *Reconciler) GroupVersionKindExists(groupVersion, kind string) (*bool, error) {
if r.DiscoveryClient == nil {
return initialize.Bool(false), nil
}

resourceList, err := r.DiscoveryClient.ServerResourcesForGroupVersion(groupVersion)
if err != nil {
if apierrors.IsNotFound(err) {
return initialize.Bool(false), nil
}

return nil, err
}

for _, resource := range resourceList.APIResources {
if resource.Kind == kind {
return initialize.Bool(true), nil
}
}

return initialize.Bool(false), nil
}
14 changes: 6 additions & 8 deletions internal/controller/postgrescluster/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/crunchydata/postgres-operator/internal/config"
"github.com/crunchydata/postgres-operator/internal/feature"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/kubernetes"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/internal/pgbackrest"
"github.com/crunchydata/postgres-operator/internal/postgres"
Expand Down Expand Up @@ -56,14 +57,11 @@ func (r *Reconciler) reconcileVolumeSnapshots(ctx context.Context,
return nil
}

// Check if the Kube cluster has VolumeSnapshots installed. If VolumeSnapshots
// are not installed, we need to return early. If user is attempting to use
// VolumeSnapshots, return an error, otherwise return nil.
volumeSnapshotKindExists, err := r.GroupVersionKindExists("snapshot.storage.k8s.io/v1", "VolumeSnapshot")
if err != nil {
return err
}
if !*volumeSnapshotKindExists {
// Return early when VolumeSnapshots are not installed in Kubernetes.
// If user is attempting to use VolumeSnapshots, return an error.
if !kubernetes.Has(
ctx, volumesnapshotv1.SchemeGroupVersion.WithKind("VolumeSnapshot"),
) {
if postgrescluster.Spec.Backups.Snapshots != nil {
return errors.New("VolumeSnapshots are not installed/enabled in this Kubernetes cluster; cannot create snapshot.")
} else {
Expand Down
38 changes: 16 additions & 22 deletions internal/controller/postgrescluster/snapshots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/crunchydata/postgres-operator/internal/controller/runtime"
"github.com/crunchydata/postgres-operator/internal/feature"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/kubernetes"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/internal/testing/cmp"
"github.com/crunchydata/postgres-operator/internal/testing/events"
Expand All @@ -33,26 +33,26 @@ import (

func TestReconcileVolumeSnapshots(t *testing.T) {
ctx := context.Background()
cfg, cc := setupKubernetes(t)
_, cc := setupKubernetes(t)
require.ParallelCapacity(t, 1)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
assert.NilError(t, err)

recorder := events.NewRecorder(t, runtime.Scheme)
r := &Reconciler{
Client: cc,
Owner: client.FieldOwner(t.Name()),
DiscoveryClient: discoveryClient,
Recorder: recorder,
Client: cc,
Owner: client.FieldOwner(t.Name()),
Recorder: recorder,
}
ns := setupNamespace(t, cc)

// Enable snapshots feature gate
// Enable snapshots feature gate and API
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.VolumeSnapshots: true,
}))
ctx = feature.NewContext(ctx, gate)
ctx = kubernetes.NewAPIContext(ctx, kubernetes.NewAPISet(
volumesnapshotv1.SchemeGroupVersion.WithKind("VolumeSnapshot"),
))

t.Run("SnapshotsDisabledDeleteSnapshots", func(t *testing.T) {
// Create cluster (without snapshots spec)
Expand Down Expand Up @@ -348,16 +348,13 @@ func TestReconcileVolumeSnapshots(t *testing.T) {

func TestReconcileDedicatedSnapshotVolume(t *testing.T) {
ctx := context.Background()
cfg, cc := setupKubernetes(t)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
assert.NilError(t, err)
_, cc := setupKubernetes(t)

recorder := events.NewRecorder(t, runtime.Scheme)
r := &Reconciler{
Client: cc,
Owner: client.FieldOwner(t.Name()),
DiscoveryClient: discoveryClient,
Recorder: recorder,
Client: cc,
Owner: client.FieldOwner(t.Name()),
Recorder: recorder,
}

// Enable snapshots feature gate
Expand Down Expand Up @@ -1253,14 +1250,11 @@ func TestGetLatestReadySnapshot(t *testing.T) {

func TestDeleteSnapshots(t *testing.T) {
ctx := context.Background()
cfg, cc := setupKubernetes(t)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
assert.NilError(t, err)
_, cc := setupKubernetes(t)

r := &Reconciler{
Client: cc,
Owner: client.FieldOwner(t.Name()),
DiscoveryClient: discoveryClient,
Client: cc,
Owner: client.FieldOwner(t.Name()),
}
ns := setupNamespace(t, cc)

Expand Down
3 changes: 1 addition & 2 deletions internal/controller/standalone_pgadmin/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ type PGAdminReconciler struct {
ctx context.Context, namespace, pod, container string,
stdin io.Reader, stdout, stderr io.Writer, command ...string,
) error
Recorder record.EventRecorder
IsOpenShift bool
Recorder record.EventRecorder
}

//+kubebuilder:rbac:groups="postgres-operator.crunchydata.com",resources="pgadmins",verbs={list,watch}
Expand Down
Loading
Loading