Initial commit

This commit is contained in:
root
2025-12-21 09:55:58 -05:00
committed by Sebastian Krupinski
commit 169b7b4c91
57 changed files with 10105 additions and 0 deletions

View File

@@ -0,0 +1,715 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: Sebastian Krupinski <krupinski01@gmail.com>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace KTXM\MailManager\Controllers;
use InvalidArgumentException;
use KTXC\Http\Response\JsonResponse;
use KTXC\SessionIdentity;
use KTXC\SessionTenant;
use KTXF\Controller\ControllerAbstract;
use KTXF\Mail\Entity\Message;
use KTXF\Mail\Queue\SendOptions;
use KTXF\Resource\Selector\SourceSelector;
use KTXF\Routing\Attributes\AuthenticatedRoute;
use KTXM\MailManager\Manager;
use Psr\Log\LoggerInterface;
use Throwable;
/**
* Default Controller - Unified Mail API
*
* Handles all mail operations in JMAP-style API pattern.
* Supports both single operations and batches with result references.
*/
class DefaultController extends ControllerAbstract {
// Error message constants
private const ERR_MISSING_PROVIDER = 'Missing parameter: provider';
private const ERR_MISSING_IDENTIFIER = 'Missing parameter: identifier';
private const ERR_MISSING_SERVICE = 'Missing parameter: service';
private const ERR_MISSING_COLLECTION = 'Missing parameter: collection';
private const ERR_MISSING_DATA = 'Missing parameter: data';
private const ERR_MISSING_SOURCES = 'Missing parameter: sources';
private const ERR_MISSING_IDENTIFIERS = 'Missing parameter: identifiers';
private const ERR_MISSING_MESSAGE = 'Missing parameter: message';
private const ERR_INVALID_PROVIDER = 'Invalid parameter: provider must be a string';
private const ERR_INVALID_SERVICE = 'Invalid parameter: service must be a string';
private const ERR_INVALID_IDENTIFIER = 'Invalid parameter: identifier must be a string';
private const ERR_INVALID_COLLECTION = 'Invalid parameter: collection must be a string or integer';
private const ERR_INVALID_SOURCES = 'Invalid parameter: sources must be an array';
private const ERR_INVALID_IDENTIFIERS = 'Invalid parameter: identifiers must be an array';
private const ERR_INVALID_DATA = 'Invalid parameter: data must be an array';
private const ERR_INVALID_MESSAGE = 'Invalid parameter: message must be an array';
public function __construct(
private readonly SessionTenant $tenantIdentity,
private readonly SessionIdentity $userIdentity,
private Manager $mailManager,
private readonly LoggerInterface $logger
) {}
/**
* Main API endpoint for mail operations
*
* Single operation:
* { "version": 1, "transaction": "tx-1", "operation": "message.send", "data": {...} }
*
* Batch operations:
* { "version": 1, "transaction": "tx-1", "operations": [
* {"id": "op1", "operation": "message.send", "data": {...}},
* {"id": "op2", "operation": "message.destroy", "data": {"collection": "#op1.draftId"}}
* ]}
*
* @return JsonResponse
*/
#[AuthenticatedRoute('/v1', name: 'mail.manager.v1', methods: ['POST'])]
public function index(
int $version,
string $transaction,
string|null $operation = null,
array|null $data = null,
array|null $operations = null,
string|null $user = null
): JsonResponse {
// authorize request
$tenantId = $this->tenantIdentity->identifier();
$userId = $this->userIdentity->identifier();
try {
// Single operation mode
if ($operation !== null) {
$result = $this->processOperation($tenantId, $userId, $operation, $data ?? [], []);
return new JsonResponse([
'version' => $version,
'transaction' => $transaction,
'operation' => $operation,
'status' => 'success',
'data' => $result
], JsonResponse::HTTP_OK);
}
// Batch operations mode
if ($operations !== null && is_array($operations)) {
$results = $this->processBatch($tenantId, $userId, $operations);
return new JsonResponse([
'version' => $version,
'transaction' => $transaction,
'status' => 'success',
'operations' => $results
], JsonResponse::HTTP_OK);
}
throw new InvalidArgumentException('Either operation or operations must be provided');
} catch (Throwable $t) {
$this->logger->error('Error processing mail manager request', ['exception' => $t]);
return new JsonResponse([
'version' => $version,
'transaction' => $transaction,
'operation' => $operation,
'status' => 'error',
'data' => [
'code' => $t->getCode(),
'message' => $t->getMessage()
]
], JsonResponse::HTTP_INTERNAL_SERVER_ERROR);
}
}
/**
* Process batch operations with result references
*/
private function processBatch(string $tenantId, string $userId, array $operations): array {
$results = [];
$resultMap = []; // Store results by operation ID for references
foreach ($operations as $index => $op) {
$opId = $op['id'] ?? "op{$index}";
$operation = $op['operation'] ?? null;
$data = $op['data'] ?? [];
if ($operation === null) {
$results[] = [
'id' => $opId,
'status' => 'error',
'data' => ['message' => 'Missing operation name']
];
continue;
}
try {
// Resolve result references in data (e.g., "#op1.id")
$data = $this->resolveReferences($data, $resultMap);
$result = $this->processOperation($tenantId, $userId, $operation, $data, $resultMap);
$results[] = [
'id' => $opId,
'operation' => $operation,
'status' => 'success',
'data' => $result
];
// Store result for future references
$resultMap[$opId] = $result;
} catch (Throwable $t) {
$this->logger->warning('Batch operation failed', [
'operation' => $operation,
'opId' => $opId,
'error' => $t->getMessage()
]);
$results[] = [
'id' => $opId,
'operation' => $operation,
'status' => 'error',
'data' => [
'code' => $t->getCode(),
'message' => $t->getMessage()
]
];
}
}
return $results;
}
/**
* Resolve result references in operation data
*
* Transforms "#op1.id" into the actual value from previous operation results
*/
private function resolveReferences(mixed $data, array $resultMap): mixed {
if (is_string($data) && str_starts_with($data, '#')) {
// Parse reference like "#op1.id" or "#op1.collection.id"
$parts = explode('.', substr($data, 1));
$opId = array_shift($parts);
if (!isset($resultMap[$opId])) {
throw new InvalidArgumentException("Reference to undefined operation: #{$opId}");
}
$value = $resultMap[$opId];
foreach ($parts as $key) {
if (is_array($value) && isset($value[$key])) {
$value = $value[$key];
} elseif (is_object($value) && isset($value->$key)) {
$value = $value->$key;
} else {
throw new InvalidArgumentException("Invalid reference path: {$data}");
}
}
return $value;
}
if (is_array($data)) {
return array_map(fn($item) => $this->resolveReferences($item, $resultMap), $data);
}
return $data;
}
/**
* Process a single operation
*/
private function processOperation(string $tenantId, string $userId, string $operation, array $data, array $resultMap): mixed {
return match ($operation) {
// Provider operations
'provider.list' => $this->providerList($tenantId, $userId, $data),
'provider.extant' => $this->providerExtant($tenantId, $userId, $data),
// Service operations
'service.list' => $this->serviceList($tenantId, $userId, $data),
'service.extant' => $this->serviceExtant($tenantId, $userId, $data),
'service.fetch' => $this->serviceFetch($tenantId, $userId, $data),
'service.discover' => $this->serviceDiscover($tenantId, $userId, $data),
'service.test' => $this->serviceTest($tenantId, $userId, $data),
'service.create' => $this->serviceCreate($tenantId, $userId, $data),
'service.update' => $this->serviceUpdate($tenantId, $userId, $data),
'service.delete' => $this->serviceDelete($tenantId, $userId, $data),
// Collection operations
'collection.list' => $this->collectionList($tenantId, $userId, $data),
'collection.extant' => $this->collectionExtant($tenantId, $userId, $data),
'collection.fetch' => $this->collectionFetch($tenantId, $userId, $data),
'collection.create' => $this->collectionCreate($tenantId, $userId, $data),
'collection.modify' => $this->collectionModify($tenantId, $userId, $data),
'collection.destroy' => $this->collectionDestroy($tenantId, $userId, $data),
// Entity operations
'entity.list' => $this->entityList($tenantId, $userId, $data),
'entity.delta' => $this->entityDelta($tenantId, $userId, $data),
'entity.extant' => $this->entityExtant($tenantId, $userId, $data),
'entity.fetch' => $this->entityFetch($tenantId, $userId, $data),
'entity.create' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
'entity.update' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
'entity.delete' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
'entity.transmit' => $this->entityTransmit($tenantId, $userId, $data),
default => throw new InvalidArgumentException('Unknown operation: ' . $operation)
};
}
// ==================== Provider Operations ====================
private function providerList(string $tenantId, string $userId, array $data): mixed {
$sources = null;
if (isset($data['sources']) && is_array($data['sources'])) {
$sources = new SourceSelector();
$sources->jsonDeserialize($data['sources']);
}
return $this->mailManager->providerList($tenantId, $userId, $sources);
}
private function providerExtant(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['sources'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SOURCES);
}
if (!is_array($data['sources'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SOURCES);
}
$sources = new SourceSelector();
$sources->jsonDeserialize($data['sources']);
return $this->mailManager->providerExtant($tenantId, $userId, $sources);
}
// ==================== Service Operations =====================
private function serviceList(string $tenantId, string $userId, array $data): mixed {
$sources = null;
if (isset($data['sources']) && is_array($data['sources'])) {
$sources = new SourceSelector();
$sources->jsonDeserialize($data['sources']);
}
return $this->mailManager->serviceList($tenantId, $userId, $sources);
}
private function serviceExtant(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['sources'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SOURCES);
}
if (!is_array($data['sources'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SOURCES);
}
$sources = new SourceSelector();
$sources->jsonDeserialize($data['sources']);
return $this->mailManager->serviceExtant($tenantId, $userId, $sources);
}
private function serviceFetch(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_MISSING_IDENTIFIER);
}
if (!is_string($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_INVALID_IDENTIFIER);
}
return $this->mailManager->serviceFetch($tenantId, $userId, $data['provider'], $data['identifier']);
}
private function serviceDiscover(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['identity']) || empty($data['identity']) || !is_string($data['identity'])) {
throw new InvalidArgumentException(self::ERR_INVALID_DATA);
}
$provider = $data['provider'] ?? null;
$identity = $data['identity'];
$location = $data['location'] ?? null;
$secret = $data['secret'] ?? null;
return $this->mailManager->serviceDiscover($tenantId, $userId, $provider, $identity, $location, $secret);
}
private function serviceTest(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['identifier']) && !isset($data['location']) && !isset($data['identity'])) {
throw new InvalidArgumentException('Either a service identifier or location and identity must be provided for service test');
}
return $this->mailManager->serviceTest(
$tenantId,
$userId,
$data['provider'],
$data['identifier'] ?? null,
$data['location'] ?? null,
$data['identity'] ?? null,
);
}
private function serviceCreate(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['data'])) {
throw new InvalidArgumentException(self::ERR_MISSING_DATA);
}
if (!is_array($data['data'])) {
throw new InvalidArgumentException(self::ERR_INVALID_DATA);
}
return $this->mailManager->serviceCreate(
$tenantId,
$userId,
$data['provider'],
$data['data']
);
}
private function serviceUpdate(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_MISSING_IDENTIFIER);
}
if (!is_string($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_INVALID_IDENTIFIER);
}
if (!isset($data['data'])) {
throw new InvalidArgumentException(self::ERR_MISSING_DATA);
}
if (!is_array($data['data'])) {
throw new InvalidArgumentException(self::ERR_INVALID_DATA);
}
return $this->mailManager->serviceUpdate(
$tenantId,
$userId,
$data['provider'],
$data['identifier'],
$data['data']
);
}
private function serviceDelete(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_MISSING_IDENTIFIER);
}
if (!is_string($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_INVALID_IDENTIFIER);
}
return $this->mailManager->serviceDelete(
$tenantId,
$userId,
$data['provider'],
$data['identifier']
);
}
// ==================== Collection Operations ====================
private function collectionList(string $tenantId, string $userId, array $data): mixed {
$sources = null;
if (isset($data['sources']) && is_array($data['sources'])) {
$sources = new SourceSelector();
$sources->jsonDeserialize($data['sources']);
}
$filter = $data['filter'] ?? null;
$sort = $data['sort'] ?? null;
return $this->mailManager->collectionList($tenantId, $userId, $sources, $filter, $sort);
}
private function collectionExtant(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['sources'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SOURCES);
}
if (!is_array($data['sources'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SOURCES);
}
$sources = new SourceSelector();
$sources->jsonDeserialize($data['sources']);
return $this->mailManager->collectionExtant($tenantId, $userId, $sources);
}
private function collectionFetch(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['service'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SERVICE);
}
if (!is_string($data['service'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SERVICE);
}
if (!isset($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_MISSING_IDENTIFIER);
}
if (!is_string($data['identifier']) && !is_int($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_INVALID_COLLECTION);
}
return $this->mailManager->collectionFetch(
$tenantId,
$userId,
$data['provider'],
$data['service'],
$data['identifier']
);
}
private function collectionCreate(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['service'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SERVICE);
}
if (!is_string($data['service'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SERVICE);
}
if (isset($data['collection']) && !is_string($data['collection']) && !is_int($data['collection'])) {
throw new InvalidArgumentException(self::ERR_INVALID_COLLECTION);
}
if (!isset($data['properties'])) {
throw new InvalidArgumentException(self::ERR_MISSING_DATA);
}
if (!is_array($data['properties'])) {
throw new InvalidArgumentException(self::ERR_INVALID_DATA);
}
return $this->mailManager->collectionCreate(
$tenantId,
$userId,
$data['provider'],
$data['service'],
$data['collection'] ?? null,
$data['properties']
);
}
private function collectionModify(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['service'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SERVICE);
}
if (!is_string($data['service'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SERVICE);
}
if (!isset($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_MISSING_IDENTIFIER);
}
if (!is_string($data['identifier']) && !is_int($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_INVALID_COLLECTION);
}
if (!isset($data['properties'])) {
throw new InvalidArgumentException(self::ERR_MISSING_DATA);
}
if (!is_array($data['properties'])) {
throw new InvalidArgumentException(self::ERR_INVALID_DATA);
}
return $this->mailManager->collectionModify(
$tenantId,
$userId,
$data['provider'],
$data['service'],
$data['identifier'],
$data['properties']
);
}
private function collectionDestroy(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['service'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SERVICE);
}
if (!is_string($data['service'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SERVICE);
}
if (!isset($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_MISSING_IDENTIFIER);
}
if (!is_string($data['identifier']) && !is_int($data['identifier'])) {
throw new InvalidArgumentException(self::ERR_INVALID_IDENTIFIER);
}
return $this->mailManager->collectionDestroy(
$tenantId,
$userId,
$data['provider'],
$data['service'],
$data['identifier'],
$data['options'] ?? []
);
}
// ==================== Entity Operations ====================
private function entityList(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['sources'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SOURCES);
}
if (!is_array($data['sources'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SOURCES);
}
$sources = new SourceSelector();
$sources->jsonDeserialize($data['sources']);
$filter = $data['filter'] ?? null;
$sort = $data['sort'] ?? null;
$range = $data['range'] ?? null;
return $this->mailManager->entityList($tenantId, $userId, $sources, $filter, $sort, $range);
}
private function entityDelta(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['sources'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SOURCES);
}
if (!is_array($data['sources'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SOURCES);
}
$sources = new SourceSelector();
$sources->jsonDeserialize($data['sources']);
return $this->mailManager->entityDelta($tenantId, $userId, $sources);
}
private function entityExtant(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['sources'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SOURCES);
}
if (!is_array($data['sources'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SOURCES);
}
$sources = new SourceSelector();
$sources->jsonDeserialize($data['sources']);
return $this->mailManager->entityExtant($tenantId, $userId, $sources);
}
private function entityFetch(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['service'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SERVICE);
}
if (!is_string($data['service'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SERVICE);
}
if (!isset($data['collection'])) {
throw new InvalidArgumentException(self::ERR_MISSING_COLLECTION);
}
if (!is_string($data['collection']) && !is_int($data['collection'])) {
throw new InvalidArgumentException(self::ERR_INVALID_COLLECTION);
}
if (!isset($data['identifiers'])) {
throw new InvalidArgumentException(self::ERR_MISSING_IDENTIFIERS);
}
if (!is_array($data['identifiers'])) {
throw new InvalidArgumentException(self::ERR_INVALID_IDENTIFIERS);
}
return $this->mailManager->entityFetch(
$tenantId,
$userId,
$data['provider'],
$data['service'],
$data['collection'],
$data['identifiers']
);
}
private function entityTransmit(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER);
}
if (!is_string($data['provider'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER);
}
if (!isset($data['service'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SERVICE);
}
if (!is_string($data['service'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SERVICE);
}
$jobId = $this->mailManager->entityTransmit(
$tenantId,
$userId,
$data['provider'],
$data['service'],
$data['data']
);
return ['jobId' => $jobId];
}
}

237
lib/Daemon/MailDaemon.php Normal file
View File

@@ -0,0 +1,237 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: Sebastian Krupinski <krupinski01@gmail.com>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace KTXM\MailManager\Daemon;
use KTXM\MailManager\Manager;
use Psr\Log\LoggerInterface;
/**
* Mail Queue Daemon
*
* Long-running worker process for processing queued mail messages.
* Supports graceful shutdown via signals and configurable batch processing.
*
* @since 2025.05.01
*/
class MailDaemon {
private bool $running = false;
private bool $shutdown = false;
private bool $reload = false;
/**
* @param Manager $manager Mail manager
* @param LoggerInterface $logger Logger
* @param int $pollInterval Seconds between queue polls when idle
* @param int $batchSize Messages to process per batch
* @param int|null $maxMemory Maximum memory usage in bytes before restart
* @param array<string>|null $tenants Specific tenants to process (null = all)
*/
public function __construct(
private Manager $manager,
private LoggerInterface $logger,
private int $pollInterval = 5,
private int $batchSize = 50,
private ?int $maxMemory = null,
private ?array $tenants = null,
) {
// Set default max memory to 128MB
$this->maxMemory = $this->maxMemory ?? (128 * 1024 * 1024);
}
/**
* Run the daemon main loop
*
* @since 2025.05.01
*/
public function run(): void {
$this->running = true;
$this->shutdown = false;
$this->setupSignalHandlers();
$this->logger->info('Mail daemon starting', [
'pollInterval' => $this->pollInterval,
'batchSize' => $this->batchSize,
'maxMemory' => $this->formatBytes($this->maxMemory),
'tenants' => $this->tenants ?? 'all',
]);
$consecutiveEmpty = 0;
while (!$this->shutdown) {
// Handle reload signal
if ($this->reload) {
$this->handleReload();
$this->reload = false;
}
// Check memory usage
if ($this->isMemoryExceeded()) {
$this->logger->warning('Memory limit exceeded, shutting down for restart', [
'current' => $this->formatBytes(memory_get_usage(true)),
'limit' => $this->formatBytes($this->maxMemory),
]);
break;
}
// Process queues
$processed = $this->processTenants();
if ($processed === 0) {
$consecutiveEmpty++;
// Exponential backoff up to poll interval
$sleepTime = min($consecutiveEmpty, $this->pollInterval);
$this->sleep($sleepTime);
} else {
$consecutiveEmpty = 0;
}
// Dispatch pending signals
pcntl_signal_dispatch();
}
$this->logger->info('Mail daemon stopped');
$this->running = false;
}
/**
* Request graceful shutdown
*
* @since 2025.05.01
*/
public function stop(): void {
$this->shutdown = true;
}
/**
* Check if daemon is running
*
* @since 2025.05.01
*
* @return bool
*/
public function isRunning(): bool {
return $this->running;
}
/**
* Process all tenants and return total messages processed
*/
private function processTenants(): int {
$totalProcessed = 0;
$tenants = $this->tenants ?? $this->discoverTenants();
foreach ($tenants as $tenantId) {
if ($this->shutdown) {
break;
}
$result = $this->manager->queueProcess($tenantId, $this->batchSize);
$totalProcessed += $result['processed'] + $result['failed'];
if ($result['processed'] > 0 || $result['failed'] > 0) {
$this->logger->debug('Processed tenant queue', [
'tenant' => $tenantId,
'processed' => $result['processed'],
'failed' => $result['failed'],
]);
}
}
return $totalProcessed;
}
/**
* Discover all tenants with mail queues
*
* @return array<string>
*/
private function discoverTenants(): array {
// This would need to be implemented based on your tenant discovery mechanism
// For now, return empty array - specific tenants should be configured
return [];
}
/**
* Setup signal handlers for graceful shutdown
*/
private function setupSignalHandlers(): void {
if (!function_exists('pcntl_signal')) {
$this->logger->warning('PCNTL extension not available, signal handling disabled');
return;
}
pcntl_signal(SIGTERM, function() {
$this->logger->info('Received SIGTERM, initiating graceful shutdown');
$this->shutdown = true;
});
pcntl_signal(SIGINT, function() {
$this->logger->info('Received SIGINT, initiating graceful shutdown');
$this->shutdown = true;
});
pcntl_signal(SIGHUP, function() {
$this->logger->info('Received SIGHUP, will reload configuration');
$this->reload = true;
});
}
/**
* Handle configuration reload
*/
private function handleReload(): void {
$this->logger->info('Reloading configuration');
// Configuration reload logic would go here
}
/**
* Check if memory limit has been exceeded
*/
private function isMemoryExceeded(): bool {
if ($this->maxMemory === null) {
return false;
}
return memory_get_usage(true) > $this->maxMemory;
}
/**
* Sleep with signal dispatch
*/
private function sleep(int $seconds): void {
for ($i = 0; $i < $seconds; $i++) {
if ($this->shutdown) {
return;
}
sleep(1);
pcntl_signal_dispatch();
}
}
/**
* Format bytes to human readable string
*/
private function formatBytes(?int $bytes): string {
if ($bytes === null) {
return 'unlimited';
}
$units = ['B', 'KB', 'MB', 'GB'];
$i = 0;
while ($bytes >= 1024 && $i < count($units) - 1) {
$bytes /= 1024;
$i++;
}
return round($bytes, 2) . ' ' . $units[$i];
}
}

246
lib/Daemon/MailQueueCli.php Normal file
View File

@@ -0,0 +1,246 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: Sebastian Krupinski <krupinski01@gmail.com>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace KTXM\MailManager\Daemon;
use KTXM\MailManager\Queue\JobStatus;
use KTXM\MailManager\Queue\MailQueue;
use Psr\Log\LoggerInterface;
/**
* Mail Queue CLI
*
* Command-line interface for mail queue management operations.
*
* Usage:
* php mail-queue.php list <tenant> [--status=pending]
* php mail-queue.php retry <jobId>
* php mail-queue.php retry-all <tenant> --status=failed
* php mail-queue.php purge <tenant> --status=complete --older-than=7d
* php mail-queue.php stats <tenant>
*
* @since 2025.05.01
*/
class MailQueueCli {
public function __construct(
private MailQueue $queue,
private LoggerInterface $logger,
) {}
/**
* Run CLI command
*
* @param array<string> $args Command line arguments
*
* @return int Exit code
*/
public function run(array $args): int {
$command = $args[1] ?? 'help';
return match($command) {
'list' => $this->commandList($args),
'retry' => $this->commandRetry($args),
'retry-all' => $this->commandRetryAll($args),
'purge' => $this->commandPurge($args),
'stats' => $this->commandStats($args),
'help', '--help', '-h' => $this->commandHelp(),
default => $this->commandHelp(),
};
}
/**
* List jobs in queue
*/
private function commandList(array $args): int {
$tenantId = $args[2] ?? null;
if ($tenantId === null) {
echo "Error: tenant ID required\n";
return 1;
}
$status = $this->parseOption($args, 'status');
$statusEnum = $status !== null ? JobStatus::tryFrom($status) : null;
$limit = (int)($this->parseOption($args, 'limit') ?? 100);
$jobs = $this->queue->listJobs($tenantId, $statusEnum, $limit);
if (empty($jobs)) {
echo "No jobs found\n";
return 0;
}
echo sprintf("%-36s %-12s %-8s %-20s %s\n",
'JOB ID', 'STATUS', 'ATTEMPTS', 'CREATED', 'SUBJECT');
echo str_repeat('-', 100) . "\n";
foreach ($jobs as $job) {
$subject = substr($job->message->getSubject(), 0, 30);
echo sprintf("%-36s %-12s %-8d %-20s %s\n",
$job->id,
$job->status->value,
$job->attempts,
$job->created?->format('Y-m-d H:i:s') ?? '-',
$subject
);
}
return 0;
}
/**
* Retry a specific job
*/
private function commandRetry(array $args): int {
$jobId = $args[2] ?? null;
if ($jobId === null) {
echo "Error: job ID required\n";
return 1;
}
if ($this->queue->retry($jobId)) {
echo "Job $jobId queued for retry\n";
return 0;
}
echo "Failed to retry job $jobId (not found or not failed)\n";
return 1;
}
/**
* Retry all failed jobs for a tenant
*/
private function commandRetryAll(array $args): int {
$tenantId = $args[2] ?? null;
if ($tenantId === null) {
echo "Error: tenant ID required\n";
return 1;
}
$jobs = $this->queue->listJobs($tenantId, JobStatus::Failed);
$retried = 0;
foreach ($jobs as $job) {
if ($this->queue->retry($job->id)) {
$retried++;
}
}
echo "Retried $retried jobs\n";
return 0;
}
/**
* Purge old jobs
*/
private function commandPurge(array $args): int {
$tenantId = $args[2] ?? null;
if ($tenantId === null) {
echo "Error: tenant ID required\n";
return 1;
}
$status = $this->parseOption($args, 'status') ?? 'complete';
$statusEnum = JobStatus::tryFrom($status);
if ($statusEnum === null) {
echo "Error: invalid status '$status'\n";
return 1;
}
$olderThan = $this->parseOption($args, 'older-than') ?? '7d';
$seconds = $this->parseDuration($olderThan);
$purged = $this->queue->purge($tenantId, $statusEnum, $seconds);
echo "Purged $purged jobs\n";
return 0;
}
/**
* Show queue statistics
*/
private function commandStats(array $args): int {
$tenantId = $args[2] ?? null;
if ($tenantId === null) {
echo "Error: tenant ID required\n";
return 1;
}
$stats = $this->queue->stats($tenantId);
echo "Queue Statistics for $tenantId:\n";
echo " Pending: {$stats['pending']}\n";
echo " Processing: {$stats['processing']}\n";
echo " Complete: {$stats['complete']}\n";
echo " Failed: {$stats['failed']}\n";
return 0;
}
/**
* Show help message
*/
private function commandHelp(): int {
echo <<<HELP
Mail Queue CLI
Usage:
mail-queue <command> [options]
Commands:
list <tenant> List jobs in queue
--status=<status> Filter by status (pending, processing, complete, failed)
--limit=<n> Maximum jobs to show (default: 100)
retry <jobId> Retry a specific failed job
retry-all <tenant> Retry all failed jobs for a tenant
purge <tenant> Purge old jobs
--status=<status> Status to purge (default: complete)
--older-than=<duration> Age threshold (default: 7d, e.g., 1h, 30d)
stats <tenant> Show queue statistics
help Show this help message
HELP;
return 0;
}
/**
* Parse a command line option
*/
private function parseOption(array $args, string $name): ?string {
foreach ($args as $arg) {
if (str_starts_with($arg, "--$name=")) {
return substr($arg, strlen("--$name="));
}
}
return null;
}
/**
* Parse a duration string to seconds
*/
private function parseDuration(string $duration): int {
preg_match('/^(\d+)([smhd])?$/', $duration, $matches);
$value = (int)($matches[1] ?? 0);
$unit = $matches[2] ?? 's';
return match($unit) {
's' => $value,
'm' => $value * 60,
'h' => $value * 3600,
'd' => $value * 86400,
default => $value,
};
}
}

1036
lib/Manager.php Normal file

File diff suppressed because it is too large Load Diff

67
lib/Module.php Normal file
View File

@@ -0,0 +1,67 @@
<?php
namespace KTXM\MailManager;
use KTXF\Module\ModuleBrowserInterface;
use KTXF\Module\ModuleInstanceAbstract;
/**
* Mail Manager Module
*
* Provides unified mail sending and management across multiple providers
* with context-aware service discovery and queued delivery.
*/
class Module extends ModuleInstanceAbstract implements ModuleBrowserInterface
{
public function __construct()
{ }
public function handle(): string
{
return 'mail_manager';
}
public function label(): string
{
return 'Mail Manager';
}
public function author(): string
{
return 'Ktrix';
}
public function description(): string
{
return 'Mail management module for Ktrix - provides unified mail sending with provider abstraction and queue support';
}
public function version(): string
{
return '0.0.1';
}
public function permissions(): array
{
return [
'mail_manager' => [
'label' => 'Access Mail Manager',
'description' => 'View and access the mail manager module',
'group' => 'Mail Management'
],
];
}
public function registerBI(): array {
return [
'handle' => $this->handle(),
'namespace' => 'MailManager',
'version' => $this->version(),
'label' => $this->label(),
'author' => $this->author(),
'description' => $this->description(),
'boot' => 'static/module.mjs',
];
}
}

32
lib/Queue/JobStatus.php Normal file
View File

@@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: Sebastian Krupinski <krupinski01@gmail.com>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace KTXM\MailManager\Queue;
use JsonSerializable;
/**
* Mail Job Status
*
* Status states for queued mail jobs.
*
* @since 2025.05.01
*/
enum JobStatus: string implements JsonSerializable {
case Pending = 'pending';
case Processing = 'processing';
case Complete = 'complete';
case Failed = 'failed';
public function jsonSerialize(): string {
return $this->value;
}
}

115
lib/Queue/MailJob.php Normal file
View File

@@ -0,0 +1,115 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: Sebastian Krupinski <krupinski01@gmail.com>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace KTXM\MailManager\Queue;
use DateTimeImmutable;
use KTXF\Mail\Entity\IMessageMutable;
use KTXF\Mail\Queue\SendOptions;
/**
* Mail Job
*
* Represents a queued mail job with metadata and message content.
*
* @since 2025.05.01
*/
class MailJob {
public function __construct(
public readonly string $id,
public readonly string $tenantId,
public readonly string $providerId,
public readonly string|int $serviceId,
public readonly IMessageMutable $message,
public readonly SendOptions $options,
public JobStatus $status = JobStatus::Pending,
public int $attempts = 0,
public ?string $lastError = null,
public ?string $messageId = null,
public ?DateTimeImmutable $created = null,
public ?DateTimeImmutable $scheduled = null,
public ?DateTimeImmutable $lastAttempt = null,
public ?DateTimeImmutable $completed = null,
) {
$this->created = $this->created ?? new DateTimeImmutable();
$this->scheduled = $this->scheduled ?? $this->calculateScheduledTime();
}
/**
* Calculate when this job should be processed
*
* @return DateTimeImmutable
*/
private function calculateScheduledTime(): DateTimeImmutable {
$scheduled = $this->created ?? new DateTimeImmutable();
if ($this->options->delaySeconds !== null && $this->options->delaySeconds > 0) {
$scheduled = $scheduled->modify("+{$this->options->delaySeconds} seconds");
}
return $scheduled;
}
/**
* Check if the job is ready to be processed
*
* @return bool
*/
public function isReady(): bool {
if ($this->status !== JobStatus::Pending) {
return false;
}
return $this->scheduled === null || $this->scheduled <= new DateTimeImmutable();
}
/**
* Check if the job can be retried
*
* @return bool
*/
public function canRetry(): bool {
return $this->attempts < $this->options->retryCount;
}
/**
* Get retry delay in seconds based on attempt count (exponential backoff)
*
* @return int
*/
public function getRetryDelay(): int {
// Exponential backoff: 30s, 60s, 120s, 240s, ...
return min(30 * (2 ** $this->attempts), 3600);
}
/**
* Serialize job metadata for storage
*
* @return array
*/
public function toMetaArray(): array {
return [
'id' => $this->id,
'tenantId' => $this->tenantId,
'providerId' => $this->providerId,
'serviceId' => $this->serviceId,
'options' => $this->options->jsonSerialize(),
'status' => $this->status->value,
'attempts' => $this->attempts,
'lastError' => $this->lastError,
'messageId' => $this->messageId,
'created' => $this->created?->format('c'),
'scheduled' => $this->scheduled?->format('c'),
'lastAttempt' => $this->lastAttempt?->format('c'),
'completed' => $this->completed?->format('c'),
];
}
}

492
lib/Queue/MailQueueFile.php Normal file
View File

@@ -0,0 +1,492 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: Sebastian Krupinski <krupinski01@gmail.com>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace KTXM\MailManager\Queue;
use DateTimeImmutable;
use DI\Attribute\Inject;
use Psr\Log\LoggerInterface;
use KTXF\Mail\Entity\IMessageMutable;
use KTXF\Mail\Entity\Message;
use KTXF\Mail\Queue\SendOptions;
use RuntimeException;
/**
* File-Based Mail Queue
*
* Stores mail queue jobs on disk with atomic operations using file locks.
*
* Structure:
* storage/{tenantId}/mail/queue/
* pending/{jobId}/
* meta.json
* message.json
* processing/{jobId}/...
* complete/{jobId}/...
* failed/{jobId}/...
*
* @since 2025.05.01
*/
class MailQueueFile {
private const DIR_PENDING = 'pending';
private const DIR_PROCESSING = 'processing';
private const DIR_COMPLETE = 'complete';
private const DIR_FAILED = 'failed';
private string $storagePath;
public function __construct(
private LoggerInterface $logger,
#[Inject('rootDir')] private readonly string $rootDir,
) {
$this->storagePath = $this->rootDir . '/var/cache/mail_manager/queue';
}
/**
* @inheritDoc
*/
public function enqueue(
string $tenantId,
string $providerId,
string|int $serviceId,
IMessageMutable $message,
SendOptions $options
): string {
$jobId = $this->generateJobId();
$job = new MailJob(
id: $jobId,
tenantId: $tenantId,
providerId: $providerId,
serviceId: $serviceId,
message: $message,
options: $options,
);
$this->writeJob($job, self::DIR_PENDING);
return $jobId;
}
/**
* @inheritDoc
*/
public function dequeue(string $tenantId, int $limit = 50): array {
$pendingDir = $this->getQueueDir($tenantId, self::DIR_PENDING);
if (!is_dir($pendingDir)) {
return [];
}
$jobs = [];
$entries = scandir($pendingDir);
// Sort by priority (read meta files)
$jobsWithPriority = [];
foreach ($entries as $entry) {
if ($entry === '.' || $entry === '..') {
continue;
}
$jobDir = $pendingDir . '/' . $entry;
if (!is_dir($jobDir)) {
continue;
}
$metaFile = $jobDir . '/meta.json';
if (!file_exists($metaFile)) {
continue;
}
$meta = json_decode(file_get_contents($metaFile), true);
if ($meta === null) {
continue;
}
$scheduled = isset($meta['scheduled']) ? new DateTimeImmutable($meta['scheduled']) : null;
if ($scheduled !== null && $scheduled > new DateTimeImmutable()) {
continue; // Not ready yet
}
$jobsWithPriority[] = [
'id' => $entry,
'priority' => $meta['options']['priority'] ?? 0,
'created' => $meta['created'] ?? '',
];
}
// Sort by priority (desc) then by created (asc)
usort($jobsWithPriority, function($a, $b) {
if ($a['priority'] !== $b['priority']) {
return $b['priority'] <=> $a['priority'];
}
return $a['created'] <=> $b['created'];
});
// Take up to limit and move to processing
$jobsWithPriority = array_slice($jobsWithPriority, 0, $limit);
foreach ($jobsWithPriority as $jobInfo) {
$job = $this->loadJob($tenantId, $jobInfo['id'], self::DIR_PENDING);
if ($job === null) {
continue;
}
// Move to processing
$this->moveJob($tenantId, $jobInfo['id'], self::DIR_PENDING, self::DIR_PROCESSING);
$job->status = JobStatus::Processing;
$job->lastAttempt = new DateTimeImmutable();
$job->attempts++;
$this->updateJobMeta($tenantId, $jobInfo['id'], $job, self::DIR_PROCESSING);
$jobs[] = $job;
}
return $jobs;
}
/**
* @inheritDoc
*/
public function acknowledge(string $jobId, string $messageId): void {
$job = $this->findJobById($jobId);
if ($job === null) {
return;
}
$job->status = JobStatus::Complete;
$job->messageId = $messageId;
$job->completed = new DateTimeImmutable();
$this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_COMPLETE);
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_COMPLETE);
}
/**
* @inheritDoc
*/
public function reject(string $jobId, string $error, bool $retry = true): void {
$job = $this->findJobById($jobId);
if ($job === null) {
return;
}
$job->lastError = $error;
if ($retry && $job->canRetry()) {
// Move back to pending with delay
$job->status = JobStatus::Pending;
$job->scheduled = (new DateTimeImmutable())->modify('+' . $job->getRetryDelay() . ' seconds');
$this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_PENDING);
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_PENDING);
} else {
// Move to failed
$job->status = JobStatus::Failed;
$job->completed = new DateTimeImmutable();
$this->moveJob($job->tenantId, $jobId, self::DIR_PROCESSING, self::DIR_FAILED);
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_FAILED);
}
}
/**
* @inheritDoc
*/
public function getJob(string $jobId): ?MailJob {
return $this->findJobById($jobId);
}
/**
* @inheritDoc
*/
public function listJobs(string $tenantId, ?JobStatus $status = null, int $limit = 100, int $offset = 0): array {
$dirs = $status !== null
? [$this->statusToDir($status)]
: [self::DIR_PENDING, self::DIR_PROCESSING, self::DIR_COMPLETE, self::DIR_FAILED];
$jobs = [];
foreach ($dirs as $dir) {
$queueDir = $this->getQueueDir($tenantId, $dir);
if (!is_dir($queueDir)) {
continue;
}
foreach (scandir($queueDir) as $entry) {
if ($entry === '.' || $entry === '..') {
continue;
}
$job = $this->loadJob($tenantId, $entry, $dir);
if ($job !== null) {
$jobs[] = $job;
}
}
}
// Sort by created desc
usort($jobs, fn($a, $b) => ($b->created?->getTimestamp() ?? 0) <=> ($a->created?->getTimestamp() ?? 0));
return array_slice($jobs, $offset, $limit);
}
/**
* @inheritDoc
*/
public function retry(string $jobId): bool {
$job = $this->findJobById($jobId);
if ($job === null || $job->status !== JobStatus::Failed) {
return false;
}
$job->status = JobStatus::Pending;
$job->attempts = 0;
$job->lastError = null;
$job->scheduled = new DateTimeImmutable();
$this->moveJob($job->tenantId, $jobId, self::DIR_FAILED, self::DIR_PENDING);
$this->updateJobMeta($job->tenantId, $jobId, $job, self::DIR_PENDING);
return true;
}
/**
* @inheritDoc
*/
public function purge(string $tenantId, JobStatus $status, int $olderThanSeconds): int {
$dir = $this->statusToDir($status);
$queueDir = $this->getQueueDir($tenantId, $dir);
if (!is_dir($queueDir)) {
return 0;
}
$threshold = new DateTimeImmutable("-{$olderThanSeconds} seconds");
$purged = 0;
foreach (scandir($queueDir) as $entry) {
if ($entry === '.' || $entry === '..') {
continue;
}
$jobDir = $queueDir . '/' . $entry;
$metaFile = $jobDir . '/meta.json';
if (!file_exists($metaFile)) {
continue;
}
$meta = json_decode(file_get_contents($metaFile), true);
$completed = isset($meta['completed']) ? new DateTimeImmutable($meta['completed']) : null;
if ($completed !== null && $completed < $threshold) {
$this->deleteJobDir($jobDir);
$purged++;
}
}
return $purged;
}
/**
* @inheritDoc
*/
public function stats(string $tenantId): array {
$stats = [
'pending' => 0,
'processing' => 0,
'complete' => 0,
'failed' => 0,
];
foreach ($stats as $status => $_) {
$dir = $this->getQueueDir($tenantId, $status);
if (is_dir($dir)) {
$stats[$status] = count(array_filter(
scandir($dir),
fn($e) => $e !== '.' && $e !== '..'
));
}
}
return $stats;
}
/**
* Generate a unique job ID
*/
private function generateJobId(): string {
return sprintf(
'%08x-%04x-%04x-%04x-%012x',
time(),
mt_rand(0, 0xffff),
mt_rand(0, 0x0fff) | 0x4000,
mt_rand(0, 0x3fff) | 0x8000,
mt_rand(0, 0xffffffffffff)
);
}
/**
* Get the queue directory path for a tenant and status
*/
private function getQueueDir(string $tenantId, string $status): string {
return $this->storagePath . '/' . $tenantId . '/mail/queue/' . $status;
}
/**
* Write a job to disk
*/
private function writeJob(MailJob $job, string $status): void {
$jobDir = $this->getQueueDir($job->tenantId, $status) . '/' . $job->id;
if (!is_dir($jobDir)) {
mkdir($jobDir, 0755, true);
}
// Write meta
$metaFile = $jobDir . '/meta.json';
file_put_contents($metaFile, json_encode($job->toMetaArray(), JSON_PRETTY_PRINT));
// Write message
$messageFile = $jobDir . '/message.json';
file_put_contents($messageFile, json_encode($job->message, JSON_PRETTY_PRINT));
}
/**
* Load a job from disk
*/
private function loadJob(string $tenantId, string $jobId, string $status): ?MailJob {
$jobDir = $this->getQueueDir($tenantId, $status) . '/' . $jobId;
$metaFile = $jobDir . '/meta.json';
$messageFile = $jobDir . '/message.json';
if (!file_exists($metaFile) || !file_exists($messageFile)) {
return null;
}
$meta = json_decode(file_get_contents($metaFile), true);
$messageData = json_decode(file_get_contents($messageFile), true);
if ($meta === null || $messageData === null) {
return null;
}
$message = Message::fromArray($messageData);
return new MailJob(
id: $meta['id'],
tenantId: $meta['tenantId'],
providerId: $meta['providerId'],
serviceId: $meta['serviceId'],
message: $message,
options: new SendOptions(
immediate: $meta['options']['immediate'] ?? false,
priority: $meta['options']['priority'] ?? 0,
retryCount: $meta['options']['retryCount'] ?? 3,
delaySeconds: $meta['options']['delaySeconds'] ?? null,
),
status: JobStatus::from($meta['status']),
attempts: $meta['attempts'] ?? 0,
lastError: $meta['lastError'] ?? null,
messageId: $meta['messageId'] ?? null,
created: isset($meta['created']) ? new DateTimeImmutable($meta['created']) : null,
scheduled: isset($meta['scheduled']) ? new DateTimeImmutable($meta['scheduled']) : null,
lastAttempt: isset($meta['lastAttempt']) ? new DateTimeImmutable($meta['lastAttempt']) : null,
completed: isset($meta['completed']) ? new DateTimeImmutable($meta['completed']) : null,
);
}
/**
* Move a job between status directories
*/
private function moveJob(string $tenantId, string $jobId, string $fromStatus, string $toStatus): void {
$fromDir = $this->getQueueDir($tenantId, $fromStatus) . '/' . $jobId;
$toDir = $this->getQueueDir($tenantId, $toStatus) . '/' . $jobId;
if (!is_dir($fromDir)) {
throw new RuntimeException("Job directory not found: $fromDir");
}
$toParent = dirname($toDir);
if (!is_dir($toParent)) {
mkdir($toParent, 0755, true);
}
rename($fromDir, $toDir);
}
/**
* Update job metadata
*/
private function updateJobMeta(string $tenantId, string $jobId, MailJob $job, string $status): void {
$metaFile = $this->getQueueDir($tenantId, $status) . '/' . $jobId . '/meta.json';
file_put_contents($metaFile, json_encode($job->toMetaArray(), JSON_PRETTY_PRINT));
}
/**
* Find a job by ID across all status directories
*/
private function findJobById(string $jobId): ?MailJob {
// We need to search across all tenants and statuses
// This is inefficient - in production, consider caching or indexing
$tenantsDir = $this->storagePath;
if (!is_dir($tenantsDir)) {
return null;
}
foreach (scandir($tenantsDir) as $tenantId) {
if ($tenantId === '.' || $tenantId === '..') {
continue;
}
foreach ([self::DIR_PENDING, self::DIR_PROCESSING, self::DIR_COMPLETE, self::DIR_FAILED] as $status) {
$job = $this->loadJob($tenantId, $jobId, $status);
if ($job !== null) {
return $job;
}
}
}
return null;
}
/**
* Delete a job directory recursively
*/
private function deleteJobDir(string $dir): void {
if (!is_dir($dir)) {
return;
}
foreach (scandir($dir) as $file) {
if ($file === '.' || $file === '..') {
continue;
}
$path = $dir . '/' . $file;
is_dir($path) ? $this->deleteJobDir($path) : unlink($path);
}
rmdir($dir);
}
/**
* Convert JobStatus to directory name
*/
private function statusToDir(JobStatus $status): string {
return match($status) {
JobStatus::Pending => self::DIR_PENDING,
JobStatus::Processing => self::DIR_PROCESSING,
JobStatus::Complete => self::DIR_COMPLETE,
JobStatus::Failed => self::DIR_FAILED,
};
}
}