Source code for hdmi.containers.default
"""Container - Root container for dependency injection.
The Container is an immutable, validated dependency graph that resolves
service instances lazily (just-in-time) when requested.
"""
import asyncio
import inspect
from contextlib import AsyncExitStack
from typing import TYPE_CHECKING, Type, TypeVar, get_type_hints
from anyio import to_thread
from hdmi.utils.typing import extract_type_from_optional
if TYPE_CHECKING:
from hdmi.types.definitions import ServiceDefinition
from hdmi.containers.scoped import ScopedContainer
T = TypeVar("T")
[docs]
class Container:
"""Immutable root container for resolving service instances at runtime.
The Container is produced by ContainerBuilder.build() and is:
- Immutable: cannot be modified after creation
- Pre-validated: all configuration errors caught during build
- Lazy: services instantiated only when first requested via get()
- Async: all resolution and lifecycle management is async
Implements IContainer protocol to provide a consistent interface with
ScopedContainer.
"""
[docs]
def __init__(self, definitions: dict[Type, "ServiceDefinition"]):
"""Initialize Container with validated service definitions.
This should only be called by ContainerBuilder.build().
Args:
definitions: Validated service definitions from builder
"""
self._definitions = definitions
self._singletons: dict[Type, object] = {}
self._pending_tasks: dict[Type, asyncio.Task] = {}
self._exit_stack: AsyncExitStack | None = None
async def __aenter__(self) -> "Container":
"""Enter the async context manager.
Returns:
Self to enable 'async with builder.build() as container:' syntax
"""
self._exit_stack = AsyncExitStack()
await self._exit_stack.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit the async context manager and cleanup all managed services.
This triggers all registered finalizers and closes all async context managers.
"""
if self._exit_stack is not None:
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
self._exit_stack = None
[docs]
def scope(self) -> "ScopedContainer":
"""Create a new scoped container for resolving scoped services.
Returns:
A new ScopedContainer instance
"""
from hdmi.containers.scoped import ScopedContainer
return ScopedContainer(self)
[docs]
async def get(self, service_type: Type[T]) -> T:
"""Resolve a service instance (lazy instantiation).
Args:
service_type: The service type to resolve
Returns:
An instance of the service type
Raises:
UnresolvableDependencyError: If the service type is not registered
ScopeViolationError: If trying to resolve a scoped service outside a scope
"""
from hdmi.exceptions import ScopeViolationError, UnresolvableDependencyError
try:
definition = self._definitions[service_type]
except KeyError:
raise UnresolvableDependencyError(
f"{service_type.__name__} is not registered in the container. "
f"Use ContainerBuilder.register({service_type.__name__}) to register it."
) from None
# Scoped services cannot be resolved directly from Container
if definition.scoped:
raise ScopeViolationError(
f"{service_type.__name__} is a scoped service (scoped=True) and cannot be resolved "
f"directly from Container. Use Container.scope() to create a scoped context."
)
# Handle non-scoped services
if definition.transient:
# Transient (scoped=False, transient=True): new instance every time, no task sharing
return await self._create_instance(service_type) # type: ignore
else:
# Singleton (scoped=False, transient=False): cached with task sharing
# Check if already cached
if service_type in self._singletons:
return self._singletons[service_type] # type: ignore
# Check if task is already pending (task sharing)
if service_type in self._pending_tasks:
# Reuse existing task
return await self._pending_tasks[service_type] # type: ignore
# Create new task and store it
task = asyncio.create_task(self._create_instance(service_type))
self._pending_tasks[service_type] = task
try:
# Await the task
instance = await task
# Cache the result
self._singletons[service_type] = instance
return instance # type: ignore
finally:
# Remove from pending tasks (cleanup)
self._pending_tasks.pop(service_type, None)
async def _create_instance(self, service_type: Type[T]) -> T:
"""Create an instance of a service, resolving dependencies and managing lifecycle.
Args:
service_type: The service type to instantiate
Returns:
An instance with all dependencies resolved and lifecycle hooks executed
"""
# Get the __init__ signature
try:
sig = inspect.signature(service_type.__init__)
except ValueError:
# If we can't get signature, try without parameters
instance = service_type() # type: ignore
await self._manage_lifecycle(service_type, instance)
return instance
# Get type hints for the __init__ method
try:
hints = get_type_hints(service_type.__init__)
except Exception:
hints = {}
# Collect dependencies to resolve concurrently
dependency_tasks: dict[str, asyncio.Task] = {}
for param_name, param in sig.parameters.items():
if param_name == "self":
continue
# Get the type annotation for this parameter
if param_name not in hints:
continue
type_hint = hints[param_name]
has_default = param.default is not inspect.Parameter.empty
# Extract actual type from Optional/Union types (e.g., Config | None -> Config)
dependency_type = extract_type_from_optional(type_hint)
if dependency_type is None:
# Can't determine single type (e.g., Union[A, B] or just None)
continue
# Check if dependency is registered
is_registered = dependency_type in self._definitions
if has_default:
# Optional dependency - only inject if registered AND autowire=True
if is_registered:
dep_definition = self._definitions[dependency_type]
if dep_definition.autowire:
# Create task for concurrent resolution
dependency_tasks[param_name] = asyncio.create_task(self.get(dependency_type))
# else: skip (autowire=False, let class use default)
# else: skip (not registered, let class use default)
else:
# Required dependency - create task for concurrent resolution
dependency_tasks[param_name] = asyncio.create_task(self.get(dependency_type))
# Resolve all dependencies concurrently
if dependency_tasks:
# Wait for all dependency tasks to complete
await asyncio.gather(*dependency_tasks.values())
# Collect results into kwargs
kwargs = {param_name: task.result() for param_name, task in dependency_tasks.items()}
else:
kwargs = {}
instance = service_type(**kwargs) # type: ignore
# Manage lifecycle (initializer, context manager, finalizer)
await self._manage_lifecycle(service_type, instance)
return instance
async def _manage_lifecycle(self, service_type: Type[T], instance: T) -> None:
"""Manage the lifecycle of a service instance.
This includes:
- Calling initializer (if provided)
- Registering finalizer with exit stack (if provided)
Note: Services that are context managers are NOT automatically entered.
The user is responsible for managing their context themselves.
Args:
service_type: The service type
instance: The service instance
"""
definition = self._definitions[service_type]
# Call initializer if provided
if definition.initializer is not None:
if inspect.iscoroutinefunction(definition.initializer):
await definition.initializer(instance)
else:
# Run sync initializer in thread pool
await to_thread.run_sync(definition.initializer, instance)
# Register finalizer with exit stack if provided
if definition.finalizer is not None and self._exit_stack is not None:
if inspect.iscoroutinefunction(definition.finalizer):
# Async finalizer
self._exit_stack.push_async_callback(definition.finalizer, instance)
else:
# Sync finalizer - wrap in async callback that runs in thread pool
finalizer = definition.finalizer # Capture to satisfy type checker
async def _run_sync_finalizer():
await to_thread.run_sync(finalizer, instance)
self._exit_stack.push_async_callback(_run_sync_finalizer)