Source code for kaiju_tools.interfaces

import abc
import logging
import uuid
from collections.abc import AsyncGenerator, Awaitable, Callable, Collection, Hashable, Iterable, Mapping
from contextvars import ContextVar  # noqa: pycharm
from typing import Any, Dict, FrozenSet, Generic, List, Literal, NewType, Optional, TypedDict, TypeVar, Union

from aiohttp.web import Application

from kaiju_tools.types import Namespace, NSKey, RequestContext, Scope, Session


__all__ = [
    'Cache',
    'DataStore',
    'UserInterface',
    'TokenInterface',
    'AbstractRPCCompatible',
    'PublicInterface',
    'SessionInterface',
    'AuthenticationInterface',
    'Locks',
    'RPCServer',
    'RPCClient',
    'ServiceManagerInterface',
    'App',
    'TokenLoginInterface',
    'Stream',
    '_Session',
]


_Session = TypeVar('_Session', bound=Session)


[docs]class App(Application, Generic[_Session]): """Web application.""" id: str """Unique instance id (auto-generated).""" name: str """Application name (i.e. type).""" version: str """App version.""" env: str """Current environment: dev, prod, etc.""" debug: bool """App is running in debug mode (with --debug flag).""" loglevel: str """Root log level.""" logger: logging.Logger """Application root logger.""" services: 'ServiceManagerInterface' """Services map.""" settings: dict """Application settings.""" namespace: 'Namespace' """Application namespace.""" namespace_shared: 'Namespace' """Shared namespace for the current environment.""" request_context: ContextVar[RequestContext | None] """Client request context is stored here.""" request_session: ContextVar[_Session | None] """Client session context is stored here.""" db_meta: Any """Database metadata.""" cookie_key: str """HTTP cookie key for client sessions."""
[docs]class ServiceManagerInterface(abc.ABC): """Application service initializer interface.""" @abc.abstractmethod async def __aenter__(self): ... @abc.abstractmethod async def __aexit__(self, exc_type, exc_val, exc_tb): ... @abc.abstractmethod def __getitem__(self, item: str) -> object: ...
[docs] @abc.abstractmethod def add_service(self, service, required: bool = True, name: str = None) -> None: ...
[docs] @abc.abstractmethod def discover_service(self, name=None, cls=None, required: bool = True) -> Optional: ...
[docs] @abc.abstractmethod def items(self) -> Iterable: ...
[docs]class Cache(abc.ABC): """Data key-value store.""" namespace: Namespace
[docs] @abc.abstractmethod async def get(self, id: NSKey) -> Any: ...
[docs] @abc.abstractmethod async def m_get(self, id: Collection[NSKey]) -> dict[NSKey, Any]: ...
[docs] @abc.abstractmethod async def exists(self, id: NSKey) -> bool: ...
[docs] @abc.abstractmethod async def m_exists(self, id: Collection[NSKey]) -> frozenset[NSKey]: ...
[docs] @abc.abstractmethod async def set(self, id: NSKey, data, ttl: int = None) -> None: ...
[docs] @abc.abstractmethod async def m_set(self, data: dict[NSKey, Any], ttl: int = None) -> None: ...
[docs] @abc.abstractmethod async def delete(self, id: NSKey) -> None: ...
[docs] @abc.abstractmethod async def m_delete(self, id: Collection[NSKey]) -> None: ...
[docs]class Stream(abc.ABC): """Streaming interface."""
[docs] @abc.abstractmethod async def lock(self) -> None: ...
[docs] @abc.abstractmethod async def unlock(self) -> None: ...
@property @abc.abstractmethod def locked(self) -> bool: ...
[docs]class Locks(abc.ABC): """Shared (between apps) locks management.""" LockId = NewType('LockId', str)
[docs] @abc.abstractmethod async def acquire( self, id: NSKey, identifier: LockId = None, ttl: int = None, wait: bool = True, timeout: float = None ) -> LockId: ...
[docs] @abc.abstractmethod async def release(self, id: NSKey, identifier: LockId) -> None: ...
[docs] @abc.abstractmethod async def owner(self, id: NSKey) -> LockId | None: ...
[docs] @abc.abstractmethod async def is_owner(self, id: NSKey) -> bool: ...
[docs] @abc.abstractmethod async def m_exists(self, id: Collection[NSKey]) -> frozenset[NSKey]: ...
_Columns = Union[Collection[str], Literal['*'], None]
[docs]class DataStore(abc.ABC): """Data row-column store."""
[docs] @abc.abstractmethod async def get(self, id: Hashable, columns: _Columns = '*', _connection=None): ...
[docs] @abc.abstractmethod async def m_get(self, id: Collection[Hashable], columns: _Columns = '*', _connection=None): ...
[docs] @abc.abstractmethod async def exists(self, id: Hashable, _connection=None): ...
[docs] @abc.abstractmethod async def m_exists(self, id: Collection[Hashable], _connection=None): ...
[docs] @abc.abstractmethod async def delete(self, id: Hashable, columns: _Columns = None, _connection=None): ...
[docs] @abc.abstractmethod async def m_delete( self, id: Collection[Hashable] = None, conditions: dict = None, columns: _Columns = None, _connection=None ): ...
[docs] @abc.abstractmethod async def create( self, data: dict, columns: _Columns = '*', _connection=None, on_conflict: str = None, on_conflict_keys: Collection = None, on_conflict_values=None, ): ...
[docs] @abc.abstractmethod async def m_create( self, data: Collection, columns: _Columns = '*', _connection=None, on_conflict: str = None, on_conflict_keys: Collection = None, on_conflict_values: dict = None, ): ...
[docs] @abc.abstractmethod async def update(self, id: Hashable, data, columns: _Columns = '*', _connection=None): ...
[docs] @abc.abstractmethod async def m_update( self, id: Collection[Hashable], data, conditions: dict = None, columns: _Columns = '*', _connection=None ): ...
[docs] @abc.abstractmethod async def iter( self, conditions: dict = None, sort=None, offset: int = 0, limit: int = 10, columns: _Columns = '*' ) -> AsyncGenerator[list, None]: ...
_User = TypeVar('_User')
[docs]class UserInterface(Generic[_User], abc.ABC): """User login interface."""
[docs] @abc.abstractmethod async def auth(self, username: str, password: str) -> _User | None: """User login and password check. Must return None if no user or token is invalid. """
[docs] @abc.abstractmethod async def register(self, username: str, email: str, password: str, settings: dict = None) -> _User: """Register a new user."""
[docs] @abc.abstractmethod async def change_password(self, username: str, password: str, new_password: str): """Change user password."""
[docs] @abc.abstractmethod async def update_profile(self, id: uuid.UUID, settings: dict): """Update current user profile."""
[docs] @abc.abstractmethod async def get_user_and_permissions(self, id: uuid.UUID): """Get user and user permissions info."""
[docs]class TokenInterface(Generic[_User], abc.ABC): """Auth token interface."""
[docs] class TokenClaims(TypedDict): """JWT token claims data.""" id: uuid.UUID permissions: Collection[str]
[docs] class TokenInfo(TypedDict): """JWT methods output.""" access: str refresh: str
[docs] @abc.abstractmethod async def auth(self, token: str, /) -> TokenClaims | None: """Verify an auth token and return token user data. Must return None if no user or token is invalid. """
[docs] @abc.abstractmethod async def get(self, claims: TokenClaims, /) -> TokenInfo: """Generate a token pair (access / refresh tokens)."""
[docs] @abc.abstractmethod async def refresh(self, token: str, /) -> TokenInfo | None: """Generate a token pair (access / refresh tokens)."""
[docs]class SessionInterface(Generic[_Session], abc.ABC): """Session management interface."""
[docs] @abc.abstractmethod def get_new_session(self, data: dict, *, user_agent: str | bytes = None) -> _Session: ...
[docs] @abc.abstractmethod async def load_session(self, session_id: str, /, *, user_agent: str = None) -> _Session | None: ...
[docs] @abc.abstractmethod async def session_exists(self, session_id: str, /) -> bool: ...
[docs] @abc.abstractmethod async def save_session(self, session: _Session, /) -> None: ...
[docs] @abc.abstractmethod async def delete_session(self, session: _Session, /) -> None: ...
[docs]class AuthenticationInterface(Generic[_Session], abc.ABC): """Authentication from auth strings / headers / tokens."""
[docs] @abc.abstractmethod async def header_auth(self, auth_string: str, /) -> _Session | None: ...
[docs] @abc.abstractmethod async def basic_auth(self, auth_string: str, /) -> _Session: ...
[docs] @abc.abstractmethod async def password_auth(self, session: _Session, username: str, password: str) -> _Session: ...
[docs] @abc.abstractmethod async def token_auth(self, token: str, /) -> _Session: ...
[docs]class TokenLoginInterface(abc.ABC): """Token client interface."""
[docs] @abc.abstractmethod async def get_token(self) -> TokenInterface.TokenInfo: ...
[docs]class PublicInterface(abc.ABC): """Class with an RPC interface.""" DEFAULT_PERMISSION = '*'
[docs] class PermissionKeys: """Permission scopes.""" GLOBAL_SYSTEM_PERMISSION = Scope.SYSTEM GLOBAL_USER_PERMISSION = Scope.USER GLOBAL_GUEST_PERMISSION = Scope.GUEST
app: App # service app
[docs] def get_session(self): """Get current user session.""" return self.app.request_session.get()
[docs] def get_request_context(self) -> RequestContext | None: """Get current user request context.""" return self.app.request_context.get()
[docs] def get_user_id(self): """Return current session user id.""" session = self.get_session() return session.user_id if session else None
[docs] def has_permission(self, permission: str) -> bool: """Check if a user session has a particular permission.""" session = self.get_session() return permission in session.permissions or self.system_user() if session else True
[docs] def system_user(self) -> bool: """Check if user session has the system scope.""" session = self.get_session() return self.PermissionKeys.GLOBAL_SYSTEM_PERMISSION.value >= session.scope.value if session else None
@property def routes(self) -> dict: """List RPC routes.""" return {} @property def permissions(self) -> dict: """List RPC routes permissions.""" return {} @property def validators(self) -> dict: """List of RPC routes validation schemas.""" return {}
AbstractRPCCompatible = PublicInterface
[docs]class RPCServer(abc.ABC): """JSONRPC server interface."""
[docs] @abc.abstractmethod async def call( self, body, headers: Mapping, callback: Callable[..., Awaitable] = None, ) -> tuple: pass
[docs]class RPCClient(abc.ABC): """JSONRPC client interface."""
[docs] @abc.abstractmethod async def call( self, method: str, params: dict | None = None, nowait: bool = False, request_id: int = 0, max_timeout: int = None, use_context: bool = True, retries: int = None, headers: dict = None, ) -> Optional: ...
[docs] @abc.abstractmethod async def call_multiple( self, requests, raise_exception: bool = True, nowait: bool = False, max_timeout: int = None, use_context: bool = True, retries: int = None, abort_on_error: bool = None, use_template: bool = None, headers: dict = None, ) -> Optional: ...