238 lines
6.6 KiB
PHP
238 lines
6.6 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
/**
|
|
* SPDX-FileCopyrightText: Sebastian Krupinski <krupinski01@gmail.com>
|
|
* SPDX-License-Identifier: AGPL-3.0-or-later
|
|
*/
|
|
|
|
namespace KTXM\MailManager\Daemon;
|
|
|
|
use KTXM\MailManager\Manager;
|
|
use Psr\Log\LoggerInterface;
|
|
|
|
/**
|
|
* Mail Queue Daemon
|
|
*
|
|
* Long-running worker process for processing queued mail messages.
|
|
* Supports graceful shutdown via signals and configurable batch processing.
|
|
*
|
|
* @since 2025.05.01
|
|
*/
|
|
class MailDaemon {
|
|
|
|
private bool $running = false;
|
|
private bool $shutdown = false;
|
|
private bool $reload = false;
|
|
|
|
/**
|
|
* @param Manager $manager Mail manager
|
|
* @param LoggerInterface $logger Logger
|
|
* @param int $pollInterval Seconds between queue polls when idle
|
|
* @param int $batchSize Messages to process per batch
|
|
* @param int|null $maxMemory Maximum memory usage in bytes before restart
|
|
* @param array<string>|null $tenants Specific tenants to process (null = all)
|
|
*/
|
|
public function __construct(
|
|
private Manager $manager,
|
|
private LoggerInterface $logger,
|
|
private int $pollInterval = 5,
|
|
private int $batchSize = 50,
|
|
private ?int $maxMemory = null,
|
|
private ?array $tenants = null,
|
|
) {
|
|
// Set default max memory to 128MB
|
|
$this->maxMemory = $this->maxMemory ?? (128 * 1024 * 1024);
|
|
}
|
|
|
|
/**
|
|
* Run the daemon main loop
|
|
*
|
|
* @since 2025.05.01
|
|
*/
|
|
public function run(): void {
|
|
$this->running = true;
|
|
$this->shutdown = false;
|
|
|
|
$this->setupSignalHandlers();
|
|
|
|
$this->logger->info('Mail daemon starting', [
|
|
'pollInterval' => $this->pollInterval,
|
|
'batchSize' => $this->batchSize,
|
|
'maxMemory' => $this->formatBytes($this->maxMemory),
|
|
'tenants' => $this->tenants ?? 'all',
|
|
]);
|
|
|
|
$consecutiveEmpty = 0;
|
|
|
|
while (!$this->shutdown) {
|
|
// Handle reload signal
|
|
if ($this->reload) {
|
|
$this->handleReload();
|
|
$this->reload = false;
|
|
}
|
|
|
|
// Check memory usage
|
|
if ($this->isMemoryExceeded()) {
|
|
$this->logger->warning('Memory limit exceeded, shutting down for restart', [
|
|
'current' => $this->formatBytes(memory_get_usage(true)),
|
|
'limit' => $this->formatBytes($this->maxMemory),
|
|
]);
|
|
break;
|
|
}
|
|
|
|
// Process queues
|
|
$processed = $this->processTenants();
|
|
|
|
if ($processed === 0) {
|
|
$consecutiveEmpty++;
|
|
// Exponential backoff up to poll interval
|
|
$sleepTime = min($consecutiveEmpty, $this->pollInterval);
|
|
$this->sleep($sleepTime);
|
|
} else {
|
|
$consecutiveEmpty = 0;
|
|
}
|
|
|
|
// Dispatch pending signals
|
|
pcntl_signal_dispatch();
|
|
}
|
|
|
|
$this->logger->info('Mail daemon stopped');
|
|
$this->running = false;
|
|
}
|
|
|
|
/**
|
|
* Request graceful shutdown
|
|
*
|
|
* @since 2025.05.01
|
|
*/
|
|
public function stop(): void {
|
|
$this->shutdown = true;
|
|
}
|
|
|
|
/**
|
|
* Check if daemon is running
|
|
*
|
|
* @since 2025.05.01
|
|
*
|
|
* @return bool
|
|
*/
|
|
public function isRunning(): bool {
|
|
return $this->running;
|
|
}
|
|
|
|
/**
|
|
* Process all tenants and return total messages processed
|
|
*/
|
|
private function processTenants(): int {
|
|
$totalProcessed = 0;
|
|
$tenants = $this->tenants ?? $this->discoverTenants();
|
|
|
|
foreach ($tenants as $tenantId) {
|
|
if ($this->shutdown) {
|
|
break;
|
|
}
|
|
|
|
$result = $this->manager->queueProcess($tenantId, $this->batchSize);
|
|
$totalProcessed += $result['processed'] + $result['failed'];
|
|
|
|
if ($result['processed'] > 0 || $result['failed'] > 0) {
|
|
$this->logger->debug('Processed tenant queue', [
|
|
'tenant' => $tenantId,
|
|
'processed' => $result['processed'],
|
|
'failed' => $result['failed'],
|
|
]);
|
|
}
|
|
}
|
|
|
|
return $totalProcessed;
|
|
}
|
|
|
|
/**
|
|
* Discover all tenants with mail queues
|
|
*
|
|
* @return array<string>
|
|
*/
|
|
private function discoverTenants(): array {
|
|
// This would need to be implemented based on your tenant discovery mechanism
|
|
// For now, return empty array - specific tenants should be configured
|
|
return [];
|
|
}
|
|
|
|
/**
|
|
* Setup signal handlers for graceful shutdown
|
|
*/
|
|
private function setupSignalHandlers(): void {
|
|
if (!function_exists('pcntl_signal')) {
|
|
$this->logger->warning('PCNTL extension not available, signal handling disabled');
|
|
return;
|
|
}
|
|
|
|
pcntl_signal(SIGTERM, function() {
|
|
$this->logger->info('Received SIGTERM, initiating graceful shutdown');
|
|
$this->shutdown = true;
|
|
});
|
|
|
|
pcntl_signal(SIGINT, function() {
|
|
$this->logger->info('Received SIGINT, initiating graceful shutdown');
|
|
$this->shutdown = true;
|
|
});
|
|
|
|
pcntl_signal(SIGHUP, function() {
|
|
$this->logger->info('Received SIGHUP, will reload configuration');
|
|
$this->reload = true;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Handle configuration reload
|
|
*/
|
|
private function handleReload(): void {
|
|
$this->logger->info('Reloading configuration');
|
|
// Configuration reload logic would go here
|
|
}
|
|
|
|
/**
|
|
* Check if memory limit has been exceeded
|
|
*/
|
|
private function isMemoryExceeded(): bool {
|
|
if ($this->maxMemory === null) {
|
|
return false;
|
|
}
|
|
|
|
return memory_get_usage(true) > $this->maxMemory;
|
|
}
|
|
|
|
/**
|
|
* Sleep with signal dispatch
|
|
*/
|
|
private function sleep(int $seconds): void {
|
|
for ($i = 0; $i < $seconds; $i++) {
|
|
if ($this->shutdown) {
|
|
return;
|
|
}
|
|
sleep(1);
|
|
pcntl_signal_dispatch();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Format bytes to human readable string
|
|
*/
|
|
private function formatBytes(?int $bytes): string {
|
|
if ($bytes === null) {
|
|
return 'unlimited';
|
|
}
|
|
|
|
$units = ['B', 'KB', 'MB', 'GB'];
|
|
$i = 0;
|
|
while ($bytes >= 1024 && $i < count($units) - 1) {
|
|
$bytes /= 1024;
|
|
$i++;
|
|
}
|
|
return round($bytes, 2) . ' ' . $units[$i];
|
|
}
|
|
|
|
}
|