Files
mail_manager/lib/Daemon/MailDaemon.php
2026-02-10 20:26:45 -05:00

238 lines
6.6 KiB
PHP

<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: Sebastian Krupinski <krupinski01@gmail.com>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace KTXM\MailManager\Daemon;
use KTXM\MailManager\Manager;
use Psr\Log\LoggerInterface;
/**
* Mail Queue Daemon
*
* Long-running worker process for processing queued mail messages.
* Supports graceful shutdown via signals and configurable batch processing.
*
* @since 2025.05.01
*/
class MailDaemon {
private bool $running = false;
private bool $shutdown = false;
private bool $reload = false;
/**
* @param Manager $manager Mail manager
* @param LoggerInterface $logger Logger
* @param int $pollInterval Seconds between queue polls when idle
* @param int $batchSize Messages to process per batch
* @param int|null $maxMemory Maximum memory usage in bytes before restart
* @param array<string>|null $tenants Specific tenants to process (null = all)
*/
public function __construct(
private Manager $manager,
private LoggerInterface $logger,
private int $pollInterval = 5,
private int $batchSize = 50,
private ?int $maxMemory = null,
private ?array $tenants = null,
) {
// Set default max memory to 128MB
$this->maxMemory = $this->maxMemory ?? (128 * 1024 * 1024);
}
/**
* Run the daemon main loop
*
* @since 2025.05.01
*/
public function run(): void {
$this->running = true;
$this->shutdown = false;
$this->setupSignalHandlers();
$this->logger->info('Mail daemon starting', [
'pollInterval' => $this->pollInterval,
'batchSize' => $this->batchSize,
'maxMemory' => $this->formatBytes($this->maxMemory),
'tenants' => $this->tenants ?? 'all',
]);
$consecutiveEmpty = 0;
while (!$this->shutdown) {
// Handle reload signal
if ($this->reload) {
$this->handleReload();
$this->reload = false;
}
// Check memory usage
if ($this->isMemoryExceeded()) {
$this->logger->warning('Memory limit exceeded, shutting down for restart', [
'current' => $this->formatBytes(memory_get_usage(true)),
'limit' => $this->formatBytes($this->maxMemory),
]);
break;
}
// Process queues
$processed = $this->processTenants();
if ($processed === 0) {
$consecutiveEmpty++;
// Exponential backoff up to poll interval
$sleepTime = min($consecutiveEmpty, $this->pollInterval);
$this->sleep($sleepTime);
} else {
$consecutiveEmpty = 0;
}
// Dispatch pending signals
pcntl_signal_dispatch();
}
$this->logger->info('Mail daemon stopped');
$this->running = false;
}
/**
* Request graceful shutdown
*
* @since 2025.05.01
*/
public function stop(): void {
$this->shutdown = true;
}
/**
* Check if daemon is running
*
* @since 2025.05.01
*
* @return bool
*/
public function isRunning(): bool {
return $this->running;
}
/**
* Process all tenants and return total messages processed
*/
private function processTenants(): int {
$totalProcessed = 0;
$tenants = $this->tenants ?? $this->discoverTenants();
foreach ($tenants as $tenantId) {
if ($this->shutdown) {
break;
}
$result = $this->manager->queueProcess($tenantId, $this->batchSize);
$totalProcessed += $result['processed'] + $result['failed'];
if ($result['processed'] > 0 || $result['failed'] > 0) {
$this->logger->debug('Processed tenant queue', [
'tenant' => $tenantId,
'processed' => $result['processed'],
'failed' => $result['failed'],
]);
}
}
return $totalProcessed;
}
/**
* Discover all tenants with mail queues
*
* @return array<string>
*/
private function discoverTenants(): array {
// This would need to be implemented based on your tenant discovery mechanism
// For now, return empty array - specific tenants should be configured
return [];
}
/**
* Setup signal handlers for graceful shutdown
*/
private function setupSignalHandlers(): void {
if (!function_exists('pcntl_signal')) {
$this->logger->warning('PCNTL extension not available, signal handling disabled');
return;
}
pcntl_signal(SIGTERM, function() {
$this->logger->info('Received SIGTERM, initiating graceful shutdown');
$this->shutdown = true;
});
pcntl_signal(SIGINT, function() {
$this->logger->info('Received SIGINT, initiating graceful shutdown');
$this->shutdown = true;
});
pcntl_signal(SIGHUP, function() {
$this->logger->info('Received SIGHUP, will reload configuration');
$this->reload = true;
});
}
/**
* Handle configuration reload
*/
private function handleReload(): void {
$this->logger->info('Reloading configuration');
// Configuration reload logic would go here
}
/**
* Check if memory limit has been exceeded
*/
private function isMemoryExceeded(): bool {
if ($this->maxMemory === null) {
return false;
}
return memory_get_usage(true) > $this->maxMemory;
}
/**
* Sleep with signal dispatch
*/
private function sleep(int $seconds): void {
for ($i = 0; $i < $seconds; $i++) {
if ($this->shutdown) {
return;
}
sleep(1);
pcntl_signal_dispatch();
}
}
/**
* Format bytes to human readable string
*/
private function formatBytes(?int $bytes): string {
if ($bytes === null) {
return 'unlimited';
}
$units = ['B', 'KB', 'MB', 'GB'];
$i = 0;
while ($bytes >= 1024 && $i < count($units) - 1) {
$bytes /= 1024;
$i++;
}
return round($bytes, 2) . ' ' . $units[$i];
}
}