refactor: unify streaming
All checks were successful
Build Test / test (pull_request) Successful in 26s
JS Unit Tests / test (pull_request) Successful in 27s
PHP Unit Tests / test (pull_request) Successful in 1m8s

Signed-off-by: Sebastian Krupinski <krupinski01@gmail.com>
This commit is contained in:
2026-03-06 22:53:08 -05:00
parent 5bfe5dd249
commit cceaf809d9
10 changed files with 205 additions and 130 deletions

View File

@@ -16,6 +16,8 @@ use KTXC\Http\Response\StreamedNdJsonResponse;
use KTXC\SessionIdentity; use KTXC\SessionIdentity;
use KTXC\SessionTenant; use KTXC\SessionTenant;
use KTXF\Controller\ControllerAbstract; use KTXF\Controller\ControllerAbstract;
use KTXF\Json\JsonSerializable;
use KTXF\Resource\Provider\ResourceServiceLocationInterface;
use KTXF\Resource\Selector\SourceSelector; use KTXF\Resource\Selector\SourceSelector;
use KTXF\Routing\Attributes\AuthenticatedRoute; use KTXF\Routing\Attributes\AuthenticatedRoute;
use KTXM\MailManager\Manager; use KTXM\MailManager\Manager;
@@ -133,7 +135,7 @@ class DefaultController extends ControllerAbstract {
'service.create' => $this->serviceCreate($tenantId, $userId, $data), 'service.create' => $this->serviceCreate($tenantId, $userId, $data),
'service.update' => $this->serviceUpdate($tenantId, $userId, $data), 'service.update' => $this->serviceUpdate($tenantId, $userId, $data),
'service.delete' => $this->serviceDelete($tenantId, $userId, $data), 'service.delete' => $this->serviceDelete($tenantId, $userId, $data),
'service.discover' => $this->serviceDiscover($tenantId, $userId, $data), 'service.discover' => $this->serviceDiscover($tenantId, $userId, $data, $version, $transaction),
'service.test' => $this->serviceTest($tenantId, $userId, $data), 'service.test' => $this->serviceTest($tenantId, $userId, $data),
// Collection operations // Collection operations
@@ -346,18 +348,48 @@ class DefaultController extends ControllerAbstract {
); );
} }
private function serviceDiscover(string $tenantId, string $userId, array $data): mixed { private function serviceDiscover(string $tenantId, string $userId, array $data, int $version, string $transaction): StreamedNdJsonResponse {
if (!isset($data['identity']) || empty($data['identity']) || !is_string($data['identity'])) { if (!isset($data['identity']) || empty($data['identity']) || !is_string($data['identity'])) {
throw new InvalidArgumentException(self::ERR_INVALID_DATA); throw new InvalidArgumentException(self::ERR_INVALID_DATA);
} }
$provider = $data['provider'] ?? null; $provider = $data['provider'] ?? null;
$identity = $data['identity']; $identity = $data['identity'];
$location = $data['location'] ?? null; $location = $data['location'] ?? null;
$secret = $data['secret'] ?? null; $secret = $data['secret'] ?? null;
return $this->mailManager->serviceDiscover($tenantId, $userId, $provider, $identity, $location, $secret); $discoverGenerator = $this->mailManager->serviceDiscover($tenantId, $userId, $provider, $identity, $location, $secret);
$logger = $this->logger;
$response = (function () use ($discoverGenerator, $version, $transaction, $logger): \Generator {
yield ['type' => 'control', 'status' => 'start', 'version' => $version, 'transaction' => $transaction];
$total = 0;
try {
foreach ($discoverGenerator as $providerId => $serviceLocation) {
if (!$serviceLocation instanceof ResourceServiceLocationInterface) {
continue;
}
yield [
'type' => 'data',
'data' => [
'provider' => $providerId,
'location' => $serviceLocation->jsonSerialize()
]
];
$total++;
}
} catch (\Throwable $t) {
$logger->error('Error streaming service discovery', ['exception' => $t]);
yield ['type' => 'error', 'message' => $t->getMessage()];
return;
}
yield ['type' => 'control', 'status' => 'end', 'total' => $total];
})();
return new StreamedNdJsonResponse($response, 1, 200, ['Content-Type' => 'application/json']);
} }
// ==================== Collection Operations ==================== // ==================== Collection Operations ====================
@@ -647,16 +679,19 @@ class DefaultController extends ControllerAbstract {
$entityGenerator = $this->mailManager->entityStream($tenantId, $userId, $sources, $filter, $sort, $range); $entityGenerator = $this->mailManager->entityStream($tenantId, $userId, $sources, $filter, $sort, $range);
$logger = $this->logger; $logger = $this->logger;
$lines = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator { $responseGenerator = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator {
yield ['type' => 'meta', 'version' => $version, 'transaction' => $transaction, 'status' => 'success']; yield ['type' => 'control', 'status' => 'start', 'version' => $version, 'transaction' => $transaction];
$total = 0; $total = 0;
try { try {
foreach ($entityGenerator as $entity) { foreach ($entityGenerator as $entity) {
$entityData = is_array($entity) if (!$entity instanceof JsonSerializable) {
? $entity continue;
: (method_exists($entity, 'jsonSerialize') ? $entity->jsonSerialize() : (array) $entity); }
yield ['type' => 'entity'] + $entityData; yield [
'type' => 'data',
'data' => $entity->jsonSerialize()
];
$total++; $total++;
} }
} catch (\Throwable $t) { } catch (\Throwable $t) {
@@ -665,10 +700,10 @@ class DefaultController extends ControllerAbstract {
return; return;
} }
yield ['type' => 'done', 'total' => $total]; yield ['type' => 'control', 'status' => 'end', 'total' => $total];
})(); })();
return new StreamedNdJsonResponse($lines, self::STREAM_FLUSH_INTERVAL, 200, ['Content-Type' => 'application/json']); return new StreamedNdJsonResponse($responseGenerator, 1, 200, ['Content-Type' => 'application/json']);
} }
} }

