Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug/question: Subscription Callbacks don't seem to work with ApolloFederatedTracingHeaderForwarder #2077

Open
hassanzareef opened this issue Nov 26, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@hassanzareef
Copy link

hassanzareef commented Nov 26, 2024

Please read our contributor guide before
creating an issue.

Context

Trying to implement subscriptions with Apollo Router and Netflix DGS (Integration with Spring-GraphQL), using multipart HTTP for client - router communication and HTTP callbacks for router-subgraph communication.

Using the Netflix DGS Spring GraphQL Integration and the apollo federation subscription callback support I was able to get it working. However when I add the DGS configuration for the ApolloFederatedTracingHeaderForwarder, the request never reaches the subgraph datafetcher and throws an error.

Expected behavior

Datafetcher to return the desired subscription result.

Actual behavior

Request doesn't reach subgraph datafetcher and instead throws an error.

{"@timestamp":"2024-11-26T15:36:37.642383-05:00","@version":"1","message":"Subscription terminated abnormally due to exception","logger_name":"com.apollographql.subscription.callback.SubscriptionCallbackHandler","thread_name":"loomBoundedElastic-2","level":"ERROR","level_value":40000,"stack_trace":"java.lang.NullPointerException: get(...) must not be null\n\tat com.netflix.graphql.dgs.context.DgsContext$Companion.from(DgsContext.kt:46)\n\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nAssembly trace from producer [reactor.core.publisher.MonoDeferContextual] :\n\treactor.core.publisher.Mono.deferContextual(Mono.java:235)\n\torg.springframework.graphql.execution.DefaultExecutionGraphQlService.execute(DefaultExecutionGraphQlService.java:90)\nError has been observed at the following site(s):\n\t*__Mono.deferContextual ⇢ at org.springframework.graphql.execution.DefaultExecutionGraphQlService.execute(DefaultExecutionGraphQlService.java:90)\n\t|_ Mono.flatMapMany ⇢ at com.apollographql.subscription.callback.SubscriptionCallbackHandler.startSubscription(SubscriptionCallbackHandler.java:111)\n\t|_ Flux.publishOn ⇢ at com.apollographql.subscription.callback.SubscriptionCallbackHandler.startSubscription(SubscriptionCallbackHandler.java:145)\n\t|_ Flux.concatMap ⇢ at com.apollographql.subscription.callback.SubscriptionCallbackHandler.startSubscription(SubscriptionCallbackHandler.java:146)\nOriginal Stack Trace:\n\t\tat com.netflix.graphql.dgs.context.DgsContext$Companion.from(DgsContext.kt:46)\n\t\tat com.netflix.graphql.dgs.context.GraphQLContextContributorInstrumentation.createState(GraphQLContextContributorInstrumentation.kt:42)\n\t\tat graphql.execution.instrumentation.SimplePerformantInstrumentation.createStateAsync(SimplePerformantInstrumentation.java:61)\n\t\tat graphql.execution.instrumentation.ChainedInstrumentation$ChainedInstrumentationState.combineAll(ChainedInstrumentation.java:382)\n\t\tat graphql.execution.instrumentation.ChainedInstrumentation.createStateAsync(ChainedInstrumentation.java:96)\n\t\tat graphql.GraphQL.executeAsync(GraphQL.java:427)\n\t\tat org.springframework.graphql.execution.DefaultExecutionGraphQlService.lambda$execute$2(DefaultExecutionGraphQlService.java:104)\n\t\tat reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)\n\t\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)\n\t\tat reactor.core.publisher.FluxPublish.connect(FluxPublish.java:106)\n\t\tat reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:88)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4512)\n\t\tat reactor.core.publisher.FluxTakeUntilOther.subscribeOrReturn(FluxTakeUntilOther.java:57)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8762)\n\t\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)\n\t\tat reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)\n\t\tat reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)\n\t\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:373)\n\t\tat reactor.core.publisher.FluxMerge.subscribe(FluxMerge.java:73)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8777)\n\t\tat reactor.core.publisher.Flux.subscribeWith(Flux.java:8898)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8742)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8666)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8584)\n\t\tat com.apollographql.subscription.callback.SubscriptionCallbackHandler.lambda$handleSubscriptionUsingCallback$0(SubscriptionCallbackHandler.java:80)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:169)\n\t\tat reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:49)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)\n\t\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)\n\t\tat reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)\n\t\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)\n\t\tat reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)\n\t\tat reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:177)\n\t\tat reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176)\n\t\tat reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:435)\n\t\tat reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:715)\n\t\tat reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:205)\n\t\tat reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:466)\n\t\tat reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:725)\n\t\tat reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\tat java.base/java.lang.Thread.run(Thread.java:1583)\n"}

Steps to reproduce

Implement A simple federated subscription through Apollo router and netflix DGS, with HTTP callbacks for router - subgraph communication.

DataFetcher:

@DgsComponent
public class SubscriptionDataFetcher {
  @DgsSubscription
  public Publisher<String> testSub(
          @InputArgument String name
  ) {
    return  Flux.just(name + "1", name + "2", name + "3", name + "4", name + "5", name + "6", name + "7")
            .delayElements(Duration.ofMillis(200));
  }
}

GraphQL HTTP callback config:

@Configuration
public class GraphQLConfiguration {

  @Bean
  public SubscriptionCallbackHandler callbackHandler(ExecutionGraphQlService graphQlService) {
    return new SubscriptionCallbackHandler(graphQlService);
  }

  // This interceptor defaults to Ordered#LOWEST_PRECEDENCE order as it should run last in chain
  // to allow users to still apply other interceptors that handle common stuff (e.g. extracting
  // auth headers, etc).
  // You can override this behavior by specifying custom order.
  @Bean
  public CallbackWebGraphQLInterceptor callbackGraphQlInterceptor(
        SubscriptionCallbackHandler callbackHandler) {
    return new CallbackWebGraphQLInterceptor(callbackHandler);
  }
}

Question

Wondering if this is a bug with the DGS framework causing the issue, because if I remove ApolloFederatedTracingHeaderForwarder file, it works fine, and I get the desired output.

@hassanzareef hassanzareef added the bug Something isn't working label Nov 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant