The Event Handler System is the central communication hub of the Trixy voice assistant. It implements a sophisticated event-driven architecture that allows all components to communicate asynchronously through events.
Location: ./trixy_core/events/
Main event processing and management class.
Decorator for automatic event handler registration.
Structured data containers for all event types.
from trixy_core.events import EventHandler
event_handler = EventHandler(
max_history=1000, # Maximum event history entries
enable_debug=True, # Enable debug logging
enable_filtering=True, # Allow event filtering
max_concurrent_handlers=10 # Max parallel event handlers
)
trigger_event(event_type, **kwargs)Triggers an event with optional data.
from trixy_core.events import EventType
# Trigger system startup
event_handler.trigger_event(EventType.SYSTEM_STARTUP)
# Trigger wakeword with data
event_handler.trigger_event(
EventType.WAKEWORD_RECEIVED,
wakeword_id="trixy",
speaker_info=speaker_info,
satellite_info=satellite_info,
volume=0.8
)
register_handler_object(obj)Registers all @TrixyEvent decorated methods from an object.
class MyPlugin:
@TrixyEvent(["wakeword_received", "text_received"])
def handle_events(self, event_name, event_data):
pass
plugin = MyPlugin()
event_handler.register_handler_object(plugin)
unregister_handler_object(obj)Removes all event handlers from an object.
event_handler.unregister_handler_object(plugin)
get_event_history(limit=None, event_types=None)Retrieves event history with optional filtering.
# Get last 10 events
recent_events = event_handler.get_event_history(limit=10)
# Get only wakeword events
wakeword_events = event_handler.get_event_history(
event_types=[EventType.WAKEWORD_RECEIVED]
)
get_statistics()Returns comprehensive event system statistics.
stats = event_handler.get_statistics()
print(f"Total events: {stats['total_events_triggered']}")
print(f"Registered handlers: {stats['total_handlers_registered']}")
from trixy_core.events import TrixyEvent
class VoicePlugin:
@TrixyEvent(["wakeword_received"])
def on_wakeword(self, event_name, event_data):
print(f"Wakeword detected: {event_data.wakeword_id}")
@TrixyEvent(["text_received", "intent_received"], priority=5)
def on_text_or_intent(self, event_name, event_data):
if event_name == "text_received":
print(f"Text: {event_data.text}")
elif event_name == "intent_received":
print(f"Intent: {event_data.intent}")
events: List of event names to listen forpriority: Handler priority (lower numbers = higher priority)SATELLITE_CONNECTED: When satellite establishes connectionSATELLITE_DISCONNECTED: When satellite loses connectionSATELLITE_REGISTERED: When new satellite is registeredWAKEWORD_RECEIVED: When satellite detects wakewordRAW_AUDIO_INPUT_RECEIVED: When audio recording completesTEXT_RECEIVED: When STT converts audio to textINTENT_RECEIVED: When NLP extracts intent from textTTS_RECEIVED: When TTS generates response audioSYSTEM_STARTUP: System initialization completeSYSTEM_SHUTDOWN: System shutting downPLUGIN_LOADED: Plugin successfully loadedPLUGIN_UNLOADED: Plugin unloadedCONVERSATION_STARTED: New conversation session startedCONVERSATION_ENDED: Conversation session endedCONVERSATION_STATE_CHANGED: Conversation state transitionARBITRATION_STARTED: Multi-satellite arbitration beginsARBITRATION_COMPLETED: Satellite selection completedSATELLITE_SELECTED: Specific satellite chosenSATELLITE_IGNORED: Satellite not selected@dataclass
class WakewordReceivedEventData:
wakeword_id: str # "trixy" or "system_command"
speaker_info: SpeakerInfo # Speaker identification
satellite_info: SatelliteInfo # Source satellite info
volume: float # Audio volume level
confidence: float = 1.0 # Detection confidence
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class TextReceivedEventData:
conversation_id: str # Conversation session ID
text: str # Recognized text
confidence: float # Recognition confidence
speaker_info: SpeakerInfo # Speaker identification
language: str = "en" # Detected language
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class IntentReceivedEventData:
conversation_id: str # Conversation session ID
intent: str # Recognized intent
entities: Dict[str, Any] # Extracted entities
confidence: float # Intent confidence
original_text: str # Source text
timestamp: datetime = field(default_factory=datetime.now)
from trixy_core.plugins import TrixyPlugin
from trixy_core.events import TrixyEvent, EventType
class WeatherPlugin(TrixyPlugin):
@TrixyEvent(["intent_received"])
def handle_weather_intent(self, event_name, event_data):
if event_data.intent == "get_weather":
location = event_data.entities.get("location", "default")
weather_info = self.get_weather(location)
# Trigger TTS response
self.application.get_event_handler().trigger_event(
EventType.TTS_RECEIVED,
conversation_id=event_data.conversation_id,
text=f"The weather in {location} is {weather_info}",
voice_settings={"speed": 1.0, "pitch": 1.0}
)
from trixy_core.events import TrixyEvent
class SatelliteHandler:
@TrixyEvent(["satellite_connected"])
def on_satellite_connected(self, event_name, event_data):
satellite = event_data.satellite_info
print(f"Satellite {satellite.alias} connected from {satellite.room_id}")
# Send welcome message
satellite_manager = self.application.get_satellite_manager()
satellite_manager.say(
satellite.satellite_id,
"Welcome! I'm ready to assist you."
)
event_config = {
"max_history": 1000, # Event history limit
"enable_debug": True, # Debug logging
"enable_filtering": True, # Event filtering
"max_concurrent_handlers": 10, # Parallel handlers
"enable_async_processing": True, # Async event processing
"handler_timeout": 30.0, # Handler timeout seconds
"enable_event_persistence": False # Persist events to disk
}
try:
event_handler.trigger_event(EventType.WAKEWORD_RECEIVED, invalid_param=True)
except EventValidationError as e:
print(f"Invalid event data: {e}")
except EventHandlerError as e:
print(f"Handler error: {e}")
try:
event_handler.register_handler_object(invalid_object)
except HandlerRegistrationError as e:
print(f"Registration failed: {e}")
class GoodEventHandler:
@TrixyEvent(["specific_event"], priority=10)
def handle_specific_event(self, event_name, event_data):
# Keep handlers focused and fast
try:
self.process_quickly(event_data)
except Exception as e:
self.logger.error(f"Handler error: {e}")
# Don't let exceptions propagate
@TrixyEvent(["wakeword_received"])
def on_wakeword(self, event_name, event_data):
# Always validate event data
if not event_data.wakeword_id:
return
if event_data.confidence < 0.5:
return # Ignore low-confidence detections
self.process_wakeword(event_data)
import asyncio
@TrixyEvent(["raw_audio_input_received"])
def on_audio_input(self, event_name, event_data):
# Don't block the event loop
asyncio.create_task(self.process_audio_async(event_data))
async def process_audio_async(self, event_data):
# Heavy processing here
result = await self.transcribe_audio(event_data.audio_data)
# Trigger follow-up event
self.application.get_event_handler().trigger_event(
EventType.TEXT_RECEIVED,
conversation_id=event_data.conversation_id,
text=result.text,
confidence=result.confidence
)
max_historymax_concurrent_handlers to control parallelismThe EventHandler is fully thread-safe and can be used concurrently from multiple threads. All operations use appropriate locking mechanisms.
event_handler = EventHandler(enable_debug=True)
# Analyze event patterns
history = event_handler.get_event_history(limit=100)
event_types = [event.event_type for event in history]
most_common = Counter(event_types).most_common(5)
print(f"Most common events: {most_common}")
stats = event_handler.get_statistics()
if stats['failed_handlers'] > 0:
print(f"Warning: {stats['failed_handlers']} handler failures detected")