Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

heartbeat for runners for better execution in environments with autoscaling #16142

Open
anthuswilliams opened this issue Nov 27, 2024 · 4 comments
Labels
enhancement An improvement of an existing feature great writeup This is a wonderful example of our standards performance Related to an optimization or performance improvement

Comments

@anthuswilliams
Copy link

anthuswilliams commented Nov 27, 2024

Describe the current behavior

Prefect does not seem to work well with a kubernetes autoscaling strategy or in general in a context where compute resources are elastic and can be scaled up and down. Because we are running our Prefect workloads on these highly volatile/scalable clusters, we have jobs/pods getting rescheduled frequently.

This graph is a view of our cluster autoscaler, it’s showing the number Instance Group - Instance Group Size metric in GCP. This is a direct result of the number of autoscale events in our production Kubernetes clusters.

Screenshot 2024-11-20 at 3 30 39 PM

What you're seeing here is a consequence of the rate of Prefect job submission. Essentially we have applications which submit a number of flows, and these flows in turn launch a number of subflows via run_deployment. This results in a spike in the number of jobs submitted to Kubernetes. In response, our autoscaler dramatically scales up the number of nodes. As these jobs start to finish, our autoscaler scales back down, evicting many of the jobs that are still in flight. Thus we end up with a lot more failures than we otherwise would, which is both frustrating for the developers of these flows and harmful to our bottom line as, after retries, we have essentially paid twice to process the same workload.

When this happens, Prefect often loses track of the workers that had been executing its workloads: either the run is marked Failed while the runner continues executing, or the runner dies but the Prefect server thinks it is still executing. These "ghost runs" can remain in the Prefect UI for hours until a human notices the issue and reschedules or cancels the run in question.

We have often been told by support that the way around this issue is to set up timeout automations that set the flow run state to Failed, schedule retries, etc. This is a workable idea, but it doesn't address the fundamental problem. There is often nothing wrong with their code, the resources that have been requested, or anything else in their control; the problem is that their infrastructure has failed them. It's a lot to ask of developers to set up these kinds of timeouts to work around a problem that should be solved out of the box. At the very least a timeout of this nature should be something more easily configurable than an entire automation block.

Describe the proposed behavior

We would like to suggest a sort of heartbeat for Prefect runners. Either this could be some communication the runner sends to the server at a predefined interval, or it could expose a healthcheck endpoint, with Prefect jobs including some sort of sidecar responsible for the same. If the runner in question has not checked in for a certain amount of time, the Prefect server assumes the run has crashed and re-enqueues the work (or marks it Failed, etc). Conversely, we could then configure the runner so that, if the runner has not been able to contact the Prefect server in a certain amount of time, it could gracefully shut down.

Example Use

This would help a lot with both the flakiness of our Prefect runs and keep down some of the costs we pay for our cloud compute. That is because we could come up with ways to help manage this burst on the Kubernetes side. Two of our best ideas here are:

  • Schedule flows to run as a Kubernetes job via a prefect-kubernetes worker. In this case we would add something like Kueue to control the rate at which these jobs are actually scheduled in Kubernetes, helping ease the spikiness of the graph you see above. Kueue also controls the scale-down and can add smoothness there, but ultimately flow runs scaled down in this way will still be subject to the same "ghost run" issue described above.
  • Eschew prefect-kubernetes entirely and deploy process workers. Here we can use KEDA to scale up and down based on the length of a work queue. The upside is that we get good scaling behavior and it can dramatically simplify the code our developers need to write, as they don't need to think as much about the size of the jobs they are ultimately submitting. The downside is that a) we have at least one process running even when there is no work to be done, and b) we have heard from our support team at Prefect that this can put us into situations where multiple process workers are executing the same flow run at the same time. But more importantly, this does not solve the "ghost run" issue, as when the processes are scaled down, the Prefect server is not notified that they are no longer working on the run in question.

As you can see, both of these approaches fall afoul of the same issue: the Prefect server does not see that the agents executing its inflight workflows have vanished. Both of these approaches would be viable if Prefect runners included some sort of heartbeat so that the server would know to mark them dead. (Note that Prefect workers seem already to behave this way, and the UI will report if it has not heard from a worker in a certain period of time. But we don't have the same observability when it comes to the runners actually executing the flows.)

Additional context

Note: we have tried some of prefect's concurrency settings (per deployment and global concurrency) here, but they actually make this problem worse. Because Prefect cannot see that some workers have vanished, it will conclude that a concurrency slot is taken up by a long-dead worker and refuse to schedule anything else until some other agent forcibly sets the state of the flow run in question to Failed.

@anthuswilliams anthuswilliams added the enhancement An improvement of an existing feature label Nov 27, 2024
@anthuswilliams anthuswilliams changed the title heartbeat for runners to provide for better execution in environments with autoscaling heartbeat for runners for better execution in environments with autoscaling Nov 28, 2024
@zzstoatzz zzstoatzz added great writeup This is a wonderful example of our standards performance Related to an optimization or performance improvement labels Nov 28, 2024
@desertaxle
Copy link
Member

Thanks for great writeup idea @anthuswilliams! I really like your runner heartbeat idea! I have another idea that I'd like your take on. What if the runner could listen for SIGTERM and SIGKILL events from k8s and pause and reschedule the flow run when those signals are received? That way, a new pod could be scheduled for the flow run, and execution could pick up where it left off. That would require having result persistence set up and including tasks within your flows to act as checkpoints, but with both those things I think this would work quite well. Does that sounds like it'd be useful for your use case?

@anthuswilliams
Copy link
Author

Yes, that would also be very useful! I imagine one challenge there would be whether we can distinguish such events from evictions that actually do require a code change (such as OOMKills), but a framework like that would be very useful when using pre-emptible compute.

@sm-moore
Copy link

sm-moore commented Dec 2, 2024

container lifecycle hooks might be a good way to orchestrate the behavior @desertaxle is talking about. That being said, there are inevitably going to be "catastrophic" failures which could result in those reschedules not working. Having a heartbeat seems like the most foolproof way to ensure running things are actually running.

@desertaxle
Copy link
Member

That's a great callout @anthuswilliams! We'll probably need some sort of limit of retries in these failure cases. We could probably reuse the retries field on the flow, but categorize it as a different retry type.

@sm-moore using container lifecycle hooks is a great suggestion! Using heartbeats and listening for common signals will work with a lot different infrastructure types, but we could definitely look to enhance Kubernetes specific functionality with Prefect via lifecycle hooks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature great writeup This is a wonderful example of our standards performance Related to an optimization or performance improvement
Projects
None yet
Development

No branches or pull requests

4 participants