feat: add entity streaming

Signed-off-by: Sebastian Krupinski <krupinski01@gmail.com>
This commit is contained in:
2026-02-20 16:23:43 -05:00
parent a576d85d74
commit 8e931f6650
2 changed files with 92 additions and 0 deletions

View File

@@ -0,0 +1,76 @@
<?php
declare(strict_types = 1);
namespace KTXC\Http\Response;
/**
* StreamedNdJsonResponse streams an HTTP response as Newline Delimited JSON (NDJSON).
*
* Each item yielded by the provided iterable is serialized as a single JSON value
* followed by a newline character (\n). The response is flushed to the client every
* $flushInterval items so consumers can process records incrementally without waiting
* for the full payload.
*
* Content-Type is set to `application/x-ndjson` and `X-Accel-Buffering: no` is added
* by default to disable nginx proxy buffering.
*
* Example usage:
*
* function records(): \Generator {
* yield ['id' => 1, 'name' => 'Alice'];
* yield ['id' => 2, 'name' => 'Bob'];
* }
*
* return new StreamedNdJsonResponse(records());
*/
class StreamedNdJsonResponse extends StreamedResponse
{
/**
* @param iterable<mixed> $items Items to serialize; each becomes one JSON line
* @param int $flushInterval Flush to client after this many items (default 10)
* @param int $status HTTP status code (default 200)
* @param array<string, string|string[]> $headers Additional HTTP headers
* @param int $encodingOptions Flags passed to json_encode()
*/
public function __construct(
iterable $items,
int $flushInterval = 10,
int $status = 200,
array $headers = [],
private readonly int $encodingOptions = JsonResponse::DEFAULT_ENCODING_OPTIONS,
) {
parent::__construct(null, $status, $headers);
if (!$this->headers->get('Content-Type')) {
$this->headers->set('Content-Type', 'application/x-ndjson');
}
if (!$this->headers->has('X-Accel-Buffering')) {
$this->headers->set('X-Accel-Buffering', 'no');
}
$encodingOptions = $this->encodingOptions;
$this->setCallback(static function () use ($items, $flushInterval, $encodingOptions): void {
$count = 0;
foreach ($items as $item) {
echo json_encode($item, \JSON_THROW_ON_ERROR | $encodingOptions) . "\n";
$count++;
if ($count >= $flushInterval) {
@ob_flush();
flush();
$count = 0;
}
}
// final flush for any remaining buffered items
if ($count > 0) {
@ob_flush();
flush();
}
});
}
}

View File

@@ -9,6 +9,7 @@ declare(strict_types=1);
namespace KTXF\Mail\Service; namespace KTXF\Mail\Service;
use Generator;
use KTXF\Mail\Collection\CollectionBaseInterface; use KTXF\Mail\Collection\CollectionBaseInterface;
use KTXF\Mail\Object\AddressInterface; use KTXF\Mail\Object\AddressInterface;
use KTXF\Resource\Delta\Delta; use KTXF\Resource\Delta\Delta;
@@ -168,6 +169,21 @@ interface ServiceBaseInterface extends ResourceServiceBaseInterface {
*/ */
public function entityList(string|int $collection, ?IFilter $filter = null, ?ISort $sort = null, ?IRange $range = null, ?array $properties = null): array; public function entityList(string|int $collection, ?IFilter $filter = null, ?ISort $sort = null, ?IRange $range = null, ?array $properties = null): array;
/**
* Lists messages in a collection
*
* @since 2025.05.01
*
* @param string|int $collection Collection ID
* @param IFilter|null $filter Optional filter criteria
* @param ISort|null $sort Optional sort order
* @param IRange|null $range Optional pagination
* @param array|null $properties Optional message properties to fetch
*
* @return Generator Yields messages one by one as EntityBaseInterface
*/
public function entityListStream(string|int $collection, ?IFilter $filter = null, ?ISort $sort = null, ?IRange $range = null, ?array $properties = null): Generator;
/** /**
* Creates a filter builder for messages * Creates a filter builder for messages
* *