rpc - server and protocol

RPC classes and services.

Check RPC guide and RPC services HOWTO on how to use and implement RPC services.

JSONRPC = '2.0'

protocol version

debug_only(f)[source]

Decorate a debug-only method (will not be available in production).

class JSONRPCServer[source]

Bases: ContextableService, PublicInterface, RPCServer

A simple JSON RPC interface with method execution and management tasks.

service_name = 'rpc'

you may define a custom service name here

__init__(app, *, scheduler: Scheduler = None, session_service: SessionInterface = None, auth_service: AuthenticationInterface = None, max_parallel_tasks: int = 128, default_request_time: int = 90, max_request_time: int = 600, enable_permissions: bool = True, request_logs: bool = False, response_logs: bool = False, blacklist_routes: list[str] = None, blacklist_scope: int = Scope.SYSTEM.value - 1, use_annotation_parser: bool = False, logger=None)[source]

Initialize.

Parameters:
  • app – web app

  • scheduler – task scheduler

  • session_service – session backend

  • max_parallel_tasks – max number of tasks in the loop

  • default_request_time – default request time in seconds if not specified by a header

  • max_request_time – maximum allowed request time in seconds

  • enable_permissions – enable perm checks in requests

  • request_logs – log request body and also log responses (even successful ones)

  • blacklist_routes – wildcard patterns to blacklist certain RPC routes

  • blacklist_scope – integer value to blacklist permission scopes lower or equal to this value

  • use_annotation_parser – annotation parser for non-validated method will be used and will try to create a method validator from its annotations (currently it’s not very reliable)

  • logger – logger instance

async static get_tasks() list[source]

Get all current asyncio tasks.

async get_status() dict[source]

Get server status and current tasks.

async get_routes(pattern: str = '*') dict[source]

Get all RPC routes (you are here).

register_service(service_name: str, service: PublicInterface) None[source]

Register an RPC compatible service and its methods.

async call(body: Collection[RPCRequest], headers: Mapping, scope: Scope = Scope.GUEST, session_id: str = None, callback: Callable[[...], Awaitable] = None)[source]
async call(body: RPCRequest, headers: Mapping, scope: Scope = Scope.GUEST, session_id: str = None, callback: Callable[[...], Awaitable] = None)

Call a registered RPC method.

Parameters:
  • body – request body (RPC request or a batch)

  • headers – request headers containing request execution instructions for the server

  • scope – default scope, determines whether a user has a right to call a specific method

  • session_id – current user session id to acquire user permissions and scope (if None then the scope arg value will be used as default for this request)

  • callback – optional response callback which should contain (session, headers, result) input params

Returns:

headers and response body or headers and None if it’s a notification request

There are two ways of calling an RPC method:

  1. Direct with waiting for the result. It requires the request to have id other than None and callback

    argument must also be None. The request will be run on the server and the result will be returned containing response headers and response / error tuple. Example of the request body:

{"id": 0, "params": {}}
  1. Notify request with or without callback. It requires id set to None or callback argument provided

    to the method. The server will return the response headers and None result immediately after the request validation. The actual request will be processed in background and the result will be send to the callback function eventually (if it’s provided). Example of the request body:

{"id": None, "params": {}}

If a request id is not provided it’s assumed that this request has id=0.

class BaseRPCClient[source]

Bases: ContextableService, RPCClient, ABC

JSONRPC client.

__init__(*args, transport: str, request_logs: bool = True, response_logs: bool = False, auth_str: str = None, scheduler: Scheduler = None, token_client: TokenClientService = False, error_classes: ErrorRegistry | None = ERROR_CLASSES, **kws)[source]

Initialize.

Parameters:
  • app – aiohttp web application

  • logger – a logger instance (None - app logger)

async call(method: str, params: dict | None = None, nowait: bool = False, request_id: int = 0, max_timeout: int = None, use_context: bool = True, retries: int = None, headers: dict = None) Any | None[source]

Make an RPC call.

Parameters:
  • method – rpc method name

  • params – method call arguments

  • nowait – create a ‘notify’ request - do not wait for the result

  • request_id – optional request id (usually you don’t need to set it)

  • max_timeout – request timeout in sec

  • use_context – use app request context such as correlation id and request chain deadline

  • retries – optional number of retries (on the server side)

  • headers – additional headers

async call_multiple(requests: Collection[RPCRequest], raise_exception: bool = True, nowait: bool = False, max_timeout: int = None, use_context: bool = True, retries: int = None, abort_on_error: bool = None, use_template: bool = None, headers: dict = None) list | None[source]

Make an RPC batch call.

Parameters:
  • requests – list of request dicts

  • nowait – create a ‘notify’ request - do not wait for the result

  • max_timeout – request timeout in sec

  • use_context – use app request context such as correlation id and request chain deadline

  • raise_exception – raise exception instead of returning error objects in the list

  • retries – optional number of retries (on the server side)

  • abort_on_error – abort the whole batch on the first error

  • use_template – use templates in batch requests

  • headers – additional headers

class RPCRequest[source]

RPC request object.

id: int | None
method: str
params: dict | None
class RPCResponse[source]

RPC response object.

id: int | None
result: Any
class RPCError[source]

RPC error object.

id: int | None
error: APIException
class JSONRPCHeaders[source]

List of JSONRPC request / response headers.

AUTHORIZATION = 'Authorization'
CONTENT_TYPE_HEADER = 'Content-Type'
USER_AGENT = 'User-Agent'
APP_ID_HEADER = 'App-Id'
SERVER_ID_HEADER = 'Server-Id'
CORRELATION_ID_HEADER = 'Correlation-Id'
REQUEST_DEADLINE_HEADER = 'Deadline'
REQUEST_TIMEOUT_HEADER = 'Timeout'
RETRIES = 'RPC-Retries'
CALLBACK_ID = 'RPC-Callback'
ABORT_ON_ERROR = 'RPC-Batch-Abort-Error'
USE_TEMPLATE = 'RPC-Batch-Template'
SESSION_ID_HEADER = 'Session-Id'
class PermissionKeys[source]

Permission scopes.

GLOBAL_SYSTEM_PERMISSION = 0
GLOBAL_USER_PERMISSION = 100
GLOBAL_GUEST_PERMISSION = 1000
class RPCClientError[source]

Bases: APIException

JSON RPC Python exception class.

__init__(*args, response=None, **kws)[source]

Initialize.

Parameters:
  • message – error messages

  • id – request id

  • base_exc – base exception object (when used as a wrapper)

  • debug – debug mode error

  • data – additional data

  • extras – will be merged with data