From cceaf809d9f4af52677adf6c316dd73e190957cf Mon Sep 17 00:00:00 2001 From: Sebastian Krupinski Date: Fri, 6 Mar 2026 22:53:08 -0500 Subject: [PATCH] refactor: unify streaming Signed-off-by: Sebastian Krupinski --- lib/Controllers/DefaultController.php | 67 ++++++++++++++++++++------- lib/Manager.php | 31 +++++-------- src/components/AddAccountDialog.vue | 15 +++--- src/services/entityService.ts | 21 ++------- src/services/serviceService.ts | 49 ++++++++++---------- src/services/transceive.ts | 52 ++++++++++++++++----- src/stores/servicesStore.ts | 17 +++++-- src/types/common.ts | 41 ++++++++++++++++ src/types/entity.ts | 34 +++----------- src/types/service.ts | 8 +++- 10 files changed, 205 insertions(+), 130 deletions(-) diff --git a/lib/Controllers/DefaultController.php b/lib/Controllers/DefaultController.php index 8340a28..7d7862e 100644 --- a/lib/Controllers/DefaultController.php +++ b/lib/Controllers/DefaultController.php @@ -16,6 +16,8 @@ use KTXC\Http\Response\StreamedNdJsonResponse; use KTXC\SessionIdentity; use KTXC\SessionTenant; use KTXF\Controller\ControllerAbstract; +use KTXF\Json\JsonSerializable; +use KTXF\Resource\Provider\ResourceServiceLocationInterface; use KTXF\Resource\Selector\SourceSelector; use KTXF\Routing\Attributes\AuthenticatedRoute; use KTXM\MailManager\Manager; @@ -133,7 +135,7 @@ class DefaultController extends ControllerAbstract { 'service.create' => $this->serviceCreate($tenantId, $userId, $data), 'service.update' => $this->serviceUpdate($tenantId, $userId, $data), 'service.delete' => $this->serviceDelete($tenantId, $userId, $data), - 'service.discover' => $this->serviceDiscover($tenantId, $userId, $data), + 'service.discover' => $this->serviceDiscover($tenantId, $userId, $data, $version, $transaction), 'service.test' => $this->serviceTest($tenantId, $userId, $data), // Collection operations @@ -346,18 +348,48 @@ class DefaultController extends ControllerAbstract { ); } - private function serviceDiscover(string $tenantId, string $userId, array $data): mixed { + private function serviceDiscover(string $tenantId, string $userId, array $data, int $version, string $transaction): StreamedNdJsonResponse { if (!isset($data['identity']) || empty($data['identity']) || !is_string($data['identity'])) { throw new InvalidArgumentException(self::ERR_INVALID_DATA); } - $provider = $data['provider'] ?? null; - $identity = $data['identity']; - $location = $data['location'] ?? null; - $secret = $data['secret'] ?? null; - - return $this->mailManager->serviceDiscover($tenantId, $userId, $provider, $identity, $location, $secret); + $provider = $data['provider'] ?? null; + $identity = $data['identity']; + $location = $data['location'] ?? null; + $secret = $data['secret'] ?? null; + + $discoverGenerator = $this->mailManager->serviceDiscover($tenantId, $userId, $provider, $identity, $location, $secret); + $logger = $this->logger; + + $response = (function () use ($discoverGenerator, $version, $transaction, $logger): \Generator { + yield ['type' => 'control', 'status' => 'start', 'version' => $version, 'transaction' => $transaction]; + + $total = 0; + try { + foreach ($discoverGenerator as $providerId => $serviceLocation) { + if (!$serviceLocation instanceof ResourceServiceLocationInterface) { + continue; + } + yield [ + 'type' => 'data', + 'data' => [ + 'provider' => $providerId, + 'location' => $serviceLocation->jsonSerialize() + ] + ]; + $total++; + } + } catch (\Throwable $t) { + $logger->error('Error streaming service discovery', ['exception' => $t]); + yield ['type' => 'error', 'message' => $t->getMessage()]; + return; + } + + yield ['type' => 'control', 'status' => 'end', 'total' => $total]; + })(); + + return new StreamedNdJsonResponse($response, 1, 200, ['Content-Type' => 'application/json']); } // ==================== Collection Operations ==================== @@ -647,16 +679,19 @@ class DefaultController extends ControllerAbstract { $entityGenerator = $this->mailManager->entityStream($tenantId, $userId, $sources, $filter, $sort, $range); $logger = $this->logger; - $lines = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator { - yield ['type' => 'meta', 'version' => $version, 'transaction' => $transaction, 'status' => 'success']; + $responseGenerator = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator { + yield ['type' => 'control', 'status' => 'start', 'version' => $version, 'transaction' => $transaction]; $total = 0; try { foreach ($entityGenerator as $entity) { - $entityData = is_array($entity) - ? $entity - : (method_exists($entity, 'jsonSerialize') ? $entity->jsonSerialize() : (array) $entity); - yield ['type' => 'entity'] + $entityData; + if (!$entity instanceof JsonSerializable) { + continue; + } + yield [ + 'type' => 'data', + 'data' => $entity->jsonSerialize() + ]; $total++; } } catch (\Throwable $t) { @@ -665,10 +700,10 @@ class DefaultController extends ControllerAbstract { return; } - yield ['type' => 'done', 'total' => $total]; + yield ['type' => 'control', 'status' => 'end', 'total' => $total]; })(); - return new StreamedNdJsonResponse($lines, self::STREAM_FLUSH_INTERVAL, 200, ['Content-Type' => 'application/json']); + return new StreamedNdJsonResponse($responseGenerator, 1, 200, ['Content-Type' => 'application/json']); } } diff --git a/lib/Manager.php b/lib/Manager.php index cf864ee..b959603 100644 --- a/lib/Manager.php +++ b/lib/Manager.php @@ -303,7 +303,7 @@ class Manager { } /** - * Discover mail service settings from identity + * Discover mail service settings from identity, yielding results as each provider completes * * @since 2025.05.01 * @@ -314,12 +314,7 @@ class Manager { * @param string|null $location Optional hostname to test directly (bypasses DNS SRV lookup) * @param string|null $secret Optional password/token to validate discovered service * - * @return array Array of discovered service locations keyed by provider ID - * [ - * 'jmap' => ResourceServiceLocationInterface, - * 'smtp' => ResourceServiceLocationInterface, - * // Only providers that successfully discovered (non-null) - * ] + * @return \Generator Yields providerId => ResourceServiceLocationInterface pairs as each provider completes */ public function serviceDiscover( string $tenantId, @@ -328,32 +323,28 @@ class Manager { string $identity, string|null $location = null, string|null $secret = null - ): array { - $locations = []; - + ): \Generator { $providers = $this->providerList($tenantId, $userId, $providerId !== null ? new SourceSelector([$providerId => true]) : null); - - foreach ($providers as $providerId => $provider) { + + foreach ($providers as $currentProviderId => $provider) { if (!($provider instanceof ProviderServiceDiscoverInterface)) { continue; } - + try { - $location = $provider->serviceDiscover($tenantId, $userId, $identity, $location, $secret); - - if ($location !== null) { - $locations[$providerId] = $location; + $result = $provider->serviceDiscover($tenantId, $userId, $identity, $location, $secret); + + if ($result !== null) { + yield $currentProviderId => $result; } } catch (\Throwable $e) { $this->logger->warning('Provider autodiscovery failed', [ - 'provider' => $providerId, + 'provider' => $currentProviderId, 'identity' => $identity, 'error' => $e->getMessage(), ]); } } - - return $locations; } /** diff --git a/src/components/AddAccountDialog.vue b/src/components/AddAccountDialog.vue index 7ceaa44..ca08267 100644 --- a/src/components/AddAccountDialog.vue +++ b/src/components/AddAccountDialog.vue @@ -198,23 +198,24 @@ async function handleDiscover() { discoveryStatus.value[identifier].status = 'discovering' try { - const services = await servicesStore.discover( + let discoveredService: any = undefined + await servicesStore.discover( discoverAddress.value, discoverSecret.value || undefined, discoverHostname.value || undefined, - identifier + identifier, + (service) => { discoveredService = service } ) // Success - check if we got results for this provider - const service = services.find(s => s.provider === identifier) - if (service && service.location) { + if (discoveredService && discoveredService.location) { discoveryStatus.value[identifier] = { provider: identifier, status: 'success', - location: service.location, - metadata: extractLocationMetadata(service.location) + location: discoveredService.location, + metadata: extractLocationMetadata(discoveredService.location) } - discoveredServices.value.push(service) + discoveredServices.value.push(discoveredService) } else { // No configuration found for this provider discoveryStatus.value[identifier].status = 'failed' diff --git a/src/services/entityService.ts b/src/services/entityService.ts index 6e59515..32dc923 100644 --- a/src/services/entityService.ts +++ b/src/services/entityService.ts @@ -22,9 +22,7 @@ import type { EntityTransmitResponse, EntityInterface, EntityStreamRequest, - EntityStreamLine, - EntityStreamEntityLine, - EntityStreamDoneLine, + EntityStreamResponse, } from '../types/entity'; import { useIntegrationStore } from '@KTXC/stores/integrationStore'; import { EntityObject } from '../models'; @@ -177,24 +175,13 @@ export const entityService = { request: EntityStreamRequest, onEntity: (entity: EntityObject) => void ): Promise<{ total: number }> { - let total = 0; - - await transceiveStream( + return await transceiveStream( 'entity.stream', request, - (line) => { - if (line.type === 'entity') { - onEntity(createEntityObject(line as EntityStreamEntityLine)); - } else if (line.type === 'done') { - total = (line as EntityStreamDoneLine).total; - } else if (line.type === 'error') { - throw new Error(`[entity.stream] ${line.message}`); - } - // 'meta' lines are silently consumed + (entity) => { + onEntity(createEntityObject(entity)); } ); - - return { total }; }, }; diff --git a/src/services/serviceService.ts b/src/services/serviceService.ts index ef29cda..064d1f7 100644 --- a/src/services/serviceService.ts +++ b/src/services/serviceService.ts @@ -16,13 +16,13 @@ import type { ServiceDeleteResponse, ServiceDeleteRequest, ServiceDiscoverRequest, - ServiceDiscoverResponse, ServiceTestRequest, ServiceTestResponse, ServiceInterface, + ServiceDiscoverResponse, } from '../types/service'; import { useIntegrationStore } from '@KTXC/stores/integrationStore'; -import { transceivePost } from './transceive'; +import { transceivePost, transceiveStream } from './transceive'; import { ServiceObject } from '../models/service'; /** @@ -87,31 +87,32 @@ export const serviceService = { }, /** - * Retrieve discoverable services for a given source selector, sorted by provider + * Discover services, streaming results as each provider responds * - * @param request - discover request parameters + * @param request - discover request parameters + * @param onService - called for each discovered service as it arrives * - * @returns Promise with array of discovered services sorted by provider + * @returns Promise resolving to { total } when the stream completes */ - async discover(request: ServiceDiscoverRequest): Promise { - const response = await transceivePost('service.discover', request); - - // Convert discovery results to ServiceObjects - const services: ServiceObject[] = []; - Object.entries(response).forEach(([providerId, location]) => { - const serviceData: ServiceInterface = { - '@type': 'mail:service', - provider: providerId, - identifier: null, - label: null, - enabled: false, - location: location, - }; - services.push(createServiceObject(serviceData)); - }); - - // Sort by provider - return services.sort((a, b) => a.provider.localeCompare(b.provider)); + async discover( + request: ServiceDiscoverRequest, + onService: (service: ServiceObject) => void + ): Promise<{ total: number }> { + return await transceiveStream( + 'service.discover', + request, + (service) => { + const serviceData: ServiceInterface = { + '@type': 'mail:service', + provider: service.provider, + identifier: null, + label: null, + enabled: false, + location: service.location, + }; + onService(createServiceObject(serviceData)); + } + ); }, /** diff --git a/src/services/transceive.ts b/src/services/transceive.ts index 9690dc8..7b72413 100644 --- a/src/services/transceive.ts +++ b/src/services/transceive.ts @@ -4,7 +4,7 @@ */ import { createFetchWrapper } from '@KTXC'; -import type { ApiRequest, ApiResponse } from '../types/common'; +import type { ApiRequest, ApiResponse, ApiStreamResponse } from '../types/common'; const fetchWrapper = createFetchWrapper(); const API_URL = '/m/mail_manager/v1'; @@ -50,24 +50,25 @@ export async function transceivePost( } /** - * Stream an NDJSON API response, calling onLine for each parsed line. + * Stream an NDJSON API response, unwrapping data frames for the caller. * - * The server emits one JSON object per line with a `type` discriminant - * (meta / entity / done / error). The caller interprets each line via onLine. - * Throwing inside onLine aborts the stream. + * The server emits one JSON object per line with a transport-level `type` + * discriminant. This helper consumes control and error frames, forwards only + * unwrapped `data` payloads to the caller, and returns the final stream total. * * @param operation - Operation name, e.g. 'entity.stream' * @param data - Operation-specific request data - * @param onLine - Synchronous callback invoked for every parsed line. + * @param onData - Synchronous callback invoked for every unwrapped data payload. * May throw to abort the stream. * @param user - Optional user identifier override + * @returns Promise resolving to the final stream total from the control/end frame */ -export async function transceiveStream( +export async function transceiveStream( operation: string, data: TRequest, - onLine: (line: TLine) => void, + onData: (data: TData) => void, user?: string -): Promise { +): Promise<{ total: number }> { const request: ApiRequest = { version: API_VERSION, transaction: generateTransactionId(), @@ -76,10 +77,12 @@ export async function transceiveStream( user, }; + let total = 0; + await fetchWrapper.post(API_URL, request, { //headers: { 'Accept': 'application/x-ndjson' }, headers: { 'Accept': 'application/json' }, - onStream: async (response) => { + onStream: async (response: Response) => { if (!response.body) { throw new Error(`[${operation}] Response body is not readable`); } @@ -99,17 +102,42 @@ export async function transceiveStream( for (const line of lines) { if (!line.trim()) continue; - onLine(JSON.parse(line) as TLine); + const message = JSON.parse(line) as ApiStreamResponse; + + if (message.type === 'control') { + if (message.status === 'end') { + total = message.total; + } + continue; + } + + if (message.type === 'error') { + throw new Error(`[${operation}] ${message.message}`); + } + + onData(message.data); } } // flush any remaining bytes still in the buffer if (buffer.trim()) { - onLine(JSON.parse(buffer) as TLine); + const message = JSON.parse(buffer) as ApiStreamResponse; + + if (message.type === 'control') { + if (message.status === 'end') { + total = message.total; + } + } else if (message.type === 'error') { + throw new Error(`[${operation}] ${message.message}`); + } else { + onData(message.data); + } } } finally { reader.releaseLock(); } }, }); + + return { total }; } diff --git a/src/stores/servicesStore.ts b/src/stores/servicesStore.ts index 11d3859..a497e09 100644 --- a/src/stores/servicesStore.ts +++ b/src/stores/servicesStore.ts @@ -268,22 +268,29 @@ export const useServicesStore = defineStore('mailServicesStore', () => { * @param secret - optional secret for discovery * @param location - optional location for discovery * @param provider - optional provider identifier for discovery + * @param onService - called for each discovered service as it arrives * - * @returns Promise with list of discovered service objects + * @returns Promise resolving to { total } when the stream completes */ async function discover( identity: string, secret: string | undefined, location: string | undefined, provider: string | undefined, - ): Promise { + onService?: (service: ServiceObject) => void, + ): Promise<{ total: number }> { transceiving.value = true try { - const services = await serviceService.discover({identity, secret, location, provider}) + const result = await serviceService.discover( + { identity, secret, location, provider }, + (service: ServiceObject) => { + onService?.(service) + } + ) - console.debug('[Mail Manager][Store] - Successfully discovered', services.length, 'services') - return services + console.debug('[Mail Manager][Store] - Successfully discovered', result.total, 'services') + return result } catch (error: any) { console.error('[Mail Manager][Store] - Failed to discover service:', error) throw error diff --git a/src/types/common.ts b/src/types/common.ts index ab912d4..e236f47 100644 --- a/src/types/common.ts +++ b/src/types/common.ts @@ -43,6 +43,47 @@ export interface ApiErrorResponse { */ export type ApiResponse = ApiSuccessResponse | ApiErrorResponse; +/** + * Stream control start line + */ +export interface ApiStreamStartResponse { + type: 'control'; + status: 'start'; + version: number; + transaction: string; +} + +/** + * Stream control end line + */ +export interface ApiStreamEndResponse { + type: 'control'; + status: 'end'; + total: number; +} + +/** + * Stream error line + */ +export interface ApiStreamErrorResponse { + type: 'error'; + message: string; +} + +export interface ApiStreamDataResponse { + type: 'data'; + data: T; +} + +/** + * Shared stream control lines + */ +export type ApiStreamResponse = + | ApiStreamStartResponse + | ApiStreamEndResponse + | ApiStreamErrorResponse + | ApiStreamDataResponse; + /** * Selector for targeting specific providers, services, collections, or entities in list or extant operations. * diff --git a/src/types/entity.ts b/src/types/entity.ts index 1cb8b9e..8b299c7 100644 --- a/src/types/entity.ts +++ b/src/types/entity.ts @@ -1,7 +1,12 @@ /** * Entity type definitions */ -import type { SourceSelector, ListFilter, ListSort, ListRange } from './common'; +import type { + ListFilter, + ListRange, + ListSort, + SourceSelector, +} from './common'; import type { MessageInterface } from './message'; /** @@ -169,29 +174,4 @@ export interface EntityStreamRequest { range?: ListRange; } -export interface EntityStreamMetaLine { - type: 'meta'; - version: number; - transaction: string; - status: 'success'; -} - -export interface EntityStreamEntityLine extends EntityInterface { - type: 'entity'; -} - -export interface EntityStreamDoneLine { - type: 'done'; - total: number; -} - -export interface EntityStreamErrorLine { - type: 'error'; - message: string; -} - -export type EntityStreamLine = - | EntityStreamMetaLine - | EntityStreamEntityLine - | EntityStreamDoneLine - | EntityStreamErrorLine; \ No newline at end of file +export interface EntityStreamResponse extends EntityInterface {} \ No newline at end of file diff --git a/src/types/service.ts b/src/types/service.ts index ff91626..9733e77 100644 --- a/src/types/service.ts +++ b/src/types/service.ts @@ -1,7 +1,10 @@ /** * Service type definitions */ -import type { SourceSelector, ListFilterComparisonOperator } from './common'; +import type { + ListFilterComparisonOperator, + SourceSelector, +} from './common'; /** * Service capabilities @@ -130,7 +133,8 @@ export interface ServiceDiscoverRequest { } export interface ServiceDiscoverResponse { - [provider: string]: ServiceLocation; // Uses existing ServiceLocation discriminated union + provider: string; + location: ServiceLocation; } /**