Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blocking call with spring webflux #2192

Closed
Vichukano opened this issue Jun 30, 2021 · 4 comments
Closed

blocking call with spring webflux #2192

Vichukano opened this issue Jun 30, 2021 · 4 comments

Comments

@Vichukano
Copy link

spring-cloud-stream with the reactive programming model makes the blocking call on the reactor netty thread while fetching metadata from kafka.

You can see this with BlockHound tool.

BlockHound output is:

reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Object#wait
	at java.base/java.lang.Object.wait(Object.java)
	at org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55)
	at org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119)
	at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1048)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886)
	at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:864)
	at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:580)
	at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:406)
	at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:500)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1049)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$7(FunctionConfiguration.java:519)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:184)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:712)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:971)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1797)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
	at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
	at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)

To reproduce this issue run the test from DemoAppTest placed here demo project

spring-boot version - 2.5.2

spring-cloud-stream version - 2020.0.3

@olegz
Copy link
Contributor

olegz commented Jun 30, 2021

I think you are miss-interpreting reactive support in spring-cloud-stream. At the moment it's API only. We can't avoid blocking calls since underlying binders are relying on non-reactive APIs (Kafka, Rabbit etc).

So effectively we provide a reactive bridge to an established messaging frameworks which is what you see in the above stack trace.
Not much can be done at the moment until the underlying tech changes.

@cliffdurden
Copy link

cliffdurden commented Jun 30, 2021

I think this is related issue about how kafka-client update metadata

and KIP -286 in Kafka

@Vichukano
Copy link
Author

I did a workaround with separate threads for netty and reactor when sending a message to kafka.

return client.post()
           .uri("/bar")
           .bodyValue(body)
           .retrieve()
           .bodyToMono(String.class)
           .doOnNext(response -> LOGGER.warn("Receive response {} in netty thread", response))
           .publishOn(Schedulers.boundedElastic());

The error is not reproduced on this configuration. How good is this solution?

@olegz
Copy link
Contributor

olegz commented Jan 4, 2023

This appears top be not related to s-c-stream so I am going to close this. Feel free to raise a new issue with an updated version of the framework

@olegz olegz closed this as completed Jan 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants