refactor: unify streaming #10
@@ -16,6 +16,8 @@ use KTXC\Http\Response\StreamedNdJsonResponse;
|
||||
use KTXC\SessionIdentity;
|
||||
use KTXC\SessionTenant;
|
||||
use KTXF\Controller\ControllerAbstract;
|
||||
use KTXF\Json\JsonSerializable;
|
||||
use KTXF\Resource\Provider\ResourceServiceLocationInterface;
|
||||
use KTXF\Resource\Selector\SourceSelector;
|
||||
use KTXF\Routing\Attributes\AuthenticatedRoute;
|
||||
use KTXM\MailManager\Manager;
|
||||
@@ -133,7 +135,7 @@ class DefaultController extends ControllerAbstract {
|
||||
'service.create' => $this->serviceCreate($tenantId, $userId, $data),
|
||||
'service.update' => $this->serviceUpdate($tenantId, $userId, $data),
|
||||
'service.delete' => $this->serviceDelete($tenantId, $userId, $data),
|
||||
'service.discover' => $this->serviceDiscover($tenantId, $userId, $data),
|
||||
'service.discover' => $this->serviceDiscover($tenantId, $userId, $data, $version, $transaction),
|
||||
'service.test' => $this->serviceTest($tenantId, $userId, $data),
|
||||
|
||||
// Collection operations
|
||||
@@ -346,18 +348,48 @@ class DefaultController extends ControllerAbstract {
|
||||
);
|
||||
}
|
||||
|
||||
private function serviceDiscover(string $tenantId, string $userId, array $data): mixed {
|
||||
private function serviceDiscover(string $tenantId, string $userId, array $data, int $version, string $transaction): StreamedNdJsonResponse {
|
||||
|
||||
if (!isset($data['identity']) || empty($data['identity']) || !is_string($data['identity'])) {
|
||||
throw new InvalidArgumentException(self::ERR_INVALID_DATA);
|
||||
}
|
||||
|
||||
$provider = $data['provider'] ?? null;
|
||||
$identity = $data['identity'];
|
||||
$location = $data['location'] ?? null;
|
||||
$secret = $data['secret'] ?? null;
|
||||
|
||||
return $this->mailManager->serviceDiscover($tenantId, $userId, $provider, $identity, $location, $secret);
|
||||
$provider = $data['provider'] ?? null;
|
||||
$identity = $data['identity'];
|
||||
$location = $data['location'] ?? null;
|
||||
$secret = $data['secret'] ?? null;
|
||||
|
||||
$discoverGenerator = $this->mailManager->serviceDiscover($tenantId, $userId, $provider, $identity, $location, $secret);
|
||||
$logger = $this->logger;
|
||||
|
||||
$response = (function () use ($discoverGenerator, $version, $transaction, $logger): \Generator {
|
||||
yield ['type' => 'control', 'status' => 'start', 'version' => $version, 'transaction' => $transaction];
|
||||
|
||||
$total = 0;
|
||||
try {
|
||||
foreach ($discoverGenerator as $providerId => $serviceLocation) {
|
||||
if (!$serviceLocation instanceof ResourceServiceLocationInterface) {
|
||||
continue;
|
||||
}
|
||||
yield [
|
||||
'type' => 'data',
|
||||
'data' => [
|
||||
'provider' => $providerId,
|
||||
'location' => $serviceLocation->jsonSerialize()
|
||||
]
|
||||
];
|
||||
$total++;
|
||||
}
|
||||
} catch (\Throwable $t) {
|
||||
$logger->error('Error streaming service discovery', ['exception' => $t]);
|
||||
yield ['type' => 'error', 'message' => $t->getMessage()];
|
||||
return;
|
||||
}
|
||||
|
||||
yield ['type' => 'control', 'status' => 'end', 'total' => $total];
|
||||
})();
|
||||
|
||||
return new StreamedNdJsonResponse($response, 1, 200, ['Content-Type' => 'application/json']);
|
||||
}
|
||||
|
||||
// ==================== Collection Operations ====================
|
||||
@@ -647,16 +679,19 @@ class DefaultController extends ControllerAbstract {
|
||||
$entityGenerator = $this->mailManager->entityStream($tenantId, $userId, $sources, $filter, $sort, $range);
|
||||
$logger = $this->logger;
|
||||
|
||||
$lines = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator {
|
||||
yield ['type' => 'meta', 'version' => $version, 'transaction' => $transaction, 'status' => 'success'];
|
||||
$responseGenerator = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator {
|
||||
yield ['type' => 'control', 'status' => 'start', 'version' => $version, 'transaction' => $transaction];
|
||||
|
||||
$total = 0;
|
||||
try {
|
||||
foreach ($entityGenerator as $entity) {
|
||||
$entityData = is_array($entity)
|
||||
? $entity
|
||||
: (method_exists($entity, 'jsonSerialize') ? $entity->jsonSerialize() : (array) $entity);
|
||||
yield ['type' => 'entity'] + $entityData;
|
||||
if (!$entity instanceof JsonSerializable) {
|
||||
continue;
|
||||
}
|
||||
yield [
|
||||
'type' => 'data',
|
||||
'data' => $entity->jsonSerialize()
|
||||
];
|
||||
$total++;
|
||||
}
|
||||
} catch (\Throwable $t) {
|
||||
@@ -665,10 +700,10 @@ class DefaultController extends ControllerAbstract {
|
||||
return;
|
||||
}
|
||||
|
||||
yield ['type' => 'done', 'total' => $total];
|
||||
yield ['type' => 'control', 'status' => 'end', 'total' => $total];
|
||||
})();
|
||||
|
||||
return new StreamedNdJsonResponse($lines, self::STREAM_FLUSH_INTERVAL, 200, ['Content-Type' => 'application/json']);
|
||||
return new StreamedNdJsonResponse($responseGenerator, 1, 200, ['Content-Type' => 'application/json']);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -303,7 +303,7 @@ class Manager {
|
||||
}
|
||||
|
||||
/**
|
||||
* Discover mail service settings from identity
|
||||
* Discover mail service settings from identity, yielding results as each provider completes
|
||||
*
|
||||
* @since 2025.05.01
|
||||
*
|
||||
@@ -314,12 +314,7 @@ class Manager {
|
||||
* @param string|null $location Optional hostname to test directly (bypasses DNS SRV lookup)
|
||||
* @param string|null $secret Optional password/token to validate discovered service
|
||||
*
|
||||
* @return array<string,ResourceServiceLocationInterface> Array of discovered service locations keyed by provider ID
|
||||
* [
|
||||
* 'jmap' => ResourceServiceLocationInterface,
|
||||
* 'smtp' => ResourceServiceLocationInterface,
|
||||
* // Only providers that successfully discovered (non-null)
|
||||
* ]
|
||||
* @return \Generator Yields providerId => ResourceServiceLocationInterface pairs as each provider completes
|
||||
*/
|
||||
public function serviceDiscover(
|
||||
string $tenantId,
|
||||
@@ -328,32 +323,28 @@ class Manager {
|
||||
string $identity,
|
||||
string|null $location = null,
|
||||
string|null $secret = null
|
||||
): array {
|
||||
$locations = [];
|
||||
|
||||
): \Generator {
|
||||
$providers = $this->providerList($tenantId, $userId, $providerId !== null ? new SourceSelector([$providerId => true]) : null);
|
||||
|
||||
foreach ($providers as $providerId => $provider) {
|
||||
|
||||
foreach ($providers as $currentProviderId => $provider) {
|
||||
if (!($provider instanceof ProviderServiceDiscoverInterface)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
$location = $provider->serviceDiscover($tenantId, $userId, $identity, $location, $secret);
|
||||
|
||||
if ($location !== null) {
|
||||
$locations[$providerId] = $location;
|
||||
$result = $provider->serviceDiscover($tenantId, $userId, $identity, $location, $secret);
|
||||
|
||||
if ($result !== null) {
|
||||
yield $currentProviderId => $result;
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
$this->logger->warning('Provider autodiscovery failed', [
|
||||
'provider' => $providerId,
|
||||
'provider' => $currentProviderId,
|
||||
'identity' => $identity,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
return $locations;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -198,23 +198,24 @@ async function handleDiscover() {
|
||||
discoveryStatus.value[identifier].status = 'discovering'
|
||||
|
||||
try {
|
||||
const services = await servicesStore.discover(
|
||||
let discoveredService: any = undefined
|
||||
await servicesStore.discover(
|
||||
discoverAddress.value,
|
||||
discoverSecret.value || undefined,
|
||||
discoverHostname.value || undefined,
|
||||
identifier
|
||||
identifier,
|
||||
(service) => { discoveredService = service }
|
||||
)
|
||||
|
||||
// Success - check if we got results for this provider
|
||||
const service = services.find(s => s.provider === identifier)
|
||||
if (service && service.location) {
|
||||
if (discoveredService && discoveredService.location) {
|
||||
discoveryStatus.value[identifier] = {
|
||||
provider: identifier,
|
||||
status: 'success',
|
||||
location: service.location,
|
||||
metadata: extractLocationMetadata(service.location)
|
||||
location: discoveredService.location,
|
||||
metadata: extractLocationMetadata(discoveredService.location)
|
||||
}
|
||||
discoveredServices.value.push(service)
|
||||
discoveredServices.value.push(discoveredService)
|
||||
} else {
|
||||
// No configuration found for this provider
|
||||
discoveryStatus.value[identifier].status = 'failed'
|
||||
|
||||
@@ -22,9 +22,7 @@ import type {
|
||||
EntityTransmitResponse,
|
||||
EntityInterface,
|
||||
EntityStreamRequest,
|
||||
EntityStreamLine,
|
||||
EntityStreamEntityLine,
|
||||
EntityStreamDoneLine,
|
||||
EntityStreamResponse,
|
||||
} from '../types/entity';
|
||||
import { useIntegrationStore } from '@KTXC/stores/integrationStore';
|
||||
import { EntityObject } from '../models';
|
||||
@@ -177,24 +175,13 @@ export const entityService = {
|
||||
request: EntityStreamRequest,
|
||||
onEntity: (entity: EntityObject) => void
|
||||
): Promise<{ total: number }> {
|
||||
let total = 0;
|
||||
|
||||
await transceiveStream<EntityStreamRequest, EntityStreamLine>(
|
||||
return await transceiveStream<EntityStreamRequest, EntityStreamResponse>(
|
||||
'entity.stream',
|
||||
request,
|
||||
(line) => {
|
||||
if (line.type === 'entity') {
|
||||
onEntity(createEntityObject(line as EntityStreamEntityLine));
|
||||
} else if (line.type === 'done') {
|
||||
total = (line as EntityStreamDoneLine).total;
|
||||
} else if (line.type === 'error') {
|
||||
throw new Error(`[entity.stream] ${line.message}`);
|
||||
}
|
||||
// 'meta' lines are silently consumed
|
||||
(entity) => {
|
||||
onEntity(createEntityObject(entity));
|
||||
}
|
||||
);
|
||||
|
||||
return { total };
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -16,13 +16,13 @@ import type {
|
||||
ServiceDeleteResponse,
|
||||
ServiceDeleteRequest,
|
||||
ServiceDiscoverRequest,
|
||||
ServiceDiscoverResponse,
|
||||
ServiceTestRequest,
|
||||
ServiceTestResponse,
|
||||
ServiceInterface,
|
||||
ServiceDiscoverResponse,
|
||||
} from '../types/service';
|
||||
import { useIntegrationStore } from '@KTXC/stores/integrationStore';
|
||||
import { transceivePost } from './transceive';
|
||||
import { transceivePost, transceiveStream } from './transceive';
|
||||
import { ServiceObject } from '../models/service';
|
||||
|
||||
/**
|
||||
@@ -87,31 +87,32 @@ export const serviceService = {
|
||||
},
|
||||
|
||||
/**
|
||||
* Retrieve discoverable services for a given source selector, sorted by provider
|
||||
* Discover services, streaming results as each provider responds
|
||||
*
|
||||
* @param request - discover request parameters
|
||||
* @param request - discover request parameters
|
||||
* @param onService - called for each discovered service as it arrives
|
||||
*
|
||||
* @returns Promise with array of discovered services sorted by provider
|
||||
* @returns Promise resolving to { total } when the stream completes
|
||||
*/
|
||||
async discover(request: ServiceDiscoverRequest): Promise<ServiceObject[]> {
|
||||
const response = await transceivePost<ServiceDiscoverRequest, ServiceDiscoverResponse>('service.discover', request);
|
||||
|
||||
// Convert discovery results to ServiceObjects
|
||||
const services: ServiceObject[] = [];
|
||||
Object.entries(response).forEach(([providerId, location]) => {
|
||||
const serviceData: ServiceInterface = {
|
||||
'@type': 'mail:service',
|
||||
provider: providerId,
|
||||
identifier: null,
|
||||
label: null,
|
||||
enabled: false,
|
||||
location: location,
|
||||
};
|
||||
services.push(createServiceObject(serviceData));
|
||||
});
|
||||
|
||||
// Sort by provider
|
||||
return services.sort((a, b) => a.provider.localeCompare(b.provider));
|
||||
async discover(
|
||||
request: ServiceDiscoverRequest,
|
||||
onService: (service: ServiceObject) => void
|
||||
): Promise<{ total: number }> {
|
||||
return await transceiveStream<ServiceDiscoverRequest, ServiceDiscoverResponse>(
|
||||
'service.discover',
|
||||
request,
|
||||
(service) => {
|
||||
const serviceData: ServiceInterface = {
|
||||
'@type': 'mail:service',
|
||||
provider: service.provider,
|
||||
identifier: null,
|
||||
label: null,
|
||||
enabled: false,
|
||||
location: service.location,
|
||||
};
|
||||
onService(createServiceObject(serviceData));
|
||||
}
|
||||
);
|
||||
},
|
||||
|
||||
/**
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
*/
|
||||
|
||||
import { createFetchWrapper } from '@KTXC';
|
||||
import type { ApiRequest, ApiResponse } from '../types/common';
|
||||
import type { ApiRequest, ApiResponse, ApiStreamResponse } from '../types/common';
|
||||
|
||||
const fetchWrapper = createFetchWrapper();
|
||||
const API_URL = '/m/mail_manager/v1';
|
||||
@@ -50,24 +50,25 @@ export async function transceivePost<TRequest, TResponse>(
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream an NDJSON API response, calling onLine for each parsed line.
|
||||
* Stream an NDJSON API response, unwrapping data frames for the caller.
|
||||
*
|
||||
* The server emits one JSON object per line with a `type` discriminant
|
||||
* (meta / entity / done / error). The caller interprets each line via onLine.
|
||||
* Throwing inside onLine aborts the stream.
|
||||
* The server emits one JSON object per line with a transport-level `type`
|
||||
* discriminant. This helper consumes control and error frames, forwards only
|
||||
* unwrapped `data` payloads to the caller, and returns the final stream total.
|
||||
*
|
||||
* @param operation - Operation name, e.g. 'entity.stream'
|
||||
* @param data - Operation-specific request data
|
||||
* @param onLine - Synchronous callback invoked for every parsed line.
|
||||
* @param onData - Synchronous callback invoked for every unwrapped data payload.
|
||||
* May throw to abort the stream.
|
||||
* @param user - Optional user identifier override
|
||||
* @returns Promise resolving to the final stream total from the control/end frame
|
||||
*/
|
||||
export async function transceiveStream<TRequest, TLine = any>(
|
||||
export async function transceiveStream<TRequest, TData>(
|
||||
operation: string,
|
||||
data: TRequest,
|
||||
onLine: (line: TLine) => void,
|
||||
onData: (data: TData) => void,
|
||||
user?: string
|
||||
): Promise<void> {
|
||||
): Promise<{ total: number }> {
|
||||
const request: ApiRequest<TRequest> = {
|
||||
version: API_VERSION,
|
||||
transaction: generateTransactionId(),
|
||||
@@ -76,10 +77,12 @@ export async function transceiveStream<TRequest, TLine = any>(
|
||||
user,
|
||||
};
|
||||
|
||||
let total = 0;
|
||||
|
||||
await fetchWrapper.post(API_URL, request, {
|
||||
//headers: { 'Accept': 'application/x-ndjson' },
|
||||
headers: { 'Accept': 'application/json' },
|
||||
onStream: async (response) => {
|
||||
onStream: async (response: Response) => {
|
||||
if (!response.body) {
|
||||
throw new Error(`[${operation}] Response body is not readable`);
|
||||
}
|
||||
@@ -99,17 +102,42 @@ export async function transceiveStream<TRequest, TLine = any>(
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) continue;
|
||||
onLine(JSON.parse(line) as TLine);
|
||||
const message = JSON.parse(line) as ApiStreamResponse<TData>;
|
||||
|
||||
if (message.type === 'control') {
|
||||
if (message.status === 'end') {
|
||||
total = message.total;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (message.type === 'error') {
|
||||
throw new Error(`[${operation}] ${message.message}`);
|
||||
}
|
||||
|
||||
onData(message.data);
|
||||
}
|
||||
}
|
||||
|
||||
// flush any remaining bytes still in the buffer
|
||||
if (buffer.trim()) {
|
||||
onLine(JSON.parse(buffer) as TLine);
|
||||
const message = JSON.parse(buffer) as ApiStreamResponse<TData>;
|
||||
|
||||
if (message.type === 'control') {
|
||||
if (message.status === 'end') {
|
||||
total = message.total;
|
||||
}
|
||||
} else if (message.type === 'error') {
|
||||
throw new Error(`[${operation}] ${message.message}`);
|
||||
} else {
|
||||
onData(message.data);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
return { total };
|
||||
}
|
||||
|
||||
@@ -268,22 +268,29 @@ export const useServicesStore = defineStore('mailServicesStore', () => {
|
||||
* @param secret - optional secret for discovery
|
||||
* @param location - optional location for discovery
|
||||
* @param provider - optional provider identifier for discovery
|
||||
* @param onService - called for each discovered service as it arrives
|
||||
*
|
||||
* @returns Promise with list of discovered service objects
|
||||
* @returns Promise resolving to { total } when the stream completes
|
||||
*/
|
||||
async function discover(
|
||||
identity: string,
|
||||
secret: string | undefined,
|
||||
location: string | undefined,
|
||||
provider: string | undefined,
|
||||
): Promise<ServiceObject[]> {
|
||||
onService?: (service: ServiceObject) => void,
|
||||
): Promise<{ total: number }> {
|
||||
transceiving.value = true
|
||||
|
||||
try {
|
||||
const services = await serviceService.discover({identity, secret, location, provider})
|
||||
const result = await serviceService.discover(
|
||||
{ identity, secret, location, provider },
|
||||
(service: ServiceObject) => {
|
||||
onService?.(service)
|
||||
}
|
||||
)
|
||||
|
||||
console.debug('[Mail Manager][Store] - Successfully discovered', services.length, 'services')
|
||||
return services
|
||||
console.debug('[Mail Manager][Store] - Successfully discovered', result.total, 'services')
|
||||
return result
|
||||
} catch (error: any) {
|
||||
console.error('[Mail Manager][Store] - Failed to discover service:', error)
|
||||
throw error
|
||||
|
||||
@@ -43,6 +43,47 @@ export interface ApiErrorResponse {
|
||||
*/
|
||||
export type ApiResponse<T = any> = ApiSuccessResponse<T> | ApiErrorResponse;
|
||||
|
||||
/**
|
||||
* Stream control start line
|
||||
*/
|
||||
export interface ApiStreamStartResponse {
|
||||
type: 'control';
|
||||
status: 'start';
|
||||
version: number;
|
||||
transaction: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream control end line
|
||||
*/
|
||||
export interface ApiStreamEndResponse {
|
||||
type: 'control';
|
||||
status: 'end';
|
||||
total: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream error line
|
||||
*/
|
||||
export interface ApiStreamErrorResponse {
|
||||
type: 'error';
|
||||
message: string;
|
||||
}
|
||||
|
||||
export interface ApiStreamDataResponse<T = any> {
|
||||
type: 'data';
|
||||
data: T;
|
||||
}
|
||||
|
||||
/**
|
||||
* Shared stream control lines
|
||||
*/
|
||||
export type ApiStreamResponse<T = any> =
|
||||
| ApiStreamStartResponse
|
||||
| ApiStreamEndResponse
|
||||
| ApiStreamErrorResponse
|
||||
| ApiStreamDataResponse<T>;
|
||||
|
||||
/**
|
||||
* Selector for targeting specific providers, services, collections, or entities in list or extant operations.
|
||||
*
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
/**
|
||||
* Entity type definitions
|
||||
*/
|
||||
import type { SourceSelector, ListFilter, ListSort, ListRange } from './common';
|
||||
import type {
|
||||
ListFilter,
|
||||
ListRange,
|
||||
ListSort,
|
||||
SourceSelector,
|
||||
} from './common';
|
||||
import type { MessageInterface } from './message';
|
||||
|
||||
/**
|
||||
@@ -169,29 +174,4 @@ export interface EntityStreamRequest {
|
||||
range?: ListRange;
|
||||
}
|
||||
|
||||
export interface EntityStreamMetaLine {
|
||||
type: 'meta';
|
||||
version: number;
|
||||
transaction: string;
|
||||
status: 'success';
|
||||
}
|
||||
|
||||
export interface EntityStreamEntityLine extends EntityInterface<MessageInterface> {
|
||||
type: 'entity';
|
||||
}
|
||||
|
||||
export interface EntityStreamDoneLine {
|
||||
type: 'done';
|
||||
total: number;
|
||||
}
|
||||
|
||||
export interface EntityStreamErrorLine {
|
||||
type: 'error';
|
||||
message: string;
|
||||
}
|
||||
|
||||
export type EntityStreamLine =
|
||||
| EntityStreamMetaLine
|
||||
| EntityStreamEntityLine
|
||||
| EntityStreamDoneLine
|
||||
| EntityStreamErrorLine;
|
||||
export interface EntityStreamResponse extends EntityInterface<MessageInterface> {}
|
||||
@@ -1,7 +1,10 @@
|
||||
/**
|
||||
* Service type definitions
|
||||
*/
|
||||
import type { SourceSelector, ListFilterComparisonOperator } from './common';
|
||||
import type {
|
||||
ListFilterComparisonOperator,
|
||||
SourceSelector,
|
||||
} from './common';
|
||||
|
||||
/**
|
||||
* Service capabilities
|
||||
@@ -130,7 +133,8 @@ export interface ServiceDiscoverRequest {
|
||||
}
|
||||
|
||||
export interface ServiceDiscoverResponse {
|
||||
[provider: string]: ServiceLocation; // Uses existing ServiceLocation discriminated union
|
||||
provider: string;
|
||||
location: ServiceLocation;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user