class documentation

Undocumented

Method __init__ Undocumented
Method __repr__ Undocumented
Method begin_rpc Schedule a function for remote execution. (Alias for ctx.rfi).
Method begin_run Schedule a function for local execution. (Alias for ctx.lfi).
Method detached Schedule a function for remote (detached) execution.
Method get_dependency Retrieve a dependency by its name.
Method lfc Schedule a function for local execution and await its result.
Method lfi Schedule a function for local execution.
Method promise Get or create a promise that can be awaited on.
Method rfc Schedule a function for remote execution and await its result.
Method rfi Schedule a function for remote execution.
Method rpc Schedule a function for remote execution and await its result. (Alias for ctx.rfc).
Method run Schedule a function for local execution and await its result. (Alias for ctx.lfc).
Method sleep Sleep inside a function.
Method typesafe Optionally provide type safety for your coroutine definition.
Property id Id for the current execution.
Property info Information for the current execution.
Property logger Undocumented
Property random Deterministic random methods.
Property time Deterministic time methods.
Method _next Undocumented
Instance Variable _cid Undocumented
Instance Variable _counter Undocumented
Instance Variable _dependencies Undocumented
Instance Variable _id Undocumented
Instance Variable _info Undocumented
Instance Variable _logger Undocumented
Instance Variable _random Undocumented
Instance Variable _registry Undocumented
Instance Variable _time Undocumented
def __init__(self, id: str, cid: str, info: Info, registry: Registry, dependencies: Dependencies, logger: Logger):

Undocumented

def __repr__(self) -> str:

Undocumented

@overload
def begin_rpc(self, func: Callable[Concatenate[Context, P], Generator[Any, Any, R] | R], *args: P.args, **kwargs: P.kwargs) -> RFI[R]:
@overload
def begin_rpc(self, func: str, *args: Any, **kwargs: Any) -> RFI:

Schedule a function for remote execution. (Alias for ctx.rfi).

The function is scheduled on the global event loop and potentially executed in a different process, the returned promise can be awaited for the final result.

  • Function must be registered
  • Function args and kwargs must be serializable
def begin_run(self, func: Callable[Concatenate[Context, P], Generator[Any, Any, R] | R], *args: P.args, **kwargs: P.kwargs) -> LFI[R]:

Schedule a function for local execution. (Alias for ctx.lfi).

The function is executed in the current process, and the returned promise can be awaited for the final result.

By default, execution is durable; non durable behavior can be configured if needed.

@overload
def detached(self, func: Callable[Concatenate[Context, P], Generator[Any, Any, R] | R], *args: P.args, **kwargs: P.kwargs) -> RFI[R]:
@overload
def detached(self, func: str, *args: Any, **kwargs: Any) -> RFI:

Schedule a function for remote (detached) execution.

The function is scheduled on the global event loop and potentially executed in a different process, and the returned promise can be awaited for the final result.

  • Function must be registered
  • Function args and kwargs must be serializable
def get_dependency(self, key: str, default: T = None) -> Any | T:

Retrieve a dependency by its name.

If the dependency identified by key exists, its value is returned; otherwise, the specified default value is returned. A TypeError is raised if key is not a string.

def lfc(self, func: Callable[Concatenate[Context, P], Generator[Any, Any, R] | R], *args: P.args, **kwargs: P.kwargs) -> LFC[R]:

Schedule a function for local execution and await its result.

The function is executed in the current process.

By default, execution is durable; non durable behavior can be configured if needed.

def lfi(self, func: Callable[Concatenate[Context, P], Generator[Any, Any, R] | R], *args: P.args, **kwargs: P.kwargs) -> LFI[R]:

Schedule a function for local execution.

The function is executed in the current process, and the returned promise can be awaited for the final result.

By default, execution is durable; non durable behavior can be configured if needed.

def promise(self, *, id: str | None = None, timeout: float | None = None, idempotency_key: str | None = None, data: Any = None, tags: dict[str, str] | None = None) -> RFI:

Get or create a promise that can be awaited on.

If no ID is provided, one is generated and a new promise is created. If an ID is provided and a promise already exists with that ID, then the existing promise is returned (if the idempotency keys match).

This is very useful for HITL (Human-In-The-Loop) use cases where you want to block progress until a human has taken an action or provided data. It works well in conjunction with Resonate's .promise.resolve() method.

@overload
def rfc(self, func: Callable[Concatenate[Context, P], Generator[Any, Any, R] | R], *args: P.args, **kwargs: P.kwargs) -> RFC[R]:
@overload
def rfc(self, func: str, *args: Any, **kwargs: Any) -> RFC:

Schedule a function for remote execution and await its result.

The function is scheduled on the global event loop and potentially executed in a different process.

  • Function must be registered
  • Function args and kwargs must be serializable
@overload
def rfi(self, func: Callable[Concatenate[Context, P], Generator[Any, Any, R] | R], *args: P.args, **kwargs: P.kwargs) -> RFI[R]:
@overload
def rfi(self, func: str, *args: Any, **kwargs: Any) -> RFI:

Schedule a function for remote execution.

The function is scheduled on the global event loop and potentially executed in a different process, the returned promise can be awaited for the final result.

  • Function must be registered
  • Function args and kwargs must be serializable
@overload
def rpc(self, func: Callable[Concatenate[Context, P], Generator[Any, Any, R] | R], *args: P.args, **kwargs: P.kwargs) -> RFC[R]:
@overload
def rpc(self, func: str, *args: Any, **kwargs: Any) -> RFC:

Schedule a function for remote execution and await its result. (Alias for ctx.rfc).

The function is scheduled on the global event loop and potentially executed in a different process.

  • Function must be registered
  • Function args and kwargs must be serializable
def run(self, func: Callable[Concatenate[Context, P], Generator[Any, Any, R] | R], *args: P.args, **kwargs: P.kwargs) -> LFC[R]:

Schedule a function for local execution and await its result. (Alias for ctx.lfc).

The function is executed in the current process.

By default, execution is durable; non durable behavior can be configured if needed.

def sleep(self, secs: float) -> RFC[None]:

Sleep inside a function.

There is no limit to how long you can sleep. The sleep method accepts a float value in seconds.

@overload
def typesafe(self, cmd: LFI[T] | RFI[T]) -> Generator[LFI[T] | RFI[T], Promise[T], Promise[T]]:
@overload
def typesafe(self, cmd: LFC[T] | RFC[T] | Promise[T]) -> Generator[LFC[T] | RFC[T] | Promise[T], T, T]:

Optionally provide type safety for your coroutine definition.

@property
id: str =

Id for the current execution.

@property
info: Info =

Information for the current execution.

@property
logger: Logger =

Undocumented

@property
random: Random =

Deterministic random methods.

@property
time: Time =

Deterministic time methods.

def _next(self) -> str:

Undocumented

_cid =

Undocumented

_counter: int =

Undocumented

_dependencies =

Undocumented

_id =

Undocumented

_info =

Undocumented

_logger =

Undocumented

_random =

Undocumented

_registry =

Undocumented

_time =

Undocumented