Files
server/shared/lib/Event/EventBus.php
2026-02-10 18:46:11 -05:00

187 lines
5.2 KiB
PHP

<?php
declare(strict_types=1);
namespace KTXF\Event;
/**
* Simple event bus for decoupled pub/sub communication between services
*
* Features:
* - Priority-based listener ordering
* - Synchronous and asynchronous (deferred) event handling
* - Event propagation control
*/
class EventBus
{
/** @var array<string, array<array{callback: callable, priority: int}>> */
private array $listeners = [];
/** @var array<string, array<callable>> */
private array $asyncListeners = [];
/** @var Event[] */
private array $deferredEvents = [];
/**
* Subscribe to an event with optional priority
* Higher priority listeners are called first
*/
public function subscribe(string $eventName, callable $listener, int $priority = 0): self
{
$this->listeners[$eventName][] = [
'callback' => $listener,
'priority' => $priority,
];
// Sort by priority (higher first)
usort(
$this->listeners[$eventName],
fn($a, $b) => $b['priority'] <=> $a['priority']
);
return $this;
}
/**
* Subscribe to an event for async/deferred processing
* These handlers run at the end of the request cycle
*/
public function subscribeAsync(string $eventName, callable $listener): self
{
$this->asyncListeners[$eventName][] = $listener;
return $this;
}
/**
* Unsubscribe a listener from an event
*/
public function unsubscribe(string $eventName, callable $listener): self
{
if (isset($this->listeners[$eventName])) {
$this->listeners[$eventName] = array_filter(
$this->listeners[$eventName],
fn($item) => $item['callback'] !== $listener
);
}
if (isset($this->asyncListeners[$eventName])) {
$this->asyncListeners[$eventName] = array_filter(
$this->asyncListeners[$eventName],
fn($item) => $item !== $listener
);
}
return $this;
}
/**
* Publish an event to all subscribers
*/
public function publish(Event $event): self
{
$eventName = $event->getName();
// Execute synchronous listeners
if (isset($this->listeners[$eventName])) {
foreach ($this->listeners[$eventName] as $listenerData) {
if ($event->isPropagationStopped()) {
break;
}
try {
call_user_func($listenerData['callback'], $event);
} catch (\Throwable $e) {
// Log error but don't break the chain
error_log(sprintf(
'Event listener error for %s: %s',
$eventName,
$e->getMessage()
));
}
}
}
// Queue for async processing if there are async listeners
if (isset($this->asyncListeners[$eventName]) && !empty($this->asyncListeners[$eventName])) {
$this->deferredEvents[] = $event;
}
return $this;
}
/**
* Process deferred/async events
* Call this at the end of the request cycle
*/
public function processDeferred(): int
{
$processed = 0;
foreach ($this->deferredEvents as $event) {
$eventName = $event->getName();
if (!isset($this->asyncListeners[$eventName])) {
continue;
}
foreach ($this->asyncListeners[$eventName] as $listener) {
try {
call_user_func($listener, $event);
$processed++;
} catch (\Throwable $e) {
// Log but don't fail - these are non-critical
error_log(sprintf(
'Async event handler error for %s: %s',
$eventName,
$e->getMessage()
));
}
}
}
$this->deferredEvents = [];
return $processed;
}
/**
* Check if an event has any listeners
*/
public function hasListeners(string $eventName): bool
{
return !empty($this->listeners[$eventName]) || !empty($this->asyncListeners[$eventName]);
}
/**
* Get count of listeners for an event
*/
public function getListenerCount(string $eventName): int
{
$sync = isset($this->listeners[$eventName]) ? count($this->listeners[$eventName]) : 0;
$async = isset($this->asyncListeners[$eventName]) ? count($this->asyncListeners[$eventName]) : 0;
return $sync + $async;
}
/**
* Get count of pending deferred events
*/
public function getDeferredCount(): int
{
return count($this->deferredEvents);
}
/**
* Clear all listeners (useful for testing)
*/
public function clear(): self
{
$this->listeners = [];
$this->asyncListeners = [];
$this->deferredEvents = [];
return $this;
}
}