View File

@@ -303,7 +303,7 @@ class Manager {
} }
/** /**
* Discover mail service settings from identity * Discover mail service settings from identity, yielding results as each provider completes
* *
* @since 2025.05.01 * @since 2025.05.01
* *
@@ -314,12 +314,7 @@ class Manager {
* @param string|null $location Optional hostname to test directly (bypasses DNS SRV lookup) * @param string|null $location Optional hostname to test directly (bypasses DNS SRV lookup)
* @param string|null $secret Optional password/token to validate discovered service * @param string|null $secret Optional password/token to validate discovered service
* *
* @return array<string,ResourceServiceLocationInterface> Array of discovered service locations keyed by provider ID * @return \Generator Yields providerId => ResourceServiceLocationInterface pairs as each provider completes
* [
* 'jmap' => ResourceServiceLocationInterface,
* 'smtp' => ResourceServiceLocationInterface,
* // Only providers that successfully discovered (non-null)
* ]
*/ */
public function serviceDiscover( public function serviceDiscover(
string $tenantId, string $tenantId,
@@ -328,32 +323,28 @@ class Manager {
string $identity, string $identity,
string|null $location = null, string|null $location = null,
string|null $secret = null string|null $secret = null
): array { ): \Generator {
$locations = [];
$providers = $this->providerList($tenantId, $userId, $providerId !== null ? new SourceSelector([$providerId => true]) : null); $providers = $this->providerList($tenantId, $userId, $providerId !== null ? new SourceSelector([$providerId => true]) : null);
foreach ($providers as $providerId => $provider) { foreach ($providers as $currentProviderId => $provider) {
if (!($provider instanceof ProviderServiceDiscoverInterface)) { if (!($provider instanceof ProviderServiceDiscoverInterface)) {
continue; continue;
} }
try { try {
$location = $provider->serviceDiscover($tenantId, $userId, $identity, $location, $secret); $result = $provider->serviceDiscover($tenantId, $userId, $identity, $location, $secret);
if ($location !== null) { if ($result !== null) {
$locations[$providerId] = $location; yield $currentProviderId => $result;
} }
} catch (\Throwable $e) { } catch (\Throwable $e) {
$this->logger->warning('Provider autodiscovery failed', [ $this->logger->warning('Provider autodiscovery failed', [
'provider' => $providerId, 'provider' => $currentProviderId,
'identity' => $identity, 'identity' => $identity,
'error' => $e->getMessage(), 'error' => $e->getMessage(),
]); ]);
} }
} }
return $locations;
} }
/** /**

View File

@@ -198,23 +198,24 @@ async function handleDiscover() {
discoveryStatus.value[identifier].status = 'discovering' discoveryStatus.value[identifier].status = 'discovering'
try { try {
const services = await servicesStore.discover( let discoveredService: any = undefined
await servicesStore.discover(
discoverAddress.value, discoverAddress.value,
discoverSecret.value || undefined, discoverSecret.value || undefined,
discoverHostname.value || undefined, discoverHostname.value || undefined,
identifier identifier,
(service) => { discoveredService = service }
) )
// Success - check if we got results for this provider // Success - check if we got results for this provider
const service = services.find(s => s.provider === identifier) if (discoveredService && discoveredService.location) {
if (service && service.location) {
discoveryStatus.value[identifier] = { discoveryStatus.value[identifier] = {
provider: identifier, provider: identifier,
status: 'success', status: 'success',
location: service.location, location: discoveredService.location,
metadata: extractLocationMetadata(service.location) metadata: extractLocationMetadata(discoveredService.location)
} }
discoveredServices.value.push(service) discoveredServices.value.push(discoveredService)
} else { } else {
// No configuration found for this provider // No configuration found for this provider
discoveryStatus.value[identifier].status = 'failed' discoveryStatus.value[identifier].status = 'failed'

View File

@@ -22,9 +22,7 @@ import type {
EntityTransmitResponse, EntityTransmitResponse,
EntityInterface, EntityInterface,
EntityStreamRequest, EntityStreamRequest,
EntityStreamLine, EntityStreamResponse,
EntityStreamEntityLine,
EntityStreamDoneLine,
} from '../types/entity'; } from '../types/entity';
import { useIntegrationStore } from '@KTXC/stores/integrationStore'; import { useIntegrationStore } from '@KTXC/stores/integrationStore';
import { EntityObject } from '../models'; import { EntityObject } from '../models';
@@ -177,24 +175,13 @@ export const entityService = {
request: EntityStreamRequest, request: EntityStreamRequest,
onEntity: (entity: EntityObject) => void onEntity: (entity: EntityObject) => void
): Promise<{ total: number }> { ): Promise<{ total: number }> {
let total = 0; return await transceiveStream<EntityStreamRequest, EntityStreamResponse>(
await transceiveStream<EntityStreamRequest, EntityStreamLine>(
'entity.stream', 'entity.stream',
request, request,
(line) => { (entity) => {
if (line.type === 'entity') { onEntity(createEntityObject(entity));
onEntity(createEntityObject(line as EntityStreamEntityLine));
} else if (line.type === 'done') {
total = (line as EntityStreamDoneLine).total;
} else if (line.type === 'error') {
throw new Error(`[entity.stream] ${line.message}`);
}
// 'meta' lines are silently consumed
} }
); );
return { total };
}, },
}; };

View File

@@ -16,13 +16,13 @@ import type {
ServiceDeleteResponse, ServiceDeleteResponse,
ServiceDeleteRequest, ServiceDeleteRequest,
ServiceDiscoverRequest, ServiceDiscoverRequest,
ServiceDiscoverResponse,
ServiceTestRequest, ServiceTestRequest,
ServiceTestResponse, ServiceTestResponse,
ServiceInterface, ServiceInterface,
ServiceDiscoverResponse,
} from '../types/service'; } from '../types/service';
import { useIntegrationStore } from '@KTXC/stores/integrationStore'; import { useIntegrationStore } from '@KTXC/stores/integrationStore';
import { transceivePost } from './transceive'; import { transceivePost, transceiveStream } from './transceive';
import { ServiceObject } from '../models/service'; import { ServiceObject } from '../models/service';
/** /**
@@ -87,31 +87,32 @@ export const serviceService = {
}, },
/** /**
* Retrieve discoverable services for a given source selector, sorted by provider * Discover services, streaming results as each provider responds
* *
* @param request - discover request parameters * @param request - discover request parameters
* @param onService - called for each discovered service as it arrives
* *
* @returns Promise with array of discovered services sorted by provider * @returns Promise resolving to { total } when the stream completes
*/ */
async discover(request: ServiceDiscoverRequest): Promise<ServiceObject[]> { async discover(
const response = await transceivePost<ServiceDiscoverRequest, ServiceDiscoverResponse>('service.discover', request); request: ServiceDiscoverRequest,
onService: (service: ServiceObject) => void
// Convert discovery results to ServiceObjects ): Promise<{ total: number }> {
const services: ServiceObject[] = []; return await transceiveStream<ServiceDiscoverRequest, ServiceDiscoverResponse>(
Object.entries(response).forEach(([providerId, location]) => { 'service.discover',
const serviceData: ServiceInterface = { request,
'@type': 'mail:service', (service) => {
provider: providerId, const serviceData: ServiceInterface = {
identifier: null, '@type': 'mail:service',
label: null, provider: service.provider,
enabled: false, identifier: null,
location: location, label: null,
}; enabled: false,
services.push(createServiceObject(serviceData)); location: service.location,
}); };
onService(createServiceObject(serviceData));
// Sort by provider }
return services.sort((a, b) => a.provider.localeCompare(b.provider)); );
}, },
/** /**

View File

@@ -4,7 +4,7 @@
*/ */
import { createFetchWrapper } from '@KTXC'; import { createFetchWrapper } from '@KTXC';
import type { ApiRequest, ApiResponse } from '../types/common'; import type { ApiRequest, ApiResponse, ApiStreamResponse } from '../types/common';
const fetchWrapper = createFetchWrapper(); const fetchWrapper = createFetchWrapper();
const API_URL = '/m/mail_manager/v1'; const API_URL = '/m/mail_manager/v1';
@@ -50,24 +50,25 @@ export async function transceivePost<TRequest, TResponse>(
} }
/** /**
* Stream an NDJSON API response, calling onLine for each parsed line. * Stream an NDJSON API response, unwrapping data frames for the caller.
* *
* The server emits one JSON object per line with a `type` discriminant * The server emits one JSON object per line with a transport-level `type`
* (meta / entity / done / error). The caller interprets each line via onLine. * discriminant. This helper consumes control and error frames, forwards only
* Throwing inside onLine aborts the stream. * unwrapped `data` payloads to the caller, and returns the final stream total.
* *
* @param operation - Operation name, e.g. 'entity.stream' * @param operation - Operation name, e.g. 'entity.stream'
* @param data - Operation-specific request data * @param data - Operation-specific request data
* @param onLine - Synchronous callback invoked for every parsed line. * @param onData - Synchronous callback invoked for every unwrapped data payload.
* May throw to abort the stream. * May throw to abort the stream.
* @param user - Optional user identifier override * @param user - Optional user identifier override
* @returns Promise resolving to the final stream total from the control/end frame
*/ */
export async function transceiveStream<TRequest, TLine = any>( export async function transceiveStream<TRequest, TData>(
operation: string, operation: string,
data: TRequest, data: TRequest,
onLine: (line: TLine) => void, onData: (data: TData) => void,
user?: string user?: string
): Promise<void> { ): Promise<{ total: number }> {
const request: ApiRequest<TRequest> = { const request: ApiRequest<TRequest> = {
version: API_VERSION, version: API_VERSION,
transaction: generateTransactionId(), transaction: generateTransactionId(),
@@ -76,10 +77,12 @@ export async function transceiveStream<TRequest, TLine = any>(
user, user,
}; };
let total = 0;
await fetchWrapper.post(API_URL, request, { await fetchWrapper.post(API_URL, request, {
//headers: { 'Accept': 'application/x-ndjson' }, //headers: { 'Accept': 'application/x-ndjson' },
headers: { 'Accept': 'application/json' }, headers: { 'Accept': 'application/json' },
onStream: async (response) => { onStream: async (response: Response) => {
if (!response.body) { if (!response.body) {
throw new Error(`[${operation}] Response body is not readable`); throw new Error(`[${operation}] Response body is not readable`);
} }
@@ -99,17 +102,42 @@ export async function transceiveStream<TRequest, TLine = any>(
for (const line of lines) { for (const line of lines) {
if (!line.trim()) continue; if (!line.trim()) continue;
onLine(JSON.parse(line) as TLine); const message = JSON.parse(line) as ApiStreamResponse<TData>;
if (message.type === 'control') {
if (message.status === 'end') {
total = message.total;
}
continue;
}
if (message.type === 'error') {
throw new Error(`[${operation}] ${message.message}`);
}
onData(message.data);
} }
} }
// flush any remaining bytes still in the buffer // flush any remaining bytes still in the buffer
if (buffer.trim()) { if (buffer.trim()) {
onLine(JSON.parse(buffer) as TLine); const message = JSON.parse(buffer) as ApiStreamResponse<TData>;
if (message.type === 'control') {
if (message.status === 'end') {
total = message.total;
}
} else if (message.type === 'error') {
throw new Error(`[${operation}] ${message.message}`);
} else {
onData(message.data);
}
} }
} finally { } finally {
reader.releaseLock(); reader.releaseLock();
} }
}, },
}); });
return { total };
} }

View File

@@ -268,22 +268,29 @@ export const useServicesStore = defineStore('mailServicesStore', () => {
* @param secret - optional secret for discovery * @param secret - optional secret for discovery
* @param location - optional location for discovery * @param location - optional location for discovery
* @param provider - optional provider identifier for discovery * @param provider - optional provider identifier for discovery
* @param onService - called for each discovered service as it arrives
* *
* @returns Promise with list of discovered service objects * @returns Promise resolving to { total } when the stream completes
*/ */
async function discover( async function discover(
identity: string, identity: string,
secret: string | undefined, secret: string | undefined,
location: string | undefined, location: string | undefined,
provider: string | undefined, provider: string | undefined,
): Promise<ServiceObject[]> { onService?: (service: ServiceObject) => void,
): Promise<{ total: number }> {
transceiving.value = true transceiving.value = true
try { try {
const services = await serviceService.discover({identity, secret, location, provider}) const result = await serviceService.discover(
{ identity, secret, location, provider },
(service: ServiceObject) => {
onService?.(service)
}
)
console.debug('[Mail Manager][Store] - Successfully discovered', services.length, 'services') console.debug('[Mail Manager][Store] - Successfully discovered', result.total, 'services')
return services return result
} catch (error: any) { } catch (error: any) {
console.error('[Mail Manager][Store] - Failed to discover service:', error) console.error('[Mail Manager][Store] - Failed to discover service:', error)
throw error throw error

View File

@@ -43,6 +43,47 @@ export interface ApiErrorResponse {
*/ */
export type ApiResponse<T = any> = ApiSuccessResponse<T> | ApiErrorResponse; export type ApiResponse<T = any> = ApiSuccessResponse<T> | ApiErrorResponse;
/**
* Stream control start line
*/
export interface ApiStreamStartResponse {
type: 'control';
status: 'start';
version: number;
transaction: string;
}
/**
* Stream control end line
*/
export interface ApiStreamEndResponse {
type: 'control';
status: 'end';
total: number;
}
/**
* Stream error line
*/
export interface ApiStreamErrorResponse {
type: 'error';
message: string;
}
export interface ApiStreamDataResponse<T = any> {
type: 'data';
data: T;
}
/**
* Shared stream control lines
*/
export type ApiStreamResponse<T = any> =
| ApiStreamStartResponse
| ApiStreamEndResponse
| ApiStreamErrorResponse
| ApiStreamDataResponse<T>;
/** /**
* Selector for targeting specific providers, services, collections, or entities in list or extant operations. * Selector for targeting specific providers, services, collections, or entities in list or extant operations.
* *

View File

@@ -1,7 +1,12 @@
/** /**
* Entity type definitions * Entity type definitions
*/ */
import type { SourceSelector, ListFilter, ListSort, ListRange } from './common'; import type {
ListFilter,
ListRange,
ListSort,
SourceSelector,
} from './common';
import type { MessageInterface } from './message'; import type { MessageInterface } from './message';
/** /**
@@ -169,29 +174,4 @@ export interface EntityStreamRequest {
range?: ListRange; range?: ListRange;
} }
export interface EntityStreamMetaLine { export interface EntityStreamResponse extends EntityInterface<MessageInterface> {}
type: 'meta';
version: number;
transaction: string;
status: 'success';
}
export interface EntityStreamEntityLine extends EntityInterface<MessageInterface> {
type: 'entity';
}
export interface EntityStreamDoneLine {
type: 'done';
total: number;
}
export interface EntityStreamErrorLine {
type: 'error';
message: string;
}
export type EntityStreamLine =
| EntityStreamMetaLine
| EntityStreamEntityLine
| EntityStreamDoneLine
| EntityStreamErrorLine;

View File

@@ -1,7 +1,10 @@
/** /**
* Service type definitions * Service type definitions
*/ */
import type { SourceSelector, ListFilterComparisonOperator } from './common'; import type {
ListFilterComparisonOperator,
SourceSelector,
} from './common';
/** /**
* Service capabilities * Service capabilities
@@ -130,7 +133,8 @@ export interface ServiceDiscoverRequest {
} }
export interface ServiceDiscoverResponse { export interface ServiceDiscoverResponse {
[provider: string]: ServiceLocation; // Uses existing ServiceLocation discriminated union provider: string;
location: ServiceLocation;
} }
/** /**