Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show subscriptions to event types in Backstage #24

Merged
merged 25 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions backends/config/100-eventmesh/100-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ rules:
resources:
- brokers
- eventtypes
- triggers
verbs:
- get
- list
Expand All @@ -43,3 +44,14 @@ rules:
- delete
- patch
- watch


# permissions to get subscribers for triggers
# as subscribers can be any resource, we need to give access to all resources
# we fetch subscribers one by one, we only need `get` verb
- apiGroups:
- "*"
resources:
- "*"
verbs:
- get
3 changes: 3 additions & 0 deletions backends/pkg/reconciler/eventmesh/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
eventtypereconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1beta2/eventtype"

brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1beta2/eventtype"

eventinglistersv1 "knative.dev/eventing/pkg/client/listers/eventing/v1"
Expand All @@ -22,6 +23,7 @@ import (
type Listers struct {
EventTypeLister eventinglistersv1beta2.EventTypeLister
BrokerLister eventinglistersv1.BrokerLister
TriggerLister eventinglistersv1.TriggerLister
}

func NewController(ctx context.Context) *controller.Impl {
Expand All @@ -40,6 +42,7 @@ func NewController(ctx context.Context) *controller.Impl {
listers := Listers{
EventTypeLister: eventtypeinformer.Get(ctx).Lister(),
BrokerLister: brokerinformer.Get(ctx).Lister(),
TriggerLister: triggerinformer.Get(ctx).Lister(),
}

go startWebServer(ctx, listers)
Expand Down
6 changes: 6 additions & 0 deletions backends/pkg/reconciler/eventmesh/eventtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@ type EventType struct {
Labels map[string]string `json:"labels,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
Reference string `json:"reference,omitempty"`
ConsumedBy []string `json:"consumedBy,omitempty"`
}

func (et EventType) NameAndNamespace() string {
return NameAndNamespace(et.Namespace, et.Name)
}

func (et EventType) NamespaceAndType() string {
return NameAndNamespace(et.Namespace, et.Type)
}

func convertEventType(et *v1beta2.EventType) EventType {
return EventType{
Name: et.Name,
Expand All @@ -33,5 +38,6 @@ func convertEventType(et *v1beta2.EventType) EventType {
Labels: et.Labels,
Annotations: FilterAnnotations(et.Annotations),
Reference: RefNameAndNamespace(et.Spec.Reference),
ConsumedBy: make([]string, 0),
}
}
154 changes: 151 additions & 3 deletions backends/pkg/reconciler/eventmesh/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@ import (
"context"
"net/http"
"sort"
"strings"

"knative.dev/pkg/injection/clients/dynamicclient"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventinglistersv1beta2 "knative.dev/eventing/pkg/client/listers/eventing/v1beta2"

"go.uber.org/zap"
Expand All @@ -23,15 +31,22 @@ type EventMesh struct {
Brokers []*Broker `json:"brokers"`
}

type EventTypeMap = map[string]*EventType
type Subscription struct {
BackstageIds map[string]struct{}
}

// SubscriptionMap key: "<namespace>/<eventType.spec.type>"
type SubscriptionMap map[string]Subscription
aliok marked this conversation as resolved.
Show resolved Hide resolved

const BackstageLabel = "backstage.io/kubernetes-id"
aliok marked this conversation as resolved.
Show resolved Hide resolved

func EventMeshHandler(ctx context.Context, listers Listers) func(w http.ResponseWriter, req *http.Request) {
logger := logging.FromContext(ctx)

return func(w http.ResponseWriter, req *http.Request) {
logger.Debugw("Handling request", "method", req.Method, "url", req.URL)

eventMesh, err := BuildEventMesh(listers, logger)
eventMesh, err := BuildEventMesh(ctx, listers, logger)
if err != nil {
logger.Errorw("Error building event mesh", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -47,13 +62,14 @@ func EventMeshHandler(ctx context.Context, listers Listers) func(w http.Response
}
}

func BuildEventMesh(listers Listers, logger *zap.SugaredLogger) (EventMesh, error) {
func BuildEventMesh(ctx context.Context, listers Listers, logger *zap.SugaredLogger) (EventMesh, error) {
convertedBrokers, err := fetchBrokers(listers.BrokerLister, logger)
if err != nil {
logger.Errorw("Error fetching and converting brokers", "error", err)
return EventMesh{}, err
}

// map key: "<namespace>/<name>"
brokerMap := make(map[string]*Broker)
for _, cbr := range convertedBrokers {
brokerMap[cbr.GetNameAndNamespace()] = cbr
Expand All @@ -73,6 +89,33 @@ func BuildEventMesh(listers Listers, logger *zap.SugaredLogger) (EventMesh, erro
}
}

// build 2 maps for event types for easier access

// map key: "<namespace>/<eventType.spec.type>"
etTypeMap := make(map[string]*EventType)
aliok marked this conversation as resolved.
Show resolved Hide resolved
// map key: "<namespace>/<eventType.name>"
etNameMap := make(map[string]*EventType)
aliok marked this conversation as resolved.
Show resolved Hide resolved

for _, et := range convertedEventTypes {
etTypeMap[et.NamespaceAndType()] = et
etNameMap[et.NameAndNamespace()] = et
}

subscriptionMap, err := buildSubscriptions(ctx, listers.TriggerLister, brokerMap, etNameMap, logger)
if err != nil {
logger.Errorw("Error building subscriptions", "error", err)
return EventMesh{}, err
}

for key, sub := range *subscriptionMap {
for backstageId := range sub.BackstageIds {
// find the event type and add the subscriber to the ConsumedBy list
if et, ok := etTypeMap[key]; ok {
et.ConsumedBy = append(et.ConsumedBy, backstageId)
}
}
}

eventMesh := EventMesh{
EventTypes: convertedEventTypes,
Brokers: convertedBrokers,
Expand Down Expand Up @@ -118,3 +161,108 @@ func fetchEventTypes(eventTypeLister eventinglistersv1beta2.EventTypeLister, log

return convertedEventTypes, err
}

func buildSubscriptions(ctx context.Context, triggerLister eventinglistersv1.TriggerLister, brokerMap map[string]*Broker, etNameMap map[string]*EventType, logger *zap.SugaredLogger) (*SubscriptionMap, error) {
// map key: "<namespace>/<eventType.spec.type>"
subscriptionMap := make(SubscriptionMap)

dynamicClient := dynamicclient.Get(ctx)

triggers, err := triggerLister.List(labels.Everything())
if err != nil {
logger.Errorw("Error listing triggers", "error", err)
return nil, err
}

for _, trigger := range triggers {
// if the trigger's broker is not set or if we haven't processed the broker, we can skip the trigger
if trigger.Spec.Broker == "" {
continue
}
aliok marked this conversation as resolved.
Show resolved Hide resolved
brokerRef := NameAndNamespace(trigger.Namespace, trigger.Spec.Broker)
if _, ok := brokerMap[brokerRef]; !ok {
return nil, nil
}

// if the trigger has no subscriber, we can skip it, there's no relation to show on Backstage side
if trigger.Spec.Subscriber.Ref == nil {
return nil, nil
}

subscriberBackstageId, err := getSubscriberBackstageId(ctx, dynamicClient, trigger, logger)
if err != nil {
// do not stop the Backstage plugin from rendering the rest of the data, e.g. because
// there are no permissions to get a single subscriber resource
continue
aliok marked this conversation as resolved.
Show resolved Hide resolved
}

// we only care about subscribers that are in Backstage
if len(subscriberBackstageId) == 0 {
continue
}
aliok marked this conversation as resolved.
Show resolved Hide resolved

// build the list of event types that the subscriber is subscribed to
subscribedEventTypes := buildSubscribedEventTypes(trigger, brokerMap[brokerRef], etNameMap, logger)

// go over the event types and add the subscriber to the subscription map
for _, eventType := range subscribedEventTypes {
key := NameAndNamespace(trigger.Namespace, eventType)
if _, ok := subscriptionMap[key]; !ok {
subscriptionMap[key] = Subscription{
BackstageIds: make(map[string]struct{}),
}
}
subscriptionMap[key].BackstageIds[subscriberBackstageId] = struct{}{}
}

}

return &subscriptionMap, nil
}

func buildSubscribedEventTypes(trigger *eventingv1.Trigger, broker *Broker, etNameMap map[string]*EventType, logger *zap.SugaredLogger) []string {
// TODO: we don't handle the CESQL yet
if trigger.Spec.Filter != nil && len(trigger.Spec.Filter.Attributes) > 0 {
// check if "type" attribute is present
if subscribedEventType, ok := trigger.Spec.Filter.Attributes["type"]; ok {
// it can be present but empty
// in that case, we assume the trigger is subscribed to all event types
if subscribedEventType != eventingv1.TriggerAnyFilter {
// if type is present and not empty, that means the trigger is subscribed to a specific event type
return []string{subscribedEventType}
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/knative/eventing/blob/main/pkg/eventfilter/subscriptionsapi/
https://github.com/knative/eventing/blob/main/pkg/eventfilter/attributes/

subscriptionsapi.NewAllFilter(...)
attributes.NewFilter()

Example:

# trigger filter
random: foo
# Event type spec
spec:
  attributes:
    random:
       type: string
e := cloudevents.NewEvent()
e.SetExtension("random", <random-string>)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#26


// if no filter or type is specified, we assume the resource is interested in all event types that the broker provides
subscribedEventTypes := make([]string, 0, len(broker.ProvidedEventTypes))
for _, eventType := range broker.ProvidedEventTypes {
if et, ok := etNameMap[eventType]; ok {
subscribedEventTypes = append(subscribedEventTypes, et.Type)
}
}

return subscribedEventTypes
}

func getSubscriberBackstageId(ctx context.Context, client dynamic.Interface, trigger *eventingv1.Trigger, logger *zap.SugaredLogger) (string, error) {
refGvr := schema.GroupVersionResource{
Group: trigger.Spec.Subscriber.Ref.Group,
Version: trigger.Spec.Subscriber.Ref.APIVersion,
// TODO: couldn't remember the elegant way to do this
Resource: strings.ToLower(trigger.Spec.Subscriber.Ref.Kind) + "s",
aliok marked this conversation as resolved.
Show resolved Hide resolved
}

resource, err := client.Resource(refGvr).Namespace(trigger.Spec.Subscriber.Ref.Namespace).Get(ctx, trigger.Spec.Subscriber.Ref.Name, metav1.GetOptions{})
if errors.IsNotFound(err) {
logger.Debugw("Subscriber resource not found", "resource", trigger.Spec.Subscriber.Ref.Name)
return "", nil
}
if err != nil {
logger.Errorw("Error fetching resource", "error", err)
return "", err
}

// check if the resource has the Backstage label
return resource.GetLabels()[BackstageLabel], nil
aliok marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading