Skip to content

AioClock Application

To initialize the AioClock instance, you need to import the AioClock class from the aioclock module. AioClock class represent the aioclock, and handle the tasks and groups that will be run by the aioclock.

Another way to modulize your code is to use Group which is kinda the same idea as router in web frameworks.

AioClock

AioClock(
    *,
    lifespan: Optional[
        Callable[
            [AioClock],
            AsyncContextManager[AioClock]
            | ContextManager[AioClock],
        ]
    ] = None,
    limiter: Optional[CapacityLimiter] = None
)

AioClock is the main class that will be used to run the tasks. It will be responsible for running the tasks in the right order.

Example
from aioclock import AioClock, Once
app = AioClock()

@app.task(trigger=Once())
async def main():
    print("Hello World")

To run the aioclock final app simply do:

Example
from aioclock import AioClock, Once
import asyncio

app = AioClock()

# whatever next comes here
asyncio.run(app.serve())

Lifespan

You can define this startup and shutdown logic using the lifespan parameter of the AioClock instance. It should be as an AsyncContextManager which get AioClock application as arguement. You can find the example below.

Example
    import asyncio
    from contextlib import asynccontextmanager

    from aioclock import AioClock

    ML_MODEL = [] # just some imaginary component that needs to be started and stopped


    @asynccontextmanager
    async def lifespan(app: AioClock):
        ML_MODEL.append(2)
        print("UP!")
        yield app
        ML_MODEL.clear()
        print("DOWN!")


    app = AioClock(lifespan=lifespan)


    if __name__ == "__main__":
        asyncio.run(app.serve())

Here we are simulating the expensive startup operation of loading the model by putting the (fake) model function in the dictionary with machine learning models before the yield. This code will be executed before the application starts operationg, during the startup.

And then, right after the yield, we unload the model. This code will be executed after the application finishes handling requests, right before the shutdown. This could, for example, release resources like memory, a GPU or some database connection.

It would also happen when you're stopping your application gracefully, for example, when you're shutting down your container.

Lifespan could also be synchronus context manager. Check the example below.

Example
    from contextlib import contextmanager

    from aioclock import AioClock

    ML_MODEL = []

    @contextmanager
    def lifespan_sync(sync_app: AioClock):
        ML_MODEL.append(2)
        print("UP!")
        yield sync_app
        ML_MODEL.clear()
        print("DOWN!")

    sync_app = AioClock(lifespan=lifespan_sync)

    if __name__ == "__main__":
        asyncio.run(app.serve())

Attributes:

Name Type Description
lifespan

A context manager that will be used to handle the startup and shutdown of the application. If not provided, the application will run without any startup and shutdown logic. To understand it better, check the examples and documentation above.

limiter

Anyio CapacityLimiter. capacity limiter to use to limit the total amount of threads running Limiter that will be used to limit the number of tasks that are running at the same time. If not provided, it will fallback to the default limiter set on Application level. If no limiter is set on Application level, it will fallback to the default limiter set by AnyIO.

Source code in aioclock/app.py
def __init__(
    self,
    *,
    lifespan: Optional[
        Callable[[AioClock], AsyncContextManager[AioClock] | ContextManager[AioClock]]
    ] = None,
    limiter: Optional[anyio.CapacityLimiter] = None,
):
    """
    Initialize AioClock instance.
    No parameters are needed.

    Attributes:
        lifespan:
            A context manager that will be used to handle the startup and shutdown of the application.
            If not provided, the application will run without any startup and shutdown logic.
            To understand it better, check the examples and documentation above.

        limiter:
            Anyio CapacityLimiter. capacity limiter to use to limit the total amount of threads running
            Limiter that will be used to limit the number of tasks that are running at the same time.
            If not provided, it will fallback to the default limiter set on Application level.
            If no limiter is set on Application level, it will fallback to the default limiter set by AnyIO.

    """
    self._groups: list[Group] = []
    self._app_tasks: list[Task] = []
    self._limiter = limiter
    self.lifespan = lifespan

dependencies property

dependencies

Dependencies provider that will be used to inject dependencies in tasks.

override_dependencies

override_dependencies(
    original: Callable[..., Any],
    override: Callable[..., Any],
) -> None

Override a dependency with a new one.

Parameters:

Name Type Description Default
original Callable[..., Any]

Original dependency that will be overridden.

required
override Callable[..., Any]

New dependency that will override the original one.

required
Example
from aioclock import AioClock

def original_dependency():
    return 1

def new_dependency():
    return 2

app = AioClock()
app.override_dependencies(original=original_dependency, override=new_dependency)
Source code in aioclock/app.py
def override_dependencies(
    self, original: Callable[..., Any], override: Callable[..., Any]
) -> None:
    """Override a dependency with a new one.

    params:
        original:
            Original dependency that will be overridden.
        override:
            New dependency that will override the original one.

    Example:
        ```python
        from aioclock import AioClock

        def original_dependency():
            return 1

        def new_dependency():
            return 2

        app = AioClock()
        app.override_dependencies(original=original_dependency, override=new_dependency)
        ```

    """
    self.dependencies.override(original, override)

include_group

include_group(group: Group) -> None

Include a group of tasks that will be run by AioClock.

Parameters:

Name Type Description Default
group Group

Group of tasks that will be run together.

required
Example
from aioclock import AioClock, Group, Once

app = AioClock()

group = Group()
@group.task(trigger=Once())
async def main():
    print("Hello World")

app.include_group(group)
Source code in aioclock/app.py
def include_group(self, group: Group) -> None:
    """Include a group of tasks that will be run by AioClock.

    params:
        group:
            Group of tasks that will be run together.

    Example:
        ```python
        from aioclock import AioClock, Group, Once

        app = AioClock()

        group = Group()
        @group.task(trigger=Once())
        async def main():
            print("Hello World")

        app.include_group(group)
        ```
    """
    self._groups.append(group)
    return None

task

task(*, trigger: BaseTrigger)

Decorator to add a task to the AioClock instance. If decorated function is sync, aioclock will run it in a thread pool executor, using AnyIO. But if you try to run the decorated function, it will run in the same thread, blocking the event loop. It is intended to not change all your sync functions to coroutine functions, and they can be used outside of aioclock, if needed.

Parameters:

Name Type Description Default
trigger BaseTrigger

BaseTrigger Trigger that will trigger the task to be running.

required
Example
from aioclock import AioClock, Once

app = AioClock()

@app.task(trigger=Once())
async def main():
    print("Hello World")
Source code in aioclock/app.py
def task(self, *, trigger: BaseTrigger):
    """
    Decorator to add a task to the AioClock instance.
    If decorated function is sync, aioclock will run it in a thread pool executor, using AnyIO.
    But if you try to run the decorated function, it will run in the same thread, blocking the event loop.
    It is intended to not change all your `sync functions` to coroutine functions,
        and they can be used outside of aioclock, if needed.

    params:
        trigger: BaseTrigger
            Trigger that will trigger the task to be running.

    Example:
        ```python

        from aioclock import AioClock, Once

        app = AioClock()

        @app.task(trigger=Once())
        async def main():
            print("Hello World")
        ```
    """

    def decorator(func):
        @wraps(func)
        async def wrapped_funciton(*args, **kwargs):
            if asyncio.iscoroutinefunction(func):
                return await func(*args, **kwargs)
            else:  # run in threadpool to make sure it's not blocking the event loop
                return await asyncify(func, limiter=self._limiter)(*args, **kwargs)

        self._app_tasks.append(
            Task(
                func=inject(wrapped_funciton, dependency_overrides_provider=get_provider()),
                trigger=trigger,
            )
        )
        if asyncio.iscoroutinefunction(func):
            return wrapped_funciton
        else:

            @wraps(func)
            def wrapper(*args, **kwargs):
                return func(*args, **kwargs)

            return wrapper

    return decorator

serve async

serve() -> None

Serves AioClock Run the tasks in the right order. First, run the startup tasks, then run the tasks, and finally run the shutdown tasks.

