From f520b8e5accca4208a05726ed5a89ad39d93f6f7 Mon Sep 17 00:00:00 2001 From: Sebastian Krupinski Date: Sat, 21 Feb 2026 10:06:07 -0500 Subject: [PATCH] feat: streaming entities Signed-off-by: Sebastian Krupinski --- lib/Controllers/DefaultController.php | 62 +++++++++++++++++++-- lib/Manager.php | 77 +++++++++++++++++++++++++++ src/services/entityService.ts | 41 +++++++++++++- src/services/transceive.ts | 65 ++++++++++++++++++++++ src/stores/entitiesStore.ts | 63 +++++++++++++++------- src/types/entity.ts | 39 +++++++++++++- 6 files changed, 323 insertions(+), 24 deletions(-) diff --git a/lib/Controllers/DefaultController.php b/lib/Controllers/DefaultController.php index bca46b9..8340a28 100644 --- a/lib/Controllers/DefaultController.php +++ b/lib/Controllers/DefaultController.php @@ -11,6 +11,8 @@ namespace KTXM\MailManager\Controllers; use InvalidArgumentException; use KTXC\Http\Response\JsonResponse; +use KTXC\Http\Response\Response; +use KTXC\Http\Response\StreamedNdJsonResponse; use KTXC\SessionIdentity; use KTXC\SessionTenant; use KTXF\Controller\ControllerAbstract; @@ -44,6 +46,8 @@ class DefaultController extends ControllerAbstract { private const ERR_INVALID_IDENTIFIERS = 'Invalid parameter: identifiers must be an array'; private const ERR_INVALID_DATA = 'Invalid parameter: data must be an array'; + private const STREAM_FLUSH_INTERVAL = 1; + public function __construct( private readonly SessionTenant $tenantIdentity, private readonly SessionIdentity $userIdentity, @@ -62,7 +66,7 @@ class DefaultController extends ControllerAbstract { * "data": {...} * } * - * @return JsonResponse + * @return Response */ #[AuthenticatedRoute('/v1', name: 'mail.manager.v1', methods: ['POST'])] public function index( @@ -71,7 +75,7 @@ class DefaultController extends ControllerAbstract { string|null $operation = null, array|null $data = null, string|null $user = null - ): JsonResponse { + ): Response { // authorize request $tenantId = $this->tenantIdentity->identifier(); @@ -80,7 +84,12 @@ class DefaultController extends ControllerAbstract { try { if ($operation !== null) { - $result = $this->processOperation($tenantId, $userId, $operation, $data ?? [], []); + $result = $this->processOperation($tenantId, $userId, $operation, $data ?? [], $version, $transaction); + + if ($result instanceof Response) { + return $result; + } + return new JsonResponse([ 'version' => $version, 'transaction' => $transaction, @@ -110,7 +119,7 @@ class DefaultController extends ControllerAbstract { /** * Process a single operation */ - private function processOperation(string $tenantId, string $userId, string $operation, array $data): mixed { + private function processOperation(string $tenantId, string $userId, string $operation, array $data, int $version = 1, string $transaction = ''): mixed { return match ($operation) { // Provider operations 'provider.list' => $this->providerList($tenantId, $userId, $data), @@ -144,6 +153,7 @@ class DefaultController extends ControllerAbstract { 'entity.create' => throw new InvalidArgumentException('Operation not implemented: ' . $operation), 'entity.update' => throw new InvalidArgumentException('Operation not implemented: ' . $operation), 'entity.delete' => throw new InvalidArgumentException('Operation not implemented: ' . $operation), + 'entity.stream' => $this->entityStream($tenantId, $userId, $data, $version, $transaction), 'entity.delta' => $this->entityDelta($tenantId, $userId, $data), 'entity.move' => throw new InvalidArgumentException('Operation not implemented: ' . $operation), 'entity.copy' => throw new InvalidArgumentException('Operation not implemented: ' . $operation), @@ -617,4 +627,48 @@ class DefaultController extends ControllerAbstract { return ['jobId' => $jobId]; } + // ==================== Entity Stream ==================== + + private function entityStream(string $tenantId, string $userId, array $data, int $version, string $transaction): StreamedNdJsonResponse { + if (!isset($data['sources'])) { + throw new InvalidArgumentException(self::ERR_MISSING_SOURCES); + } + if (!is_array($data['sources'])) { + throw new InvalidArgumentException(self::ERR_INVALID_SOURCES); + } + + $sources = new SourceSelector(); + $sources->jsonDeserialize($data['sources']); + + $filter = $data['filter'] ?? null; + $sort = $data['sort'] ?? null; + $range = $data['range'] ?? null; + + $entityGenerator = $this->mailManager->entityStream($tenantId, $userId, $sources, $filter, $sort, $range); + $logger = $this->logger; + + $lines = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator { + yield ['type' => 'meta', 'version' => $version, 'transaction' => $transaction, 'status' => 'success']; + + $total = 0; + try { + foreach ($entityGenerator as $entity) { + $entityData = is_array($entity) + ? $entity + : (method_exists($entity, 'jsonSerialize') ? $entity->jsonSerialize() : (array) $entity); + yield ['type' => 'entity'] + $entityData; + $total++; + } + } catch (\Throwable $t) { + $logger->error('Error streaming entities', ['exception' => $t]); + yield ['type' => 'error', 'message' => $t->getMessage()]; + return; + } + + yield ['type' => 'done', 'total' => $total]; + })(); + + return new StreamedNdJsonResponse($lines, self::STREAM_FLUSH_INTERVAL, 200, ['Content-Type' => 'application/json']); + } + } diff --git a/lib/Manager.php b/lib/Manager.php index c6beb1f..cf864ee 100644 --- a/lib/Manager.php +++ b/lib/Manager.php @@ -763,6 +763,83 @@ class Manager { return $responseData; } + + /** + * Stream entities + * + * @since 2026.02.01 + * + * @param string $tenantId Tenant identifier + * @param string $userId User identifier + * @param SourceSelector $sources Message sources with collection identifiers + * @param array|null $filter Message filter + * @param array|null $sort Message sort + * @param array|null $range Message range/pagination + * + * @return \Generator Yields each entity as it is retrieved + */ + public function entityStream(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): \Generator { + // retrieve providers + $providers = $this->providerList($tenantId, $userId, $sources); + // retrieve services for each provider + foreach ($providers as $provider) { + $serviceSelector = $sources[$provider->identifier()]; + $servicesSelected = $provider->serviceList($tenantId, $userId, $serviceSelector->identifiers()); + /** @var ServiceBaseInterface $service */ + foreach ($servicesSelected as $service) { + // retrieve collections for each service + $collectionSelector = $serviceSelector[$service->identifier()]; + $collectionSelected = $collectionSelector instanceof CollectionSelector ? $collectionSelector->identifiers() : []; + if ($collectionSelected === []) { + $collections = $service->collectionList(''); + $collectionSelected = array_map( + fn($collection) => $collection->identifier(), + $collections + ); + } + if ($collectionSelected === []) { + continue; + } + // construct filter for entities + $entityFilter = null; + if ($filter !== null && $filter !== []) { + $entityFilter = $service->entityListFilter(); + foreach ($filter as $attribute => $value) { + $entityFilter->condition($attribute, $value); + } + } + // construct sort for entities + $entitySort = null; + if ($sort !== null && $sort !== []) { + $entitySort = $service->entityListSort(); + foreach ($sort as $attribute => $direction) { + $entitySort->condition($attribute, $direction); + } + } + // construct range for entities + $entityRange = null; + if ($range !== null && $range !== [] && isset($range['type'])) { + $entityRange = $service->entityListRange(RangeType::from($range['type'])); + if ($entityRange->type() === RangeType::TALLY) { + /** @var IRangeTally $entityRange */ + if (isset($range['anchor'])) { + $entityRange->setAnchor(RangeAnchorType::from($range['anchor'])); + } + if (isset($range['position'])) { + $entityRange->setPosition($range['position']); + } + if (isset($range['tally'])) { + $entityRange->setTally($range['tally']); + } + } + } + // yield entities for each collection individually + foreach ($collectionSelected as $collectionId) { + yield from $service->entityListStream($collectionId, $entityFilter, $entitySort, $entityRange, null); + } + } + } + } /** * Fetch specific messages diff --git a/src/services/entityService.ts b/src/services/entityService.ts index bbefddb..6e59515 100644 --- a/src/services/entityService.ts +++ b/src/services/entityService.ts @@ -2,7 +2,7 @@ * Entity management service */ -import { transceivePost } from './transceive'; +import { transceivePost, transceiveStream } from './transceive'; import type { EntityListRequest, EntityListResponse, @@ -21,6 +21,10 @@ import type { EntityTransmitRequest, EntityTransmitResponse, EntityInterface, + EntityStreamRequest, + EntityStreamLine, + EntityStreamEntityLine, + EntityStreamDoneLine, } from '../types/entity'; import { useIntegrationStore } from '@KTXC/stores/integrationStore'; import { EntityObject } from '../models'; @@ -157,6 +161,41 @@ export const entityService = { async transmit(request: EntityTransmitRequest): Promise { return await transceivePost('entity.transmit', request); }, + + /** + * Stream entities as NDJSON, invoking onEntity for each entity as it arrives. + * + * The server emits one entity per line so the caller receives entities + * progressively rather than waiting for the full collection to load. + * + * @param request - stream request parameters (same shape as list) + * @param onEntity - called synchronously for each entity as it is received + * + * @returns Promise resolving to { total } when the stream completes + */ + async stream( + request: EntityStreamRequest, + onEntity: (entity: EntityObject) => void + ): Promise<{ total: number }> { + let total = 0; + + await transceiveStream( + 'entity.stream', + request, + (line) => { + if (line.type === 'entity') { + onEntity(createEntityObject(line as EntityStreamEntityLine)); + } else if (line.type === 'done') { + total = (line as EntityStreamDoneLine).total; + } else if (line.type === 'error') { + throw new Error(`[entity.stream] ${line.message}`); + } + // 'meta' lines are silently consumed + } + ); + + return { total }; + }, }; export default entityService; diff --git a/src/services/transceive.ts b/src/services/transceive.ts index 1fe51cf..7744751 100644 --- a/src/services/transceive.ts +++ b/src/services/transceive.ts @@ -48,3 +48,68 @@ export async function transceivePost( return response.data; } + +/** + * Stream an NDJSON API response, calling onLine for each parsed line. + * + * The server emits one JSON object per line with a `type` discriminant + * (meta / entity / done / error). The caller interprets each line via onLine. + * Throwing inside onLine aborts the stream. + * + * @param operation - Operation name, e.g. 'entity.stream' + * @param data - Operation-specific request data + * @param onLine - Synchronous callback invoked for every parsed line. + * May throw to abort the stream. + * @param user - Optional user identifier override + */ +export async function transceiveStream( + operation: string, + data: TRequest, + onLine: (line: TLine) => void, + user?: string +): Promise { + const request: ApiRequest = { + version: API_VERSION, + transaction: generateTransactionId(), + operation, + data, + user, + }; + + await fetchWrapper.post(API_URL, request, { + //headers: { 'Accept': 'application/x-ndjson' }, + headers: { 'Accept': 'application/json' }, + onStream: async (response) => { + if (!response.body) { + throw new Error(`[${operation}] Response body is not readable`); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop()!; // retain any incomplete trailing chunk + + for (const line of lines) { + if (!line.trim()) continue; + onLine(JSON.parse(line) as TLine); + } + } + + // flush any remaining bytes still in the buffer + if (buffer.trim()) { + onLine(JSON.parse(buffer) as TLine); + } + } finally { + reader.releaseLock(); + } + }, + }); +} diff --git a/src/stores/entitiesStore.ts b/src/stores/entitiesStore.ts index 4740392..323dcb4 100644 --- a/src/stores/entitiesStore.ts +++ b/src/stores/entitiesStore.ts @@ -6,7 +6,7 @@ import { ref, computed, readonly } from 'vue' import { defineStore } from 'pinia' import { entityService } from '../services' import { EntityObject } from '../models' -import type { EntityTransmitRequest, EntityTransmitResponse } from '../types/entity' +import type { EntityTransmitRequest, EntityTransmitResponse, EntityStreamRequest } from '../types/entity' import type { SourceSelector, ListFilter, ListSort, ListRange } from '../types/common' export const useEntitiesStore = defineStore('mailEntitiesStore', () => { @@ -103,26 +103,16 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => { async function list(sources?: SourceSelector, filter?: ListFilter, sort?: ListSort, range?: ListRange): Promise> { transceiving.value = true try { - const response = await entityService.list({ sources, filter, sort, range }) + const added: Record = {} - // Flatten nested structure: provider:service:collection:entity -> "provider:service:collection:entity": object - const entities: Record = {} - Object.entries(response).forEach(([providerId, providerServices]) => { - Object.entries(providerServices).forEach(([serviceId, serviceCollections]) => { - Object.entries(serviceCollections).forEach(([collectionId, collectionEntities]) => { - Object.entries(collectionEntities).forEach(([entityId, entityData]) => { - const key = identifierKey(providerId, serviceId, collectionId, entityId) - entities[key] = entityData - }) - }) - }) + await entityService.stream({ sources, filter, sort, range }, (entity: EntityObject) => { + const key = identifierKey(entity.provider, entity.service, entity.collection, entity.identifier) + _entities.value[key] = entity + added[key] = entity }) - // Merge retrieved entities into state - _entities.value = { ..._entities.value, ...entities } - - console.debug('[Mail Manager][Store] - Successfully retrieved', Object.keys(entities).length, 'entities') - return entities + console.debug('[Mail Manager][Store] - Successfully retrieved', Object.keys(added).length, 'entities') + return added } catch (error: any) { console.error('[Mail Manager][Store] - Failed to retrieve entities:', error) throw error @@ -346,6 +336,42 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => { } } + /** + * Stream entities progressively, merging each entity into the store as it arrives. + * + * Unlike list(), which waits for the full response before updating the store, + * stream() updates reactive state entity-by-entity so UI renders incrementally. + * + * @param sources - optional source selector + * @param filter - optional list filter + * @param sort - optional list sort + * @param range - optional list range + * + * @returns Promise resolving to { total } when the stream completes + */ + async function stream( + sources?: SourceSelector, + filter?: ListFilter, + sort?: ListSort, + range?: ListRange + ): Promise<{ total: number }> { + transceiving.value = true + try { + const request: EntityStreamRequest = { sources, filter, sort, range } + const result = await entityService.stream(request, (entity: EntityObject) => { + const key = identifierKey(entity.provider, entity.service, entity.collection, entity.identifier) + _entities.value[key] = entity + }) + console.debug('[Mail Manager][Store] - Successfully streamed', result.total, 'entities') + return result + } catch (error: any) { + console.error('[Mail Manager][Store] - Failed to stream entities:', error) + throw error + } finally { + transceiving.value = false + } + } + // Return public API return { // State (readonly) @@ -365,5 +391,6 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => { delete: remove, delta, transmit, + stream, } }) diff --git a/src/types/entity.ts b/src/types/entity.ts index 87690c5..1cb8b9e 100644 --- a/src/types/entity.ts +++ b/src/types/entity.ts @@ -157,4 +157,41 @@ export interface EntityTransmitRequest { export interface EntityTransmitResponse { id: string; status: 'queued' | 'sent'; -} \ No newline at end of file +} + +/** + * Entity stream + */ +export interface EntityStreamRequest { + sources?: SourceSelector; + filter?: ListFilter; + sort?: ListSort; + range?: ListRange; +} + +export interface EntityStreamMetaLine { + type: 'meta'; + version: number; + transaction: string; + status: 'success'; +} + +export interface EntityStreamEntityLine extends EntityInterface { + type: 'entity'; +} + +export interface EntityStreamDoneLine { + type: 'done'; + total: number; +} + +export interface EntityStreamErrorLine { + type: 'error'; + message: string; +} + +export type EntityStreamLine = + | EntityStreamMetaLine + | EntityStreamEntityLine + | EntityStreamDoneLine + | EntityStreamErrorLine; \ No newline at end of file