Skip to content

Commit

Permalink
fix: add internal gRPC server
Browse files Browse the repository at this point in the history
  • Loading branch information
hperl committed Mar 10, 2023
1 parent 619a306 commit a99a212
Show file tree
Hide file tree
Showing 31 changed files with 1,169 additions and 1,038 deletions.
8 changes: 4 additions & 4 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ plugins:
opt: ts_out=proto
path: node_modules/.bin/protoc-gen-ts

- remote: buf.build/grpc-ecosystem/plugins/grpc-gateway:v2.14.0-1
- plugin: buf.build/grpc-ecosystem/gateway
out: proto
opt: paths=source_relative

- remote: buf.build/grpc-ecosystem/plugins/openapiv2:v2.14.0-1
- plugin: buf.build/grpc-ecosystem/openapiv2
opt:
- allow_merge=true
- merge_file_name=api
Expand All @@ -41,6 +41,6 @@ plugins:
- disable_default_responses=true
out: spec

- remote: buf.build/sawadashota/plugins/protoc-gen-doc:v1.5.1
- plugin: buf.build/community/pseudomuto-doc
opt: markdown,docs.md
out: proto
opt: markdown,proto/buf.md
130 changes: 103 additions & 27 deletions internal/driver/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"runtime/debug"
"strings"
"syscall"
"time"

grpcLogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpcRecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
Expand Down Expand Up @@ -40,6 +41,7 @@ import (
grpcHealthV1 "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"

"github.com/ory/keto/internal/check"
"github.com/ory/keto/internal/driver/config"
Expand Down Expand Up @@ -125,6 +127,7 @@ func (r *RegistryDefault) ServeAll(ctx context.Context) error {
// We need to separate the setup (invoking the functions that return the serve functions) from running the serve
// functions to mitigate race contitions in the HTTP router.
for _, serve := range []func() error{
r.serveInternalGRPC(innerCtx),
r.serveRead(innerCtx, doneShutdown),
r.serveWrite(innerCtx, doneShutdown),
r.serveOPLSyntax(innerCtx, doneShutdown),
Expand All @@ -136,6 +139,57 @@ func (r *RegistryDefault) ServeAll(ctx context.Context) error {
return eg.Wait()
}

func (r *RegistryDefault) initInternalGRPC() {
r.internalGRPC.listener = bufconn.Listen(1024 * 1024 * 10)
r.internalGRPC.dialer = func() (*grpc.ClientConn, error) {
return grpc.Dial("bufnet",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return r.internalGRPC.listener.Dial() }),
)
}

}

func (r *RegistryDefault) serveInternalGRPC(ctx context.Context) func() error {
return func() error {
serverDone := make(chan struct{})
internalGRPCServer := r.newInternalGRPCServer(ctx)
eg := &errgroup.Group{}

eg.Go(func() error {
err := internalGRPCServer.Serve(r.internalGRPCListener())
close(serverDone)
return err
})

eg.Go(func() (err error) {
<-ctx.Done()

internalGRPCServer.GracefulStop()
select {
case <-serverDone:
return nil
case <-time.After(graceful.DefaultShutdownTimeout):
internalGRPCServer.Stop()
return errors.New("graceful stop of internal gRPC server canceled, had to force it")
}
})

err := eg.Wait()
return err
}
}

func (r *RegistryDefault) internalGRPCListener() net.Listener {
r.internalGRPC.initOnce.Do(r.initInternalGRPC)
return r.internalGRPC.listener
}

func (r *RegistryDefault) internalGRPCDialer() GRPCDialer {
r.internalGRPC.initOnce.Do(r.initInternalGRPC)
return r.internalGRPC.dialer
}

func (r *RegistryDefault) serveRead(ctx context.Context, done chan<- struct{}) func() error {
rt, s := r.ReadRouter(ctx), r.ReadGRPCServer(ctx)

Expand Down Expand Up @@ -333,7 +387,7 @@ func (r *RegistryDefault) ReadRouter(ctx context.Context) http.Handler {
}
n.Use(reqlog.NewMiddlewareFromLogger(r.l, "read#Ory Keto").ExcludePaths(healthx.AliveCheckPath, healthx.ReadyCheckPath))

conn, err := grpc.DialContext(ctx, r.Config(ctx).ReadAPIListenOn(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := r.internalGRPCDialer()()
if err != nil {
panic(err)
}
Expand All @@ -345,7 +399,7 @@ func (r *RegistryDefault) ReadRouter(ctx context.Context) http.Handler {
)...)
for _, h := range r.allHandlers() {
if h, ok := h.(ReadHandler); ok {
if err := h.RegisterReadGRPCGateway(ctx, mux, r.Config(ctx).ReadAPIListenOn(), grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
if err := h.RegisterReadGRPCGatewayConn(ctx, mux, conn); err != nil {
panic(err)
}
}
Expand Down Expand Up @@ -380,7 +434,7 @@ func (r *RegistryDefault) WriteRouter(ctx context.Context) http.Handler {
}
n.Use(reqlog.NewMiddlewareFromLogger(r.l, "write#Ory Keto").ExcludePaths(healthx.AliveCheckPath, healthx.ReadyCheckPath))

conn, err := grpc.DialContext(ctx, r.Config(ctx).WriteAPIListenOn(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := r.internalGRPCDialer()()
if err != nil {
panic(err)
}
Expand All @@ -392,7 +446,7 @@ func (r *RegistryDefault) WriteRouter(ctx context.Context) http.Handler {
)...)
for _, h := range r.allHandlers() {
if h, ok := h.(WriteHandler); ok {
if err := h.RegisterWriteGRPCGateway(ctx, mux, r.Config(ctx).WriteAPIListenOn(), grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
if err := h.RegisterWriteGRPCGatewayConn(ctx, mux, conn); err != nil {
panic(err)
}
}
Expand Down Expand Up @@ -435,7 +489,7 @@ func (r *RegistryDefault) OPLSyntaxRouter(ctx context.Context) http.Handler {
r.HealthHandler().SetHealthRoutes(pr.Router, false)
r.HealthHandler().SetVersionRoutes(pr.Router)

conn, err := grpc.DialContext(ctx, r.Config(ctx).OPLSyntaxAPIListenOn(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := r.internalGRPCDialer()()
if err != nil {
panic(err)
}
Expand All @@ -447,7 +501,7 @@ func (r *RegistryDefault) OPLSyntaxRouter(ctx context.Context) http.Handler {
)...)
for _, h := range r.allHandlers() {
if h, ok := h.(OPLSyntaxHandler); ok {
if err := h.RegisterSyntaxGRPCGateway(ctx, mux, r.Config(ctx).OPLSyntaxAPIListenOn(), grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
if err := h.RegisterSyntaxGRPCGatewayConn(ctx, mux, conn); err != nil {
panic(err)
}
}
Expand Down Expand Up @@ -512,6 +566,22 @@ func (r *RegistryDefault) streamInterceptors(ctx context.Context) []grpc.StreamS
return is
}

// newInternalGRPCServer creates a new gRPC server with the default
// interceptors, but without transport credentials, to be used internally.
func (r *RegistryDefault) newInternalGRPCServer(ctx context.Context) *grpc.Server {
s := grpc.NewServer(
grpc.ChainStreamInterceptor(r.streamInterceptors(ctx)...),
grpc.ChainUnaryInterceptor(r.unaryInterceptors(ctx)...),
)

r.registerCommonGRPCServices(s)
r.registerReadGRPCServices(s)
r.registerWriteGRPCServices(s)
r.registerOPLGRPCServices(s)

return s
}

func (r *RegistryDefault) newGrpcServer(ctx context.Context) *grpc.Server {
opts := []grpc.ServerOption{
grpc.ChainStreamInterceptor(r.streamInterceptors(ctx)...),
Expand All @@ -523,50 +593,56 @@ func (r *RegistryDefault) newGrpcServer(ctx context.Context) *grpc.Server {
return grpc.NewServer(opts...)
}

func (r *RegistryDefault) ReadGRPCServer(ctx context.Context) *grpc.Server {
s := r.newGrpcServer(ctx)

func (r *RegistryDefault) registerCommonGRPCServices(s *grpc.Server) {
grpcHealthV1.RegisterHealthServer(s, r.HealthServer())
rts.RegisterVersionServiceServer(s, r)
reflection.Register(s)
}

func (r *RegistryDefault) registerReadGRPCServices(s *grpc.Server) {
for _, h := range r.allHandlers() {
if h, ok := h.(ReadHandler); ok {
h.RegisterReadGRPC(s)
}
}

return s
}

func (r *RegistryDefault) WriteGRPCServer(ctx context.Context) *grpc.Server {
s := r.newGrpcServer(ctx)

grpcHealthV1.RegisterHealthServer(s, r.HealthServer())
rts.RegisterVersionServiceServer(s, r)
reflection.Register(s)

func (r *RegistryDefault) registerWriteGRPCServices(s *grpc.Server) {
for _, h := range r.allHandlers() {
if h, ok := h.(WriteHandler); ok {
h.RegisterWriteGRPC(s)
}
}

return s
}

func (r *RegistryDefault) OplGRPCServer(ctx context.Context) *grpc.Server {
s := r.newGrpcServer(ctx)

grpcHealthV1.RegisterHealthServer(s, r.HealthServer())
rts.RegisterVersionServiceServer(s, r)
reflection.Register(s)

func (r *RegistryDefault) registerOPLGRPCServices(s *grpc.Server) {
for _, h := range r.allHandlers() {
if h, ok := h.(OPLSyntaxHandler); ok {
h.RegisterSyntaxGRPC(s)
}
}
}

func (r *RegistryDefault) ReadGRPCServer(ctx context.Context) *grpc.Server {
s := r.newGrpcServer(ctx)
r.registerCommonGRPCServices(s)
r.registerReadGRPCServices(s)

return s
}

func (r *RegistryDefault) WriteGRPCServer(ctx context.Context) *grpc.Server {
s := r.newGrpcServer(ctx)
r.registerCommonGRPCServices(s)
r.registerWriteGRPCServices(s)

return s
}

func (r *RegistryDefault) OplGRPCServer(ctx context.Context) *grpc.Server {
s := r.newGrpcServer(ctx)
r.registerCommonGRPCServices(s)
r.registerOPLGRPCServices(s)

return s
}
Expand Down
2 changes: 2 additions & 0 deletions internal/driver/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type (
ServeAllSQA(cmd *cobra.Command) error
}

GRPCDialer func() (*grpc.ClientConn, error)

contextKeys string
)

Expand Down
7 changes: 7 additions & 0 deletions internal/driver/registry_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/test/bufconn"

"github.com/ory/keto/internal/check"
"github.com/ory/keto/internal/driver/config"
Expand Down Expand Up @@ -80,6 +81,12 @@ type (
grpcTransportCredentials credentials.TransportCredentials
defaultMigrationOptions []popx.MigrationBoxOption
healthReadyCheckers healthx.ReadyCheckers

internalGRPC struct {
initOnce sync.Once
listener *bufconn.Listener
dialer GRPCDialer
}
}
ReadHandler interface {
RegisterReadGRPC(s *grpc.Server)
Expand Down
3 changes: 2 additions & 1 deletion internal/namespace/namespacehandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ func (h *handler) RegisterReadGRPC(s *grpc.Server) {
func (h *handler) RegisterReadGRPCGateway(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts ...grpc.DialOption) error {
return rts.RegisterNamespacesServiceHandlerFromEndpoint(ctx, mux, endpoint, opts)
}

func (h *handler) RegisterReadGRPCGatewayConn(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return rts.RegisterReadServiceHandler(ctx, mux, conn)
return rts.RegisterNamespacesServiceHandler(ctx, mux, conn)
}

func (h *handler) ListNamespaces(ctx context.Context, _ *rts.ListNamespacesRequest) (*rts.ListNamespacesResponse, error) {
Expand Down
Loading

0 comments on commit a99a212

Please sign in to comment.