Source code in aioclock/app.py
async def serve(self) -> None:
    """
    Serves AioClock
    Run the tasks in the right order.
    First, run the startup tasks, then run the tasks, and finally run the shutdown tasks.
    """
    group = Group()
    group._tasks = self._app_tasks
    self.include_group(group)

    if self.lifespan is None:
        await self._run_tasks()
        return

    ctx = self.lifespan(self)

    if isinstance(ctx, AsyncContextManager):
        async with ctx:
            await self._run_tasks()

    elif isinstance(ctx, ContextManager):
        with ctx:
            await self._run_tasks()

    else:
        assert_never(ctx)

Group

Group(*, limiter: Optional[CapacityLimiter] = None)

Best use case is to have a good modularity and separation of concerns. For example, you can have a group of tasks that are responsible for sending emails. And another group of tasks that are responsible for sending notifications.

Parameters:

Name Type Description Default
limiter Optional[CapacityLimiter]

Anyio CapacityLimiter. capacity limiter to use to limit the total amount of threads running Limiter that will be used to limit the number of tasks that are running at the same time. If not provided, it will fallback to the default limiter set on Application level. If no limiter is set on Application level, it will fallback to the default limiter set by AnyIO.

None
Example
from aioclock import Group, AioClock, Forever

email_group = Group()

# consider this as different file
@email_group.task(trigger=Forever())
async def send_email():
    ...

# app.py
aio_clock = AioClock()
aio_clock.include_group(email_group)
Source code in aioclock/group.py
def __init__(
    self,
    *,
    limiter: Optional[anyio.CapacityLimiter] = None,
):
    """
    Group of tasks that will be run together.

    Best use case is to have a good modularity and separation of concerns.
    For example, you can have a group of tasks that are responsible for sending emails.
    And another group of tasks that are responsible for sending notifications.

    Params:
        limiter:
            Anyio CapacityLimiter. capacity limiter to use to limit the total amount of threads running
            Limiter that will be used to limit the number of tasks that are running at the same time.
            If not provided, it will fallback to the default limiter set on Application level.
            If no limiter is set on Application level, it will fallback to the default limiter set by AnyIO.

    Example:
        ```python

        from aioclock import Group, AioClock, Forever

        email_group = Group()

        # consider this as different file
        @email_group.task(trigger=Forever())
        async def send_email():
            ...

        # app.py
        aio_clock = AioClock()
        aio_clock.include_group(email_group)
        ```

    """
    self._tasks: list[Task] = []
    self._limiter = limiter

task

task(*, trigger: BaseTrigger)

Decorator to add a task to the AioClock instance. If decorated function is sync, aioclock will run it in a thread pool executor, using AnyIO. But if you try to run the decorated function, it will run in the same thread, blocking the event loop. It is intended to not change all your sync functions to coroutine functions, and they can be used outside of aioclock, if needed.

Parameters:

Name Type Description Default
trigger BaseTrigger

BaseTrigger Trigger that will trigger the task to be running.

required
Example
from aioclock import AioClock, Once

app = AioClock()

@app.task(trigger=Once())
async def main():
    print("Hello World")
Source code in aioclock/group.py
def task(self, *, trigger: BaseTrigger):
    """
    Decorator to add a task to the AioClock instance.
    If decorated function is sync, aioclock will run it in a thread pool executor, using AnyIO.
    But if you try to run the decorated function, it will run in the same thread, blocking the event loop.
    It is intended to not change all your `sync functions` to coroutine functions,
        and they can be used outside of aioclock, if needed.

    params:
        trigger: BaseTrigger
            Trigger that will trigger the task to be running.

    Example:
        ```python

        from aioclock import AioClock, Once

        app = AioClock()

        @app.task(trigger=Once())
        async def main():
            print("Hello World")
        ```
    """

    def decorator(func):
        @wraps(func)
        async def wrapped_funciton(*args, **kwargs):
            if asyncio.iscoroutinefunction(func):
                return await func(*args, **kwargs)
            else:  # run in threadpool to make sure it's not blocking the event loop
                return await asyncify(func, limiter=self._limiter)(*args, **kwargs)

        self._tasks.append(
            Task(
                func=inject(wrapped_funciton, dependency_overrides_provider=get_provider()),
                trigger=trigger,
            )
        )

        if asyncio.iscoroutinefunction(func):
            return wrapped_funciton

        else:

            @wraps(func)
            def wrapper(*args, **kwargs):
                return func(*args, **kwargs)

            return wrapper

    return decorator