diff --git a/lib/Controllers/DefaultController.php b/lib/Controllers/DefaultController.php index 83638d2..43ab463 100644 --- a/lib/Controllers/DefaultController.php +++ b/lib/Controllers/DefaultController.php @@ -42,23 +42,17 @@ class DefaultController extends ControllerAbstract { private const ERR_MISSING_PROVIDER = 'Missing parameter: provider'; private const ERR_MISSING_IDENTIFIER = 'Missing parameter: identifier'; private const ERR_MISSING_SERVICE = 'Missing parameter: service'; - private const ERR_MISSING_COLLECTION = 'Missing parameter: collection'; private const ERR_MISSING_DATA = 'Missing parameter: data'; - private const ERR_MISSING_SOURCE = 'Missing parameter: source'; private const ERR_MISSING_SOURCES = 'Missing parameter: sources'; private const ERR_MISSING_TARGET = 'Missing parameter: target'; private const ERR_MISSING_TARGETS = 'Missing parameter: targets'; - private const ERR_MISSING_IDENTIFIERS = 'Missing parameter: identifiers'; private const ERR_INVALID_OPERATION = 'Invalid operation: '; private const ERR_INVALID_PROVIDER = 'Invalid parameter: provider must be a string'; private const ERR_INVALID_SERVICE = 'Invalid parameter: service must be a string'; private const ERR_INVALID_IDENTIFIER = 'Invalid parameter: identifier must be a string'; - private const ERR_INVALID_COLLECTION = 'Invalid parameter: collection must be a string or integer'; private const ERR_INVALID_SOURCES = 'Invalid parameter: sources must be an array'; - private const ERR_INVALID_SOURCE = 'Invalid parameter: source must be a string'; private const ERR_INVALID_TARGET = 'Invalid parameter: target must be an array'; private const ERR_INVALID_TARGETS = 'Invalid parameter: targets must be an array'; - private const ERR_INVALID_IDENTIFIERS = 'Invalid parameter: identifiers must be an array'; private const ERR_INVALID_DATA = 'Invalid parameter: data must be an array'; private const STREAM_FLUSH_INTERVAL = 1; @@ -162,8 +156,8 @@ class DefaultController extends ControllerAbstract { 'collection.move' => $this->collectionMove($tenantId, $userId, $data), // Entity operations - 'entity.list' => $this->entityList($tenantId, $userId, $data), - 'entity.stream' => $this->entityStream($tenantId, $userId, $data, $version, $transaction), + 'entity.listBulk' => $this->entityListBulk($tenantId, $userId, $data), + 'entity.listStream' => $this->entityListStream($tenantId, $userId, $data, $version, $transaction), 'entity.fetch' => $this->entityFetch($tenantId, $userId, $data), 'entity.extant' => $this->entityExtant($tenantId, $userId, $data), 'entity.delta' => $this->entityDelta($tenantId, $userId, $data), @@ -608,7 +602,7 @@ class DefaultController extends ControllerAbstract { // ==================== Entity Operations ==================== - private function entityList(string $tenantId, string $userId, array $data): mixed { + private function entityListBulk(string $tenantId, string $userId, array $data): mixed { if (!isset($data['sources'])) { throw new InvalidArgumentException(self::ERR_MISSING_SOURCES); } @@ -629,11 +623,11 @@ class DefaultController extends ControllerAbstract { $sort = $data['sort'] ?? null; $range = $data['range'] ?? null; - return $this->mailManager->entityList($tenantId, $userId, $sources, $filter, $sort, $range); + return $this->mailManager->entityListBulk($tenantId, $userId, $sources, $filter, $sort, $range); } - private function entityStream(string $tenantId, string $userId, array $data, int $version, string $transaction): StreamedNdJsonResponse { + private function entityListStream(string $tenantId, string $userId, array $data, int $version, string $transaction): StreamedNdJsonResponse { if (!isset($data['sources'])) { throw new InvalidArgumentException(self::ERR_MISSING_SOURCES); } @@ -654,7 +648,7 @@ class DefaultController extends ControllerAbstract { $sort = $data['sort'] ?? null; $range = $data['range'] ?? null; - $entityGenerator = $this->mailManager->entityStream($tenantId, $userId, $sources, $filter, $sort, $range); + $entityGenerator = $this->mailManager->entityListStream($tenantId, $userId, $sources, $filter, $sort, $range); $logger = $this->logger; $responseGenerator = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator { @@ -729,38 +723,24 @@ class DefaultController extends ControllerAbstract { } private function entityFetch(string $tenantId, string $userId, array $data): mixed { - if (!isset($data['provider'])) { - throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER); + if (!isset($data['targets'])) { + throw new InvalidArgumentException(self::ERR_MISSING_TARGETS); } - if (!is_string($data['provider'])) { - throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER); + if (!is_array($data['targets'])) { + throw new InvalidArgumentException(self::ERR_INVALID_TARGETS); } - if (!isset($data['service'])) { - throw new InvalidArgumentException(self::ERR_MISSING_SERVICE); - } - if (!is_string($data['service'])) { - throw new InvalidArgumentException(self::ERR_INVALID_SERVICE); - } - if (!isset($data['collection'])) { - throw new InvalidArgumentException(self::ERR_MISSING_COLLECTION); - } - if (!is_string($data['collection']) && !is_int($data['collection'])) { - throw new InvalidArgumentException(self::ERR_INVALID_COLLECTION); - } - if (!isset($data['identifiers'])) { - throw new InvalidArgumentException(self::ERR_MISSING_IDENTIFIERS); - } - if (!is_array($data['identifiers'])) { - throw new InvalidArgumentException(self::ERR_INVALID_IDENTIFIERS); + + $targets = ResourceIdentifiers::fromArray($data['targets']); + foreach ($targets as $target) { + if (!$target instanceof EntityIdentifier) { + throw new InvalidArgumentException('Invalid parameter: targets must contain provider:service:collection:entity identifiers'); + } } - return $this->mailManager->entityFetch( + return $this->mailManager->entityFetchBulk( $tenantId, $userId, - $data['provider'], - $data['service'], - $data['collection'], - $data['identifiers'] + ...$targets->all() ); } diff --git a/lib/Manager.php b/lib/Manager.php index 9dbd9f9..7e87034 100644 --- a/lib/Manager.php +++ b/lib/Manager.php @@ -731,7 +731,7 @@ class Manager { * * @return array>>> Messages grouped by provider/service/collection */ - public function entityList(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): array { + public function entityListBulk(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): array { // retrieve providers $providers = $this->providerList($tenantId, $userId, $sources); // retrieve services for each provider @@ -822,7 +822,7 @@ class Manager { * * @return \Generator Yields each entity as it is retrieved */ - public function entityStream(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): \Generator { + public function entityListStream(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): \Generator { // retrieve providers $providers = $this->providerList($tenantId, $userId, $sources); // retrieve services for each provider @@ -890,26 +890,65 @@ class Manager { } /** - * Fetch specific messages + * Fetch specific messages in bulk * * @since 2025.05.01 * * @param string $tenantId Tenant identifier * @param string|null $userId User identifier for context - * @param string $providerId Provider identifier - * @param string|int $serviceId Service identifier - * @param string|int $collectionId Collection identifier - * @param array $identifiers Message identifiers + * @param EntityIdentifier ...$identifiers Specific message identifiers to fetch * - * @return array Messages indexed by ID + * @return array */ - public function entityFetch(string $tenantId, ?string $userId, string $providerId, string|int $serviceId, string|int $collectionId, array $identifiers): array { - $service = $this->serviceFetch($tenantId, $userId, $providerId, $serviceId); - if ($service->getEnabled() === false) { - throw new InvalidArgumentException("Service '{$providerId}:{$serviceId}' not found or is disabled"); + public function entityFetchBulk(string $tenantId, ?string $userId, EntityIdentifier ...$identifiers): array { + // group identifiers by provider/service + $groupedIdentifiers = []; + foreach ($identifiers as $identifier) { + $groupedIdentifiers[$identifier->provider()][$identifier->service()][] = $identifier->entity(); + } + // retrieve each service and fetch entities + $list = []; + foreach ($groupedIdentifiers as $providerId => $services) { + foreach ($services as $serviceId => $entities) { + $service = $this->serviceFetch($tenantId, $userId, $providerId, $serviceId); + if ($service->getEnabled() === false) { + throw new InvalidArgumentException("Service '{$providerId}:{$serviceId}' not found or is disabled"); + } + // retrieve entities and merge into list + $list = array_merge($list, $service->entityFetchBulk(...$entities)); + } + } + return $list; + } + + /** + * Fetch specific messages as a stream + * + * @since 2025.05.01 + * + * @param string $tenantId Tenant identifier + * @param string|null $userId User identifier for context + * @param EntityIdentifier ...$identifiers Specific message identifiers to fetch + * + * @return \Generator + */ + public function entityFetchStream(string $tenantId, ?string $userId, EntityIdentifier ...$identifiers): \Generator { + // group identifiers by provider/service + $groupedIdentifiers = []; + foreach ($identifiers as $identifier) { + $groupedIdentifiers[$identifier->provider()][$identifier->service()][] = $identifier->entity(); + } + // retrieve each service and fetch entities + foreach ($groupedIdentifiers as $providerId => $services) { + foreach ($services as $serviceId => $entities) { + $service = $this->serviceFetch($tenantId, $userId, $providerId, $serviceId); + if ($service->getEnabled() === false) { + throw new InvalidArgumentException("Service '{$providerId}:{$serviceId}' not found or is disabled"); + } + // retrieve entities and yield each one + yield from $service->entityFetchStream(...$entities); + } } - // retrieve collection - return $service->entityFetch($collectionId, ...$identifiers); } /** diff --git a/src/services/entityService.ts b/src/services/entityService.ts index 055f2bd..4d81abb 100644 --- a/src/services/entityService.ts +++ b/src/services/entityService.ts @@ -4,8 +4,6 @@ import { transceivePost, transceiveStream } from './transceive'; import type { - EntityListRequest, - EntityListResponse, EntityFetchRequest, EntityFetchResponse, EntityExtantRequest, @@ -23,8 +21,10 @@ import type { EntityTransmitRequest, EntityTransmitResponse, EntityInterface, - EntityStreamRequest, - EntityStreamResponse, + EntityListStreamResponse, + EntityListStreamRequest, + EntityListBulkResponse, + EntityListBulkRequest, } from '../types/entity'; import { useIntegrationStore } from '@KTXC/stores/integrationStore'; import { EntityObject } from '../models'; @@ -51,8 +51,8 @@ export const entityService = { * * @returns Promise with entity object list grouped by provider, service, collection, and entity identifier */ - async list(request: EntityListRequest = {}): Promise>>>> { - const response = await transceivePost('entity.list', request); + async listBulk(request: EntityListBulkRequest = {}): Promise>>>> { + const response = await transceivePost('entity.listBulk', request); // Convert nested response to EntityObject instances const providerList: Record>>> = {}; @@ -75,6 +75,27 @@ export const entityService = { return providerList; }, + /** + * Stream entities as NDJSON, invoking onEntity for each entity as it arrives. + * + * The server emits one entity per line so the caller receives entities + * progressively rather than waiting for the full collection to load. + * + * @param request - stream request parameters (same shape as list) + * @param onEntity - called synchronously for each entity as it is received + * + * @returns Promise resolving to { total } when the stream completes + */ + async listStream(request: EntityListStreamRequest, onEntity: (entity: EntityObject) => void): Promise<{ total: number }> { + return await transceiveStream( + 'entity.listStream', + request, + (entity) => { + onEntity(createEntityObject(entity)); + } + ); + }, + /** * Retrieve a specific entity by provider and identifier * @@ -172,27 +193,6 @@ export const entityService = { async transmit(request: EntityTransmitRequest): Promise { return await transceivePost('entity.transmit', request); }, - - /** - * Stream entities as NDJSON, invoking onEntity for each entity as it arrives. - * - * The server emits one entity per line so the caller receives entities - * progressively rather than waiting for the full collection to load. - * - * @param request - stream request parameters (same shape as list) - * @param onEntity - called synchronously for each entity as it is received - * - * @returns Promise resolving to { total } when the stream completes - */ - async stream(request: EntityStreamRequest, onEntity: (entity: EntityObject) => void): Promise<{ total: number }> { - return await transceiveStream( - 'entity.stream', - request, - (entity) => { - onEntity(createEntityObject(entity)); - } - ); - }, }; export default entityService; diff --git a/src/stores/entitiesStore.ts b/src/stores/entitiesStore.ts index b32ec3e..ba9290a 100644 --- a/src/stores/entitiesStore.ts +++ b/src/stores/entitiesStore.ts @@ -100,7 +100,7 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => { try { const entities: Record = {} - await entityService.stream({ sources, filter, sort, range }, (entity: EntityObject) => { + await entityService.listStream({ sources, filter, sort, range }, (entity: EntityObject) => { _entities.value[entity.identifier] = entity entities[entity.identifier] = entity }) diff --git a/src/types/entity.ts b/src/types/entity.ts index 04b9004..4a4b539 100644 --- a/src/types/entity.ts +++ b/src/types/entity.ts @@ -29,16 +29,16 @@ export interface EntityInterface { export interface EntityModelInterface extends Omit, '@type' | 'version'> {} /** - * Entity list + * Entity list bulk */ -export interface EntityListRequest { +export interface EntityListBulkRequest { sources?: CollectionIdentifier[]; filter?: ListFilter; sort?: ListSort; range?: ListRange; } -export interface EntityListResponse { +export interface EntityListBulkResponse { [providerId: string]: { [serviceId: string]: { [collectionId: string]: { @@ -48,6 +48,18 @@ export interface EntityListResponse { }; } +/** + * Entity list stream + */ +export interface EntityListStreamRequest { + sources?: CollectionIdentifier[]; + filter?: ListFilter; + sort?: ListSort; + range?: ListRange; +} + +export interface EntityListStreamResponse extends EntityInterface {} + /** * Entity fetch */ @@ -178,16 +190,4 @@ export interface EntityTransmitRequest { export interface EntityTransmitResponse { id: string; status: 'queued' | 'sent'; -} - -/** - * Entity stream - */ -export interface EntityStreamRequest { - sources?: CollectionIdentifier[]; - filter?: ListFilter; - sort?: ListSort; - range?: ListRange; -} - -export interface EntityStreamResponse extends EntityInterface {} \ No newline at end of file +} \ No newline at end of file