Source code for src.core.kernel

from typing import Dict, Any, Optional

[docs] class ChatbotKernel: """ Core orchestrator for the chatbot framework. Manages component registration and message flow. """ def __init__(self, config: Optional[Dict[str, Any]] = None):
[docs] self.config = config or {}
[docs] self.components = {}
[docs] def register_component(self, name: str, component: Any) -> None: """ Register a component to the framework. Args: name: Unique identifier for the component component: The component instance """ self.components[name] = component
[docs] def get_component(self, name: str) -> Any: """ Retrieve a registered component. Args: name: Component identifier Returns: The component instance if found, None otherwise """ return self.components.get(name, None)
[docs] def process_message(self, message: str) -> str: """ Process a user message through the pipeline. Args: message: User input text Returns: Response text """ # Basic pipeline implementation if 'input_handler' not in self.components: raise ValueError("Required component 'input_handler' not registered") if 'nlu' not in self.components: raise ValueError("Required component 'nlu' not registered") if 'response_generator' not in self.components: raise ValueError("Required component 'response_generator' not registered") # Process through pipeline normalized_input = self.components['input_handler'].normalize(message) # Handle both SimpleNLU and SpacyNLU intent_result = self.components['nlu'].get_intent(normalized_input) # Extract intent name (compatible with both NLU implementations) intent = intent_result['name'] if isinstance(intent_result, dict) else intent_result response = self.components['response_generator'].generate(intent) return response