Skip to content

Commit

Permalink
Implement timeout mechanism for tasks & groups (#26)
Browse files Browse the repository at this point in the history
* fix typos in names, comments and docstrings

* Implement `timeout` for tasks

* Implement `timeout` for a group of tasks

* fix type hinting errors for 3.9
  • Loading branch information
mahdihaghverdi authored Aug 29, 2024
1 parent 744a512 commit ce16cb2
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/
20 changes: 12 additions & 8 deletions aioclock/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
Other tools and extension are written from this tool.
!!! danger "Note when writing to aioclock API and changing its state."
Right now the state of AioClock instance is on the memory level, so if you write an API and change a task's trigger time, it will not persist.
In future we might store the state of AioClock instance in a database, so that it always remains same.
But this is a bit tricky and implicit because then your code gets ignored and database is preferred over the codebase.
For now you may consider it as a way to change something without redeploying the application, but it is not very recommended to write.
Right now the state of AioClock instance is on the memory level,
so if you write an API and change a task's trigger time, it will not persist.
In the future, we might store the state of AioClock instance in a database, so that it always remains same.
But this is a bit tricky and implicit because then your code gets ignored
and database is preferred over the codebase.
For now, you may consider it as a way to change something without redeploying the application,
but it is not very recommended to write.
"""

import sys
Expand Down Expand Up @@ -37,8 +40,9 @@ class TaskMetadata(BaseModel):
Attributes:
id: UUID: Task ID that is unique for each task, and changes every time you run the aioclock app.
In future we might store task ID in a database, so that it always remains same.
trigger: Union[TriggerT, Any]: Trigger that is used to run the task, type is also any to ease implementing new triggers.
In the future, we might store task ID in a database, so that it always remains same.
trigger: Union[TriggerT, Any]: Trigger that is used to run the task,
type is also any to ease implementing new triggers.
task_name: str: Name of the task function.
"""

Expand All @@ -54,7 +58,7 @@ async def run_specific_task(task_id: UUID, app: AioClock):
params:
task_id: Task ID that is unique for each task, and changes every time you run the aioclock app.
In future we might store task ID in a database, so that it always remains same.
In the future, we might store task ID in a database, so that it always remains same.
app: AioClock instance to run the task from.
Example:
Expand Down Expand Up @@ -114,7 +118,7 @@ async def some_other_func():
async def get_metadata_of_all_tasks(app: AioClock) -> list[TaskMetadata]:
"""Get metadata of all tasks that are included in the AioClock instance.
This function can be used to mutate the `TaskMetadata` object, i.e to change the trigger of a task.
This function can be used to mutate the `TaskMetadata` object, i.e. to change the trigger of a task.
But for now it is yet not recommended to do this, as you might experience some unexpected behavior.
But in next versions, I'd like to make it more stable and reliable on mutating the data.
Expand Down
51 changes: 31 additions & 20 deletions aioclock/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
To initialize the AioClock instance, you need to import the AioClock class from the aioclock module.
AioClock class represent the aioclock, and handle the tasks and groups that will be run by the aioclock.
Another way to modulize your code is to use `Group` which is kinda the same idea as router in web frameworks.
Another way to modularize your code is to use `Group` which is kinda the same idea as router in web frameworks.
"""

from __future__ import annotations
Expand Down Expand Up @@ -76,7 +76,7 @@ async def main():
## Lifespan
You can define this startup and shutdown logic using the lifespan parameter of the AioClock instance.
It should be as an AsyncContextManager which get AioClock application as arguement.
It should be as an AsyncContextManager which get AioClock application as argument.
You can find the example below.
Example:
Expand Down Expand Up @@ -107,15 +107,16 @@ async def lifespan(app: AioClock):
Here we are simulating the expensive startup operation of loading the model by putting the (fake)
model function in the dictionary with machine learning models before the yield.
This code will be executed before the application starts operationg, during the startup.
This code will be executed before the application starts operating, during the startup.
And then, right after the yield, we unload the model.
This code will be executed after the application finishes handling requests, right before the shutdown.
This could, for example, release resources like memory, a GPU or some database connection.
It would also happen when you're stopping your application gracefully, for example, when you're shutting down your container.
It would also happen when you're stopping your application gracefully,
for example, when you're shutting down your container.
Lifespan could also be synchronus context manager. Check the example below.
Lifespan could also be synchronous context manager. Check the example below.
Example:
Expand Down Expand Up @@ -163,8 +164,8 @@ def __init__(
limiter:
Anyio CapacityLimiter. capacity limiter to use to limit the total amount of threads running
Limiter that will be used to limit the number of tasks that are running at the same time.
If not provided, it will fallback to the default limiter set on Application level.
If no limiter is set on Application level, it will fallback to the default limiter set by AnyIO.
If not provided, it will fall back to the default limiter set on Application level.
If no limiter is set on Application level, it will fall back to the default limiter set by AnyIO.
"""
self._groups: list[Group] = []
Expand Down Expand Up @@ -235,18 +236,23 @@ async def main():
self._groups.append(group)
return None

def task(self, *, trigger: BaseTrigger):
def task(self, *, trigger: BaseTrigger, timeout: Optional[float] = None):
"""
Decorator to add a task to the AioClock instance.
If decorated function is sync, aioclock will run it in a thread pool executor, using AnyIO.
But if you try to run the decorated function, it will run in the same thread, blocking the event loop.
It is intended to not change all your `sync functions` to coroutine functions,
and they can be used outside of aioclock, if needed.
and they can be used outside aioclock, if needed.
params:
trigger: BaseTrigger
Trigger that will trigger the task to be running.
timeout: float | None (defaults to None)
Set a timeout for the task.
If the task completion took longer than timeout,
it will be cancelled and a `TaskTimeoutError` be raised by the Application.
Example:
```python
Expand All @@ -258,31 +264,36 @@ def task(self, *, trigger: BaseTrigger):
async def main():
print("Hello World")
```
Example:
```python
from aioclock import AioClock, Once
app = AioClock()
@app.task(trigger=Once(), timeout=3)
async def main():
await some_io_task()
```
"""

def decorator(func):
@wraps(func)
async def wrapped_funciton(*args, **kwargs):
async def wrapped_function(*args, **kwargs):
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else: # run in threadpool to make sure it's not blocking the event loop
return await asyncify(func, limiter=self._limiter)(*args, **kwargs)

self._app_tasks.append(
Task(
func=inject(wrapped_funciton, dependency_overrides_provider=get_provider()),
func=inject(wrapped_function, dependency_overrides_provider=get_provider()),
trigger=trigger,
timeout=timeout,
)
)
if asyncio.iscoroutinefunction(func):
return wrapped_funciton
else:

@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)

return wrapper
return wrapped_function

return decorator

Expand Down
4 changes: 4 additions & 0 deletions aioclock/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ class BaseAioClockException(Exception):

class TaskIdNotFound(BaseAioClockException):
"""Task not found in the AioClock app."""


class TaskTimeoutError(BaseAioClockException, TimeoutError):
"""A task took longer than its timeout"""
86 changes: 66 additions & 20 deletions aioclock/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(
self,
*,
limiter: Optional[anyio.CapacityLimiter] = None,
timeout: Optional[float] = None
):
"""
Group of tasks that will be run together.
Expand All @@ -38,8 +39,13 @@ def __init__(
limiter:
Anyio CapacityLimiter. capacity limiter to use to limit the total amount of threads running
Limiter that will be used to limit the number of tasks that are running at the same time.
If not provided, it will fallback to the default limiter set on Application level.
If no limiter is set on Application level, it will fallback to the default limiter set by AnyIO.
If not provided, it will fall back to the default limiter set on Application level.
If no limiter is set on Application level, it will fall back to the default limiter set by AnyIO.
timeout:
General timeout for the group's tasks.
If a task overrides this value, the new value will be used
for the task.
Example:
```python
Expand All @@ -58,67 +64,107 @@ async def send_email():
aio_clock.include_group(email_group)
```
Example:
```python
from aioclock import Group, AioClock, Forever
email_group = Group(timeout=5)
# consider this as different file
@email_group.task(trigger=Forever())
async def send_email():
...
# app.py
aio_clock = AioClock()
aio_clock.include_group(email_group)
```
"""
self._tasks: list[Task] = []
self._limiter = limiter
self._timeout = timeout

def task(self, *, trigger: BaseTrigger):
def task(self, *, trigger: BaseTrigger, timeout: Optional[float] = None):
"""
Decorator to add a task to the AioClock instance.
Decorator to add a task to the group.
If decorated function is sync, aioclock will run it in a thread pool executor, using AnyIO.
But if you try to run the decorated function, it will run in the same thread, blocking the event loop.
It is intended to not change all your `sync functions` to coroutine functions,
and they can be used outside of aioclock, if needed.
and they can be used outside aioclock, if needed.
params:
trigger: BaseTrigger
Trigger that will trigger the task to be running.
timeout: float | None (defaults to None)
Set a timeout for the task.
If the task completion took longer than timeout,
it will be cancelled and a `TaskTimeoutError` be raised by the Application.
Example:
```python
from aioclock import AioClock, Once
from aioclock import AioClock, Group, Once
group = Group()
@group.task(trigger=Once())
async def main():
print("Hello World")
app = AioClock()
app.include_group(group)
```
Example:
```python
from aioclock import AioClock, Group, Once, Every
@app.task(trigger=Once())
group = Group(timeout=5)
@group.task(trigger=Every(seconds=5))
async def main():
print("Hello World")
@group.task(trigger=Once(), timeout=4) # this task will get 4 as timeout
async def main():
print("Hello World")
app = AioClock()
app.include_group(group)
```
"""

def decorator(func):
@wraps(func)
async def wrapped_funciton(*args, **kwargs):
async def wrapped_function(*args, **kwargs):
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else: # run in threadpool to make sure it's not blocking the event loop
return await asyncify(func, limiter=self._limiter)(*args, **kwargs)

to = self._timeout
if timeout is not None:
to = timeout
self._tasks.append(
Task(
func=inject(wrapped_funciton, dependency_overrides_provider=get_provider()),
func=inject(wrapped_function, dependency_overrides_provider=get_provider()),
trigger=trigger,
timeout=to,
)
)

if asyncio.iscoroutinefunction(func):
return wrapped_funciton

else:

@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)

return wrapper
return wrapped_function

return decorator

async def _run(self):
"""
Just for purpose of being able to run all task in group
Private method, should not be used outside of the library
Private method, should not be used outside the library
"""
await asyncio.gather(
*(task.run() for task in self._tasks),
Expand Down
29 changes: 23 additions & 6 deletions aioclock/task.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
"""
Aioclock wrap your functions with a task object, and append the task to the list of tasks in the AioClock instance.
After collecting all the tasks from decorated functions, aioclock serve them in order it has to be (startup, normal, shutdown).
Aioclock wrap your functions with a task object,
and append the task to the list of tasks in the AioClock instance.
After collecting all the tasks from decorated functions,
aioclock serve them in order it has to be (startup, normal, shutdown).
These tasks keep running forever until the trigger's method `should_trigger` returns False.
"""

import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any, Awaitable, Callable
from typing import Any, Awaitable, Callable, Optional
from uuid import UUID, uuid4

from aioclock.exceptions import TaskTimeoutError
from aioclock.logger import logger
from aioclock.triggers import BaseTrigger

Expand All @@ -26,14 +30,15 @@ class Task:
func: Callable[..., Awaitable[Any]]: Decorated function that will be run by AioClock.
trigger: BaseTrigger: Trigger that will be used to run the function.
id: UUID: Task ID that is unique for each task, and changes every time you run the aioclock app.
In future we might store task ID in a database, so that it always remains same.
In the future, we might store task ID in a database, so that it always remains same.
"""

func: Callable[..., Awaitable[Any]]

trigger: BaseTrigger

timeout: Optional[float] = None
id: UUID = field(default_factory=uuid4)

async def run(self):
Expand All @@ -50,8 +55,20 @@ async def run(self):
seconds=next_trigger
)
await self.trigger.trigger_next()
logger.debug(f"Running task {self.func.__name__}")
await self.func()

if self.timeout is not None:
logger.debug(
f"Running task {self.func.__name__} with timeout of {self.timeout}"
)
try:
await asyncio.wait_for(self.func(), self.timeout)
except asyncio.TimeoutError:
raise TaskTimeoutError(
f"Task {self.func.__name__!r} took longer than {self.timeout} seconds to run!"
) from None
else:
logger.debug(f"Running task {self.func.__name__}")
await self.func()
except Exception as error:
# Log the error, but keep running the tasks.
# don't crash the whole application.
Expand Down
Loading

0 comments on commit ce16cb2

Please sign in to comment.