* SPDX-License-Identifier: AGPL-3.0-or-later */ namespace KTXM\MailManager\Daemon; use KTXM\MailManager\Manager; use Psr\Log\LoggerInterface; /** * Mail Queue Daemon * * Long-running worker process for processing queued mail messages. * Supports graceful shutdown via signals and configurable batch processing. * * @since 2025.05.01 */ class MailDaemon { private bool $running = false; private bool $shutdown = false; private bool $reload = false; /** * @param Manager $manager Mail manager * @param LoggerInterface $logger Logger * @param int $pollInterval Seconds between queue polls when idle * @param int $batchSize Messages to process per batch * @param int|null $maxMemory Maximum memory usage in bytes before restart * @param array|null $tenants Specific tenants to process (null = all) */ public function __construct( private Manager $manager, private LoggerInterface $logger, private int $pollInterval = 5, private int $batchSize = 50, private ?int $maxMemory = null, private ?array $tenants = null, ) { // Set default max memory to 128MB $this->maxMemory = $this->maxMemory ?? (128 * 1024 * 1024); } /** * Run the daemon main loop * * @since 2025.05.01 */ public function run(): void { $this->running = true; $this->shutdown = false; $this->setupSignalHandlers(); $this->logger->info('Mail daemon starting', [ 'pollInterval' => $this->pollInterval, 'batchSize' => $this->batchSize, 'maxMemory' => $this->formatBytes($this->maxMemory), 'tenants' => $this->tenants ?? 'all', ]); $consecutiveEmpty = 0; while (!$this->shutdown) { // Handle reload signal if ($this->reload) { $this->handleReload(); $this->reload = false; } // Check memory usage if ($this->isMemoryExceeded()) { $this->logger->warning('Memory limit exceeded, shutting down for restart', [ 'current' => $this->formatBytes(memory_get_usage(true)), 'limit' => $this->formatBytes($this->maxMemory), ]); break; } // Process queues $processed = $this->processTenants(); if ($processed === 0) { $consecutiveEmpty++; // Exponential backoff up to poll interval $sleepTime = min($consecutiveEmpty, $this->pollInterval); $this->sleep($sleepTime); } else { $consecutiveEmpty = 0; } // Dispatch pending signals pcntl_signal_dispatch(); } $this->logger->info('Mail daemon stopped'); $this->running = false; } /** * Request graceful shutdown * * @since 2025.05.01 */ public function stop(): void { $this->shutdown = true; } /** * Check if daemon is running * * @since 2025.05.01 * * @return bool */ public function isRunning(): bool { return $this->running; } /** * Process all tenants and return total messages processed */ private function processTenants(): int { $totalProcessed = 0; $tenants = $this->tenants ?? $this->discoverTenants(); foreach ($tenants as $tenantId) { if ($this->shutdown) { break; } $result = $this->manager->queueProcess($tenantId, $this->batchSize); $totalProcessed += $result['processed'] + $result['failed']; if ($result['processed'] > 0 || $result['failed'] > 0) { $this->logger->debug('Processed tenant queue', [ 'tenant' => $tenantId, 'processed' => $result['processed'], 'failed' => $result['failed'], ]); } } return $totalProcessed; } /** * Discover all tenants with mail queues * * @return array */ private function discoverTenants(): array { // This would need to be implemented based on your tenant discovery mechanism // For now, return empty array - specific tenants should be configured return []; } /** * Setup signal handlers for graceful shutdown */ private function setupSignalHandlers(): void { if (!function_exists('pcntl_signal')) { $this->logger->warning('PCNTL extension not available, signal handling disabled'); return; } pcntl_signal(SIGTERM, function() { $this->logger->info('Received SIGTERM, initiating graceful shutdown'); $this->shutdown = true; }); pcntl_signal(SIGINT, function() { $this->logger->info('Received SIGINT, initiating graceful shutdown'); $this->shutdown = true; }); pcntl_signal(SIGHUP, function() { $this->logger->info('Received SIGHUP, will reload configuration'); $this->reload = true; }); } /** * Handle configuration reload */ private function handleReload(): void { $this->logger->info('Reloading configuration'); // Configuration reload logic would go here } /** * Check if memory limit has been exceeded */ private function isMemoryExceeded(): bool { if ($this->maxMemory === null) { return false; } return memory_get_usage(true) > $this->maxMemory; } /** * Sleep with signal dispatch */ private function sleep(int $seconds): void { for ($i = 0; $i < $seconds; $i++) { if ($this->shutdown) { return; } sleep(1); pcntl_signal_dispatch(); } } /** * Format bytes to human readable string */ private function formatBytes(?int $bytes): string { if ($bytes === null) { return 'unlimited'; } $units = ['B', 'KB', 'MB', 'GB']; $i = 0; while ($bytes >= 1024 && $i < count($units) - 1) { $bytes /= 1024; $i++; } return round($bytes, 2) . ' ' . $units[$i]; } }