feat: streaming entities

Signed-off-by: Sebastian Krupinski <krupinski01@gmail.com>
This commit is contained in:
2026-02-21 10:06:07 -05:00
parent e5eeeeb546
commit f520b8e5ac
6 changed files with 323 additions and 24 deletions

View File

@@ -11,6 +11,8 @@ namespace KTXM\MailManager\Controllers;
use InvalidArgumentException; use InvalidArgumentException;
use KTXC\Http\Response\JsonResponse; use KTXC\Http\Response\JsonResponse;
use KTXC\Http\Response\Response;
use KTXC\Http\Response\StreamedNdJsonResponse;
use KTXC\SessionIdentity; use KTXC\SessionIdentity;
use KTXC\SessionTenant; use KTXC\SessionTenant;
use KTXF\Controller\ControllerAbstract; use KTXF\Controller\ControllerAbstract;
@@ -44,6 +46,8 @@ class DefaultController extends ControllerAbstract {
private const ERR_INVALID_IDENTIFIERS = 'Invalid parameter: identifiers must be an array'; private const ERR_INVALID_IDENTIFIERS = 'Invalid parameter: identifiers must be an array';
private const ERR_INVALID_DATA = 'Invalid parameter: data must be an array'; private const ERR_INVALID_DATA = 'Invalid parameter: data must be an array';
private const STREAM_FLUSH_INTERVAL = 1;
public function __construct( public function __construct(
private readonly SessionTenant $tenantIdentity, private readonly SessionTenant $tenantIdentity,
private readonly SessionIdentity $userIdentity, private readonly SessionIdentity $userIdentity,
@@ -62,7 +66,7 @@ class DefaultController extends ControllerAbstract {
* "data": {...} * "data": {...}
* } * }
* *
* @return JsonResponse * @return Response
*/ */
#[AuthenticatedRoute('/v1', name: 'mail.manager.v1', methods: ['POST'])] #[AuthenticatedRoute('/v1', name: 'mail.manager.v1', methods: ['POST'])]
public function index( public function index(
@@ -71,7 +75,7 @@ class DefaultController extends ControllerAbstract {
string|null $operation = null, string|null $operation = null,
array|null $data = null, array|null $data = null,
string|null $user = null string|null $user = null
): JsonResponse { ): Response {
// authorize request // authorize request
$tenantId = $this->tenantIdentity->identifier(); $tenantId = $this->tenantIdentity->identifier();
@@ -80,7 +84,12 @@ class DefaultController extends ControllerAbstract {
try { try {
if ($operation !== null) { if ($operation !== null) {
$result = $this->processOperation($tenantId, $userId, $operation, $data ?? [], []); $result = $this->processOperation($tenantId, $userId, $operation, $data ?? [], $version, $transaction);
if ($result instanceof Response) {
return $result;
}
return new JsonResponse([ return new JsonResponse([
'version' => $version, 'version' => $version,
'transaction' => $transaction, 'transaction' => $transaction,
@@ -110,7 +119,7 @@ class DefaultController extends ControllerAbstract {
/** /**
* Process a single operation * Process a single operation
*/ */
private function processOperation(string $tenantId, string $userId, string $operation, array $data): mixed { private function processOperation(string $tenantId, string $userId, string $operation, array $data, int $version = 1, string $transaction = ''): mixed {
return match ($operation) { return match ($operation) {
// Provider operations // Provider operations
'provider.list' => $this->providerList($tenantId, $userId, $data), 'provider.list' => $this->providerList($tenantId, $userId, $data),
@@ -144,6 +153,7 @@ class DefaultController extends ControllerAbstract {
'entity.create' => throw new InvalidArgumentException('Operation not implemented: ' . $operation), 'entity.create' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
'entity.update' => throw new InvalidArgumentException('Operation not implemented: ' . $operation), 'entity.update' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
'entity.delete' => throw new InvalidArgumentException('Operation not implemented: ' . $operation), 'entity.delete' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
'entity.stream' => $this->entityStream($tenantId, $userId, $data, $version, $transaction),
'entity.delta' => $this->entityDelta($tenantId, $userId, $data), 'entity.delta' => $this->entityDelta($tenantId, $userId, $data),
'entity.move' => throw new InvalidArgumentException('Operation not implemented: ' . $operation), 'entity.move' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
'entity.copy' => throw new InvalidArgumentException('Operation not implemented: ' . $operation), 'entity.copy' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
@@ -617,4 +627,48 @@ class DefaultController extends ControllerAbstract {
return ['jobId' => $jobId]; return ['jobId' => $jobId];
} }
// ==================== Entity Stream ====================
private function entityStream(string $tenantId, string $userId, array $data, int $version, string $transaction): StreamedNdJsonResponse {
if (!isset($data['sources'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SOURCES);
}
if (!is_array($data['sources'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SOURCES);
}
$sources = new SourceSelector();
$sources->jsonDeserialize($data['sources']);
$filter = $data['filter'] ?? null;
$sort = $data['sort'] ?? null;
$range = $data['range'] ?? null;
$entityGenerator = $this->mailManager->entityStream($tenantId, $userId, $sources, $filter, $sort, $range);
$logger = $this->logger;
$lines = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator {
yield ['type' => 'meta', 'version' => $version, 'transaction' => $transaction, 'status' => 'success'];
$total = 0;
try {
foreach ($entityGenerator as $entity) {
$entityData = is_array($entity)
? $entity
: (method_exists($entity, 'jsonSerialize') ? $entity->jsonSerialize() : (array) $entity);
yield ['type' => 'entity'] + $entityData;
$total++;
}
} catch (\Throwable $t) {
$logger->error('Error streaming entities', ['exception' => $t]);
yield ['type' => 'error', 'message' => $t->getMessage()];
return;
}
yield ['type' => 'done', 'total' => $total];
})();
return new StreamedNdJsonResponse($lines, self::STREAM_FLUSH_INTERVAL, 200, ['Content-Type' => 'application/json']);
}
} }

View File

@@ -763,6 +763,83 @@ class Manager {
return $responseData; return $responseData;
} }
/**
* Stream entities
*
* @since 2026.02.01
*
* @param string $tenantId Tenant identifier
* @param string $userId User identifier
* @param SourceSelector $sources Message sources with collection identifiers
* @param array|null $filter Message filter
* @param array|null $sort Message sort
* @param array|null $range Message range/pagination
*
* @return \Generator<EntityBaseInterface> Yields each entity as it is retrieved
*/
public function entityStream(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): \Generator {
// retrieve providers
$providers = $this->providerList($tenantId, $userId, $sources);
// retrieve services for each provider
foreach ($providers as $provider) {
$serviceSelector = $sources[$provider->identifier()];
$servicesSelected = $provider->serviceList($tenantId, $userId, $serviceSelector->identifiers());
/** @var ServiceBaseInterface $service */
foreach ($servicesSelected as $service) {
// retrieve collections for each service
$collectionSelector = $serviceSelector[$service->identifier()];
$collectionSelected = $collectionSelector instanceof CollectionSelector ? $collectionSelector->identifiers() : [];
if ($collectionSelected === []) {
$collections = $service->collectionList('');
$collectionSelected = array_map(
fn($collection) => $collection->identifier(),
$collections
);
}
if ($collectionSelected === []) {
continue;
}
// construct filter for entities
$entityFilter = null;
if ($filter !== null && $filter !== []) {
$entityFilter = $service->entityListFilter();
foreach ($filter as $attribute => $value) {
$entityFilter->condition($attribute, $value);
}
}
// construct sort for entities
$entitySort = null;
if ($sort !== null && $sort !== []) {
$entitySort = $service->entityListSort();
foreach ($sort as $attribute => $direction) {
$entitySort->condition($attribute, $direction);
}
}
// construct range for entities
$entityRange = null;
if ($range !== null && $range !== [] && isset($range['type'])) {
$entityRange = $service->entityListRange(RangeType::from($range['type']));
if ($entityRange->type() === RangeType::TALLY) {
/** @var IRangeTally $entityRange */
if (isset($range['anchor'])) {
$entityRange->setAnchor(RangeAnchorType::from($range['anchor']));
}
if (isset($range['position'])) {
$entityRange->setPosition($range['position']);
}
if (isset($range['tally'])) {
$entityRange->setTally($range['tally']);
}
}
}
// yield entities for each collection individually
foreach ($collectionSelected as $collectionId) {
yield from $service->entityListStream($collectionId, $entityFilter, $entitySort, $entityRange, null);
}
}
}
}
/** /**
* Fetch specific messages * Fetch specific messages

View File

@@ -2,7 +2,7 @@
* Entity management service * Entity management service
*/ */
import { transceivePost } from './transceive'; import { transceivePost, transceiveStream } from './transceive';
import type { import type {
EntityListRequest, EntityListRequest,
EntityListResponse, EntityListResponse,
@@ -21,6 +21,10 @@ import type {
EntityTransmitRequest, EntityTransmitRequest,
EntityTransmitResponse, EntityTransmitResponse,
EntityInterface, EntityInterface,
EntityStreamRequest,
EntityStreamLine,
EntityStreamEntityLine,
EntityStreamDoneLine,
} from '../types/entity'; } from '../types/entity';
import { useIntegrationStore } from '@KTXC/stores/integrationStore'; import { useIntegrationStore } from '@KTXC/stores/integrationStore';
import { EntityObject } from '../models'; import { EntityObject } from '../models';
@@ -157,6 +161,41 @@ export const entityService = {
async transmit(request: EntityTransmitRequest): Promise<EntityTransmitResponse> { async transmit(request: EntityTransmitRequest): Promise<EntityTransmitResponse> {
return await transceivePost<EntityTransmitRequest, EntityTransmitResponse>('entity.transmit', request); return await transceivePost<EntityTransmitRequest, EntityTransmitResponse>('entity.transmit', request);
}, },
/**
* Stream entities as NDJSON, invoking onEntity for each entity as it arrives.
*
* The server emits one entity per line so the caller receives entities
* progressively rather than waiting for the full collection to load.
*
* @param request - stream request parameters (same shape as list)
* @param onEntity - called synchronously for each entity as it is received
*
* @returns Promise resolving to { total } when the stream completes
*/
async stream(
request: EntityStreamRequest,
onEntity: (entity: EntityObject) => void
): Promise<{ total: number }> {
let total = 0;
await transceiveStream<EntityStreamRequest, EntityStreamLine>(
'entity.stream',
request,
(line) => {
if (line.type === 'entity') {
onEntity(createEntityObject(line as EntityStreamEntityLine));
} else if (line.type === 'done') {
total = (line as EntityStreamDoneLine).total;
} else if (line.type === 'error') {
throw new Error(`[entity.stream] ${line.message}`);
}
// 'meta' lines are silently consumed
}
);
return { total };
},
}; };
export default entityService; export default entityService;

View File

@@ -48,3 +48,68 @@ export async function transceivePost<TRequest, TResponse>(
return response.data; return response.data;
} }
/**
* Stream an NDJSON API response, calling onLine for each parsed line.
*
* The server emits one JSON object per line with a `type` discriminant
* (meta / entity / done / error). The caller interprets each line via onLine.
* Throwing inside onLine aborts the stream.
*
* @param operation - Operation name, e.g. 'entity.stream'
* @param data - Operation-specific request data
* @param onLine - Synchronous callback invoked for every parsed line.
* May throw to abort the stream.
* @param user - Optional user identifier override
*/
export async function transceiveStream<TRequest, TLine = any>(
operation: string,
data: TRequest,
onLine: (line: TLine) => void,
user?: string
): Promise<void> {
const request: ApiRequest<TRequest> = {
version: API_VERSION,
transaction: generateTransactionId(),
operation,
data,
user,
};
await fetchWrapper.post(API_URL, request, {
//headers: { 'Accept': 'application/x-ndjson' },
headers: { 'Accept': 'application/json' },
onStream: async (response) => {
if (!response.body) {
throw new Error(`[${operation}] Response body is not readable`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop()!; // retain any incomplete trailing chunk
for (const line of lines) {
if (!line.trim()) continue;
onLine(JSON.parse(line) as TLine);
}
}
// flush any remaining bytes still in the buffer
if (buffer.trim()) {
onLine(JSON.parse(buffer) as TLine);
}
} finally {
reader.releaseLock();
}
},
});
}

View File

@@ -6,7 +6,7 @@ import { ref, computed, readonly } from 'vue'
import { defineStore } from 'pinia' import { defineStore } from 'pinia'
import { entityService } from '../services' import { entityService } from '../services'
import { EntityObject } from '../models' import { EntityObject } from '../models'
import type { EntityTransmitRequest, EntityTransmitResponse } from '../types/entity' import type { EntityTransmitRequest, EntityTransmitResponse, EntityStreamRequest } from '../types/entity'
import type { SourceSelector, ListFilter, ListSort, ListRange } from '../types/common' import type { SourceSelector, ListFilter, ListSort, ListRange } from '../types/common'
export const useEntitiesStore = defineStore('mailEntitiesStore', () => { export const useEntitiesStore = defineStore('mailEntitiesStore', () => {
@@ -103,26 +103,16 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => {
async function list(sources?: SourceSelector, filter?: ListFilter, sort?: ListSort, range?: ListRange): Promise<Record<string, EntityObject>> { async function list(sources?: SourceSelector, filter?: ListFilter, sort?: ListSort, range?: ListRange): Promise<Record<string, EntityObject>> {
transceiving.value = true transceiving.value = true
try { try {
const response = await entityService.list({ sources, filter, sort, range }) const added: Record<string, EntityObject> = {}
// Flatten nested structure: provider:service:collection:entity -> "provider:service:collection:entity": object await entityService.stream({ sources, filter, sort, range }, (entity: EntityObject) => {
const entities: Record<string, EntityObject> = {} const key = identifierKey(entity.provider, entity.service, entity.collection, entity.identifier)
Object.entries(response).forEach(([providerId, providerServices]) => { _entities.value[key] = entity
Object.entries(providerServices).forEach(([serviceId, serviceCollections]) => { added[key] = entity
Object.entries(serviceCollections).forEach(([collectionId, collectionEntities]) => {
Object.entries(collectionEntities).forEach(([entityId, entityData]) => {
const key = identifierKey(providerId, serviceId, collectionId, entityId)
entities[key] = entityData
})
})
})
}) })
// Merge retrieved entities into state console.debug('[Mail Manager][Store] - Successfully retrieved', Object.keys(added).length, 'entities')
_entities.value = { ..._entities.value, ...entities } return added
console.debug('[Mail Manager][Store] - Successfully retrieved', Object.keys(entities).length, 'entities')
return entities
} catch (error: any) { } catch (error: any) {
console.error('[Mail Manager][Store] - Failed to retrieve entities:', error) console.error('[Mail Manager][Store] - Failed to retrieve entities:', error)
throw error throw error
@@ -346,6 +336,42 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => {
} }
} }
/**
* Stream entities progressively, merging each entity into the store as it arrives.
*
* Unlike list(), which waits for the full response before updating the store,
* stream() updates reactive state entity-by-entity so UI renders incrementally.
*
* @param sources - optional source selector
* @param filter - optional list filter
* @param sort - optional list sort
* @param range - optional list range
*
* @returns Promise resolving to { total } when the stream completes
*/
async function stream(
sources?: SourceSelector,
filter?: ListFilter,
sort?: ListSort,
range?: ListRange
): Promise<{ total: number }> {
transceiving.value = true
try {
const request: EntityStreamRequest = { sources, filter, sort, range }
const result = await entityService.stream(request, (entity: EntityObject) => {
const key = identifierKey(entity.provider, entity.service, entity.collection, entity.identifier)
_entities.value[key] = entity
})
console.debug('[Mail Manager][Store] - Successfully streamed', result.total, 'entities')
return result
} catch (error: any) {
console.error('[Mail Manager][Store] - Failed to stream entities:', error)
throw error
} finally {
transceiving.value = false
}
}
// Return public API // Return public API
return { return {
// State (readonly) // State (readonly)
@@ -365,5 +391,6 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => {
delete: remove, delete: remove,
delta, delta,
transmit, transmit,
stream,
} }
}) })

View File

@@ -157,4 +157,41 @@ export interface EntityTransmitRequest {
export interface EntityTransmitResponse { export interface EntityTransmitResponse {
id: string; id: string;
status: 'queued' | 'sent'; status: 'queued' | 'sent';
} }
/**
* Entity stream
*/
export interface EntityStreamRequest {
sources?: SourceSelector;
filter?: ListFilter;
sort?: ListSort;
range?: ListRange;
}
export interface EntityStreamMetaLine {
type: 'meta';
version: number;
transaction: string;
status: 'success';
}
export interface EntityStreamEntityLine extends EntityInterface<MessageInterface> {
type: 'entity';
}
export interface EntityStreamDoneLine {
type: 'done';
total: number;
}
export interface EntityStreamErrorLine {
type: 'error';
message: string;
}
export type EntityStreamLine =
| EntityStreamMetaLine
| EntityStreamEntityLine
| EntityStreamDoneLine
| EntityStreamErrorLine;