refactor: list and fetch #23

Merged
Sebastian merged 1 commits from refactor/clean-up-list-and-fetch into main 2026-05-17 21:55:46 +00:00
5 changed files with 115 additions and 96 deletions

View File

@@ -42,23 +42,17 @@ class DefaultController extends ControllerAbstract {
private const ERR_MISSING_PROVIDER = 'Missing parameter: provider'; private const ERR_MISSING_PROVIDER = 'Missing parameter: provider';
private const ERR_MISSING_IDENTIFIER = 'Missing parameter: identifier'; private const ERR_MISSING_IDENTIFIER = 'Missing parameter: identifier';
private const ERR_MISSING_SERVICE = 'Missing parameter: service'; private const ERR_MISSING_SERVICE = 'Missing parameter: service';
private const ERR_MISSING_COLLECTION = 'Missing parameter: collection';
private const ERR_MISSING_DATA = 'Missing parameter: data'; private const ERR_MISSING_DATA = 'Missing parameter: data';
private const ERR_MISSING_SOURCE = 'Missing parameter: source';
private const ERR_MISSING_SOURCES = 'Missing parameter: sources'; private const ERR_MISSING_SOURCES = 'Missing parameter: sources';
private const ERR_MISSING_TARGET = 'Missing parameter: target'; private const ERR_MISSING_TARGET = 'Missing parameter: target';
private const ERR_MISSING_TARGETS = 'Missing parameter: targets'; private const ERR_MISSING_TARGETS = 'Missing parameter: targets';
private const ERR_MISSING_IDENTIFIERS = 'Missing parameter: identifiers';
private const ERR_INVALID_OPERATION = 'Invalid operation: '; private const ERR_INVALID_OPERATION = 'Invalid operation: ';
private const ERR_INVALID_PROVIDER = 'Invalid parameter: provider must be a string'; private const ERR_INVALID_PROVIDER = 'Invalid parameter: provider must be a string';
private const ERR_INVALID_SERVICE = 'Invalid parameter: service must be a string'; private const ERR_INVALID_SERVICE = 'Invalid parameter: service must be a string';
private const ERR_INVALID_IDENTIFIER = 'Invalid parameter: identifier must be a string'; private const ERR_INVALID_IDENTIFIER = 'Invalid parameter: identifier must be a string';
private const ERR_INVALID_COLLECTION = 'Invalid parameter: collection must be a string or integer';
private const ERR_INVALID_SOURCES = 'Invalid parameter: sources must be an array'; private const ERR_INVALID_SOURCES = 'Invalid parameter: sources must be an array';
private const ERR_INVALID_SOURCE = 'Invalid parameter: source must be a string';
private const ERR_INVALID_TARGET = 'Invalid parameter: target must be an array'; private const ERR_INVALID_TARGET = 'Invalid parameter: target must be an array';
private const ERR_INVALID_TARGETS = 'Invalid parameter: targets must be an array'; private const ERR_INVALID_TARGETS = 'Invalid parameter: targets must be an array';
private const ERR_INVALID_IDENTIFIERS = 'Invalid parameter: identifiers must be an array';
private const ERR_INVALID_DATA = 'Invalid parameter: data must be an array'; private const ERR_INVALID_DATA = 'Invalid parameter: data must be an array';
private const STREAM_FLUSH_INTERVAL = 1; private const STREAM_FLUSH_INTERVAL = 1;
@@ -162,8 +156,8 @@ class DefaultController extends ControllerAbstract {
'collection.move' => $this->collectionMove($tenantId, $userId, $data), 'collection.move' => $this->collectionMove($tenantId, $userId, $data),
// Entity operations // Entity operations
'entity.list' => $this->entityList($tenantId, $userId, $data), 'entity.listBulk' => $this->entityListBulk($tenantId, $userId, $data),
'entity.stream' => $this->entityStream($tenantId, $userId, $data, $version, $transaction), 'entity.listStream' => $this->entityListStream($tenantId, $userId, $data, $version, $transaction),
'entity.fetch' => $this->entityFetch($tenantId, $userId, $data), 'entity.fetch' => $this->entityFetch($tenantId, $userId, $data),
'entity.extant' => $this->entityExtant($tenantId, $userId, $data), 'entity.extant' => $this->entityExtant($tenantId, $userId, $data),
'entity.delta' => $this->entityDelta($tenantId, $userId, $data), 'entity.delta' => $this->entityDelta($tenantId, $userId, $data),
@@ -608,7 +602,7 @@ class DefaultController extends ControllerAbstract {
// ==================== Entity Operations ==================== // ==================== Entity Operations ====================
private function entityList(string $tenantId, string $userId, array $data): mixed { private function entityListBulk(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['sources'])) { if (!isset($data['sources'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SOURCES); throw new InvalidArgumentException(self::ERR_MISSING_SOURCES);
} }
@@ -629,11 +623,11 @@ class DefaultController extends ControllerAbstract {
$sort = $data['sort'] ?? null; $sort = $data['sort'] ?? null;
$range = $data['range'] ?? null; $range = $data['range'] ?? null;
return $this->mailManager->entityList($tenantId, $userId, $sources, $filter, $sort, $range); return $this->mailManager->entityListBulk($tenantId, $userId, $sources, $filter, $sort, $range);
} }
private function entityStream(string $tenantId, string $userId, array $data, int $version, string $transaction): StreamedNdJsonResponse { private function entityListStream(string $tenantId, string $userId, array $data, int $version, string $transaction): StreamedNdJsonResponse {
if (!isset($data['sources'])) { if (!isset($data['sources'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SOURCES); throw new InvalidArgumentException(self::ERR_MISSING_SOURCES);
} }
@@ -654,7 +648,7 @@ class DefaultController extends ControllerAbstract {
$sort = $data['sort'] ?? null; $sort = $data['sort'] ?? null;
$range = $data['range'] ?? null; $range = $data['range'] ?? null;
$entityGenerator = $this->mailManager->entityStream($tenantId, $userId, $sources, $filter, $sort, $range); $entityGenerator = $this->mailManager->entityListStream($tenantId, $userId, $sources, $filter, $sort, $range);
$logger = $this->logger; $logger = $this->logger;
$responseGenerator = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator { $responseGenerator = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator {
@@ -729,38 +723,24 @@ class DefaultController extends ControllerAbstract {
} }
private function entityFetch(string $tenantId, string $userId, array $data): mixed { private function entityFetch(string $tenantId, string $userId, array $data): mixed {
if (!isset($data['provider'])) { if (!isset($data['targets'])) {
throw new InvalidArgumentException(self::ERR_MISSING_PROVIDER); throw new InvalidArgumentException(self::ERR_MISSING_TARGETS);
} }
if (!is_string($data['provider'])) { if (!is_array($data['targets'])) {
throw new InvalidArgumentException(self::ERR_INVALID_PROVIDER); throw new InvalidArgumentException(self::ERR_INVALID_TARGETS);
}
if (!isset($data['service'])) {
throw new InvalidArgumentException(self::ERR_MISSING_SERVICE);
}
if (!is_string($data['service'])) {
throw new InvalidArgumentException(self::ERR_INVALID_SERVICE);
}
if (!isset($data['collection'])) {
throw new InvalidArgumentException(self::ERR_MISSING_COLLECTION);
}
if (!is_string($data['collection']) && !is_int($data['collection'])) {
throw new InvalidArgumentException(self::ERR_INVALID_COLLECTION);
}
if (!isset($data['identifiers'])) {
throw new InvalidArgumentException(self::ERR_MISSING_IDENTIFIERS);
}
if (!is_array($data['identifiers'])) {
throw new InvalidArgumentException(self::ERR_INVALID_IDENTIFIERS);
} }
return $this->mailManager->entityFetch( $targets = ResourceIdentifiers::fromArray($data['targets']);
foreach ($targets as $target) {
if (!$target instanceof EntityIdentifier) {
throw new InvalidArgumentException('Invalid parameter: targets must contain provider:service:collection:entity identifiers');
}
}
return $this->mailManager->entityFetchBulk(
$tenantId, $tenantId,
$userId, $userId,
$data['provider'], ...$targets->all()
$data['service'],
$data['collection'],
$data['identifiers']
); );
} }

View File

@@ -731,7 +731,7 @@ class Manager {
* *
* @return array<string, array<string|int, array<string|int, array<string|int, IMessageBase>>>> Messages grouped by provider/service/collection * @return array<string, array<string|int, array<string|int, array<string|int, IMessageBase>>>> Messages grouped by provider/service/collection
*/ */
public function entityList(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): array { public function entityListBulk(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): array {
// retrieve providers // retrieve providers
$providers = $this->providerList($tenantId, $userId, $sources); $providers = $this->providerList($tenantId, $userId, $sources);
// retrieve services for each provider // retrieve services for each provider
@@ -822,7 +822,7 @@ class Manager {
* *
* @return \Generator<EntityBaseInterface> Yields each entity as it is retrieved * @return \Generator<EntityBaseInterface> Yields each entity as it is retrieved
*/ */
public function entityStream(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): \Generator { public function entityListStream(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): \Generator {
// retrieve providers // retrieve providers
$providers = $this->providerList($tenantId, $userId, $sources); $providers = $this->providerList($tenantId, $userId, $sources);
// retrieve services for each provider // retrieve services for each provider
@@ -890,26 +890,65 @@ class Manager {
} }
/** /**
* Fetch specific messages * Fetch specific messages in bulk
* *
* @since 2025.05.01 * @since 2025.05.01
* *
* @param string $tenantId Tenant identifier * @param string $tenantId Tenant identifier
* @param string|null $userId User identifier for context * @param string|null $userId User identifier for context
* @param string $providerId Provider identifier * @param EntityIdentifier ...$identifiers Specific message identifiers to fetch
* @param string|int $serviceId Service identifier
* @param string|int $collectionId Collection identifier
* @param array<string|int> $identifiers Message identifiers
* *
* @return array<string|int, IMessageBase> Messages indexed by ID * @return array<int,IMessageBase>
*/ */
public function entityFetch(string $tenantId, ?string $userId, string $providerId, string|int $serviceId, string|int $collectionId, array $identifiers): array { public function entityFetchBulk(string $tenantId, ?string $userId, EntityIdentifier ...$identifiers): array {
$service = $this->serviceFetch($tenantId, $userId, $providerId, $serviceId); // group identifiers by provider/service
if ($service->getEnabled() === false) { $groupedIdentifiers = [];
throw new InvalidArgumentException("Service '{$providerId}:{$serviceId}' not found or is disabled"); foreach ($identifiers as $identifier) {
$groupedIdentifiers[$identifier->provider()][$identifier->service()][] = $identifier->entity();
}
// retrieve each service and fetch entities
$list = [];
foreach ($groupedIdentifiers as $providerId => $services) {
foreach ($services as $serviceId => $entities) {
$service = $this->serviceFetch($tenantId, $userId, $providerId, $serviceId);
if ($service->getEnabled() === false) {
throw new InvalidArgumentException("Service '{$providerId}:{$serviceId}' not found or is disabled");
}
// retrieve entities and merge into list
$list = array_merge($list, $service->entityFetchBulk(...$entities));
}
}
return $list;
}
/**
* Fetch specific messages as a stream
*
* @since 2025.05.01
*
* @param string $tenantId Tenant identifier
* @param string|null $userId User identifier for context
* @param EntityIdentifier ...$identifiers Specific message identifiers to fetch
*
* @return \Generator<string,IMessageBase>
*/
public function entityFetchStream(string $tenantId, ?string $userId, EntityIdentifier ...$identifiers): \Generator {
// group identifiers by provider/service
$groupedIdentifiers = [];
foreach ($identifiers as $identifier) {
$groupedIdentifiers[$identifier->provider()][$identifier->service()][] = $identifier->entity();
}
// retrieve each service and fetch entities
foreach ($groupedIdentifiers as $providerId => $services) {
foreach ($services as $serviceId => $entities) {
$service = $this->serviceFetch($tenantId, $userId, $providerId, $serviceId);
if ($service->getEnabled() === false) {
throw new InvalidArgumentException("Service '{$providerId}:{$serviceId}' not found or is disabled");
}
// retrieve entities and yield each one
yield from $service->entityFetchStream(...$entities);
}
} }
// retrieve collection
return $service->entityFetch($collectionId, ...$identifiers);
} }
/** /**

View File

@@ -4,8 +4,6 @@
import { transceivePost, transceiveStream } from './transceive'; import { transceivePost, transceiveStream } from './transceive';
import type { import type {
EntityListRequest,
EntityListResponse,
EntityFetchRequest, EntityFetchRequest,
EntityFetchResponse, EntityFetchResponse,
EntityExtantRequest, EntityExtantRequest,
@@ -23,8 +21,10 @@ import type {
EntityTransmitRequest, EntityTransmitRequest,
EntityTransmitResponse, EntityTransmitResponse,
EntityInterface, EntityInterface,
EntityStreamRequest, EntityListStreamResponse,
EntityStreamResponse, EntityListStreamRequest,
EntityListBulkResponse,
EntityListBulkRequest,
} from '../types/entity'; } from '../types/entity';
import { useIntegrationStore } from '@KTXC/stores/integrationStore'; import { useIntegrationStore } from '@KTXC/stores/integrationStore';
import { EntityObject } from '../models'; import { EntityObject } from '../models';
@@ -51,8 +51,8 @@ export const entityService = {
* *
* @returns Promise with entity object list grouped by provider, service, collection, and entity identifier * @returns Promise with entity object list grouped by provider, service, collection, and entity identifier
*/ */
async list(request: EntityListRequest = {}): Promise<Record<string, Record<string, Record<string, Record<string, EntityObject>>>>> { async listBulk(request: EntityListBulkRequest = {}): Promise<Record<string, Record<string, Record<string, Record<string, EntityObject>>>>> {
const response = await transceivePost<EntityListRequest, EntityListResponse>('entity.list', request); const response = await transceivePost<EntityListBulkRequest, EntityListBulkResponse>('entity.listBulk', request);
// Convert nested response to EntityObject instances // Convert nested response to EntityObject instances
const providerList: Record<string, Record<string, Record<string, Record<string, EntityObject>>>> = {}; const providerList: Record<string, Record<string, Record<string, Record<string, EntityObject>>>> = {};
@@ -75,6 +75,27 @@ export const entityService = {
return providerList; return providerList;
}, },
/**
* Stream entities as NDJSON, invoking onEntity for each entity as it arrives.
*
* The server emits one entity per line so the caller receives entities
* progressively rather than waiting for the full collection to load.
*
* @param request - stream request parameters (same shape as list)
* @param onEntity - called synchronously for each entity as it is received
*
* @returns Promise resolving to { total } when the stream completes
*/
async listStream(request: EntityListStreamRequest, onEntity: (entity: EntityObject) => void): Promise<{ total: number }> {
return await transceiveStream<EntityListStreamRequest, EntityListStreamResponse>(
'entity.listStream',
request,
(entity) => {
onEntity(createEntityObject(entity));
}
);
},
/** /**
* Retrieve a specific entity by provider and identifier * Retrieve a specific entity by provider and identifier
* *
@@ -172,27 +193,6 @@ export const entityService = {
async transmit(request: EntityTransmitRequest): Promise<EntityTransmitResponse> { async transmit(request: EntityTransmitRequest): Promise<EntityTransmitResponse> {
return await transceivePost<EntityTransmitRequest, EntityTransmitResponse>('entity.transmit', request); return await transceivePost<EntityTransmitRequest, EntityTransmitResponse>('entity.transmit', request);
}, },
/**
* Stream entities as NDJSON, invoking onEntity for each entity as it arrives.
*
* The server emits one entity per line so the caller receives entities
* progressively rather than waiting for the full collection to load.
*
* @param request - stream request parameters (same shape as list)
* @param onEntity - called synchronously for each entity as it is received
*
* @returns Promise resolving to { total } when the stream completes
*/
async stream(request: EntityStreamRequest, onEntity: (entity: EntityObject) => void): Promise<{ total: number }> {
return await transceiveStream<EntityStreamRequest, EntityStreamResponse>(
'entity.stream',
request,
(entity) => {
onEntity(createEntityObject(entity));
}
);
},
}; };
export default entityService; export default entityService;

View File

@@ -100,7 +100,7 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => {
try { try {
const entities: Record<string, EntityObject> = {} const entities: Record<string, EntityObject> = {}
await entityService.stream({ sources, filter, sort, range }, (entity: EntityObject) => { await entityService.listStream({ sources, filter, sort, range }, (entity: EntityObject) => {
_entities.value[entity.identifier] = entity _entities.value[entity.identifier] = entity
entities[entity.identifier] = entity entities[entity.identifier] = entity
}) })

View File

@@ -29,16 +29,16 @@ export interface EntityInterface<T = MessageInterface> {
export interface EntityModelInterface extends Omit<EntityInterface<MessageModelInterface>, '@type' | 'version'> {} export interface EntityModelInterface extends Omit<EntityInterface<MessageModelInterface>, '@type' | 'version'> {}
/** /**
* Entity list * Entity list bulk
*/ */
export interface EntityListRequest { export interface EntityListBulkRequest {
sources?: CollectionIdentifier[]; sources?: CollectionIdentifier[];
filter?: ListFilter; filter?: ListFilter;
sort?: ListSort; sort?: ListSort;
range?: ListRange; range?: ListRange;
} }
export interface EntityListResponse { export interface EntityListBulkResponse {
[providerId: string]: { [providerId: string]: {
[serviceId: string]: { [serviceId: string]: {
[collectionId: string]: { [collectionId: string]: {
@@ -48,6 +48,18 @@ export interface EntityListResponse {
}; };
} }
/**
* Entity list stream
*/
export interface EntityListStreamRequest {
sources?: CollectionIdentifier[];
filter?: ListFilter;
sort?: ListSort;
range?: ListRange;
}
export interface EntityListStreamResponse extends EntityInterface<MessageInterface> {}
/** /**
* Entity fetch * Entity fetch
*/ */
@@ -179,15 +191,3 @@ export interface EntityTransmitResponse {
id: string; id: string;
status: 'queued' | 'sent'; status: 'queued' | 'sent';
} }
/**
* Entity stream
*/
export interface EntityStreamRequest {
sources?: CollectionIdentifier[];
filter?: ListFilter;
sort?: ListSort;
range?: ListRange;
}
export interface EntityStreamResponse extends EntityInterface<MessageInterface> {}