AioClock Application
To initialize the AioClock instance, you need to import the AioClock class from the aioclock module. AioClock class represent the aioclock, and handle the tasks and groups that will be run by the aioclock.
Another way to modulize your code is to use Group
which is kinda the same idea as router in web frameworks.
AioClock ¶
AioClock(
*,
lifespan: Optional[
Callable[
[AioClock],
AsyncContextManager[AioClock]
| ContextManager[AioClock],
]
] = None,
limiter: Optional[CapacityLimiter] = None
)
AioClock is the main class that will be used to run the tasks. It will be responsible for running the tasks in the right order.
Example
To run the aioclock final app simply do:
Example
Lifespan¶
You can define this startup and shutdown logic using the lifespan parameter of the AioClock instance. It should be as an AsyncContextManager which get AioClock application as arguement. You can find the example below.
Example
import asyncio
from contextlib import asynccontextmanager
from aioclock import AioClock
ML_MODEL = [] # just some imaginary component that needs to be started and stopped
@asynccontextmanager
async def lifespan(app: AioClock):
ML_MODEL.append(2)
print("UP!")
yield app
ML_MODEL.clear()
print("DOWN!")
app = AioClock(lifespan=lifespan)
if __name__ == "__main__":
asyncio.run(app.serve())
Here we are simulating the expensive startup operation of loading the model by putting the (fake) model function in the dictionary with machine learning models before the yield. This code will be executed before the application starts operationg, during the startup.
And then, right after the yield, we unload the model. This code will be executed after the application finishes handling requests, right before the shutdown. This could, for example, release resources like memory, a GPU or some database connection.
It would also happen when you're stopping your application gracefully, for example, when you're shutting down your container.
Lifespan could also be synchronus context manager. Check the example below.
Example
from contextlib import contextmanager
from aioclock import AioClock
ML_MODEL = []
@contextmanager
def lifespan_sync(sync_app: AioClock):
ML_MODEL.append(2)
print("UP!")
yield sync_app
ML_MODEL.clear()
print("DOWN!")
sync_app = AioClock(lifespan=lifespan_sync)
if __name__ == "__main__":
asyncio.run(app.serve())
Attributes:
Name | Type | Description |
---|---|---|
lifespan |
A context manager that will be used to handle the startup and shutdown of the application. If not provided, the application will run without any startup and shutdown logic. To understand it better, check the examples and documentation above. |
|
limiter |
Anyio CapacityLimiter. capacity limiter to use to limit the total amount of threads running Limiter that will be used to limit the number of tasks that are running at the same time. If not provided, it will fallback to the default limiter set on Application level. If no limiter is set on Application level, it will fallback to the default limiter set by AnyIO. |
Source code in aioclock/app.py
dependencies
property
¶
Dependencies provider that will be used to inject dependencies in tasks.
override_dependencies ¶
Override a dependency with a new one.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
original |
Callable[..., Any]
|
Original dependency that will be overridden. |
required |
override |
Callable[..., Any]
|
New dependency that will override the original one. |
required |
Example
Source code in aioclock/app.py
include_group ¶
include_group(group: Group) -> None
Include a group of tasks that will be run by AioClock.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
group |
Group
|
Group of tasks that will be run together. |
required |
Example
Source code in aioclock/app.py
task ¶
task(*, trigger: BaseTrigger)
Decorator to add a task to the AioClock instance.
If decorated function is sync, aioclock will run it in a thread pool executor, using AnyIO.
But if you try to run the decorated function, it will run in the same thread, blocking the event loop.
It is intended to not change all your sync functions
to coroutine functions,
and they can be used outside of aioclock, if needed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trigger |
BaseTrigger
|
BaseTrigger Trigger that will trigger the task to be running. |
required |
Example
Source code in aioclock/app.py
serve
async
¶
Serves AioClock Run the tasks in the right order. First, run the startup tasks, then run the tasks, and finally run the shutdown tasks.
Source code in aioclock/app.py
Group ¶
Best use case is to have a good modularity and separation of concerns. For example, you can have a group of tasks that are responsible for sending emails. And another group of tasks that are responsible for sending notifications.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
limiter |
Optional[CapacityLimiter]
|
Anyio CapacityLimiter. capacity limiter to use to limit the total amount of threads running Limiter that will be used to limit the number of tasks that are running at the same time. If not provided, it will fallback to the default limiter set on Application level. If no limiter is set on Application level, it will fallback to the default limiter set by AnyIO. |
None
|
Example
Source code in aioclock/group.py
task ¶
task(*, trigger: BaseTrigger)
Decorator to add a task to the AioClock instance.
If decorated function is sync, aioclock will run it in a thread pool executor, using AnyIO.
But if you try to run the decorated function, it will run in the same thread, blocking the event loop.
It is intended to not change all your sync functions
to coroutine functions,
and they can be used outside of aioclock, if needed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trigger |
BaseTrigger
|
BaseTrigger Trigger that will trigger the task to be running. |
required |