Files
mail_manager/src/services/transceive.ts
Sebastian Krupinski ad0a20613e
All checks were successful
JS Unit Tests / test (pull_request) Successful in 50s
Build Test / test (pull_request) Successful in 54s
PHP Unit Tests / test (pull_request) Successful in 57s
refactor: module federation
Signed-off-by: Sebastian Krupinski <krupinski01@gmail.com>
2026-02-22 16:54:00 -05:00

116 lines
3.4 KiB
TypeScript

/**
* API Client for Mail Manager
* Provides a centralized way to make API calls with envelope wrapping/unwrapping
*/
import { createFetchWrapper } from '@KTXC';
import type { ApiRequest, ApiResponse } from '../types/common';
const fetchWrapper = createFetchWrapper();
const API_URL = '/m/mail_manager/v1';
const API_VERSION = 1;
/**
* Generate a unique transaction ID
*/
export function generateTransactionId(): string {
return `${Date.now()}-${Math.random().toString(36).substring(2, 9)}`;
}
/**
* Make an API call with automatic envelope wrapping and unwrapping
*
* @param operation - Operation name (e.g., 'provider.list', 'service.autodiscover')
* @param data - Operation-specific request data
* @param user - Optional user identifier override
* @returns Promise with unwrapped response data
* @throws Error if the API returns an error status
*/
export async function transceivePost<TRequest, TResponse>(
operation: string,
data: TRequest,
user?: string
): Promise<TResponse> {
const request: ApiRequest<TRequest> = {
version: API_VERSION,
transaction: generateTransactionId(),
operation,
data,
user
};
const response: ApiResponse<TResponse> = await fetchWrapper.post(API_URL, request);
if (response.status === 'error') {
const errorMessage = `[${operation}] ${response.data.message}${response.data.code ? ` (code: ${response.data.code})` : ''}`;
throw new Error(errorMessage);
}
return response.data;
}
/**
* Stream an NDJSON API response, calling onLine for each parsed line.
*
* The server emits one JSON object per line with a `type` discriminant
* (meta / entity / done / error). The caller interprets each line via onLine.
* Throwing inside onLine aborts the stream.
*
* @param operation - Operation name, e.g. 'entity.stream'
* @param data - Operation-specific request data
* @param onLine - Synchronous callback invoked for every parsed line.
* May throw to abort the stream.
* @param user - Optional user identifier override
*/
export async function transceiveStream<TRequest, TLine = any>(
operation: string,
data: TRequest,
onLine: (line: TLine) => void,
user?: string
): Promise<void> {
const request: ApiRequest<TRequest> = {
version: API_VERSION,
transaction: generateTransactionId(),
operation,
data,
user,
};
await fetchWrapper.post(API_URL, request, {
//headers: { 'Accept': 'application/x-ndjson' },
headers: { 'Accept': 'application/json' },
onStream: async (response) => {
if (!response.body) {
throw new Error(`[${operation}] Response body is not readable`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop()!; // retain any incomplete trailing chunk
for (const line of lines) {
if (!line.trim()) continue;
onLine(JSON.parse(line) as TLine);
}
}
// flush any remaining bytes still in the buffer
if (buffer.trim()) {
onLine(JSON.parse(buffer) as TLine);
}
} finally {
reader.releaseLock();
}
},
});
}