187 lines
5.2 KiB
PHP
187 lines
5.2 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace KTXF\Event;
|
|
|
|
/**
|
|
* Simple event bus for decoupled pub/sub communication between services
|
|
*
|
|
* Features:
|
|
* - Priority-based listener ordering
|
|
* - Synchronous and asynchronous (deferred) event handling
|
|
* - Event propagation control
|
|
*/
|
|
class EventBus
|
|
{
|
|
/** @var array<string, array<array{callback: callable, priority: int}>> */
|
|
private array $listeners = [];
|
|
|
|
/** @var array<string, array<callable>> */
|
|
private array $asyncListeners = [];
|
|
|
|
/** @var Event[] */
|
|
private array $deferredEvents = [];
|
|
|
|
/**
|
|
* Subscribe to an event with optional priority
|
|
* Higher priority listeners are called first
|
|
*/
|
|
public function subscribe(string $eventName, callable $listener, int $priority = 0): self
|
|
{
|
|
$this->listeners[$eventName][] = [
|
|
'callback' => $listener,
|
|
'priority' => $priority,
|
|
];
|
|
|
|
// Sort by priority (higher first)
|
|
usort(
|
|
$this->listeners[$eventName],
|
|
fn($a, $b) => $b['priority'] <=> $a['priority']
|
|
);
|
|
|
|
return $this;
|
|
}
|
|
|
|
/**
|
|
* Subscribe to an event for async/deferred processing
|
|
* These handlers run at the end of the request cycle
|
|
*/
|
|
public function subscribeAsync(string $eventName, callable $listener): self
|
|
{
|
|
$this->asyncListeners[$eventName][] = $listener;
|
|
return $this;
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe a listener from an event
|
|
*/
|
|
public function unsubscribe(string $eventName, callable $listener): self
|
|
{
|
|
if (isset($this->listeners[$eventName])) {
|
|
$this->listeners[$eventName] = array_filter(
|
|
$this->listeners[$eventName],
|
|
fn($item) => $item['callback'] !== $listener
|
|
);
|
|
}
|
|
|
|
if (isset($this->asyncListeners[$eventName])) {
|
|
$this->asyncListeners[$eventName] = array_filter(
|
|
$this->asyncListeners[$eventName],
|
|
fn($item) => $item !== $listener
|
|
);
|
|
}
|
|
|
|
return $this;
|
|
}
|
|
|
|
/**
|
|
* Publish an event to all subscribers
|
|
*/
|
|
public function publish(Event $event): self
|
|
{
|
|
$eventName = $event->getName();
|
|
|
|
// Execute synchronous listeners
|
|
if (isset($this->listeners[$eventName])) {
|
|
foreach ($this->listeners[$eventName] as $listenerData) {
|
|
if ($event->isPropagationStopped()) {
|
|
break;
|
|
}
|
|
|
|
try {
|
|
call_user_func($listenerData['callback'], $event);
|
|
} catch (\Throwable $e) {
|
|
// Log error but don't break the chain
|
|
error_log(sprintf(
|
|
'Event listener error for %s: %s',
|
|
$eventName,
|
|
$e->getMessage()
|
|
));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Queue for async processing if there are async listeners
|
|
if (isset($this->asyncListeners[$eventName]) && !empty($this->asyncListeners[$eventName])) {
|
|
$this->deferredEvents[] = $event;
|
|
}
|
|
|
|
return $this;
|
|
}
|
|
|
|
/**
|
|
* Process deferred/async events
|
|
* Call this at the end of the request cycle
|
|
*/
|
|
public function processDeferred(): int
|
|
{
|
|
$processed = 0;
|
|
|
|
foreach ($this->deferredEvents as $event) {
|
|
$eventName = $event->getName();
|
|
|
|
if (!isset($this->asyncListeners[$eventName])) {
|
|
continue;
|
|
}
|
|
|
|
foreach ($this->asyncListeners[$eventName] as $listener) {
|
|
try {
|
|
call_user_func($listener, $event);
|
|
$processed++;
|
|
} catch (\Throwable $e) {
|
|
// Log but don't fail - these are non-critical
|
|
error_log(sprintf(
|
|
'Async event handler error for %s: %s',
|
|
$eventName,
|
|
$e->getMessage()
|
|
));
|
|
}
|
|
}
|
|
}
|
|
|
|
$this->deferredEvents = [];
|
|
|
|
return $processed;
|
|
}
|
|
|
|
/**
|
|
* Check if an event has any listeners
|
|
*/
|
|
public function hasListeners(string $eventName): bool
|
|
{
|
|
return !empty($this->listeners[$eventName]) || !empty($this->asyncListeners[$eventName]);
|
|
}
|
|
|
|
/**
|
|
* Get count of listeners for an event
|
|
*/
|
|
public function getListenerCount(string $eventName): int
|
|
{
|
|
$sync = isset($this->listeners[$eventName]) ? count($this->listeners[$eventName]) : 0;
|
|
$async = isset($this->asyncListeners[$eventName]) ? count($this->asyncListeners[$eventName]) : 0;
|
|
|
|
return $sync + $async;
|
|
}
|
|
|
|
/**
|
|
* Get count of pending deferred events
|
|
*/
|
|
public function getDeferredCount(): int
|
|
{
|
|
return count($this->deferredEvents);
|
|
}
|
|
|
|
/**
|
|
* Clear all listeners (useful for testing)
|
|
*/
|
|
public function clear(): self
|
|
{
|
|
$this->listeners = [];
|
|
$this->asyncListeners = [];
|
|
$this->deferredEvents = [];
|
|
|
|
return $this;
|
|
}
|
|
}
|