Files
mail_manager/lib/Queue/MailQueueFile.php
2026-02-10 20:26:45 -05:00

493 lines
15 KiB
PHP

<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: Sebastian Krupinski <krupinski01@gmail.com>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace KTXM\MailManager\Queue;
use DateTimeImmutable;
use DI\Attribute\Inject;
use Psr\Log\LoggerInterface;
use KTXF\Mail\Entity\IMessageMutable;
use KTXF\Mail\Entity\Message;
use KTXF\Mail\Queue\SendOptions;
use RuntimeException;
/**
* File-Based Mail Queue
*
* Stores mail queue jobs on disk with atomic operations using file locks.
*
* Structure:
* storage/{tenantId}/mail/queue/
* pending/{jobId}/
* meta.json
* message.json
* processing/{jobId}/...
* complete/{jobId}/...
* failed/{jobId}/...
*
* @since 2025.05.01
*/
class MailQueueFile {
private const DIR_PENDING = 'pending';
private const DIR_PROCESSING = 'processing';
private const DIR_COMPLETE = 'complete';
private const DIR_FAILED = 'failed';
private string $storagePath;
public function __construct(
private LoggerInterface $logger,
#[Inject('rootDir')] private readonly string $rootDir,
) {
$this->storagePath = $this->rootDir . '/var/cache/mail_manager/queue';
}
/**
* @inheritDoc
*/
public function enqueue(
string $tenantId,
string $providerId,
string|int $serviceId,
IMessageMutable $message,
SendOptions $options
): string {
$jobId = $this->generateJobId();
$job = new MailJob(
id: $jobId,
tenantId: $tenantId,
providerId: $providerId,
serviceId: $serviceId,
message: $message,
options: $options,
);
$this->writeJob($job, self::DIR_PENDING);
return $jobId;
}
/**
* @inheritDoc
*/
public function dequeue(string $tenantId, int $limit = 50): array {
$pendingDir = $this->getQueueDir($tenantId, self::DIR_PENDING);
if (!is_dir($pendingDir)) {
return [];
}
$jobs = [];
$entries = scandir($pendingDir);
// Sort by priority (read meta files)
$jobsWithPriority = [];
foreach ($entries as $entry) {
if ($entry === '.' || $entry === '..') {
continue;
}
$jobDir = $pendingDir . '/' . $entry;
if (!is_dir($jobDir)) {
continue;
}
$metaFile = $jobDir . '/meta.json';
if (!file_exists($metaFile)) {
continue;
}
$meta = json_decode(file_get_contents($metaFile), true);
if ($meta === null) {
continue;
}
$scheduled = isset($meta['scheduled']) ? new DateTimeImmutable($meta['scheduled']) : null;
if ($scheduled !== null && $scheduled > new DateTimeImmutable()) {
continue; // Not ready yet
}
$jobsWithPriority[] = [
'id' => $entry,
'priority' => $meta['options']['priority'] ?? 0,
'created' => $meta['created'] ?? '',
];
}
// Sort by priority (desc) then by created (asc)
usort($jobsWithPriority, function($a, $b) {
if ($a['priority'] !== $b['priority']) {
return $b['priority'] <=> $a['priority'];
}
return $a['created'] <=> $b['created'];
});
// Take up to limit and move to processing
$jobsWithPriority = array_slice($jobsWithPriority, 0, $limit);
foreach ($jobsWithPriority as $jobInfo) {
$job = $this->loadJob($tenantId, $jobInfo['id'], self::DIR_PENDING);
if ($job === null) {
continue;
}
// Move to processing
$this->moveJob($tenantId, $jobInfo['id'], self::DIR_PENDING, self::DIR_PROCESSING);
$job->status = JobStatus::Processing;
$job->lastAttempt = new DateTimeImmutable();
$job->attempts++;
$this->updateJobMeta($tenantId, $jobInfo['id'], $job, self::DIR_PROCESSING);
$jobs[] = $job;
}
return $jobs;
}
/**
* @inheritDoc
*/
public function acknowledge(string $jobId, string $messageId): void {
$job = $this->findJobById($jobId);
if ($job === null) {
return;
}
$job->status = JobStatus::Complete;
$job->messageId = $messageId;
$job->completed = new DateTimeImmutable();
$this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_COMPLETE);
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_COMPLETE);
}
/**
* @inheritDoc
*/
public function reject(string $jobId, string $error, bool $retry = true): void {
$job = $this->findJobById($jobId);
if ($job === null) {
return;
}
$job->lastError = $error;
if ($retry && $job->canRetry()) {
// Move back to pending with delay
$job->status = JobStatus::Pending;
$job->scheduled = (new DateTimeImmutable())->modify('+' . $job->getRetryDelay() . ' seconds');
$this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_PENDING);
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_PENDING);
} else {
// Move to failed
$job->status = JobStatus::Failed;
$job->completed = new DateTimeImmutable();
$this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_FAILED);
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_FAILED);
}
}
/**
* @inheritDoc
*/
public function getJob(string $jobId): ?MailJob {
return $this->findJobById($jobId);
}
/**
* @inheritDoc
*/
public function listJobs(string $tenantId, ?JobStatus $status = null, int $limit = 100, int $offset = 0): array {
$dirs = $status !== null
? [$this->statusToDir($status)]
: [self::DIR_PENDING, self::DIR_PROCESSING, self::DIR_COMPLETE, self::DIR_FAILED];
$jobs = [];
foreach ($dirs as $dir) {
$queueDir = $this->getQueueDir($tenantId, $dir);
if (!is_dir($queueDir)) {
continue;
}
foreach (scandir($queueDir) as $entry) {
if ($entry === '.' || $entry === '..') {
continue;
}
$job = $this->loadJob($tenantId, $entry, $dir);
if ($job !== null) {
$jobs[] = $job;
}
}
}
// Sort by created desc
usort($jobs, fn($a, $b) => ($b->created?->getTimestamp() ?? 0) <=> ($a->created?->getTimestamp() ?? 0));
return array_slice($jobs, $offset, $limit);
}
/**
* @inheritDoc
*/
public function retry(string $jobId): bool {
$job = $this->findJobById($jobId);
if ($job === null || $job->status !== JobStatus::Failed) {
return false;
}
$job->status = JobStatus::Pending;
$job->attempts = 0;
$job->lastError = null;
$job->scheduled = new DateTimeImmutable();
$this->moveJob($job->tenantId, $jobId, self::DIR_FAILED, self::DIR_PENDING);
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_PENDING);
return true;
}
/**
* @inheritDoc
*/
public function purge(string $tenantId, JobStatus $status, int $olderThanSeconds): int {
$dir = $this->statusToDir($status);
$queueDir = $this->getQueueDir($tenantId, $dir);
if (!is_dir($queueDir)) {
return 0;
}
$threshold = new DateTimeImmutable("-{$olderThanSeconds} seconds");
$purged = 0;
foreach (scandir($queueDir) as $entry) {
if ($entry === '.' || $entry === '..') {
continue;
}
$jobDir = $queueDir . '/' . $entry;
$metaFile = $jobDir . '/meta.json';
if (!file_exists($metaFile)) {
continue;
}
$meta = json_decode(file_get_contents($metaFile), true);
$completed = isset($meta['completed']) ? new DateTimeImmutable($meta['completed']) : null;
if ($completed !== null && $completed < $threshold) {
$this->deleteJobDir($jobDir);
$purged++;
}
}
return $purged;
}
/**
* @inheritDoc
*/
public function stats(string $tenantId): array {
$stats = [
'pending' => 0,
'processing' => 0,
'complete' => 0,
'failed' => 0,
];
foreach ($stats as $status => $_) {
$dir = $this->getQueueDir($tenantId, $status);
if (is_dir($dir)) {
$stats[$status] = count(array_filter(
scandir($dir),
fn($e) => $e !== '.' && $e !== '..'
));
}
}
return $stats;
}
/**
* Generate a unique job ID
*/
private function generateJobId(): string {
return sprintf(
'%08x-%04x-%04x-%04x-%012x',
time(),
mt_rand(0, 0xffff),
mt_rand(0, 0x0fff) | 0x4000,
mt_rand(0, 0x3fff) | 0x8000,
mt_rand(0, 0xffffffffffff)
);
}
/**
* Get the queue directory path for a tenant and status
*/
private function getQueueDir(string $tenantId, string $status): string {
return $this->storagePath . '/' . $tenantId . '/mail/queue/' . $status;
}
/**
* Write a job to disk
*/
private function writeJob(MailJob $job, string $status): void {
$jobDir = $this->getQueueDir($job->tenantId, $status) . '/' . $job->id;
if (!is_dir($jobDir)) {
mkdir($jobDir, 0755, true);
}
// Write meta
$metaFile = $jobDir . '/meta.json';
file_put_contents($metaFile, json_encode($job->toMetaArray(), JSON_PRETTY_PRINT));
// Write message
$messageFile = $jobDir . '/message.json';
file_put_contents($messageFile, json_encode($job->message, JSON_PRETTY_PRINT));
}
/**
* Load a job from disk
*/
private function loadJob(string $tenantId, string $jobId, string $status): ?MailJob {
$jobDir = $this->getQueueDir($tenantId, $status) . '/' . $jobId;
$metaFile = $jobDir . '/meta.json';
$messageFile = $jobDir . '/message.json';
if (!file_exists($metaFile) || !file_exists($messageFile)) {
return null;
}
$meta = json_decode(file_get_contents($metaFile), true);
$messageData = json_decode(file_get_contents($messageFile), true);
if ($meta === null || $messageData === null) {
return null;
}
$message = Message::fromArray($messageData);
return new MailJob(
id: $meta['id'],
tenantId: $meta['tenantId'],
providerId: $meta['providerId'],
serviceId: $meta['serviceId'],
message: $message,
options: new SendOptions(
immediate: $meta['options']['immediate'] ?? false,
priority: $meta['options']['priority'] ?? 0,
retryCount: $meta['options']['retryCount'] ?? 3,
delaySeconds: $meta['options']['delaySeconds'] ?? null,
),
status: JobStatus::from($meta['status']),
attempts: $meta['attempts'] ?? 0,
lastError: $meta['lastError'] ?? null,
messageId: $meta['messageId'] ?? null,
created: isset($meta['created']) ? new DateTimeImmutable($meta['created']) : null,
scheduled: isset($meta['scheduled']) ? new DateTimeImmutable($meta['scheduled']) : null,
lastAttempt: isset($meta['lastAttempt']) ? new DateTimeImmutable($meta['lastAttempt']) : null,
completed: isset($meta['completed']) ? new DateTimeImmutable($meta['completed']) : null,
);
}
/**
* Move a job between status directories
*/
private function moveJob(string $tenantId, string $jobId, string $fromStatus, string $toStatus): void {
$fromDir = $this->getQueueDir($tenantId, $fromStatus) . '/' . $jobId;
$toDir = $this->getQueueDir($tenantId, $toStatus) . '/' . $jobId;
if (!is_dir($fromDir)) {
throw new RuntimeException("Job directory not found: $fromDir");
}
$toParent = dirname($toDir);
if (!is_dir($toParent)) {
mkdir($toParent, 0755, true);
}
rename($fromDir, $toDir);
}
/**
* Update job metadata
*/
private function updateJobMeta(string $tenantId, string $jobId, MailJob $job, string $status): void {
$metaFile = $this->getQueueDir($tenantId, $status) . '/' . $jobId . '/meta.json';
file_put_contents($metaFile, json_encode($job->toMetaArray(), JSON_PRETTY_PRINT));
}
/**
* Find a job by ID across all status directories
*/
private function findJobById(string $jobId): ?MailJob {
// We need to search across all tenants and statuses
// This is inefficient - in production, consider caching or indexing
$tenantsDir = $this->storagePath;
if (!is_dir($tenantsDir)) {
return null;
}
foreach (scandir($tenantsDir) as $tenantId) {
if ($tenantId === '.' || $tenantId === '..') {
continue;
}
foreach ([self::DIR_PENDING, self::DIR_PROCESSING, self::DIR_COMPLETE, self::DIR_FAILED] as $status) {
$job = $this->loadJob($tenantId, $jobId, $status);
if ($job !== null) {
return $job;
}
}
}
return null;
}
/**
* Delete a job directory recursively
*/
private function deleteJobDir(string $dir): void {
if (!is_dir($dir)) {
return;
}
foreach (scandir($dir) as $file) {
if ($file === '.' || $file === '..') {
continue;
}
$path = $dir . '/' . $file;
is_dir($path) ? $this->deleteJobDir($path) : unlink($path);
}
rmdir($dir);
}
/**
* Convert JobStatus to directory name
*/
private function statusToDir(JobStatus $status): string {
return match($status) {
JobStatus::Pending => self::DIR_PENDING,
JobStatus::Processing => self::DIR_PROCESSING,
JobStatus::Complete => self::DIR_COMPLETE,
JobStatus::Failed => self::DIR_FAILED,
};
}
}