493 lines
15 KiB
PHP
493 lines
15 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
/**
|
|
* SPDX-FileCopyrightText: Sebastian Krupinski <krupinski01@gmail.com>
|
|
* SPDX-License-Identifier: AGPL-3.0-or-later
|
|
*/
|
|
|
|
namespace KTXM\MailManager\Queue;
|
|
|
|
use DateTimeImmutable;
|
|
use DI\Attribute\Inject;
|
|
use Psr\Log\LoggerInterface;
|
|
use KTXF\Mail\Entity\IMessageMutable;
|
|
use KTXF\Mail\Entity\Message;
|
|
use KTXF\Mail\Queue\SendOptions;
|
|
use RuntimeException;
|
|
|
|
/**
|
|
* File-Based Mail Queue
|
|
*
|
|
* Stores mail queue jobs on disk with atomic operations using file locks.
|
|
*
|
|
* Structure:
|
|
* storage/{tenantId}/mail/queue/
|
|
* pending/{jobId}/
|
|
* meta.json
|
|
* message.json
|
|
* processing/{jobId}/...
|
|
* complete/{jobId}/...
|
|
* failed/{jobId}/...
|
|
*
|
|
* @since 2025.05.01
|
|
*/
|
|
class MailQueueFile {
|
|
|
|
private const DIR_PENDING = 'pending';
|
|
private const DIR_PROCESSING = 'processing';
|
|
private const DIR_COMPLETE = 'complete';
|
|
private const DIR_FAILED = 'failed';
|
|
private string $storagePath;
|
|
|
|
public function __construct(
|
|
private LoggerInterface $logger,
|
|
#[Inject('rootDir')] private readonly string $rootDir,
|
|
) {
|
|
$this->storagePath = $this->rootDir . '/var/cache/mail_manager/queue';
|
|
}
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function enqueue(
|
|
string $tenantId,
|
|
string $providerId,
|
|
string|int $serviceId,
|
|
IMessageMutable $message,
|
|
SendOptions $options
|
|
): string {
|
|
$jobId = $this->generateJobId();
|
|
|
|
$job = new MailJob(
|
|
id: $jobId,
|
|
tenantId: $tenantId,
|
|
providerId: $providerId,
|
|
serviceId: $serviceId,
|
|
message: $message,
|
|
options: $options,
|
|
);
|
|
|
|
$this->writeJob($job, self::DIR_PENDING);
|
|
|
|
return $jobId;
|
|
}
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function dequeue(string $tenantId, int $limit = 50): array {
|
|
$pendingDir = $this->getQueueDir($tenantId, self::DIR_PENDING);
|
|
|
|
if (!is_dir($pendingDir)) {
|
|
return [];
|
|
}
|
|
|
|
$jobs = [];
|
|
$entries = scandir($pendingDir);
|
|
|
|
// Sort by priority (read meta files)
|
|
$jobsWithPriority = [];
|
|
foreach ($entries as $entry) {
|
|
if ($entry === '.' || $entry === '..') {
|
|
continue;
|
|
}
|
|
|
|
$jobDir = $pendingDir . '/' . $entry;
|
|
if (!is_dir($jobDir)) {
|
|
continue;
|
|
}
|
|
|
|
$metaFile = $jobDir . '/meta.json';
|
|
if (!file_exists($metaFile)) {
|
|
continue;
|
|
}
|
|
|
|
$meta = json_decode(file_get_contents($metaFile), true);
|
|
if ($meta === null) {
|
|
continue;
|
|
}
|
|
|
|
$scheduled = isset($meta['scheduled']) ? new DateTimeImmutable($meta['scheduled']) : null;
|
|
if ($scheduled !== null && $scheduled > new DateTimeImmutable()) {
|
|
continue; // Not ready yet
|
|
}
|
|
|
|
$jobsWithPriority[] = [
|
|
'id' => $entry,
|
|
'priority' => $meta['options']['priority'] ?? 0,
|
|
'created' => $meta['created'] ?? '',
|
|
];
|
|
}
|
|
|
|
// Sort by priority (desc) then by created (asc)
|
|
usort($jobsWithPriority, function($a, $b) {
|
|
if ($a['priority'] !== $b['priority']) {
|
|
return $b['priority'] <=> $a['priority'];
|
|
}
|
|
return $a['created'] <=> $b['created'];
|
|
});
|
|
|
|
// Take up to limit and move to processing
|
|
$jobsWithPriority = array_slice($jobsWithPriority, 0, $limit);
|
|
|
|
foreach ($jobsWithPriority as $jobInfo) {
|
|
$job = $this->loadJob($tenantId, $jobInfo['id'], self::DIR_PENDING);
|
|
if ($job === null) {
|
|
continue;
|
|
}
|
|
|
|
// Move to processing
|
|
$this->moveJob($tenantId, $jobInfo['id'], self::DIR_PENDING, self::DIR_PROCESSING);
|
|
$job->status = JobStatus::Processing;
|
|
$job->lastAttempt = new DateTimeImmutable();
|
|
$job->attempts++;
|
|
$this->updateJobMeta($tenantId, $jobInfo['id'], $job, self::DIR_PROCESSING);
|
|
|
|
$jobs[] = $job;
|
|
}
|
|
|
|
return $jobs;
|
|
}
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function acknowledge(string $jobId, string $messageId): void {
|
|
$job = $this->findJobById($jobId);
|
|
if ($job === null) {
|
|
return;
|
|
}
|
|
|
|
$job->status = JobStatus::Complete;
|
|
$job->messageId = $messageId;
|
|
$job->completed = new DateTimeImmutable();
|
|
|
|
$this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_COMPLETE);
|
|
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_COMPLETE);
|
|
}
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function reject(string $jobId, string $error, bool $retry = true): void {
|
|
$job = $this->findJobById($jobId);
|
|
if ($job === null) {
|
|
return;
|
|
}
|
|
|
|
$job->lastError = $error;
|
|
|
|
if ($retry && $job->canRetry()) {
|
|
// Move back to pending with delay
|
|
$job->status = JobStatus::Pending;
|
|
$job->scheduled = (new DateTimeImmutable())->modify('+' . $job->getRetryDelay() . ' seconds');
|
|
|
|
$this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_PENDING);
|
|
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_PENDING);
|
|
} else {
|
|
// Move to failed
|
|
$job->status = JobStatus::Failed;
|
|
$job->completed = new DateTimeImmutable();
|
|
|
|
$this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_FAILED);
|
|
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_FAILED);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function getJob(string $jobId): ?MailJob {
|
|
return $this->findJobById($jobId);
|
|
}
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function listJobs(string $tenantId, ?JobStatus $status = null, int $limit = 100, int $offset = 0): array {
|
|
$dirs = $status !== null
|
|
? [$this->statusToDir($status)]
|
|
: [self::DIR_PENDING, self::DIR_PROCESSING, self::DIR_COMPLETE, self::DIR_FAILED];
|
|
|
|
$jobs = [];
|
|
|
|
foreach ($dirs as $dir) {
|
|
$queueDir = $this->getQueueDir($tenantId, $dir);
|
|
if (!is_dir($queueDir)) {
|
|
continue;
|
|
}
|
|
|
|
foreach (scandir($queueDir) as $entry) {
|
|
if ($entry === '.' || $entry === '..') {
|
|
continue;
|
|
}
|
|
|
|
$job = $this->loadJob($tenantId, $entry, $dir);
|
|
if ($job !== null) {
|
|
$jobs[] = $job;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sort by created desc
|
|
usort($jobs, fn($a, $b) => ($b->created?->getTimestamp() ?? 0) <=> ($a->created?->getTimestamp() ?? 0));
|
|
|
|
return array_slice($jobs, $offset, $limit);
|
|
}
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function retry(string $jobId): bool {
|
|
$job = $this->findJobById($jobId);
|
|
if ($job === null || $job->status !== JobStatus::Failed) {
|
|
return false;
|
|
}
|
|
|
|
$job->status = JobStatus::Pending;
|
|
$job->attempts = 0;
|
|
$job->lastError = null;
|
|
$job->scheduled = new DateTimeImmutable();
|
|
|
|
$this->moveJob($job->tenantId, $jobId, self::DIR_FAILED, self::DIR_PENDING);
|
|
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_PENDING);
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function purge(string $tenantId, JobStatus $status, int $olderThanSeconds): int {
|
|
$dir = $this->statusToDir($status);
|
|
$queueDir = $this->getQueueDir($tenantId, $dir);
|
|
|
|
if (!is_dir($queueDir)) {
|
|
return 0;
|
|
}
|
|
|
|
$threshold = new DateTimeImmutable("-{$olderThanSeconds} seconds");
|
|
$purged = 0;
|
|
|
|
foreach (scandir($queueDir) as $entry) {
|
|
if ($entry === '.' || $entry === '..') {
|
|
continue;
|
|
}
|
|
|
|
$jobDir = $queueDir . '/' . $entry;
|
|
$metaFile = $jobDir . '/meta.json';
|
|
|
|
if (!file_exists($metaFile)) {
|
|
continue;
|
|
}
|
|
|
|
$meta = json_decode(file_get_contents($metaFile), true);
|
|
$completed = isset($meta['completed']) ? new DateTimeImmutable($meta['completed']) : null;
|
|
|
|
if ($completed !== null && $completed < $threshold) {
|
|
$this->deleteJobDir($jobDir);
|
|
$purged++;
|
|
}
|
|
}
|
|
|
|
return $purged;
|
|
}
|
|
|
|
/**
|
|
* @inheritDoc
|
|
*/
|
|
public function stats(string $tenantId): array {
|
|
$stats = [
|
|
'pending' => 0,
|
|
'processing' => 0,
|
|
'complete' => 0,
|
|
'failed' => 0,
|
|
];
|
|
|
|
foreach ($stats as $status => $_) {
|
|
$dir = $this->getQueueDir($tenantId, $status);
|
|
if (is_dir($dir)) {
|
|
$stats[$status] = count(array_filter(
|
|
scandir($dir),
|
|
fn($e) => $e !== '.' && $e !== '..'
|
|
));
|
|
}
|
|
}
|
|
|
|
return $stats;
|
|
}
|
|
|
|
/**
|
|
* Generate a unique job ID
|
|
*/
|
|
private function generateJobId(): string {
|
|
return sprintf(
|
|
'%08x-%04x-%04x-%04x-%012x',
|
|
time(),
|
|
mt_rand(0, 0xffff),
|
|
mt_rand(0, 0x0fff) | 0x4000,
|
|
mt_rand(0, 0x3fff) | 0x8000,
|
|
mt_rand(0, 0xffffffffffff)
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Get the queue directory path for a tenant and status
|
|
*/
|
|
private function getQueueDir(string $tenantId, string $status): string {
|
|
return $this->storagePath . '/' . $tenantId . '/mail/queue/' . $status;
|
|
}
|
|
|
|
/**
|
|
* Write a job to disk
|
|
*/
|
|
private function writeJob(MailJob $job, string $status): void {
|
|
$jobDir = $this->getQueueDir($job->tenantId, $status) . '/' . $job->id;
|
|
|
|
if (!is_dir($jobDir)) {
|
|
mkdir($jobDir, 0755, true);
|
|
}
|
|
|
|
// Write meta
|
|
$metaFile = $jobDir . '/meta.json';
|
|
file_put_contents($metaFile, json_encode($job->toMetaArray(), JSON_PRETTY_PRINT));
|
|
|
|
// Write message
|
|
$messageFile = $jobDir . '/message.json';
|
|
file_put_contents($messageFile, json_encode($job->message, JSON_PRETTY_PRINT));
|
|
}
|
|
|
|
/**
|
|
* Load a job from disk
|
|
*/
|
|
private function loadJob(string $tenantId, string $jobId, string $status): ?MailJob {
|
|
$jobDir = $this->getQueueDir($tenantId, $status) . '/' . $jobId;
|
|
$metaFile = $jobDir . '/meta.json';
|
|
$messageFile = $jobDir . '/message.json';
|
|
|
|
if (!file_exists($metaFile) || !file_exists($messageFile)) {
|
|
return null;
|
|
}
|
|
|
|
$meta = json_decode(file_get_contents($metaFile), true);
|
|
$messageData = json_decode(file_get_contents($messageFile), true);
|
|
|
|
if ($meta === null || $messageData === null) {
|
|
return null;
|
|
}
|
|
|
|
$message = Message::fromArray($messageData);
|
|
|
|
return new MailJob(
|
|
id: $meta['id'],
|
|
tenantId: $meta['tenantId'],
|
|
providerId: $meta['providerId'],
|
|
serviceId: $meta['serviceId'],
|
|
message: $message,
|
|
options: new SendOptions(
|
|
immediate: $meta['options']['immediate'] ?? false,
|
|
priority: $meta['options']['priority'] ?? 0,
|
|
retryCount: $meta['options']['retryCount'] ?? 3,
|
|
delaySeconds: $meta['options']['delaySeconds'] ?? null,
|
|
),
|
|
status: JobStatus::from($meta['status']),
|
|
attempts: $meta['attempts'] ?? 0,
|
|
lastError: $meta['lastError'] ?? null,
|
|
messageId: $meta['messageId'] ?? null,
|
|
created: isset($meta['created']) ? new DateTimeImmutable($meta['created']) : null,
|
|
scheduled: isset($meta['scheduled']) ? new DateTimeImmutable($meta['scheduled']) : null,
|
|
lastAttempt: isset($meta['lastAttempt']) ? new DateTimeImmutable($meta['lastAttempt']) : null,
|
|
completed: isset($meta['completed']) ? new DateTimeImmutable($meta['completed']) : null,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Move a job between status directories
|
|
*/
|
|
private function moveJob(string $tenantId, string $jobId, string $fromStatus, string $toStatus): void {
|
|
$fromDir = $this->getQueueDir($tenantId, $fromStatus) . '/' . $jobId;
|
|
$toDir = $this->getQueueDir($tenantId, $toStatus) . '/' . $jobId;
|
|
|
|
if (!is_dir($fromDir)) {
|
|
throw new RuntimeException("Job directory not found: $fromDir");
|
|
}
|
|
|
|
$toParent = dirname($toDir);
|
|
if (!is_dir($toParent)) {
|
|
mkdir($toParent, 0755, true);
|
|
}
|
|
|
|
rename($fromDir, $toDir);
|
|
}
|
|
|
|
/**
|
|
* Update job metadata
|
|
*/
|
|
private function updateJobMeta(string $tenantId, string $jobId, MailJob $job, string $status): void {
|
|
$metaFile = $this->getQueueDir($tenantId, $status) . '/' . $jobId . '/meta.json';
|
|
file_put_contents($metaFile, json_encode($job->toMetaArray(), JSON_PRETTY_PRINT));
|
|
}
|
|
|
|
/**
|
|
* Find a job by ID across all status directories
|
|
*/
|
|
private function findJobById(string $jobId): ?MailJob {
|
|
// We need to search across all tenants and statuses
|
|
// This is inefficient - in production, consider caching or indexing
|
|
$tenantsDir = $this->storagePath;
|
|
|
|
if (!is_dir($tenantsDir)) {
|
|
return null;
|
|
}
|
|
|
|
foreach (scandir($tenantsDir) as $tenantId) {
|
|
if ($tenantId === '.' || $tenantId === '..') {
|
|
continue;
|
|
}
|
|
|
|
foreach ([self::DIR_PENDING, self::DIR_PROCESSING, self::DIR_COMPLETE, self::DIR_FAILED] as $status) {
|
|
$job = $this->loadJob($tenantId, $jobId, $status);
|
|
if ($job !== null) {
|
|
return $job;
|
|
}
|
|
}
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Delete a job directory recursively
|
|
*/
|
|
private function deleteJobDir(string $dir): void {
|
|
if (!is_dir($dir)) {
|
|
return;
|
|
}
|
|
|
|
foreach (scandir($dir) as $file) {
|
|
if ($file === '.' || $file === '..') {
|
|
continue;
|
|
}
|
|
$path = $dir . '/' . $file;
|
|
is_dir($path) ? $this->deleteJobDir($path) : unlink($path);
|
|
}
|
|
|
|
rmdir($dir);
|
|
}
|
|
|
|
/**
|
|
* Convert JobStatus to directory name
|
|
*/
|
|
private function statusToDir(JobStatus $status): string {
|
|
return match($status) {
|
|
JobStatus::Pending => self::DIR_PENDING,
|
|
JobStatus::Processing => self::DIR_PROCESSING,
|
|
JobStatus::Complete => self::DIR_COMPLETE,
|
|
JobStatus::Failed => self::DIR_FAILED,
|
|
};
|
|
}
|
|
|
|
}
|