Merge pull request 'feat/entity-streaming' (#7) from feat/entity-streaming into main
Some checks failed
Renovate / renovate (push) Failing after 1m47s
Some checks failed
Renovate / renovate (push) Failing after 1m47s
Reviewed-on: #7
This commit was merged in pull request #7.
This commit is contained in:
@@ -1,428 +0,0 @@
|
|||||||
# Mail Manager - Interface Relationships
|
|
||||||
|
|
||||||
This document visualizes all the interfaces in the mail_manager module and their relationships.
|
|
||||||
|
|
||||||
## Overview
|
|
||||||
|
|
||||||
The mail manager uses a hierarchical structure where interfaces are organized by their domain responsibilities:
|
|
||||||
- **Common Types**: Base types and selectors
|
|
||||||
- **Providers**: Mail service providers (Gmail, IMAP, etc.)
|
|
||||||
- **Services**: Individual mail accounts/services
|
|
||||||
- **Collections**: Mailboxes and folders
|
|
||||||
- **Messages**: Email messages and their parts
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Complete Interface Diagram
|
|
||||||
|
|
||||||
```mermaid
|
|
||||||
classDiagram
|
|
||||||
%% Common/Base Types
|
|
||||||
class SourceSelector {
|
|
||||||
+string provider
|
|
||||||
+string service
|
|
||||||
+string collection
|
|
||||||
+string message
|
|
||||||
}
|
|
||||||
|
|
||||||
class ApiRequest~T~ {
|
|
||||||
+T data
|
|
||||||
}
|
|
||||||
|
|
||||||
class ApiResponse~T~ {
|
|
||||||
+T data
|
|
||||||
+Error error
|
|
||||||
}
|
|
||||||
|
|
||||||
class ListRange {
|
|
||||||
+number offset
|
|
||||||
+number limit
|
|
||||||
}
|
|
||||||
|
|
||||||
%% Provider Interfaces
|
|
||||||
class ProviderInterface {
|
|
||||||
+string @type
|
|
||||||
+string identifier
|
|
||||||
+string label
|
|
||||||
+ProviderCapabilitiesInterface capabilities
|
|
||||||
}
|
|
||||||
|
|
||||||
class ProviderCapabilitiesInterface {
|
|
||||||
+boolean ServiceList
|
|
||||||
+boolean ServiceFetch
|
|
||||||
+boolean ServiceExtant
|
|
||||||
+boolean ServiceCreate
|
|
||||||
+boolean ServiceModify
|
|
||||||
+boolean ServiceDestroy
|
|
||||||
+boolean ServiceDiscover
|
|
||||||
+boolean ServiceTest
|
|
||||||
}
|
|
||||||
|
|
||||||
class ProviderListRequest {
|
|
||||||
+SourceSelector sources
|
|
||||||
}
|
|
||||||
|
|
||||||
class ProviderListResponse {
|
|
||||||
+ProviderInterface[identifier] providers
|
|
||||||
}
|
|
||||||
|
|
||||||
class ProviderFetchRequest {
|
|
||||||
+string identifier
|
|
||||||
}
|
|
||||||
|
|
||||||
class ProviderFetchResponse {
|
|
||||||
<<extends ProviderInterface>>
|
|
||||||
}
|
|
||||||
|
|
||||||
class ProviderExtantRequest {
|
|
||||||
+SourceSelector sources
|
|
||||||
}
|
|
||||||
|
|
||||||
class ProviderExtantResponse {
|
|
||||||
+boolean[identifier] exists
|
|
||||||
}
|
|
||||||
|
|
||||||
%% Service Interfaces
|
|
||||||
class ServiceInterface {
|
|
||||||
+string @type
|
|
||||||
+string identifier
|
|
||||||
+string provider
|
|
||||||
+string label
|
|
||||||
+ServiceCapabilitiesInterface capabilities
|
|
||||||
+object configuration
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceCapabilitiesInterface {
|
|
||||||
+boolean CollectionList
|
|
||||||
+boolean CollectionFetch
|
|
||||||
+boolean CollectionExtant
|
|
||||||
+boolean CollectionCreate
|
|
||||||
+boolean CollectionModify
|
|
||||||
+boolean CollectionDestroy
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceListRequest {
|
|
||||||
+SourceSelector sources
|
|
||||||
+ListRange range
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceListResponse {
|
|
||||||
+ServiceInterface[identifier] services
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceFetchRequest {
|
|
||||||
+string provider
|
|
||||||
+string identifier
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceFetchResponse {
|
|
||||||
<<extends ServiceInterface>>
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceExtantRequest {
|
|
||||||
+SourceSelector sources
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceExtantResponse {
|
|
||||||
+boolean[identifier] exists
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceCreateRequest {
|
|
||||||
+string provider
|
|
||||||
+string label
|
|
||||||
+object configuration
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceCreateResponse {
|
|
||||||
<<extends ServiceInterface>>
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceModifyRequest {
|
|
||||||
+string provider
|
|
||||||
+string identifier
|
|
||||||
+string label
|
|
||||||
+object configuration
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceModifyResponse {
|
|
||||||
<<extends ServiceInterface>>
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceDestroyRequest {
|
|
||||||
+string provider
|
|
||||||
+string identifier
|
|
||||||
}
|
|
||||||
|
|
||||||
class ServiceDestroyResponse {
|
|
||||||
+boolean success
|
|
||||||
}
|
|
||||||
|
|
||||||
%% Collection Interfaces
|
|
||||||
class CollectionInterface {
|
|
||||||
+string @type
|
|
||||||
+string identifier
|
|
||||||
+string service
|
|
||||||
+string provider
|
|
||||||
+string label
|
|
||||||
+CollectionCapabilitiesInterface capabilities
|
|
||||||
+string[] flags
|
|
||||||
+number messageCount
|
|
||||||
}
|
|
||||||
|
|
||||||
class CollectionCapabilitiesInterface {
|
|
||||||
+boolean MessageList
|
|
||||||
+boolean MessageFetch
|
|
||||||
+boolean MessageExtant
|
|
||||||
+boolean MessageCreate
|
|
||||||
+boolean MessageModify
|
|
||||||
+boolean MessageDestroy
|
|
||||||
}
|
|
||||||
|
|
||||||
class CollectionListRequest {
|
|
||||||
+SourceSelector sources
|
|
||||||
+ListRange range
|
|
||||||
}
|
|
||||||
|
|
||||||
class CollectionListResponse {
|
|
||||||
+CollectionInterface[identifier] collections
|
|
||||||
}
|
|
||||||
|
|
||||||
class CollectionFetchRequest {
|
|
||||||
+string provider
|
|
||||||
+string service
|
|
||||||
+string identifier
|
|
||||||
}
|
|
||||||
|
|
||||||
class CollectionFetchResponse {
|
|
||||||
<<extends CollectionInterface>>
|
|
||||||
}
|
|
||||||
|
|
||||||
%% Message Interfaces
|
|
||||||
class MessageInterface {
|
|
||||||
+string @type
|
|
||||||
+string identifier
|
|
||||||
+string collection
|
|
||||||
+string service
|
|
||||||
+string provider
|
|
||||||
+string[] flags
|
|
||||||
+Date receivedDate
|
|
||||||
+Date internalDate
|
|
||||||
+MessageHeadersInterface headers
|
|
||||||
+MessagePartInterface[] parts
|
|
||||||
}
|
|
||||||
|
|
||||||
class MessageHeadersInterface {
|
|
||||||
+string from
|
|
||||||
+string[] to
|
|
||||||
+string[] cc
|
|
||||||
+string[] bcc
|
|
||||||
+string subject
|
|
||||||
+string messageId
|
|
||||||
+string[] references
|
|
||||||
+string inReplyTo
|
|
||||||
+Date date
|
|
||||||
}
|
|
||||||
|
|
||||||
class MessagePartInterface {
|
|
||||||
+string partId
|
|
||||||
+string mimeType
|
|
||||||
+string filename
|
|
||||||
+number size
|
|
||||||
+MessagePartInterface[] subParts
|
|
||||||
+object headers
|
|
||||||
+string body
|
|
||||||
}
|
|
||||||
|
|
||||||
class MessageListRequest {
|
|
||||||
+SourceSelector sources
|
|
||||||
+ListRange range
|
|
||||||
+string[] flags
|
|
||||||
}
|
|
||||||
|
|
||||||
class MessageListResponse {
|
|
||||||
+MessageInterface[identifier] messages
|
|
||||||
}
|
|
||||||
|
|
||||||
class MessageFetchRequest {
|
|
||||||
+string provider
|
|
||||||
+string service
|
|
||||||
+string collection
|
|
||||||
+string identifier
|
|
||||||
}
|
|
||||||
|
|
||||||
class MessageFetchResponse {
|
|
||||||
<<extends MessageInterface>>
|
|
||||||
}
|
|
||||||
|
|
||||||
%% Relationships
|
|
||||||
ProviderInterface --> ProviderCapabilitiesInterface
|
|
||||||
ProviderFetchResponse --|> ProviderInterface
|
|
||||||
ProviderListResponse --> ProviderInterface
|
|
||||||
|
|
||||||
ServiceInterface --> ServiceCapabilitiesInterface
|
|
||||||
ServiceFetchResponse --|> ServiceInterface
|
|
||||||
ServiceCreateResponse --|> ServiceInterface
|
|
||||||
ServiceModifyResponse --|> ServiceInterface
|
|
||||||
ServiceListResponse --> ServiceInterface
|
|
||||||
|
|
||||||
CollectionInterface --> CollectionCapabilitiesInterface
|
|
||||||
CollectionFetchResponse --|> CollectionInterface
|
|
||||||
CollectionListResponse --> CollectionInterface
|
|
||||||
|
|
||||||
MessageInterface --> MessageHeadersInterface
|
|
||||||
MessageInterface --> MessagePartInterface
|
|
||||||
MessagePartInterface --> MessagePartInterface : subParts
|
|
||||||
MessageFetchResponse --|> MessageInterface
|
|
||||||
MessageListResponse --> MessageInterface
|
|
||||||
|
|
||||||
%% Selector Usage
|
|
||||||
ProviderListRequest --> SourceSelector
|
|
||||||
ProviderExtantRequest --> SourceSelector
|
|
||||||
ServiceListRequest --> SourceSelector
|
|
||||||
ServiceExtantRequest --> SourceSelector
|
|
||||||
CollectionListRequest --> SourceSelector
|
|
||||||
MessageListRequest --> SourceSelector
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Hierarchical Structure
|
|
||||||
|
|
||||||
```mermaid
|
|
||||||
graph TD
|
|
||||||
A[SourceSelector] --> B[Provider Level]
|
|
||||||
B --> C[Service Level]
|
|
||||||
C --> D[Collection Level]
|
|
||||||
D --> E[Message Level]
|
|
||||||
|
|
||||||
B --> B1[ProviderInterface]
|
|
||||||
B --> B2[ProviderCapabilities]
|
|
||||||
|
|
||||||
C --> C1[ServiceInterface]
|
|
||||||
C --> C2[ServiceCapabilities]
|
|
||||||
|
|
||||||
D --> D1[CollectionInterface]
|
|
||||||
D --> D2[CollectionCapabilities]
|
|
||||||
|
|
||||||
E --> E1[MessageInterface]
|
|
||||||
E --> E2[MessageHeaders]
|
|
||||||
E --> E3[MessagePart]
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Request/Response Pattern
|
|
||||||
|
|
||||||
All operations follow a consistent request/response pattern:
|
|
||||||
|
|
||||||
```mermaid
|
|
||||||
sequenceDiagram
|
|
||||||
participant Client
|
|
||||||
participant API
|
|
||||||
participant Provider
|
|
||||||
|
|
||||||
Client->>API: {Operation}Request
|
|
||||||
API->>Provider: Process Request
|
|
||||||
Provider->>API: Data
|
|
||||||
API->>Client: {Operation}Response
|
|
||||||
```
|
|
||||||
|
|
||||||
### Operations by Level:
|
|
||||||
|
|
||||||
**Provider Level:**
|
|
||||||
- List, Fetch, Extant
|
|
||||||
|
|
||||||
**Service Level:**
|
|
||||||
- List, Fetch, Extant, Create, Modify, Destroy
|
|
||||||
|
|
||||||
**Collection Level:**
|
|
||||||
- List, Fetch, Extant, Create, Modify, Destroy
|
|
||||||
|
|
||||||
**Message Level:**
|
|
||||||
- List, Fetch, Extant, Create, Modify, Destroy
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Capability Inheritance
|
|
||||||
|
|
||||||
```mermaid
|
|
||||||
graph LR
|
|
||||||
A[ProviderCapabilities] -->|enables| B[ServiceCapabilities]
|
|
||||||
B -->|enables| C[CollectionCapabilities]
|
|
||||||
C -->|enables| D[Message Operations]
|
|
||||||
```
|
|
||||||
|
|
||||||
Capabilities cascade down the hierarchy - if a provider doesn't support `ServiceList`, then no services can be listed for that provider.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Key Patterns
|
|
||||||
|
|
||||||
### 1. **Extends Pattern**
|
|
||||||
Response interfaces extend their base interface:
|
|
||||||
- `ProviderFetchResponse extends ProviderInterface`
|
|
||||||
- `ServiceFetchResponse extends ServiceInterface`
|
|
||||||
|
|
||||||
### 2. **Dictionary Pattern**
|
|
||||||
List responses use identifier as key:
|
|
||||||
```typescript
|
|
||||||
{
|
|
||||||
[identifier: string]: Interface
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 3. **SourceSelector Pattern**
|
|
||||||
Resources are selected hierarchically:
|
|
||||||
```typescript
|
|
||||||
{
|
|
||||||
provider: "gmail",
|
|
||||||
service: "user@example.com",
|
|
||||||
collection: "INBOX",
|
|
||||||
message: "msg123"
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### 4. **Recursive Structure**
|
|
||||||
MessagePart can contain subParts:
|
|
||||||
```typescript
|
|
||||||
MessagePartInterface {
|
|
||||||
subParts?: MessagePartInterface[]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Usage Examples
|
|
||||||
|
|
||||||
### Selecting a specific message:
|
|
||||||
```typescript
|
|
||||||
const selector: SourceSelector = {
|
|
||||||
provider: "gmail",
|
|
||||||
service: "user@example.com",
|
|
||||||
collection: "INBOX",
|
|
||||||
message: "12345"
|
|
||||||
};
|
|
||||||
```
|
|
||||||
|
|
||||||
### Listing all services for a provider:
|
|
||||||
```typescript
|
|
||||||
const request: ServiceListRequest = {
|
|
||||||
sources: {
|
|
||||||
provider: "gmail"
|
|
||||||
},
|
|
||||||
range: {
|
|
||||||
offset: 0,
|
|
||||||
limit: 50
|
|
||||||
}
|
|
||||||
};
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Interface Files
|
|
||||||
|
|
||||||
- `common.ts` - Base types and selectors
|
|
||||||
- `provider.ts` - Provider-level interfaces
|
|
||||||
- `service.ts` - Service-level interfaces
|
|
||||||
- `collection.ts` - Collection-level interfaces
|
|
||||||
- `message.ts` - Message-level interfaces
|
|
||||||
@@ -11,6 +11,8 @@ namespace KTXM\MailManager\Controllers;
|
|||||||
|
|
||||||
use InvalidArgumentException;
|
use InvalidArgumentException;
|
||||||
use KTXC\Http\Response\JsonResponse;
|
use KTXC\Http\Response\JsonResponse;
|
||||||
|
use KTXC\Http\Response\Response;
|
||||||
|
use KTXC\Http\Response\StreamedNdJsonResponse;
|
||||||
use KTXC\SessionIdentity;
|
use KTXC\SessionIdentity;
|
||||||
use KTXC\SessionTenant;
|
use KTXC\SessionTenant;
|
||||||
use KTXF\Controller\ControllerAbstract;
|
use KTXF\Controller\ControllerAbstract;
|
||||||
@@ -44,6 +46,8 @@ class DefaultController extends ControllerAbstract {
|
|||||||
private const ERR_INVALID_IDENTIFIERS = 'Invalid parameter: identifiers must be an array';
|
private const ERR_INVALID_IDENTIFIERS = 'Invalid parameter: identifiers must be an array';
|
||||||
private const ERR_INVALID_DATA = 'Invalid parameter: data must be an array';
|
private const ERR_INVALID_DATA = 'Invalid parameter: data must be an array';
|
||||||
|
|
||||||
|
private const STREAM_FLUSH_INTERVAL = 1;
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
private readonly SessionTenant $tenantIdentity,
|
private readonly SessionTenant $tenantIdentity,
|
||||||
private readonly SessionIdentity $userIdentity,
|
private readonly SessionIdentity $userIdentity,
|
||||||
@@ -62,7 +66,7 @@ class DefaultController extends ControllerAbstract {
|
|||||||
* "data": {...}
|
* "data": {...}
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* @return JsonResponse
|
* @return Response
|
||||||
*/
|
*/
|
||||||
#[AuthenticatedRoute('/v1', name: 'mail.manager.v1', methods: ['POST'])]
|
#[AuthenticatedRoute('/v1', name: 'mail.manager.v1', methods: ['POST'])]
|
||||||
public function index(
|
public function index(
|
||||||
@@ -71,7 +75,7 @@ class DefaultController extends ControllerAbstract {
|
|||||||
string|null $operation = null,
|
string|null $operation = null,
|
||||||
array|null $data = null,
|
array|null $data = null,
|
||||||
string|null $user = null
|
string|null $user = null
|
||||||
): JsonResponse {
|
): Response {
|
||||||
|
|
||||||
// authorize request
|
// authorize request
|
||||||
$tenantId = $this->tenantIdentity->identifier();
|
$tenantId = $this->tenantIdentity->identifier();
|
||||||
@@ -80,7 +84,12 @@ class DefaultController extends ControllerAbstract {
|
|||||||
try {
|
try {
|
||||||
|
|
||||||
if ($operation !== null) {
|
if ($operation !== null) {
|
||||||
$result = $this->processOperation($tenantId, $userId, $operation, $data ?? [], []);
|
$result = $this->processOperation($tenantId, $userId, $operation, $data ?? [], $version, $transaction);
|
||||||
|
|
||||||
|
if ($result instanceof Response) {
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
|
||||||
return new JsonResponse([
|
return new JsonResponse([
|
||||||
'version' => $version,
|
'version' => $version,
|
||||||
'transaction' => $transaction,
|
'transaction' => $transaction,
|
||||||
@@ -110,7 +119,7 @@ class DefaultController extends ControllerAbstract {
|
|||||||
/**
|
/**
|
||||||
* Process a single operation
|
* Process a single operation
|
||||||
*/
|
*/
|
||||||
private function processOperation(string $tenantId, string $userId, string $operation, array $data): mixed {
|
private function processOperation(string $tenantId, string $userId, string $operation, array $data, int $version = 1, string $transaction = ''): mixed {
|
||||||
return match ($operation) {
|
return match ($operation) {
|
||||||
// Provider operations
|
// Provider operations
|
||||||
'provider.list' => $this->providerList($tenantId, $userId, $data),
|
'provider.list' => $this->providerList($tenantId, $userId, $data),
|
||||||
@@ -144,6 +153,7 @@ class DefaultController extends ControllerAbstract {
|
|||||||
'entity.create' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
|
'entity.create' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
|
||||||
'entity.update' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
|
'entity.update' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
|
||||||
'entity.delete' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
|
'entity.delete' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
|
||||||
|
'entity.stream' => $this->entityStream($tenantId, $userId, $data, $version, $transaction),
|
||||||
'entity.delta' => $this->entityDelta($tenantId, $userId, $data),
|
'entity.delta' => $this->entityDelta($tenantId, $userId, $data),
|
||||||
'entity.move' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
|
'entity.move' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
|
||||||
'entity.copy' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
|
'entity.copy' => throw new InvalidArgumentException('Operation not implemented: ' . $operation),
|
||||||
@@ -617,4 +627,48 @@ class DefaultController extends ControllerAbstract {
|
|||||||
return ['jobId' => $jobId];
|
return ['jobId' => $jobId];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ==================== Entity Stream ====================
|
||||||
|
|
||||||
|
private function entityStream(string $tenantId, string $userId, array $data, int $version, string $transaction): StreamedNdJsonResponse {
|
||||||
|
if (!isset($data['sources'])) {
|
||||||
|
throw new InvalidArgumentException(self::ERR_MISSING_SOURCES);
|
||||||
|
}
|
||||||
|
if (!is_array($data['sources'])) {
|
||||||
|
throw new InvalidArgumentException(self::ERR_INVALID_SOURCES);
|
||||||
|
}
|
||||||
|
|
||||||
|
$sources = new SourceSelector();
|
||||||
|
$sources->jsonDeserialize($data['sources']);
|
||||||
|
|
||||||
|
$filter = $data['filter'] ?? null;
|
||||||
|
$sort = $data['sort'] ?? null;
|
||||||
|
$range = $data['range'] ?? null;
|
||||||
|
|
||||||
|
$entityGenerator = $this->mailManager->entityStream($tenantId, $userId, $sources, $filter, $sort, $range);
|
||||||
|
$logger = $this->logger;
|
||||||
|
|
||||||
|
$lines = (function () use ($entityGenerator, $version, $transaction, $logger): \Generator {
|
||||||
|
yield ['type' => 'meta', 'version' => $version, 'transaction' => $transaction, 'status' => 'success'];
|
||||||
|
|
||||||
|
$total = 0;
|
||||||
|
try {
|
||||||
|
foreach ($entityGenerator as $entity) {
|
||||||
|
$entityData = is_array($entity)
|
||||||
|
? $entity
|
||||||
|
: (method_exists($entity, 'jsonSerialize') ? $entity->jsonSerialize() : (array) $entity);
|
||||||
|
yield ['type' => 'entity'] + $entityData;
|
||||||
|
$total++;
|
||||||
|
}
|
||||||
|
} catch (\Throwable $t) {
|
||||||
|
$logger->error('Error streaming entities', ['exception' => $t]);
|
||||||
|
yield ['type' => 'error', 'message' => $t->getMessage()];
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
yield ['type' => 'done', 'total' => $total];
|
||||||
|
})();
|
||||||
|
|
||||||
|
return new StreamedNdJsonResponse($lines, self::STREAM_FLUSH_INTERVAL, 200, ['Content-Type' => 'application/json']);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -764,6 +764,83 @@ class Manager {
|
|||||||
return $responseData;
|
return $responseData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream entities
|
||||||
|
*
|
||||||
|
* @since 2026.02.01
|
||||||
|
*
|
||||||
|
* @param string $tenantId Tenant identifier
|
||||||
|
* @param string $userId User identifier
|
||||||
|
* @param SourceSelector $sources Message sources with collection identifiers
|
||||||
|
* @param array|null $filter Message filter
|
||||||
|
* @param array|null $sort Message sort
|
||||||
|
* @param array|null $range Message range/pagination
|
||||||
|
*
|
||||||
|
* @return \Generator<EntityBaseInterface> Yields each entity as it is retrieved
|
||||||
|
*/
|
||||||
|
public function entityStream(string $tenantId, string $userId, SourceSelector $sources, array|null $filter = null, array|null $sort = null, array|null $range = null): \Generator {
|
||||||
|
// retrieve providers
|
||||||
|
$providers = $this->providerList($tenantId, $userId, $sources);
|
||||||
|
// retrieve services for each provider
|
||||||
|
foreach ($providers as $provider) {
|
||||||
|
$serviceSelector = $sources[$provider->identifier()];
|
||||||
|
$servicesSelected = $provider->serviceList($tenantId, $userId, $serviceSelector->identifiers());
|
||||||
|
/** @var ServiceBaseInterface $service */
|
||||||
|
foreach ($servicesSelected as $service) {
|
||||||
|
// retrieve collections for each service
|
||||||
|
$collectionSelector = $serviceSelector[$service->identifier()];
|
||||||
|
$collectionSelected = $collectionSelector instanceof CollectionSelector ? $collectionSelector->identifiers() : [];
|
||||||
|
if ($collectionSelected === []) {
|
||||||
|
$collections = $service->collectionList('');
|
||||||
|
$collectionSelected = array_map(
|
||||||
|
fn($collection) => $collection->identifier(),
|
||||||
|
$collections
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if ($collectionSelected === []) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// construct filter for entities
|
||||||
|
$entityFilter = null;
|
||||||
|
if ($filter !== null && $filter !== []) {
|
||||||
|
$entityFilter = $service->entityListFilter();
|
||||||
|
foreach ($filter as $attribute => $value) {
|
||||||
|
$entityFilter->condition($attribute, $value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// construct sort for entities
|
||||||
|
$entitySort = null;
|
||||||
|
if ($sort !== null && $sort !== []) {
|
||||||
|
$entitySort = $service->entityListSort();
|
||||||
|
foreach ($sort as $attribute => $direction) {
|
||||||
|
$entitySort->condition($attribute, $direction);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// construct range for entities
|
||||||
|
$entityRange = null;
|
||||||
|
if ($range !== null && $range !== [] && isset($range['type'])) {
|
||||||
|
$entityRange = $service->entityListRange(RangeType::from($range['type']));
|
||||||
|
if ($entityRange->type() === RangeType::TALLY) {
|
||||||
|
/** @var IRangeTally $entityRange */
|
||||||
|
if (isset($range['anchor'])) {
|
||||||
|
$entityRange->setAnchor(RangeAnchorType::from($range['anchor']));
|
||||||
|
}
|
||||||
|
if (isset($range['position'])) {
|
||||||
|
$entityRange->setPosition($range['position']);
|
||||||
|
}
|
||||||
|
if (isset($range['tally'])) {
|
||||||
|
$entityRange->setTally($range['tally']);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// yield entities for each collection individually
|
||||||
|
foreach ($collectionSelected as $collectionId) {
|
||||||
|
yield from $service->entityListStream($collectionId, $entityFilter, $entitySort, $entityRange, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch specific messages
|
* Fetch specific messages
|
||||||
*
|
*
|
||||||
|
|||||||
15
src/main.ts
15
src/main.ts
@@ -26,6 +26,21 @@ export { routes, integrations }
|
|||||||
// Export stores for external use if needed
|
// Export stores for external use if needed
|
||||||
export { useCollectionsStore, useEntitiesStore, useProvidersStore, useServicesStore }
|
export { useCollectionsStore, useEntitiesStore, useProvidersStore, useServicesStore }
|
||||||
|
|
||||||
|
// Export composables
|
||||||
|
export { useMailSync } from '@/composables/useMailSync'
|
||||||
|
|
||||||
|
// Export services
|
||||||
|
export { providerService, serviceService, collectionService, entityService } from '@/services'
|
||||||
|
|
||||||
|
// Export models
|
||||||
|
export { CollectionObject, CollectionPropertiesObject } from '@/models/collection'
|
||||||
|
export { MessageObject } from '@/models/message'
|
||||||
|
export { EntityObject } from '@/models/entity'
|
||||||
|
|
||||||
|
// Export components
|
||||||
|
export { default as AddAccountDialog } from '@/components/AddAccountDialog.vue'
|
||||||
|
export { default as EditAccountDialog } from '@/components/EditAccountDialog.vue'
|
||||||
|
|
||||||
// Default export for Vue plugin installation
|
// Default export for Vue plugin installation
|
||||||
export default {
|
export default {
|
||||||
install(app: Vue) {
|
install(app: Vue) {
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
* Entity management service
|
* Entity management service
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { transceivePost } from './transceive';
|
import { transceivePost, transceiveStream } from './transceive';
|
||||||
import type {
|
import type {
|
||||||
EntityListRequest,
|
EntityListRequest,
|
||||||
EntityListResponse,
|
EntityListResponse,
|
||||||
@@ -21,6 +21,10 @@ import type {
|
|||||||
EntityTransmitRequest,
|
EntityTransmitRequest,
|
||||||
EntityTransmitResponse,
|
EntityTransmitResponse,
|
||||||
EntityInterface,
|
EntityInterface,
|
||||||
|
EntityStreamRequest,
|
||||||
|
EntityStreamLine,
|
||||||
|
EntityStreamEntityLine,
|
||||||
|
EntityStreamDoneLine,
|
||||||
} from '../types/entity';
|
} from '../types/entity';
|
||||||
import { useIntegrationStore } from '@KTXC/stores/integrationStore';
|
import { useIntegrationStore } from '@KTXC/stores/integrationStore';
|
||||||
import { EntityObject } from '../models';
|
import { EntityObject } from '../models';
|
||||||
@@ -157,6 +161,41 @@ export const entityService = {
|
|||||||
async transmit(request: EntityTransmitRequest): Promise<EntityTransmitResponse> {
|
async transmit(request: EntityTransmitRequest): Promise<EntityTransmitResponse> {
|
||||||
return await transceivePost<EntityTransmitRequest, EntityTransmitResponse>('entity.transmit', request);
|
return await transceivePost<EntityTransmitRequest, EntityTransmitResponse>('entity.transmit', request);
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream entities as NDJSON, invoking onEntity for each entity as it arrives.
|
||||||
|
*
|
||||||
|
* The server emits one entity per line so the caller receives entities
|
||||||
|
* progressively rather than waiting for the full collection to load.
|
||||||
|
*
|
||||||
|
* @param request - stream request parameters (same shape as list)
|
||||||
|
* @param onEntity - called synchronously for each entity as it is received
|
||||||
|
*
|
||||||
|
* @returns Promise resolving to { total } when the stream completes
|
||||||
|
*/
|
||||||
|
async stream(
|
||||||
|
request: EntityStreamRequest,
|
||||||
|
onEntity: (entity: EntityObject) => void
|
||||||
|
): Promise<{ total: number }> {
|
||||||
|
let total = 0;
|
||||||
|
|
||||||
|
await transceiveStream<EntityStreamRequest, EntityStreamLine>(
|
||||||
|
'entity.stream',
|
||||||
|
request,
|
||||||
|
(line) => {
|
||||||
|
if (line.type === 'entity') {
|
||||||
|
onEntity(createEntityObject(line as EntityStreamEntityLine));
|
||||||
|
} else if (line.type === 'done') {
|
||||||
|
total = (line as EntityStreamDoneLine).total;
|
||||||
|
} else if (line.type === 'error') {
|
||||||
|
throw new Error(`[entity.stream] ${line.message}`);
|
||||||
|
}
|
||||||
|
// 'meta' lines are silently consumed
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return { total };
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
export default entityService;
|
export default entityService;
|
||||||
|
|||||||
@@ -48,3 +48,68 @@ export async function transceivePost<TRequest, TResponse>(
|
|||||||
|
|
||||||
return response.data;
|
return response.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream an NDJSON API response, calling onLine for each parsed line.
|
||||||
|
*
|
||||||
|
* The server emits one JSON object per line with a `type` discriminant
|
||||||
|
* (meta / entity / done / error). The caller interprets each line via onLine.
|
||||||
|
* Throwing inside onLine aborts the stream.
|
||||||
|
*
|
||||||
|
* @param operation - Operation name, e.g. 'entity.stream'
|
||||||
|
* @param data - Operation-specific request data
|
||||||
|
* @param onLine - Synchronous callback invoked for every parsed line.
|
||||||
|
* May throw to abort the stream.
|
||||||
|
* @param user - Optional user identifier override
|
||||||
|
*/
|
||||||
|
export async function transceiveStream<TRequest, TLine = any>(
|
||||||
|
operation: string,
|
||||||
|
data: TRequest,
|
||||||
|
onLine: (line: TLine) => void,
|
||||||
|
user?: string
|
||||||
|
): Promise<void> {
|
||||||
|
const request: ApiRequest<TRequest> = {
|
||||||
|
version: API_VERSION,
|
||||||
|
transaction: generateTransactionId(),
|
||||||
|
operation,
|
||||||
|
data,
|
||||||
|
user,
|
||||||
|
};
|
||||||
|
|
||||||
|
await fetchWrapper.post(API_URL, request, {
|
||||||
|
//headers: { 'Accept': 'application/x-ndjson' },
|
||||||
|
headers: { 'Accept': 'application/json' },
|
||||||
|
onStream: async (response) => {
|
||||||
|
if (!response.body) {
|
||||||
|
throw new Error(`[${operation}] Response body is not readable`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const reader = response.body.getReader();
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
let buffer = '';
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
|
||||||
|
buffer += decoder.decode(value, { stream: true });
|
||||||
|
const lines = buffer.split('\n');
|
||||||
|
buffer = lines.pop()!; // retain any incomplete trailing chunk
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
if (!line.trim()) continue;
|
||||||
|
onLine(JSON.parse(line) as TLine);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// flush any remaining bytes still in the buffer
|
||||||
|
if (buffer.trim()) {
|
||||||
|
onLine(JSON.parse(buffer) as TLine);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
reader.releaseLock();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import { ref, computed, readonly } from 'vue'
|
|||||||
import { defineStore } from 'pinia'
|
import { defineStore } from 'pinia'
|
||||||
import { entityService } from '../services'
|
import { entityService } from '../services'
|
||||||
import { EntityObject } from '../models'
|
import { EntityObject } from '../models'
|
||||||
import type { EntityTransmitRequest, EntityTransmitResponse } from '../types/entity'
|
import type { EntityTransmitRequest, EntityTransmitResponse, EntityStreamRequest } from '../types/entity'
|
||||||
import type { SourceSelector, ListFilter, ListSort, ListRange } from '../types/common'
|
import type { SourceSelector, ListFilter, ListSort, ListRange } from '../types/common'
|
||||||
|
|
||||||
export const useEntitiesStore = defineStore('mailEntitiesStore', () => {
|
export const useEntitiesStore = defineStore('mailEntitiesStore', () => {
|
||||||
@@ -103,26 +103,16 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => {
|
|||||||
async function list(sources?: SourceSelector, filter?: ListFilter, sort?: ListSort, range?: ListRange): Promise<Record<string, EntityObject>> {
|
async function list(sources?: SourceSelector, filter?: ListFilter, sort?: ListSort, range?: ListRange): Promise<Record<string, EntityObject>> {
|
||||||
transceiving.value = true
|
transceiving.value = true
|
||||||
try {
|
try {
|
||||||
const response = await entityService.list({ sources, filter, sort, range })
|
const added: Record<string, EntityObject> = {}
|
||||||
|
|
||||||
// Flatten nested structure: provider:service:collection:entity -> "provider:service:collection:entity": object
|
await entityService.stream({ sources, filter, sort, range }, (entity: EntityObject) => {
|
||||||
const entities: Record<string, EntityObject> = {}
|
const key = identifierKey(entity.provider, entity.service, entity.collection, entity.identifier)
|
||||||
Object.entries(response).forEach(([providerId, providerServices]) => {
|
_entities.value[key] = entity
|
||||||
Object.entries(providerServices).forEach(([serviceId, serviceCollections]) => {
|
added[key] = entity
|
||||||
Object.entries(serviceCollections).forEach(([collectionId, collectionEntities]) => {
|
|
||||||
Object.entries(collectionEntities).forEach(([entityId, entityData]) => {
|
|
||||||
const key = identifierKey(providerId, serviceId, collectionId, entityId)
|
|
||||||
entities[key] = entityData
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// Merge retrieved entities into state
|
console.debug('[Mail Manager][Store] - Successfully retrieved', Object.keys(added).length, 'entities')
|
||||||
_entities.value = { ..._entities.value, ...entities }
|
return added
|
||||||
|
|
||||||
console.debug('[Mail Manager][Store] - Successfully retrieved', Object.keys(entities).length, 'entities')
|
|
||||||
return entities
|
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
console.error('[Mail Manager][Store] - Failed to retrieve entities:', error)
|
console.error('[Mail Manager][Store] - Failed to retrieve entities:', error)
|
||||||
throw error
|
throw error
|
||||||
@@ -346,6 +336,42 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream entities progressively, merging each entity into the store as it arrives.
|
||||||
|
*
|
||||||
|
* Unlike list(), which waits for the full response before updating the store,
|
||||||
|
* stream() updates reactive state entity-by-entity so UI renders incrementally.
|
||||||
|
*
|
||||||
|
* @param sources - optional source selector
|
||||||
|
* @param filter - optional list filter
|
||||||
|
* @param sort - optional list sort
|
||||||
|
* @param range - optional list range
|
||||||
|
*
|
||||||
|
* @returns Promise resolving to { total } when the stream completes
|
||||||
|
*/
|
||||||
|
async function stream(
|
||||||
|
sources?: SourceSelector,
|
||||||
|
filter?: ListFilter,
|
||||||
|
sort?: ListSort,
|
||||||
|
range?: ListRange
|
||||||
|
): Promise<{ total: number }> {
|
||||||
|
transceiving.value = true
|
||||||
|
try {
|
||||||
|
const request: EntityStreamRequest = { sources, filter, sort, range }
|
||||||
|
const result = await entityService.stream(request, (entity: EntityObject) => {
|
||||||
|
const key = identifierKey(entity.provider, entity.service, entity.collection, entity.identifier)
|
||||||
|
_entities.value[key] = entity
|
||||||
|
})
|
||||||
|
console.debug('[Mail Manager][Store] - Successfully streamed', result.total, 'entities')
|
||||||
|
return result
|
||||||
|
} catch (error: any) {
|
||||||
|
console.error('[Mail Manager][Store] - Failed to stream entities:', error)
|
||||||
|
throw error
|
||||||
|
} finally {
|
||||||
|
transceiving.value = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Return public API
|
// Return public API
|
||||||
return {
|
return {
|
||||||
// State (readonly)
|
// State (readonly)
|
||||||
@@ -365,5 +391,6 @@ export const useEntitiesStore = defineStore('mailEntitiesStore', () => {
|
|||||||
delete: remove,
|
delete: remove,
|
||||||
delta,
|
delta,
|
||||||
transmit,
|
transmit,
|
||||||
|
stream,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -158,3 +158,40 @@ export interface EntityTransmitResponse {
|
|||||||
id: string;
|
id: string;
|
||||||
status: 'queued' | 'sent';
|
status: 'queued' | 'sent';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Entity stream
|
||||||
|
*/
|
||||||
|
export interface EntityStreamRequest {
|
||||||
|
sources?: SourceSelector;
|
||||||
|
filter?: ListFilter;
|
||||||
|
sort?: ListSort;
|
||||||
|
range?: ListRange;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface EntityStreamMetaLine {
|
||||||
|
type: 'meta';
|
||||||
|
version: number;
|
||||||
|
transaction: string;
|
||||||
|
status: 'success';
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface EntityStreamEntityLine extends EntityInterface<MessageInterface> {
|
||||||
|
type: 'entity';
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface EntityStreamDoneLine {
|
||||||
|
type: 'done';
|
||||||
|
total: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface EntityStreamErrorLine {
|
||||||
|
type: 'error';
|
||||||
|
message: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type EntityStreamLine =
|
||||||
|
| EntityStreamMetaLine
|
||||||
|
| EntityStreamEntityLine
|
||||||
|
| EntityStreamDoneLine
|
||||||
|
| EntityStreamErrorLine;
|
||||||
Reference in New Issue
Block a user