Initial Version
This commit is contained in:
186
shared/lib/Event/EventBus.php
Normal file
186
shared/lib/Event/EventBus.php
Normal file
@@ -0,0 +1,186 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace KTXF\Event;
|
||||
|
||||
/**
|
||||
* Simple event bus for decoupled pub/sub communication between services
|
||||
*
|
||||
* Features:
|
||||
* - Priority-based listener ordering
|
||||
* - Synchronous and asynchronous (deferred) event handling
|
||||
* - Event propagation control
|
||||
*/
|
||||
class EventBus
|
||||
{
|
||||
/** @var array<string, array<array{callback: callable, priority: int}>> */
|
||||
private array $listeners = [];
|
||||
|
||||
/** @var array<string, array<callable>> */
|
||||
private array $asyncListeners = [];
|
||||
|
||||
/** @var Event[] */
|
||||
private array $deferredEvents = [];
|
||||
|
||||
/**
|
||||
* Subscribe to an event with optional priority
|
||||
* Higher priority listeners are called first
|
||||
*/
|
||||
public function subscribe(string $eventName, callable $listener, int $priority = 0): self
|
||||
{
|
||||
$this->listeners[$eventName][] = [
|
||||
'callback' => $listener,
|
||||
'priority' => $priority,
|
||||
];
|
||||
|
||||
// Sort by priority (higher first)
|
||||
usort(
|
||||
$this->listeners[$eventName],
|
||||
fn($a, $b) => $b['priority'] <=> $a['priority']
|
||||
);
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to an event for async/deferred processing
|
||||
* These handlers run at the end of the request cycle
|
||||
*/
|
||||
public function subscribeAsync(string $eventName, callable $listener): self
|
||||
{
|
||||
$this->asyncListeners[$eventName][] = $listener;
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe a listener from an event
|
||||
*/
|
||||
public function unsubscribe(string $eventName, callable $listener): self
|
||||
{
|
||||
if (isset($this->listeners[$eventName])) {
|
||||
$this->listeners[$eventName] = array_filter(
|
||||
$this->listeners[$eventName],
|
||||
fn($item) => $item['callback'] !== $listener
|
||||
);
|
||||
}
|
||||
|
||||
if (isset($this->asyncListeners[$eventName])) {
|
||||
$this->asyncListeners[$eventName] = array_filter(
|
||||
$this->asyncListeners[$eventName],
|
||||
fn($item) => $item !== $listener
|
||||
);
|
||||
}
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish an event to all subscribers
|
||||
*/
|
||||
public function publish(Event $event): self
|
||||
{
|
||||
$eventName = $event->getName();
|
||||
|
||||
// Execute synchronous listeners
|
||||
if (isset($this->listeners[$eventName])) {
|
||||
foreach ($this->listeners[$eventName] as $listenerData) {
|
||||
if ($event->isPropagationStopped()) {
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
call_user_func($listenerData['callback'], $event);
|
||||
} catch (\Throwable $e) {
|
||||
// Log error but don't break the chain
|
||||
error_log(sprintf(
|
||||
'Event listener error for %s: %s',
|
||||
$eventName,
|
||||
$e->getMessage()
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Queue for async processing if there are async listeners
|
||||
if (isset($this->asyncListeners[$eventName]) && !empty($this->asyncListeners[$eventName])) {
|
||||
$this->deferredEvents[] = $event;
|
||||
}
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process deferred/async events
|
||||
* Call this at the end of the request cycle
|
||||
*/
|
||||
public function processDeferred(): int
|
||||
{
|
||||
$processed = 0;
|
||||
|
||||
foreach ($this->deferredEvents as $event) {
|
||||
$eventName = $event->getName();
|
||||
|
||||
if (!isset($this->asyncListeners[$eventName])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach ($this->asyncListeners[$eventName] as $listener) {
|
||||
try {
|
||||
call_user_func($listener, $event);
|
||||
$processed++;
|
||||
} catch (\Throwable $e) {
|
||||
// Log but don't fail - these are non-critical
|
||||
error_log(sprintf(
|
||||
'Async event handler error for %s: %s',
|
||||
$eventName,
|
||||
$e->getMessage()
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->deferredEvents = [];
|
||||
|
||||
return $processed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an event has any listeners
|
||||
*/
|
||||
public function hasListeners(string $eventName): bool
|
||||
{
|
||||
return !empty($this->listeners[$eventName]) || !empty($this->asyncListeners[$eventName]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get count of listeners for an event
|
||||
*/
|
||||
public function getListenerCount(string $eventName): int
|
||||
{
|
||||
$sync = isset($this->listeners[$eventName]) ? count($this->listeners[$eventName]) : 0;
|
||||
$async = isset($this->asyncListeners[$eventName]) ? count($this->asyncListeners[$eventName]) : 0;
|
||||
|
||||
return $sync + $async;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get count of pending deferred events
|
||||
*/
|
||||
public function getDeferredCount(): int
|
||||
{
|
||||
return count($this->deferredEvents);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all listeners (useful for testing)
|
||||
*/
|
||||
public function clear(): self
|
||||
{
|
||||
$this->listeners = [];
|
||||
$this->asyncListeners = [];
|
||||
$this->deferredEvents = [];
|
||||
|
||||
return $this;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user