* SPDX-License-Identifier: AGPL-3.0-or-later */ namespace KTXM\MailManager\Queue; use DateTimeImmutable; use DI\Attribute\Inject; use Psr\Log\LoggerInterface; use KTXF\Mail\Entity\IMessageMutable; use KTXF\Mail\Entity\Message; use KTXF\Mail\Queue\SendOptions; use RuntimeException; /** * File-Based Mail Queue * * Stores mail queue jobs on disk with atomic operations using file locks. * * Structure: * storage/{tenantId}/mail/queue/ * pending/{jobId}/ * meta.json * message.json * processing/{jobId}/... * complete/{jobId}/... * failed/{jobId}/... * * @since 2025.05.01 */ class MailQueueFile { private const DIR_PENDING = 'pending'; private const DIR_PROCESSING = 'processing'; private const DIR_COMPLETE = 'complete'; private const DIR_FAILED = 'failed'; private string $storagePath; public function __construct( private LoggerInterface $logger, #[Inject('rootDir')] private readonly string $rootDir, ) { $this->storagePath = $this->rootDir . '/var/cache/mail_manager/queue'; } /** * @inheritDoc */ public function enqueue( string $tenantId, string $providerId, string|int $serviceId, IMessageMutable $message, SendOptions $options ): string { $jobId = $this->generateJobId(); $job = new MailJob( id: $jobId, tenantId: $tenantId, providerId: $providerId, serviceId: $serviceId, message: $message, options: $options, ); $this->writeJob($job, self::DIR_PENDING); return $jobId; } /** * @inheritDoc */ public function dequeue(string $tenantId, int $limit = 50): array { $pendingDir = $this->getQueueDir($tenantId, self::DIR_PENDING); if (!is_dir($pendingDir)) { return []; } $jobs = []; $entries = scandir($pendingDir); // Sort by priority (read meta files) $jobsWithPriority = []; foreach ($entries as $entry) { if ($entry === '.' || $entry === '..') { continue; } $jobDir = $pendingDir . '/' . $entry; if (!is_dir($jobDir)) { continue; } $metaFile = $jobDir . '/meta.json'; if (!file_exists($metaFile)) { continue; } $meta = json_decode(file_get_contents($metaFile), true); if ($meta === null) { continue; } $scheduled = isset($meta['scheduled']) ? new DateTimeImmutable($meta['scheduled']) : null; if ($scheduled !== null && $scheduled > new DateTimeImmutable()) { continue; // Not ready yet } $jobsWithPriority[] = [ 'id' => $entry, 'priority' => $meta['options']['priority'] ?? 0, 'created' => $meta['created'] ?? '', ]; } // Sort by priority (desc) then by created (asc) usort($jobsWithPriority, function($a, $b) { if ($a['priority'] !== $b['priority']) { return $b['priority'] <=> $a['priority']; } return $a['created'] <=> $b['created']; }); // Take up to limit and move to processing $jobsWithPriority = array_slice($jobsWithPriority, 0, $limit); foreach ($jobsWithPriority as $jobInfo) { $job = $this->loadJob($tenantId, $jobInfo['id'], self::DIR_PENDING); if ($job === null) { continue; } // Move to processing $this->moveJob($tenantId, $jobInfo['id'], self::DIR_PENDING, self::DIR_PROCESSING); $job->status = JobStatus::Processing; $job->lastAttempt = new DateTimeImmutable(); $job->attempts++; $this->updateJobMeta($tenantId, $jobInfo['id'], $job, self::DIR_PROCESSING); $jobs[] = $job; } return $jobs; } /** * @inheritDoc */ public function acknowledge(string $jobId, string $messageId): void { $job = $this->findJobById($jobId); if ($job === null) { return; } $job->status = JobStatus::Complete; $job->messageId = $messageId; $job->completed = new DateTimeImmutable(); $this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_COMPLETE); $this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_COMPLETE); } /** * @inheritDoc */ public function reject(string $jobId, string $error, bool $retry = true): void { $job = $this->findJobById($jobId); if ($job === null) { return; } $job->lastError = $error; if ($retry && $job->canRetry()) { // Move back to pending with delay $job->status = JobStatus::Pending; $job->scheduled = (new DateTimeImmutable())->modify('+' . $job->getRetryDelay() . ' seconds'); $this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_PENDING); $this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_PENDING); } else { // Move to failed $job->status = JobStatus::Failed; $job->completed = new DateTimeImmutable(); $this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_FAILED); $this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_FAILED); } } /** * @inheritDoc */ public function getJob(string $jobId): ?MailJob { return $this->findJobById($jobId); } /** * @inheritDoc */ public function listJobs(string $tenantId, ?JobStatus $status = null, int $limit = 100, int $offset = 0): array { $dirs = $status !== null ? [$this->statusToDir($status)] : [self::DIR_PENDING, self::DIR_PROCESSING, self::DIR_COMPLETE, self::DIR_FAILED]; $jobs = []; foreach ($dirs as $dir) { $queueDir = $this->getQueueDir($tenantId, $dir); if (!is_dir($queueDir)) { continue; } foreach (scandir($queueDir) as $entry) { if ($entry === '.' || $entry === '..') { continue; } $job = $this->loadJob($tenantId, $entry, $dir); if ($job !== null) { $jobs[] = $job; } } } // Sort by created desc usort($jobs, fn($a, $b) => ($b->created?->getTimestamp() ?? 0) <=> ($a->created?->getTimestamp() ?? 0)); return array_slice($jobs, $offset, $limit); } /** * @inheritDoc */ public function retry(string $jobId): bool { $job = $this->findJobById($jobId); if ($job === null || $job->status !== JobStatus::Failed) { return false; } $job->status = JobStatus::Pending; $job->attempts = 0; $job->lastError = null; $job->scheduled = new DateTimeImmutable(); $this->moveJob($job->tenantId, $jobId, self::DIR_FAILED, self::DIR_PENDING); $this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_PENDING); return true; } /** * @inheritDoc */ public function purge(string $tenantId, JobStatus $status, int $olderThanSeconds): int { $dir = $this->statusToDir($status); $queueDir = $this->getQueueDir($tenantId, $dir); if (!is_dir($queueDir)) { return 0; } $threshold = new DateTimeImmutable("-{$olderThanSeconds} seconds"); $purged = 0; foreach (scandir($queueDir) as $entry) { if ($entry === '.' || $entry === '..') { continue; } $jobDir = $queueDir . '/' . $entry; $metaFile = $jobDir . '/meta.json'; if (!file_exists($metaFile)) { continue; } $meta = json_decode(file_get_contents($metaFile), true); $completed = isset($meta['completed']) ? new DateTimeImmutable($meta['completed']) : null; if ($completed !== null && $completed < $threshold) { $this->deleteJobDir($jobDir); $purged++; } } return $purged; } /** * @inheritDoc */ public function stats(string $tenantId): array { $stats = [ 'pending' => 0, 'processing' => 0, 'complete' => 0, 'failed' => 0, ]; foreach ($stats as $status => $_) { $dir = $this->getQueueDir($tenantId, $status); if (is_dir($dir)) { $stats[$status] = count(array_filter( scandir($dir), fn($e) => $e !== '.' && $e !== '..' )); } } return $stats; } /** * Generate a unique job ID */ private function generateJobId(): string { return sprintf( '%08x-%04x-%04x-%04x-%012x', time(), mt_rand(0, 0xffff), mt_rand(0, 0x0fff) | 0x4000, mt_rand(0, 0x3fff) | 0x8000, mt_rand(0, 0xffffffffffff) ); } /** * Get the queue directory path for a tenant and status */ private function getQueueDir(string $tenantId, string $status): string { return $this->storagePath . '/' . $tenantId . '/mail/queue/' . $status; } /** * Write a job to disk */ private function writeJob(MailJob $job, string $status): void { $jobDir = $this->getQueueDir($job->tenantId, $status) . '/' . $job->id; if (!is_dir($jobDir)) { mkdir($jobDir, 0755, true); } // Write meta $metaFile = $jobDir . '/meta.json'; file_put_contents($metaFile, json_encode($job->toMetaArray(), JSON_PRETTY_PRINT)); // Write message $messageFile = $jobDir . '/message.json'; file_put_contents($messageFile, json_encode($job->message, JSON_PRETTY_PRINT)); } /** * Load a job from disk */ private function loadJob(string $tenantId, string $jobId, string $status): ?MailJob { $jobDir = $this->getQueueDir($tenantId, $status) . '/' . $jobId; $metaFile = $jobDir . '/meta.json'; $messageFile = $jobDir . '/message.json'; if (!file_exists($metaFile) || !file_exists($messageFile)) { return null; } $meta = json_decode(file_get_contents($metaFile), true); $messageData = json_decode(file_get_contents($messageFile), true); if ($meta === null || $messageData === null) { return null; } $message = Message::fromArray($messageData); return new MailJob( id: $meta['id'], tenantId: $meta['tenantId'], providerId: $meta['providerId'], serviceId: $meta['serviceId'], message: $message, options: new SendOptions( immediate: $meta['options']['immediate'] ?? false, priority: $meta['options']['priority'] ?? 0, retryCount: $meta['options']['retryCount'] ?? 3, delaySeconds: $meta['options']['delaySeconds'] ?? null, ), status: JobStatus::from($meta['status']), attempts: $meta['attempts'] ?? 0, lastError: $meta['lastError'] ?? null, messageId: $meta['messageId'] ?? null, created: isset($meta['created']) ? new DateTimeImmutable($meta['created']) : null, scheduled: isset($meta['scheduled']) ? new DateTimeImmutable($meta['scheduled']) : null, lastAttempt: isset($meta['lastAttempt']) ? new DateTimeImmutable($meta['lastAttempt']) : null, completed: isset($meta['completed']) ? new DateTimeImmutable($meta['completed']) : null, ); } /** * Move a job between status directories */ private function moveJob(string $tenantId, string $jobId, string $fromStatus, string $toStatus): void { $fromDir = $this->getQueueDir($tenantId, $fromStatus) . '/' . $jobId; $toDir = $this->getQueueDir($tenantId, $toStatus) . '/' . $jobId; if (!is_dir($fromDir)) { throw new RuntimeException("Job directory not found: $fromDir"); } $toParent = dirname($toDir); if (!is_dir($toParent)) { mkdir($toParent, 0755, true); } rename($fromDir, $toDir); } /** * Update job metadata */ private function updateJobMeta(string $tenantId, string $jobId, MailJob $job, string $status): void { $metaFile = $this->getQueueDir($tenantId, $status) . '/' . $jobId . '/meta.json'; file_put_contents($metaFile, json_encode($job->toMetaArray(), JSON_PRETTY_PRINT)); } /** * Find a job by ID across all status directories */ private function findJobById(string $jobId): ?MailJob { // We need to search across all tenants and statuses // This is inefficient - in production, consider caching or indexing $tenantsDir = $this->storagePath; if (!is_dir($tenantsDir)) { return null; } foreach (scandir($tenantsDir) as $tenantId) { if ($tenantId === '.' || $tenantId === '..') { continue; } foreach ([self::DIR_PENDING, self::DIR_PROCESSING, self::DIR_COMPLETE, self::DIR_FAILED] as $status) { $job = $this->loadJob($tenantId, $jobId, $status); if ($job !== null) { return $job; } } } return null; } /** * Delete a job directory recursively */ private function deleteJobDir(string $dir): void { if (!is_dir($dir)) { return; } foreach (scandir($dir) as $file) { if ($file === '.' || $file === '..') { continue; } $path = $dir . '/' . $file; is_dir($path) ? $this->deleteJobDir($path) : unlink($path); } rmdir($dir); } /** * Convert JobStatus to directory name */ private function statusToDir(JobStatus $status): string { return match($status) { JobStatus::Pending => self::DIR_PENDING, JobStatus::Processing => self::DIR_PROCESSING, JobStatus::Complete => self::DIR_COMPLETE, JobStatus::Failed => self::DIR_FAILED, }; } }