class documentation

class Context:

Constructor: Context(deps)

View In Hierarchy

Undocumented

Method __init__ Undocumented
Method detached Detached invocation.
Method get_dependency Undocumented
Method lfc Local function call.
Method lfi Local function invocation.
Method rfc Undocumented
Method rfi Undocumented
Method sleep Undocumented
Instance Variable _deps Undocumented
def __init__(self, deps: Dependencies):

Undocumented

@overload
def detached(self, id: str, coro: Callable[Concatenate[Context, P], Generator[Yieldable, Any, Any]], /, *args: P.args, **kwargs: P.kwargs) -> DI:
@overload
def detached(self, id: str, coro: Callable[Concatenate[Context, P], Any | Coroutine[Any, Any, Any]], /, *args: P.args, **kwargs: P.kwargs) -> DI:

Detached invocation.

Invoke as a root invocation. Is equivalent to do Scheduler.run(...) invoked execution will be retried and managed from the server.

def get_dependency(self, key: str) -> Any:

Undocumented

@overload
def lfc(self, func: RegisteredFn[P, Any], /, *args: P.args, **kwargs: P.kwargs) -> LFC:
@overload
def lfc(self, func: Callable[Concatenate[Context, P], Generator[Yieldable, Any, Any]], /, *args: P.args, **kwargs: P.kwargs) -> LFC:
@overload
def lfc(self, func: Callable[Concatenate[Context, P], Any | Coroutine[Any, Any, Any]], /, *args: P.args, **kwargs: P.kwargs) -> LFC:

Local function call.

LFC and await for the result of the execution. It's syntax sugar for yield (yield ctx.lfi(...))

@overload
def lfi(self, func: RegisteredFn[P, Any], /, *args: P.args, **kwargs: P.kwargs) -> LFI:
@overload
def lfi(self, func: Callable[Concatenate[Context, P], Generator[Yieldable, Any, Any]], /, *args: P.args, **kwargs: P.kwargs) -> LFI:
@overload
def lfi(self, func: Callable[Concatenate[Context, P], Any | Coroutine[Any, Any, Any]], /, *args: P.args, **kwargs: P.kwargs) -> LFI:

Local function invocation.

Invoke and immediatelly receive a Promise[T] that represents the future result of the execution.

The Promise can be yielded later in the execution to await for the result.

@overload
def rfc(self, cmd: DurablePromise, /) -> RFC:
@overload
def rfc(self, func: RegisteredFn[P, Any], /, *args: P.args, **kwargs: P.kwargs) -> RFC:
@overload
def rfc(self, func: str, /, *args: Any, **kwargs: Any) -> RFC:
@overload
def rfc(self, func: Callable[Concatenate[Context, P], Generator[Yieldable, Any, Any]], /, *args: P.args, **kwargs: P.kwargs) -> RFC:
@overload
def rfc(self, func: Callable[Concatenate[Context, P], Any | Coroutine[Any, Any, Any]], /, *args: P.args, **kwargs: P.kwargs) -> RFC:

Undocumented

@overload
def rfi(self, cmd: DurablePromise, /) -> RFI:
@overload
def rfi(self, func: RegisteredFn[P, Any], /, *args: P.args, **kwargs: P.kwargs) -> RFI:
@overload
def rfi(self, func: str, /, *args: Any, **kwargs: Any) -> RFI:
@overload
def rfi(self, func: Callable[Concatenate[Context, P], Generator[Yieldable, Any, Any]], /, *args: P.args, **kwargs: P.kwargs) -> RFI:
@overload
def rfi(self, func: Callable[Concatenate[Context, P], Any | Coroutine[Any, Any, Any]], /, *args: P.args, **kwargs: P.kwargs) -> RFI:

Undocumented

def sleep(self, secs: int) -> RFC:

Undocumented

_deps =

Undocumented