Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WindowOperatorQueryFrameProcessor: Avoid writing multiple frames to output channel in runIncrementally() #17373

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

Akshat-Jain
Copy link
Contributor

Description

With the changes made in #17038, we missed a flow where multiple frames could be written to the output channel in a single iteration of WindowOperatorQueryFrameProcessor#runIncrementally. This violates the contract of runIncrementally() and leads to the following error: Channel has no capacity

Sample stacktrace
2024-10-18T00:06:27,410 WARN [MultiStageQuery-test-controller-client] org.apache.druid.msq.exec.WorkerImpl - Work failed; stage 2; task query-dummy-worker0_0; host 123:8080: UnknownError: java.lang.RuntimeException: org.apache.druid.java.util.common.ISE: Channel has no capacity
java.lang.RuntimeException: org.apache.druid.java.util.common.ISE: Channel has no capacity
	at org.apache.druid.java.util.common.Either.valueOrThrow(Either.java:95)
	at org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.runProcessorNow(FrameProcessorExecutor.java:271)
	at org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.run(FrameProcessorExecutor.java:141)
	at org.apache.druid.msq.exec.WorkerImpl$2$2.run(WorkerImpl.java:900)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.druid.java.util.common.ISE: Channel has no capacity
	at org.apache.druid.frame.channel.BlockingQueueFrameChannel$Writable.write(BlockingQueueFrameChannel.java:139)
	at org.apache.druid.msq.indexing.CountingWritableFrameChannel.write(CountingWritableFrameChannel.java:50)
	at org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.flushFrameWriter(WindowOperatorQueryFrameProcessor.java:302)
	at org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.writeRacToFrame(WindowOperatorQueryFrameProcessor.java:262)
	at org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.flushAllRowsAndCols(WindowOperatorQueryFrameProcessor.java:232)
	at org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessor.runIncrementally(WindowOperatorQueryFrameProcessor.java:150)
	at org.apache.druid.msq.counters.CpuTimeAccumulatingFrameProcessor.runIncrementally(CpuTimeAccumulatingFrameProcessor.java:66)
	at org.apache.druid.frame.processor.FrameProcessors$1FrameProcessorWithBaggage.runIncrementally(FrameProcessors.java:72)
	at org.apache.druid.frame.processor.FrameProcessorExecutor$1ExecutorRunnable.runProcessorNow(FrameProcessorExecutor.java:239)
	... 5 more

We missed this flow:

  1. We read the input channel
  2. We call runAllOpsOnBatch()
  3. If after step (1), the input channel was finished, then the operatorChain's receiver's completed() method will get called, which would call flushAllRowsAndCols(), hence writing to the output channel.
  4. After this, we end up calling flushAllRowsAndCols() again, missing the fact that there's a chance that the input channel might have finished. This ends up attempting to write another frame to the output channel, causing the Channel has no capacity error.

This PR fixes the above problematic flow by checking if the input channel is finished. If it's finished, we re-run runIncrementally() instead of step 4.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Oct 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants