From 3475ea5f0709e8672ea7ad317ac92a55dcc4b37a Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Thu, 7 May 2026 15:14:51 -0700 Subject: [PATCH 1/4] feat: add isClosed observer to TestSseClient Lets test code assert that the SUT correctly tears the SSE connection down. Test-only addition; the production SSEClient implementations do not expose this state, and the TestSseClient class is already documented as test-only with no semver guarantee. --- packages/event_source_client/lib/src/test_sse_client.dart | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/event_source_client/lib/src/test_sse_client.dart b/packages/event_source_client/lib/src/test_sse_client.dart index d65df43..c8f497e 100644 --- a/packages/event_source_client/lib/src/test_sse_client.dart +++ b/packages/event_source_client/lib/src/test_sse_client.dart @@ -37,6 +37,12 @@ final class TestSseClient implements SSEClient { bool hasCapability(SSECapability capability) => _capabilities.contains(capability); + /// Whether [close] has been called on this client. Test-only -- + /// production [SSEClient] implementations do not expose this state, + /// and tests asserting against it are inherently white-box. Use to + /// verify that code under test correctly tears the connection down. + bool get isClosed => _messageEventsController.isClosed; + /// Emit an event on the stream. /// Has no effect if the client has been closed. /// From 2798db33111d560fd42687a061d2aa454ce1c389 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 6 May 2026 16:29:28 -0700 Subject: [PATCH 2/4] feat(SDK-2185): add FDv2 streaming base, initializer, and synchronizer - streaming_base.dart: wraps an SSEClient. Single-subscription StreamController. On subscribe, opens the SSE stream and creates a fresh FDv2ProtocolHandler. Each named SSE event is parsed as JSON, wrapped in an FDv2Event, and fed to the handler. ActionPayload becomes a ChangeSetResult (persist: true); ActionGoodbye becomes a goodbye StatusResult and closes the connection; ActionServerError / ActionError become interrupted StatusResults; ActionNone does not emit. Legacy `ping` events invoke the injected PingHandler and forward its result. The `x-ld-fd-fallback` header on the OpenEvent emits terminalError with fdv1Fallback=true and closes. SSE transport errors surface as interrupted; the SSE client's built-in backoff handles reconnect. Closed-ness is tracked via a single Completer _stoppedSignal matching the polling synchronizer. - streaming_initializer.dart: implements Initializer. Subscribes to the base, completes run() with the first emission, then closes the connection. close() before the first emission yields a shutdown StatusResult. - streaming_synchronizer.dart: implements Synchronizer. Thin adapter forwarding the base's stream so the orchestrator can treat polling and streaming uniformly. Tests cover: lifecycle (open on subscribe, cancel teardown, close+shutdown, idempotency), event handling (xfer-full payload, environmentId from header, goodbye, malformed data, non-object data, transport error), FDv1 fallback header (true / case- insensitive / false ignored), legacy ping bridge (forwards result, handles thrower), and synchronizer forwarding. The orchestrator (SDK-2186) wires the SSEClient with the right URL and auth strategy based on SSEClient.hasCapability(requestHeaders); the streaming source consumes whatever client it's given. --- .../src/data_sources/fdv2/streaming_base.dart | 231 +++++++++++ .../fdv2/streaming_initializer.dart | 59 +++ .../fdv2/streaming_synchronizer.dart | 22 + .../fdv2/streaming_base_test.dart | 388 ++++++++++++++++++ .../fdv2/streaming_initializer_test.dart | 174 ++++++++ .../fdv2/streaming_synchronizer_test.dart | 109 +++++ 6 files changed, 983 insertions(+) create mode 100644 packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart create mode 100644 packages/common_client/lib/src/data_sources/fdv2/streaming_initializer.dart create mode 100644 packages/common_client/lib/src/data_sources/fdv2/streaming_synchronizer.dart create mode 100644 packages/common_client/test/data_sources/fdv2/streaming_base_test.dart create mode 100644 packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart create mode 100644 packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart diff --git a/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart b/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart new file mode 100644 index 0000000..049db49 --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart @@ -0,0 +1,231 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; + +import 'flag_eval_mapper.dart'; +import 'protocol_handler.dart'; +import 'protocol_types.dart'; +import 'source.dart'; +import 'source_result.dart'; + +/// Long-lived streaming data source over SSE. +/// +/// Wraps an [SSEClient] with FDv2 protocol semantics. Each named SSE +/// event is parsed as JSON, wrapped in an [FDv2Event], and fed to a +/// fresh [FDv2ProtocolHandler]. The first emitted [ProtocolAction] +/// per event is translated into an [FDv2SourceResult]: +/// +/// - [ActionPayload] --> [ChangeSetResult] with `persist: true`. +/// - [ActionGoodbye] --> goodbye [StatusResult]; the SSE connection is +/// closed. +/// - [ActionServerError] / [ActionError] --> interrupted +/// [StatusResult]; the SSE client's built-in retry handles the +/// reconnect. +/// - [ActionNone] --> no emission (waiting for more events). +/// +/// Legacy `ping` events are routed to the injected [PingHandler] (which +/// performs a one-shot poll) and the result is forwarded to the +/// stream. This is the streaming-to-polling bridge for older servers +/// that pre-date FDv2. +/// +/// The `x-ld-fd-fallback` header on the initial connection's response +/// is detected and produces a terminal-error result with +/// `fdv1Fallback: true`. The connection is closed. +/// +/// Lifecycle: a single-subscription stream. [results] starts the SSE +/// connection on subscribe; cancelling the subscription tears it down +/// without emitting a shutdown. [close] both stops the source and +/// emits a shutdown [StatusResult] before closing the stream. Both +/// paths funnel through a `Completer _stoppedSignal` so async +/// callbacks short-circuit safely. +final class FDv2StreamingBase { + final SSEClient _sseClient; + final PingHandler _pingHandler; + final DateTime Function() _now; + final LDLogger _logger; + + late final StreamController _controller; + final Completer _stoppedSignal = Completer(); + StreamSubscription? _sseSubscription; + FDv2ProtocolHandler? _handler; + String? _environmentId; + + FDv2StreamingBase({ + required SSEClient sseClient, + required PingHandler pingHandler, + required LDLogger logger, + DateTime Function()? now, + }) : _sseClient = sseClient, + _pingHandler = pingHandler, + _logger = logger.subLogger('FDv2StreamingBase'), + _now = now ?? DateTime.now { + _controller = StreamController( + onListen: _onListen, + onCancel: _onCancel, + ); + } + + /// Single-subscription stream of results. The SSE connection is + /// established lazily on the first [Stream.listen] call. + Stream get results => _controller.stream; + + /// Stops the source, emits a shutdown [StatusResult], and closes the + /// stream. Idempotent. + void close() { + if (_stoppedSignal.isCompleted) return; + _stoppedSignal.complete(); + _tearDownConnection(); + _controller + .add(FDv2SourceResults.shutdown(message: 'Streaming source closed')); + _controller.close(); + } + + void _onListen() { + // Build the protocol handler fresh for each connection so a + // partial transfer from a previous connection cannot bleed into + // the new one. + _handler = FDv2ProtocolHandler( + objProcessors: {flagEvalKind: processFlagEval}, + logger: _logger, + ); + _sseSubscription = _sseClient.stream.listen( + _handleEvent, + onError: _handleSseError, + ); + } + + Future _onCancel() async { + if (_stoppedSignal.isCompleted) return; + _stoppedSignal.complete(); + _tearDownConnection(); + // No shutdown emission -- the subscriber asked us to stop. + } + + void _tearDownConnection() { + _sseSubscription?.cancel(); + _sseSubscription = null; + // Best-effort close. The SSE client may already be closed if it + // emitted an error; that's fine -- the operation is documented as + // safe in any state. + _sseClient.close(); + } + + void _handleEvent(Event event) { + if (_stoppedSignal.isCompleted) return; + switch (event) { + case OpenEvent open: + _handleOpen(open); + case MessageEvent message: + _handleMessage(message); + } + } + + void _handleOpen(OpenEvent event) { + final headers = event.headers; + if (headers == null) return; + + final envId = headers['x-ld-envid']; + if (envId != null) { + _environmentId = envId; + } + + final fallback = headers['x-ld-fd-fallback']?.toLowerCase() == 'true'; + if (fallback) { + _emit(FDv2SourceResults.terminalError( + message: 'Server requested FDv1 fallback', + fdv1Fallback: true, + )); + // Server told us to fall back; don't keep the connection open. + _tearDownConnection(); + _controller.close(); + } + } + + Future _handleMessage(MessageEvent event) async { + if (event.type == 'ping') { + // Legacy bridge: older servers may still send `ping` instead of + // FDv2 events. Defer to the injected handler for a one-shot poll. + await _handlePing(); + return; + } + + final Map data; + try { + final decoded = jsonDecode(event.data); + if (decoded is! Map) { + _logger.warn('Ignoring SSE event with non-object data: ' + 'event=${event.type}'); + _emit(FDv2SourceResults.interrupted( + message: 'Streaming event payload was not a JSON object')); + return; + } + data = decoded; + } catch (err) { + _logger + .warn('Failed to parse SSE event data as JSON (${err.runtimeType})'); + _emit(FDv2SourceResults.interrupted( + message: 'Streaming event payload was not valid JSON')); + return; + } + + final action = + _handler!.processEvent(FDv2Event(event: event.type, data: data)); + if (_stoppedSignal.isCompleted) return; + + switch (action) { + case ActionPayload(:final payload): + _emit(ChangeSetResult( + payload: payload, + environmentId: _environmentId, + freshness: _now(), + persist: true, + )); + case ActionGoodbye(:final reason): + _emit(FDv2SourceResults.goodbyeResult(message: reason)); + // Server told us to disconnect; close instead of letting the + // SSE client retry into a closed channel. + _tearDownConnection(); + _controller.close(); + case ActionServerError(:final reason): + _emit(FDv2SourceResults.interrupted(message: reason)); + case ActionError(:final message): + _emit(FDv2SourceResults.interrupted(message: message)); + case ActionNone(): + // No emission; continue accumulating events until the handler + // reaches a terminal action. + break; + } + } + + Future _handlePing() async { + final FDv2SourceResult result; + try { + result = await _pingHandler(); + } catch (err) { + _logger.warn('Ping handler threw unexpectedly: ${err.runtimeType}'); + _emit(FDv2SourceResults.interrupted( + message: 'Ping handler raised error unexpectedly')); + return; + } + if (_stoppedSignal.isCompleted) return; + _emit(result); + } + + void _handleSseError(Object err, StackTrace stack) { + if (_stoppedSignal.isCompleted) return; + // The SSE client's built-in backoff handles reconnection. Surface + // the disruption as interrupted; the orchestrator decides whether + // to fall through to a different source after enough time. + _logger.warn('SSE error (${err.runtimeType}); will retry'); + _logger.debug('SSE error detail: $err\n$stack'); + _emit(FDv2SourceResults.interrupted(message: 'Streaming connection error')); + } + + void _emit(FDv2SourceResult result) { + if (_stoppedSignal.isCompleted) return; + if (_controller.isClosed) return; + _controller.add(result); + } +} diff --git a/packages/common_client/lib/src/data_sources/fdv2/streaming_initializer.dart b/packages/common_client/lib/src/data_sources/fdv2/streaming_initializer.dart new file mode 100644 index 0000000..9e9629c --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/streaming_initializer.dart @@ -0,0 +1,59 @@ +import 'dart:async'; + +import 'source.dart'; +import 'source_result.dart'; +import 'streaming_base.dart'; + +/// One-shot streaming initializer. +/// +/// Subscribes to the underlying [FDv2StreamingBase], returns the first +/// emitted [FDv2SourceResult], and tears the connection down. Used at +/// SDK init time to bring the SDK to a usable state from the streaming +/// path before handing off to the long-lived synchronizer. +/// +/// Calling [close] before the first emission resolves the pending +/// [run] future with a [SourceState.shutdown] result. +final class FDv2StreamingInitializer implements Initializer { + final FDv2StreamingBase _base; + final Completer _completer = Completer(); + StreamSubscription? _subscription; + bool _closed = false; + + FDv2StreamingInitializer({required FDv2StreamingBase base}) : _base = base; + + @override + Future run() { + if (_closed) { + return Future.value(_shutdownResult()); + } + _subscription = _base.results.listen((result) { + if (_completer.isCompleted) return; + _completer.complete(result); + // First emission received; tear down. + _subscription?.cancel(); + _subscription = null; + _base.close(); + }, onDone: () { + if (_completer.isCompleted) return; + // The base closed before producing a result. Surface as shutdown. + _completer.complete(_shutdownResult()); + }); + return _completer.future; + } + + @override + void close() { + if (_closed) return; + _closed = true; + _subscription?.cancel(); + _subscription = null; + _base.close(); + if (!_completer.isCompleted) { + _completer.complete(_shutdownResult()); + } + } + + StatusResult _shutdownResult() => FDv2SourceResults.shutdown( + message: 'Streaming initializer closed before first emission', + ); +} diff --git a/packages/common_client/lib/src/data_sources/fdv2/streaming_synchronizer.dart b/packages/common_client/lib/src/data_sources/fdv2/streaming_synchronizer.dart new file mode 100644 index 0000000..12dde88 --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/streaming_synchronizer.dart @@ -0,0 +1,22 @@ +import 'source.dart'; +import 'source_result.dart'; +import 'streaming_base.dart'; + +/// Long-lived streaming synchronizer. +/// +/// A thin adapter that exposes [FDv2StreamingBase.results] as a +/// [Synchronizer]. The base class already implements all of the +/// connection lifecycle, protocol parsing, and error handling; this +/// wrapper exists only to satisfy the [Synchronizer] interface so the +/// orchestrator can treat polling and streaming uniformly. +final class FDv2StreamingSynchronizer implements Synchronizer { + final FDv2StreamingBase _base; + + FDv2StreamingSynchronizer({required FDv2StreamingBase base}) : _base = base; + + @override + Stream get results => _base.results; + + @override + void close() => _base.close(); +} diff --git a/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart new file mode 100644 index 0000000..a4f6390 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart @@ -0,0 +1,388 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:convert'; + +import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_base.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; +import 'package:test/test.dart'; + +/// Fake SSE client backed by a controllable [StreamController]. Tests +/// drive the SSE stream by calling [emitOpen], [emitMessage], or +/// [emitError]. Calls to [close] complete the [closed] future so tests +/// can assert teardown happened. +class FakeSseClient implements SSEClient { + final StreamController _controller = StreamController(); + final Completer closed = Completer(); + int restartCount = 0; + + void emitOpen({Map? headers}) { + _controller.add(OpenEvent( + headers: headers == null ? null : UnmodifiableMapView(headers), + )); + } + + void emitMessage(String type, String data, {String? id}) { + _controller.add(MessageEvent(type, data, id)); + } + + void emitError(Object err) { + _controller.addError(err); + } + + @override + Stream get stream => _controller.stream; + + @override + Future close() async { + if (!closed.isCompleted) closed.complete(); + if (!_controller.isClosed) await _controller.close(); + } + + @override + void restart() { + restartCount++; + } + + @override + bool hasCapability(SSECapability capability) => true; +} + +String serverIntent({String intentCode = 'xfer-full', int target = 1}) => + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': target, + 'intentCode': intentCode, + 'reason': 'test', + } + ] + }); + +String putObject({ + String key = 'flag-a', + int version = 1, +}) => + jsonEncode({ + 'kind': 'flag-eval', + 'key': key, + 'version': version, + 'object': {'value': true, 'version': version, 'variation': 0}, + }); + +String payloadTransferred({String state = 'sel-1', int version = 1}) => + jsonEncode({ + 'state': state, + 'version': version, + }); + +void emitFullPayload(FakeSseClient sse, + {String state = 'sel-1', String flagKey = 'flag-a'}) { + sse.emitMessage('server-intent', serverIntent()); + sse.emitMessage('put-object', putObject(key: flagKey)); + sse.emitMessage('payload-transferred', payloadTransferred(state: state)); +} + +FDv2StreamingBase makeBase( + FakeSseClient sse, { + Future Function()? pingHandler, + DateTime Function()? now, +}) { + return FDv2StreamingBase( + sseClient: sse, + pingHandler: pingHandler ?? + () async => FDv2SourceResults.interrupted(message: 'no ping handler'), + logger: LDLogger(level: LDLogLevel.error), + now: now, + ); +} + +void main() { + group('connection lifecycle', () { + test('opens the SSE stream on first listen', () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + // No emission yet -- nothing has come in over the SSE stream. + expect(emissions, isEmpty); + + // Drive a full xfer-full sequence and confirm the resulting + // ChangeSetResult is emitted. + emitFullPayload(sse); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(1)); + expect(emissions.single, isA()); + + await sub.cancel(); + }); + + test( + 'subscription cancel tears down the SSE client without emitting ' + 'shutdown', () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + await sub.cancel(); + await Future.delayed(Duration.zero); + + expect(sse.closed.isCompleted, isTrue); + expect(emissions.whereType(), isEmpty); + }); + + test('close() emits shutdown then closes the stream', () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + final done = Completer(); + base.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + base.close(); + await done.future; + + expect(emissions, hasLength(1)); + expect((emissions.single as StatusResult).state, + equals(SourceState.shutdown)); + expect(sse.closed.isCompleted, isTrue); + }); + + test('close() is idempotent', () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + base.close(); + expect(() => base.close(), returnsNormally); + }); + }); + + group('event handling', () { + test('xfer-full sequence produces ChangeSetResult with full payload', + () async { + final sse = FakeSseClient(); + final fixedNow = DateTime.utc(2026, 1, 1); + final base = makeBase(sse, now: () => fixedNow); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitFullPayload(sse, state: 'sel-99', flagKey: 'k1'); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(1)); + final cs = emissions.single as ChangeSetResult; + expect(cs.payload.type, equals(PayloadType.full)); + expect(cs.payload.selector.state, equals('sel-99')); + expect(cs.payload.updates.single.key, equals('k1')); + expect(cs.persist, isTrue); + expect(cs.freshness, equals(fixedNow)); + + await sub.cancel(); + }); + + test('environmentId from x-ld-envid header rides on the ChangeSetResult', + () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + sse.emitOpen(headers: {'x-ld-envid': 'env-abc'}); + emitFullPayload(sse); + await Future.delayed(Duration.zero); + + expect((emissions.single as ChangeSetResult).environmentId, + equals('env-abc')); + + await sub.cancel(); + }); + + test('goodbye event closes the source and emits a goodbye result', + () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + final done = Completer(); + base.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + sse.emitMessage('server-intent', serverIntent()); + sse.emitMessage('goodbye', jsonEncode({'reason': 'maintenance'})); + await done.future; + + expect(emissions, hasLength(1)); + expect((emissions.single as StatusResult).state, + equals(SourceState.goodbye)); + expect(sse.closed.isCompleted, isTrue); + }); + + test('unparseable event data is reported as interrupted, no throw', + () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + sse.emitMessage('put-object', 'not json'); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + + test('non-object event data is reported as interrupted, no throw', + () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + sse.emitMessage('server-intent', '[1,2,3]'); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + + test('SSE transport error is reported as interrupted', () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + sse.emitError(Exception('connection dropped')); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + }); + + group('FDv1 fallback header on connect', () { + test( + 'x-ld-fd-fallback: true on the OpenEvent emits terminalError and ' + 'closes', () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + final done = Completer(); + base.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + sse.emitOpen(headers: {'x-ld-fd-fallback': 'true'}); + await done.future; + + expect(emissions, hasLength(1)); + final status = emissions.single as StatusResult; + expect(status.state, equals(SourceState.terminalError)); + expect(status.fdv1Fallback, isTrue); + expect(sse.closed.isCompleted, isTrue); + }); + + test('fallback header is matched case-insensitively', () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + sse.emitOpen(headers: {'x-ld-fd-fallback': 'TRUE'}); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).fdv1Fallback, isTrue); + + await sub.cancel(); + }); + + test('fallback header value other than true is ignored', () async { + final sse = FakeSseClient(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + sse.emitOpen(headers: {'x-ld-fd-fallback': 'false'}); + emitFullPayload(sse); + await Future.delayed(Duration.zero); + + expect(emissions.single, isA()); + expect(emissions.single.fdv1Fallback, isFalse); + + await sub.cancel(); + }); + }); + + group('legacy ping bridge', () { + test( + 'ping event invokes the PingHandler and forwards its result to ' + 'the stream', () async { + var pingCallCount = 0; + final pingResult = ChangeSetResult( + payload: const Payload(type: PayloadType.full, updates: []), + persist: true, + freshness: DateTime.utc(2026, 1, 1), + ); + final sse = FakeSseClient(); + final base = makeBase( + sse, + pingHandler: () async { + pingCallCount++; + return pingResult; + }, + ); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + sse.emitMessage('ping', ''); + await Future.delayed(Duration.zero); + + expect(pingCallCount, equals(1)); + expect(emissions, hasLength(1)); + expect(identical(emissions.single, pingResult), isTrue); + + await sub.cancel(); + }); + + test('PingHandler throwing is treated as interrupted, no propagation', + () async { + final sse = FakeSseClient(); + final base = makeBase( + sse, + pingHandler: () async { + throw StateError('boom'); + }, + ); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + sse.emitMessage('ping', ''); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + }); +} diff --git a/packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart new file mode 100644 index 0000000..d2fad40 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart @@ -0,0 +1,174 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:convert'; + +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_base.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_initializer.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; +import 'package:test/test.dart'; + +class FakeSseClient implements SSEClient { + final StreamController _controller = StreamController(); + bool _closed = false; + + void emitMessage(String type, String data) { + _controller.add(MessageEvent(type, data, null)); + } + + void emitOpen({Map? headers}) { + _controller.add(OpenEvent( + headers: headers == null ? null : UnmodifiableMapView(headers))); + } + + bool get sseClosed => _closed; + + @override + Stream get stream => _controller.stream; + + @override + Future close() async { + _closed = true; + if (!_controller.isClosed) await _controller.close(); + } + + @override + void restart() {} + + @override + bool hasCapability(SSECapability capability) => true; +} + +void emitFullPayload(FakeSseClient sse, {String state = 'sel-1'}) { + sse.emitMessage( + 'server-intent', + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': 1, + 'intentCode': 'xfer-full', + 'reason': 'test', + } + ] + }), + ); + sse.emitMessage( + 'put-object', + jsonEncode({ + 'kind': 'flag-eval', + 'key': 'k', + 'version': 1, + 'object': {'value': true, 'version': 1, 'variation': 0}, + }), + ); + sse.emitMessage( + 'payload-transferred', + jsonEncode({'state': state, 'version': 1}), + ); +} + +FDv2StreamingBase makeBase(FakeSseClient sse) => FDv2StreamingBase( + sseClient: sse, + pingHandler: () async => + FDv2SourceResults.interrupted(message: 'no ping'), + logger: LDLogger(level: LDLogLevel.error), + ); + +void main() { + test('returns the first ChangeSetResult and tears the connection down', + () async { + final sse = FakeSseClient(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + + final future = init.run(); + // Yield once so run subscribes to the base. + await Future.delayed(Duration.zero); + + emitFullPayload(sse, state: 'sel-init'); + final result = await future; + + expect(result, isA()); + expect( + (result as ChangeSetResult).payload.selector.state, equals('sel-init')); + expect(sse.sseClosed, isTrue); + }); + + test('surfaces a goodbye result as the first emission', () async { + final sse = FakeSseClient(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + final future = init.run(); + await Future.delayed(Duration.zero); + + sse.emitMessage( + 'server-intent', + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': 1, + 'intentCode': 'xfer-full', + 'reason': 'test', + } + ] + }), + ); + sse.emitMessage('goodbye', jsonEncode({'reason': 'maintenance'})); + + final result = await future; + expect((result as StatusResult).state, equals(SourceState.goodbye)); + expect(sse.sseClosed, isTrue); + }); + + test('surfaces FDv1 fallback as terminalError', () async { + final sse = FakeSseClient(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + final future = init.run(); + await Future.delayed(Duration.zero); + + sse.emitOpen(headers: {'x-ld-fd-fallback': 'true'}); + + final result = await future; + final status = result as StatusResult; + expect(status.state, equals(SourceState.terminalError)); + expect(status.fdv1Fallback, isTrue); + }); + + test('close before any emission resolves with a shutdown result', () async { + final sse = FakeSseClient(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + final future = init.run(); + await Future.delayed(Duration.zero); + + init.close(); + + final result = await future; + expect((result as StatusResult).state, equals(SourceState.shutdown)); + expect(sse.sseClosed, isTrue); + }); + + test('close after run() returns is idempotent', () async { + final sse = FakeSseClient(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + final future = init.run(); + await Future.delayed(Duration.zero); + + emitFullPayload(sse); + await future; + + expect(() => init.close(), returnsNormally); + expect(() => init.close(), returnsNormally); + }); + + test('close() before run() yields a shutdown result without a subscription', + () async { + final sse = FakeSseClient(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + + init.close(); + final result = await init.run(); + + expect((result as StatusResult).state, equals(SourceState.shutdown)); + }); +} diff --git a/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart new file mode 100644 index 0000000..b40b666 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart @@ -0,0 +1,109 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_base.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_synchronizer.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; +import 'package:test/test.dart'; + +class FakeSseClient implements SSEClient { + final StreamController _controller = StreamController(); + bool _closed = false; + + bool get sseClosed => _closed; + + void emitMessage(String type, String data) { + _controller.add(MessageEvent(type, data, null)); + } + + @override + Stream get stream => _controller.stream; + + @override + Future close() async { + _closed = true; + if (!_controller.isClosed) await _controller.close(); + } + + @override + void restart() {} + + @override + bool hasCapability(SSECapability capability) => true; +} + +void emitFullPayload(FakeSseClient sse, {String state = 'sel-1'}) { + sse.emitMessage( + 'server-intent', + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': 1, + 'intentCode': 'xfer-full', + 'reason': 'test', + } + ] + }), + ); + sse.emitMessage( + 'put-object', + jsonEncode({ + 'kind': 'flag-eval', + 'key': 'k', + 'version': 1, + 'object': {'value': true, 'version': 1, 'variation': 0}, + }), + ); + sse.emitMessage( + 'payload-transferred', + jsonEncode({'state': state, 'version': 1}), + ); +} + +FDv2StreamingBase makeBase(FakeSseClient sse) => FDv2StreamingBase( + sseClient: sse, + pingHandler: () async => + FDv2SourceResults.interrupted(message: 'no ping'), + logger: LDLogger(level: LDLogLevel.error), + ); + +void main() { + test('forwards results from the underlying base', () async { + final sse = FakeSseClient(); + final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); + final emissions = []; + final sub = sync.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitFullPayload(sse, state: 'sel-1'); + emitFullPayload(sse, state: 'sel-2'); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(2)); + expect((emissions[0] as ChangeSetResult).payload.selector.state, + equals('sel-1')); + expect((emissions[1] as ChangeSetResult).payload.selector.state, + equals('sel-2')); + + await sub.cancel(); + }); + + test('close forwards to the base, emitting shutdown', () async { + final sse = FakeSseClient(); + final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); + final emissions = []; + final done = Completer(); + sync.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + sync.close(); + await done.future; + + expect( + (emissions.last as StatusResult).state, equals(SourceState.shutdown)); + expect(sse.sseClosed, isTrue); + }); +} From 24264afcea1a8fcc21cf227c293031311d1bd8ec Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Thu, 7 May 2026 15:17:55 -0700 Subject: [PATCH 3/4] test(SDK-2185): consolidate streaming test fakes onto TestSseClient Drops the per-file FakeSseClient stub and replaces it with the shared TestSseClient from launchdarkly_event_source_client, driven via the existing emitEvent / emitError API plus the new isClosed observer. Net: ~50 lines of duplicated stub code removed across three test files. --- .../fdv2/streaming_base_test.dart | 119 +++++++----------- .../fdv2/streaming_initializer_test.dart | 72 +++++------ .../fdv2/streaming_synchronizer_test.dart | 45 +++---- 3 files changed, 87 insertions(+), 149 deletions(-) diff --git a/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart index a4f6390..ab56323 100644 --- a/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart +++ b/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart @@ -9,45 +9,16 @@ import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; import 'package:test/test.dart'; -/// Fake SSE client backed by a controllable [StreamController]. Tests -/// drive the SSE stream by calling [emitOpen], [emitMessage], or -/// [emitError]. Calls to [close] complete the [closed] future so tests -/// can assert teardown happened. -class FakeSseClient implements SSEClient { - final StreamController _controller = StreamController(); - final Completer closed = Completer(); - int restartCount = 0; - - void emitOpen({Map? headers}) { - _controller.add(OpenEvent( - headers: headers == null ? null : UnmodifiableMapView(headers), - )); - } - - void emitMessage(String type, String data, {String? id}) { - _controller.add(MessageEvent(type, data, id)); - } - - void emitError(Object err) { - _controller.addError(err); - } - - @override - Stream get stream => _controller.stream; - - @override - Future close() async { - if (!closed.isCompleted) closed.complete(); - if (!_controller.isClosed) await _controller.close(); - } - - @override - void restart() { - restartCount++; - } - - @override - bool hasCapability(SSECapability capability) => true; +TestSseClient makeSse() => SSEClient.testClient(Uri.parse('/test'), const {}); + +void emitOpen(TestSseClient sse, {Map? headers}) { + sse.emitEvent(OpenEvent( + headers: headers == null ? null : UnmodifiableMapView(headers), + )); +} + +void emitMessage(TestSseClient sse, String type, String data, {String? id}) { + sse.emitEvent(MessageEvent(type, data, id)); } String serverIntent({String intentCode = 'xfer-full', int target = 1}) => @@ -79,15 +50,15 @@ String payloadTransferred({String state = 'sel-1', int version = 1}) => 'version': version, }); -void emitFullPayload(FakeSseClient sse, +void emitFullPayload(TestSseClient sse, {String state = 'sel-1', String flagKey = 'flag-a'}) { - sse.emitMessage('server-intent', serverIntent()); - sse.emitMessage('put-object', putObject(key: flagKey)); - sse.emitMessage('payload-transferred', payloadTransferred(state: state)); + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'put-object', putObject(key: flagKey)); + emitMessage(sse, 'payload-transferred', payloadTransferred(state: state)); } FDv2StreamingBase makeBase( - FakeSseClient sse, { + TestSseClient sse, { Future Function()? pingHandler, DateTime Function()? now, }) { @@ -103,7 +74,7 @@ FDv2StreamingBase makeBase( void main() { group('connection lifecycle', () { test('opens the SSE stream on first listen', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; @@ -127,7 +98,7 @@ void main() { test( 'subscription cancel tears down the SSE client without emitting ' 'shutdown', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; final sub = base.results.listen(emissions.add); @@ -136,12 +107,12 @@ void main() { await sub.cancel(); await Future.delayed(Duration.zero); - expect(sse.closed.isCompleted, isTrue); + expect(sse.isClosed, isTrue); expect(emissions.whereType(), isEmpty); }); test('close() emits shutdown then closes the stream', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; final done = Completer(); @@ -154,11 +125,11 @@ void main() { expect(emissions, hasLength(1)); expect((emissions.single as StatusResult).state, equals(SourceState.shutdown)); - expect(sse.closed.isCompleted, isTrue); + expect(sse.isClosed, isTrue); }); test('close() is idempotent', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); base.close(); expect(() => base.close(), returnsNormally); @@ -168,7 +139,7 @@ void main() { group('event handling', () { test('xfer-full sequence produces ChangeSetResult with full payload', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final fixedNow = DateTime.utc(2026, 1, 1); final base = makeBase(sse, now: () => fixedNow); final emissions = []; @@ -191,13 +162,13 @@ void main() { test('environmentId from x-ld-envid header rides on the ChangeSetResult', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; final sub = base.results.listen(emissions.add); await Future.delayed(Duration.zero); - sse.emitOpen(headers: {'x-ld-envid': 'env-abc'}); + emitOpen(sse, headers: {'x-ld-envid': 'env-abc'}); emitFullPayload(sse); await Future.delayed(Duration.zero); @@ -209,32 +180,32 @@ void main() { test('goodbye event closes the source and emits a goodbye result', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; final done = Completer(); base.results.listen(emissions.add, onDone: done.complete); await Future.delayed(Duration.zero); - sse.emitMessage('server-intent', serverIntent()); - sse.emitMessage('goodbye', jsonEncode({'reason': 'maintenance'})); + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); await done.future; expect(emissions, hasLength(1)); expect((emissions.single as StatusResult).state, equals(SourceState.goodbye)); - expect(sse.closed.isCompleted, isTrue); + expect(sse.isClosed, isTrue); }); test('unparseable event data is reported as interrupted, no throw', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; final sub = base.results.listen(emissions.add); await Future.delayed(Duration.zero); - sse.emitMessage('put-object', 'not json'); + emitMessage(sse, 'put-object', 'not json'); await Future.delayed(Duration.zero); expect((emissions.single as StatusResult).state, @@ -245,13 +216,13 @@ void main() { test('non-object event data is reported as interrupted, no throw', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; final sub = base.results.listen(emissions.add); await Future.delayed(Duration.zero); - sse.emitMessage('server-intent', '[1,2,3]'); + emitMessage(sse, 'server-intent', '[1,2,3]'); await Future.delayed(Duration.zero); expect((emissions.single as StatusResult).state, @@ -261,13 +232,13 @@ void main() { }); test('SSE transport error is reported as interrupted', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; final sub = base.results.listen(emissions.add); await Future.delayed(Duration.zero); - sse.emitError(Exception('connection dropped')); + sse.emitError(error: Exception('connection dropped')); await Future.delayed(Duration.zero); expect((emissions.single as StatusResult).state, @@ -281,31 +252,31 @@ void main() { test( 'x-ld-fd-fallback: true on the OpenEvent emits terminalError and ' 'closes', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; final done = Completer(); base.results.listen(emissions.add, onDone: done.complete); await Future.delayed(Duration.zero); - sse.emitOpen(headers: {'x-ld-fd-fallback': 'true'}); + emitOpen(sse, headers: {'x-ld-fd-fallback': 'true'}); await done.future; expect(emissions, hasLength(1)); final status = emissions.single as StatusResult; expect(status.state, equals(SourceState.terminalError)); expect(status.fdv1Fallback, isTrue); - expect(sse.closed.isCompleted, isTrue); + expect(sse.isClosed, isTrue); }); test('fallback header is matched case-insensitively', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; final sub = base.results.listen(emissions.add); await Future.delayed(Duration.zero); - sse.emitOpen(headers: {'x-ld-fd-fallback': 'TRUE'}); + emitOpen(sse, headers: {'x-ld-fd-fallback': 'TRUE'}); await Future.delayed(Duration.zero); expect((emissions.single as StatusResult).fdv1Fallback, isTrue); @@ -314,13 +285,13 @@ void main() { }); test('fallback header value other than true is ignored', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase(sse); final emissions = []; final sub = base.results.listen(emissions.add); await Future.delayed(Duration.zero); - sse.emitOpen(headers: {'x-ld-fd-fallback': 'false'}); + emitOpen(sse, headers: {'x-ld-fd-fallback': 'false'}); emitFullPayload(sse); await Future.delayed(Duration.zero); @@ -341,7 +312,7 @@ void main() { persist: true, freshness: DateTime.utc(2026, 1, 1), ); - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase( sse, pingHandler: () async { @@ -353,7 +324,7 @@ void main() { final sub = base.results.listen(emissions.add); await Future.delayed(Duration.zero); - sse.emitMessage('ping', ''); + emitMessage(sse, 'ping', ''); await Future.delayed(Duration.zero); expect(pingCallCount, equals(1)); @@ -365,7 +336,7 @@ void main() { test('PingHandler throwing is treated as interrupted, no propagation', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final base = makeBase( sse, pingHandler: () async { @@ -376,7 +347,7 @@ void main() { final sub = base.results.listen(emissions.add); await Future.delayed(Duration.zero); - sse.emitMessage('ping', ''); + emitMessage(sse, 'ping', ''); await Future.delayed(Duration.zero); expect((emissions.single as StatusResult).state, diff --git a/packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart index d2fad40..1d3bfd8 100644 --- a/packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart +++ b/packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart @@ -9,39 +9,20 @@ import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; import 'package:test/test.dart'; -class FakeSseClient implements SSEClient { - final StreamController _controller = StreamController(); - bool _closed = false; +TestSseClient makeSse() => SSEClient.testClient(Uri.parse('/test'), const {}); - void emitMessage(String type, String data) { - _controller.add(MessageEvent(type, data, null)); - } - - void emitOpen({Map? headers}) { - _controller.add(OpenEvent( - headers: headers == null ? null : UnmodifiableMapView(headers))); - } - - bool get sseClosed => _closed; - - @override - Stream get stream => _controller.stream; - - @override - Future close() async { - _closed = true; - if (!_controller.isClosed) await _controller.close(); - } - - @override - void restart() {} +void emitMessage(TestSseClient sse, String type, String data) { + sse.emitEvent(MessageEvent(type, data, null)); +} - @override - bool hasCapability(SSECapability capability) => true; +void emitOpen(TestSseClient sse, {Map? headers}) { + sse.emitEvent(OpenEvent( + headers: headers == null ? null : UnmodifiableMapView(headers))); } -void emitFullPayload(FakeSseClient sse, {String state = 'sel-1'}) { - sse.emitMessage( +void emitFullPayload(TestSseClient sse, {String state = 'sel-1'}) { + emitMessage( + sse, 'server-intent', jsonEncode({ 'payloads': [ @@ -54,7 +35,8 @@ void emitFullPayload(FakeSseClient sse, {String state = 'sel-1'}) { ] }), ); - sse.emitMessage( + emitMessage( + sse, 'put-object', jsonEncode({ 'kind': 'flag-eval', @@ -63,13 +45,14 @@ void emitFullPayload(FakeSseClient sse, {String state = 'sel-1'}) { 'object': {'value': true, 'version': 1, 'variation': 0}, }), ); - sse.emitMessage( + emitMessage( + sse, 'payload-transferred', jsonEncode({'state': state, 'version': 1}), ); } -FDv2StreamingBase makeBase(FakeSseClient sse) => FDv2StreamingBase( +FDv2StreamingBase makeBase(TestSseClient sse) => FDv2StreamingBase( sseClient: sse, pingHandler: () async => FDv2SourceResults.interrupted(message: 'no ping'), @@ -79,7 +62,7 @@ FDv2StreamingBase makeBase(FakeSseClient sse) => FDv2StreamingBase( void main() { test('returns the first ChangeSetResult and tears the connection down', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final init = FDv2StreamingInitializer(base: makeBase(sse)); final future = init.run(); @@ -92,16 +75,17 @@ void main() { expect(result, isA()); expect( (result as ChangeSetResult).payload.selector.state, equals('sel-init')); - expect(sse.sseClosed, isTrue); + expect(sse.isClosed, isTrue); }); test('surfaces a goodbye result as the first emission', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final init = FDv2StreamingInitializer(base: makeBase(sse)); final future = init.run(); await Future.delayed(Duration.zero); - sse.emitMessage( + emitMessage( + sse, 'server-intent', jsonEncode({ 'payloads': [ @@ -114,20 +98,20 @@ void main() { ] }), ); - sse.emitMessage('goodbye', jsonEncode({'reason': 'maintenance'})); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); final result = await future; expect((result as StatusResult).state, equals(SourceState.goodbye)); - expect(sse.sseClosed, isTrue); + expect(sse.isClosed, isTrue); }); test('surfaces FDv1 fallback as terminalError', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final init = FDv2StreamingInitializer(base: makeBase(sse)); final future = init.run(); await Future.delayed(Duration.zero); - sse.emitOpen(headers: {'x-ld-fd-fallback': 'true'}); + emitOpen(sse, headers: {'x-ld-fd-fallback': 'true'}); final result = await future; final status = result as StatusResult; @@ -136,7 +120,7 @@ void main() { }); test('close before any emission resolves with a shutdown result', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final init = FDv2StreamingInitializer(base: makeBase(sse)); final future = init.run(); await Future.delayed(Duration.zero); @@ -145,11 +129,11 @@ void main() { final result = await future; expect((result as StatusResult).state, equals(SourceState.shutdown)); - expect(sse.sseClosed, isTrue); + expect(sse.isClosed, isTrue); }); test('close after run() returns is idempotent', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final init = FDv2StreamingInitializer(base: makeBase(sse)); final future = init.run(); await Future.delayed(Duration.zero); @@ -163,7 +147,7 @@ void main() { test('close() before run() yields a shutdown result without a subscription', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final init = FDv2StreamingInitializer(base: makeBase(sse)); init.close(); diff --git a/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart index b40b666..712b181 100644 --- a/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart +++ b/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart @@ -8,34 +8,15 @@ import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; import 'package:test/test.dart'; -class FakeSseClient implements SSEClient { - final StreamController _controller = StreamController(); - bool _closed = false; +TestSseClient makeSse() => SSEClient.testClient(Uri.parse('/test'), const {}); - bool get sseClosed => _closed; - - void emitMessage(String type, String data) { - _controller.add(MessageEvent(type, data, null)); - } - - @override - Stream get stream => _controller.stream; - - @override - Future close() async { - _closed = true; - if (!_controller.isClosed) await _controller.close(); - } - - @override - void restart() {} - - @override - bool hasCapability(SSECapability capability) => true; +void emitMessage(TestSseClient sse, String type, String data) { + sse.emitEvent(MessageEvent(type, data, null)); } -void emitFullPayload(FakeSseClient sse, {String state = 'sel-1'}) { - sse.emitMessage( +void emitFullPayload(TestSseClient sse, {String state = 'sel-1'}) { + emitMessage( + sse, 'server-intent', jsonEncode({ 'payloads': [ @@ -48,7 +29,8 @@ void emitFullPayload(FakeSseClient sse, {String state = 'sel-1'}) { ] }), ); - sse.emitMessage( + emitMessage( + sse, 'put-object', jsonEncode({ 'kind': 'flag-eval', @@ -57,13 +39,14 @@ void emitFullPayload(FakeSseClient sse, {String state = 'sel-1'}) { 'object': {'value': true, 'version': 1, 'variation': 0}, }), ); - sse.emitMessage( + emitMessage( + sse, 'payload-transferred', jsonEncode({'state': state, 'version': 1}), ); } -FDv2StreamingBase makeBase(FakeSseClient sse) => FDv2StreamingBase( +FDv2StreamingBase makeBase(TestSseClient sse) => FDv2StreamingBase( sseClient: sse, pingHandler: () async => FDv2SourceResults.interrupted(message: 'no ping'), @@ -72,7 +55,7 @@ FDv2StreamingBase makeBase(FakeSseClient sse) => FDv2StreamingBase( void main() { test('forwards results from the underlying base', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); final emissions = []; final sub = sync.results.listen(emissions.add); @@ -92,7 +75,7 @@ void main() { }); test('close forwards to the base, emitting shutdown', () async { - final sse = FakeSseClient(); + final sse = makeSse(); final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); final emissions = []; final done = Completer(); @@ -104,6 +87,6 @@ void main() { expect( (emissions.last as StatusResult).state, equals(SourceState.shutdown)); - expect(sse.sseClosed, isTrue); + expect(sse.isClosed, isTrue); }); } From 892fc8b038aa823aa36df7ee9769f8d30a41b242 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 8 May 2026 10:06:56 -0700 Subject: [PATCH 4/4] fix(SDK-2185): close races, malformed-event safety, reconnect reset, ping serialization, sanitized error log Six fixes in the streaming vertical, each with a regression test that fails against the prior commit: - close-after-self-close. Goodbye and FDv1-fallback branches closed the controller without completing the stop signal, so a close() call from inside the listener's onData (the natural orchestrator reaction to a goodbye) would pass the signal-based guard and crash inside _controller.add(...) on a closed controller. Consolidated the three terminal paths (close, goodbye, fdv1-fallback) through a single _terminate() helper that orders operations safely. Synchronizer-level close-after-goodbye is fixed automatically. - malformed-shape event data. The previous try/catch wrapped only jsonDecode; the structural casts inside per-event fromJson factories could throw TypeError synchronously from processEvent and become an unhandled async exception. Wrapped processEvent in the same try/catch and added handler reset on the failure path. - protocol handler not reset on SSE reconnect. _onListen rebuilt the handler once per stream subscription, not per connection. SSE auto-reconnect emits a fresh OpenEvent on the same subscription; if the server resumes via Last-Event-ID without re-sending server-intent, stale _tempUpdates from the prior connection bleed into the new payload. Reset the handler on every OpenEvent so the SDK defends against the bleed regardless of server behavior. - unbounded concurrent ping handlers. Two consecutive ping events spawned two concurrent polls (out-of-order races + DoS amplification). Serialized via a _pingInFlight flag; excess pings drop while one is in flight (one in-flight poll already satisfies the FDv2 ping semantic). - SSE error log leaked the full request URL. _handleSseError logged the raw exception at debug, whose toString embeds the URL (and the base64-encoded context in GET mode). Mirrored the polling base's _describeError to categorize without echoing the underlying exception. - ActionServerError log format. _processError in the protocol handler now emits the spec-mandated "An issue was encountered receiving updates for payload '{id}' with reason: '{reason}'. Automatic retry will occur." (was free-form text without payload id). Pre-existing from SDK-2182, surfaces externally for the first time in this PR. Plus a one-line clarification in the streaming-base class doc that restart() is intentionally not exposed -- the orchestrator handles connection lifecycle by tearing down a streaming source and constructing a fresh one, not by reconnecting an existing one. --- .../data_sources/fdv2/protocol_handler.dart | 4 +- .../src/data_sources/fdv2/streaming_base.dart | 153 ++++++++--- .../fdv2/protocol_handler_test.dart | 59 ++++- .../fdv2/streaming_base_test.dart | 237 ++++++++++++++++++ .../fdv2/streaming_synchronizer_test.dart | 41 +++ 5 files changed, 455 insertions(+), 39 deletions(-) diff --git a/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart b/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart index 3436fa6..74318e5 100644 --- a/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart +++ b/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart @@ -263,7 +263,9 @@ final class FDv2ProtocolHandler { } ProtocolAction _processError(ServerErrorEvent data) { - _logger.info('Server error encountered receiving updates: ${data.reason}'); + _logger.info('An issue was encountered receiving updates for payload ' + "'${data.payloadId ?? ''}' with reason: '${data.reason}'. " + 'Automatic retry will occur.'); _resetAfterError(); return ActionServerError(data.reason, id: data.payloadId); } diff --git a/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart b/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart index 049db49..d1205de 100644 --- a/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart +++ b/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:convert'; +import 'package:http/http.dart' as http; import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; @@ -40,6 +41,11 @@ import 'source_result.dart'; /// emits a shutdown [StatusResult] before closing the stream. Both /// paths funnel through a `Completer _stoppedSignal` so async /// callbacks short-circuit safely. +/// +/// `SSEClient.restart` is intentionally not surfaced here. The +/// orchestrator drives connection lifecycle by tearing down a +/// streaming source and constructing a fresh one (e.g. on credential +/// rotation or basis change), not by reconnecting an existing one. final class FDv2StreamingBase { final SSEClient _sseClient; final PingHandler _pingHandler; @@ -51,6 +57,7 @@ final class FDv2StreamingBase { StreamSubscription? _sseSubscription; FDv2ProtocolHandler? _handler; String? _environmentId; + bool _pingInFlight = false; FDv2StreamingBase({ required SSEClient sseClient, @@ -74,28 +81,51 @@ final class FDv2StreamingBase { /// Stops the source, emits a shutdown [StatusResult], and closes the /// stream. Idempotent. void close() { + _terminate( + finalResult: + FDv2SourceResults.shutdown(message: 'Streaming source closed')); + } + + /// Terminal-path helper used by [close] and by the in-stream + /// terminal paths (goodbye event, fdv1-fallback header). Completes + /// [_stoppedSignal] *first* so any subsequent [close] call -- e.g. + /// from inside an `onData` listener reacting to the [finalResult] + /// we are about to emit -- short-circuits at its guard instead of + /// racing into a closed controller. Idempotent. + void _terminate({FDv2SourceResult? finalResult}) { if (_stoppedSignal.isCompleted) return; _stoppedSignal.complete(); _tearDownConnection(); - _controller - .add(FDv2SourceResults.shutdown(message: 'Streaming source closed')); - _controller.close(); + if (!_controller.isClosed) { + if (finalResult != null) { + _controller.add(finalResult); + } + _controller.close(); + } } void _onListen() { - // Build the protocol handler fresh for each connection so a - // partial transfer from a previous connection cannot bleed into - // the new one. - _handler = FDv2ProtocolHandler( - objProcessors: {flagEvalKind: processFlagEval}, - logger: _logger, - ); + _resetHandler(); _sseSubscription = _sseClient.stream.listen( _handleEvent, onError: _handleSseError, ); } + /// Builds a fresh [FDv2ProtocolHandler]. Called on initial connect + /// and on every subsequent [OpenEvent] (SSE auto-reconnect), so a + /// partial transfer from the previous connection cannot bleed into + /// the new one regardless of whether the server re-sends + /// `server-intent` after a Last-Event-ID resumption. Also called + /// after a mid-event throw inside [processEvent] so any + /// half-accumulated `_tempUpdates` are discarded. + void _resetHandler() { + _handler = FDv2ProtocolHandler( + objProcessors: {flagEvalKind: processFlagEval}, + logger: _logger, + ); + } + Future _onCancel() async { if (_stoppedSignal.isCompleted) return; _stoppedSignal.complete(); @@ -123,6 +153,13 @@ final class FDv2StreamingBase { } void _handleOpen(OpenEvent event) { + // Every OpenEvent represents a (re)established connection. Rebuild + // the protocol handler so a partial transfer from the prior + // connection cannot bleed into this one -- the SDK must defend + // against this regardless of whether the server respects the + // protocol's "re-send server-intent on resume" semantic. + _resetHandler(); + final headers = event.headers; if (headers == null) return; @@ -133,13 +170,14 @@ final class FDv2StreamingBase { final fallback = headers['x-ld-fd-fallback']?.toLowerCase() == 'true'; if (fallback) { - _emit(FDv2SourceResults.terminalError( + // Server told us to fall back; route through the terminal helper + // so a close() from the listener's onData -- a natural reaction + // to a fallback signal -- doesn't race with our own close. + _terminate( + finalResult: FDv2SourceResults.terminalError( message: 'Server requested FDv1 fallback', fdv1Fallback: true, )); - // Server told us to fall back; don't keep the connection open. - _tearDownConnection(); - _controller.close(); } } @@ -151,7 +189,7 @@ final class FDv2StreamingBase { return; } - final Map data; + final ProtocolAction action; try { final decoded = jsonDecode(event.data); if (decoded is! Map) { @@ -161,17 +199,23 @@ final class FDv2StreamingBase { message: 'Streaming event payload was not a JSON object')); return; } - data = decoded; + // Wrap the protocol-handler dispatch in the same try/catch as the + // jsonDecode: the structural casts inside the per-event fromJson + // factories (e.g. PayloadIntent, PutObjectEvent) throw TypeError + // on shape mismatch and would otherwise become unhandled async + // exceptions. + action = + _handler!.processEvent(FDv2Event(event: event.type, data: decoded)); } catch (err) { - _logger - .warn('Failed to parse SSE event data as JSON (${err.runtimeType})'); + _logger.warn('Failed to parse or process SSE event (${err.runtimeType})'); + // Reset the handler -- a mid-event throw can leave it with stale + // _tempUpdates from the partially-processed payload. + _resetHandler(); _emit(FDv2SourceResults.interrupted( - message: 'Streaming event payload was not valid JSON')); + message: 'Streaming event payload was malformed')); return; } - final action = - _handler!.processEvent(FDv2Event(event: event.type, data: data)); if (_stoppedSignal.isCompleted) return; switch (action) { @@ -183,11 +227,11 @@ final class FDv2StreamingBase { persist: true, )); case ActionGoodbye(:final reason): - _emit(FDv2SourceResults.goodbyeResult(message: reason)); - // Server told us to disconnect; close instead of letting the - // SSE client retry into a closed channel. - _tearDownConnection(); - _controller.close(); + // Server told us to disconnect; route through the terminal + // helper so a close() from the listener's onData -- a natural + // reaction to a goodbye -- doesn't race with our own close. + _terminate( + finalResult: FDv2SourceResults.goodbyeResult(message: reason)); case ActionServerError(:final reason): _emit(FDv2SourceResults.interrupted(message: reason)); case ActionError(:final message): @@ -200,17 +244,28 @@ final class FDv2StreamingBase { } Future _handlePing() async { - final FDv2SourceResult result; + // The FDv2 ping semantic is "go re-poll". A single in-flight poll + // already satisfies any number of pings that arrive while it is + // running, so drop excess pings rather than spawning concurrent + // polls (which would race on emit-order and amplify load on the + // polling endpoint). + if (_pingInFlight) return; + _pingInFlight = true; try { - result = await _pingHandler(); - } catch (err) { - _logger.warn('Ping handler threw unexpectedly: ${err.runtimeType}'); - _emit(FDv2SourceResults.interrupted( - message: 'Ping handler raised error unexpectedly')); - return; + final FDv2SourceResult result; + try { + result = await _pingHandler(); + } catch (err) { + _logger.warn('Ping handler threw unexpectedly: ${err.runtimeType}'); + _emit(FDv2SourceResults.interrupted( + message: 'Ping handler raised error unexpectedly')); + return; + } + if (_stoppedSignal.isCompleted) return; + _emit(result); + } finally { + _pingInFlight = false; } - if (_stoppedSignal.isCompleted) return; - _emit(result); } void _handleSseError(Object err, StackTrace stack) { @@ -218,9 +273,33 @@ final class FDv2StreamingBase { // The SSE client's built-in backoff handles reconnection. Surface // the disruption as interrupted; the orchestrator decides whether // to fall through to a different source after enough time. + // + // Don't log the raw exception. http.ClientException's toString + // formats as 'ClientException: , uri=', and in GET + // mode the URL embeds the base64-encoded context. Only the + // category and a synthetic stack header go to the log. _logger.warn('SSE error (${err.runtimeType}); will retry'); - _logger.debug('SSE error detail: $err\n$stack'); - _emit(FDv2SourceResults.interrupted(message: 'Streaming connection error')); + _logger.debug('SSE error stack:\n$stack'); + _emit(FDv2SourceResults.interrupted(message: _describeError(err))); + } + + /// Categorizes an exception surfaced on the SSE stream into a fixed + /// sanitized message. Mirrors the polling base's helper so neither + /// surface (the public StatusResult.message nor the warn log) ever + /// echoes a raw http.ClientException -- whose toString carries the + /// full request URL. + String _describeError(Object err) { + if (err is TimeoutException) { + return 'Streaming request timed out'; + } + if (err is http.ClientException) { + return 'Network error during streaming request'; + } + final type = err.runtimeType.toString(); + if (type.contains('Tls') || type.contains('Handshake')) { + return 'TLS error during streaming request'; + } + return 'Streaming connection error'; } void _emit(FDv2SourceResult result) { diff --git a/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart b/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart index 2532151..af2c7b6 100644 --- a/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart +++ b/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart @@ -1,10 +1,21 @@ -import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/protocol_handler.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/protocol_types.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:mocktail/mocktail.dart'; import 'package:test/test.dart'; +class MockLogAdapter extends Mock implements LDLogAdapter {} + void main() { + setUpAll(() { + registerFallbackValue(LDLogRecord( + level: LDLogLevel.debug, + message: '', + time: DateTime.now(), + logTag: '')); + }); + final logger = LDLogger(); FDv2ProtocolHandler makeHandler() { @@ -433,6 +444,52 @@ void main() { // but temp updates should be cleared. New data can follow. expect(handler.state, equals(ProtocolState.full)); }); + + test('logs the spec-mandated message format with payload id and reason', + () { + final adapter = MockLogAdapter(); + when(() => adapter.log(any())).thenReturn(null); + final loggerWithAdapter = LDLogger(adapter: adapter); + final handler = FDv2ProtocolHandler( + objProcessors: {'flag-eval': (obj) => obj}, + logger: loggerWithAdapter, + ); + handler.processEvent(serverIntent('xfer-full')); + + handler.processEvent(FDv2Event( + event: FDv2EventTypes.error, + data: {'payload_id': 'p-7', 'reason': 'oops'})); + + final records = verify(() => adapter.log(captureAny())).captured; + final messages = records.map((r) => (r as LDLogRecord).message).toList(); + expect( + messages, + contains("An issue was encountered receiving updates for payload 'p-7' " + "with reason: 'oops'. Automatic retry will occur."), + ); + }); + + test('uses a placeholder for the payload id when the field is missing', () { + final adapter = MockLogAdapter(); + when(() => adapter.log(any())).thenReturn(null); + final loggerWithAdapter = LDLogger(adapter: adapter); + final handler = FDv2ProtocolHandler( + objProcessors: {'flag-eval': (obj) => obj}, + logger: loggerWithAdapter, + ); + handler.processEvent(serverIntent('xfer-full')); + + handler.processEvent( + FDv2Event(event: FDv2EventTypes.error, data: {'reason': 'oops'})); + + final records = verify(() => adapter.log(captureAny())).captured; + final messages = records.map((r) => (r as LDLogRecord).message).toList(); + expect( + messages.any( + (m) => m.contains("for payload '' with reason: 'oops'")), + isTrue, + ); + }); }); group('heartbeat', () { diff --git a/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart index ab56323..e6f1ee8 100644 --- a/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart +++ b/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart @@ -2,13 +2,17 @@ import 'dart:async'; import 'dart:collection'; import 'dart:convert'; +import 'package:http/http.dart' as http; import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_base.dart'; import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; +import 'package:mocktail/mocktail.dart'; import 'package:test/test.dart'; +class MockLogAdapter extends Mock implements LDLogAdapter {} + TestSseClient makeSse() => SSEClient.testClient(Uri.parse('/test'), const {}); void emitOpen(TestSseClient sse, {Map? headers}) { @@ -72,6 +76,14 @@ FDv2StreamingBase makeBase( } void main() { + setUpAll(() { + registerFallbackValue(LDLogRecord( + level: LDLogLevel.debug, + message: '', + time: DateTime.now(), + logTag: '')); + }); + group('connection lifecycle', () { test('opens the SSE stream on first listen', () async { final sse = makeSse(); @@ -134,6 +146,64 @@ void main() { base.close(); expect(() => base.close(), returnsNormally); }); + + test('close() is safe to call from a listener reacting to a goodbye', + () async { + // The orchestrator can legitimately react to a goodbye result + // by calling base.close(). That path must not race with the + // self-close the goodbye branch already does internally. + final sse = makeSse(); + final base = makeBase(sse); + Object? caught; + runZonedGuarded(() { + base.results.listen((event) { + if (event is StatusResult && event.state == SourceState.goodbye) { + base.close(); + } + }); + }, (err, _) => caught = err); + + await Future.delayed(Duration.zero); + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); + // Yield enough times for the listener and any async work to settle. + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caught, isNull, + reason: + 'close() from onData on a goodbye must not throw, got $caught'); + expect(sse.isClosed, isTrue); + }); + + test( + 'close() is safe to call from a listener reacting to an FDv1 ' + 'fallback', () async { + // Same race risk as the goodbye case, but for the fdv1-fallback + // terminal branch. + final sse = makeSse(); + final base = makeBase(sse); + Object? caught; + runZonedGuarded(() { + base.results.listen((event) { + if (event is StatusResult && event.fdv1Fallback) { + base.close(); + } + }); + }, (err, _) => caught = err); + + await Future.delayed(Duration.zero); + emitOpen(sse, headers: {'x-ld-fd-fallback': 'true'}); + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caught, isNull, + reason: + 'close() from onData on fallback must not throw, got $caught'); + expect(sse.isClosed, isTrue); + }); }); group('event handling', () { @@ -214,6 +284,40 @@ void main() { await sub.cancel(); }); + test( + 'malformed-shape event data that passes the Map check but fails ' + 'inside processEvent is reported as interrupted, no unhandled async', + () async { + // The data is a JSON object, but its inner structure violates + // the FDv2 protocol: payloads must be a list, not a string. The + // List cast inside protocol_types.dart throws TypeError + // synchronously from processEvent. The streaming source must + // catch it and surface as interrupted, not let it become an + // unhandled async exception. + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + Object? caughtAsync; + late final StreamSubscription sub; + runZonedGuarded(() { + sub = base.results.listen(emissions.add); + }, (err, _) => caughtAsync = err); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'server-intent', jsonEncode({'payloads': 'not-a-list'})); + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caughtAsync, isNull, + reason: 'malformed inner shape must not become unhandled async, ' + 'got $caughtAsync'); + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + test('non-object event data is reported as interrupted, no throw', () async { final sse = makeSse(); @@ -231,6 +335,52 @@ void main() { await sub.cancel(); }); + test( + 'an SSE reconnect resets the protocol handler so partial transfer ' + 'state from the previous connection does not bleed into the new one', + () async { + // Connection 1: server-intent + put-object, then the connection + // drops BEFORE payload-transferred. The handler is mid-payload + // with one accumulated update. + // + // Connection 2 (auto-reconnect, fresh OpenEvent on the same SSE + // subscriber): a Last-Event-ID resumption could have the server + // skip server-intent and continue with put-object directly. + // Without a per-OpenEvent handler reset, the new put-object + // accumulates on top of the stale buffer. The eventual + // payload-transferred would emit BOTH puts as one payload. + // + // With the reset, the new OpenEvent rebuilds the handler. Absent + // a server-intent the new put-object lands on an inactive + // handler and is rejected; the payload-transferred surfaces as a + // protocol error rather than as a corrupted ChangeSet. + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitOpen(sse); + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'put-object', putObject(key: 'old-flag', version: 1)); + + // Reconnect. Server skips server-intent (Last-Event-ID resume). + emitOpen(sse); + emitMessage(sse, 'put-object', putObject(key: 'new-flag', version: 2)); + emitMessage(sse, 'payload-transferred', payloadTransferred()); + + await Future.delayed(Duration.zero); + + for (final result in emissions.whereType()) { + final keys = result.payload.updates.map((u) => u.key).toSet(); + expect(keys, isNot(contains('old-flag')), + reason: 'old-flag from the previous connection bled into ' + "the new connection's payload"); + } + + await sub.cancel(); + }); + test('SSE transport error is reported as interrupted', () async { final sse = makeSse(); final base = makeBase(sse); @@ -246,6 +396,43 @@ void main() { await sub.cancel(); }); + + test( + 'SSE transport error log records do not echo the request URL ' + 'or any other detail of the underlying exception', () async { + // http.ClientException's toString format is + // 'ClientException: , uri='. The URL embeds the + // base64-encoded context in GET mode, which is reversible. + // The streaming source must categorize the error and log only + // the sanitized form, like the polling sibling does. + final adapter = MockLogAdapter(); + when(() => adapter.log(any())).thenReturn(null); + final logger = LDLogger(adapter: adapter, level: LDLogLevel.debug); + + final sse = makeSse(); + final base = FDv2StreamingBase( + sseClient: sse, + pingHandler: () async => + FDv2SourceResults.interrupted(message: 'no ping'), + logger: logger, + ); + final sub = base.results.listen((_) {}); + await Future.delayed(Duration.zero); + + const secret = 'SECRET-ENCODED-CONTEXT'; + sse.emitError( + error: http.ClientException('Connection refused', + Uri.parse('https://example.test/sdk/stream/eval/$secret')), + ); + await Future.delayed(Duration.zero); + + final records = verify(() => adapter.log(captureAny())).captured; + for (final record in records) { + expect((record as LDLogRecord).message, isNot(contains(secret))); + } + + await sub.cancel(); + }); }); group('FDv1 fallback header on connect', () { @@ -334,6 +521,56 @@ void main() { await sub.cancel(); }); + test( + 'consecutive ping events do not spawn concurrent PingHandler ' + 'invocations -- excess pings are dropped while one is in flight', + () async { + // Two pings back-to-back must not result in two concurrent polls. + // The slow poll's result could otherwise overwrite the fast one's + // (out-of-order) and the polling endpoint sees DoS amplification. + // FDv2 ping semantic is "go re-poll" -- a single in-flight poll + // already satisfies it. + var concurrent = 0; + var maxConcurrent = 0; + var totalCalls = 0; + final firstCallGate = Completer(); + final firstCallReleased = Completer(); + final sse = makeSse(); + final base = makeBase( + sse, + pingHandler: () async { + totalCalls++; + concurrent++; + if (concurrent > maxConcurrent) maxConcurrent = concurrent; + if (!firstCallGate.isCompleted) firstCallGate.complete(); + // Hold the first call open until the test releases it. + await firstCallReleased.future; + concurrent--; + return FDv2SourceResults.interrupted(message: 'ok'); + }, + ); + final sub = base.results.listen((_) {}); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'ping', ''); + // Wait for the first call to enter the handler. + await firstCallGate.future; + // Fire the second ping while the first is still in flight. + emitMessage(sse, 'ping', ''); + await Future.delayed(Duration.zero); + // Release the held call so it can return. + firstCallReleased.complete(); + await Future.delayed(Duration.zero); + + expect(maxConcurrent, equals(1), + reason: 'concurrent ping handler invocations are not allowed'); + expect(totalCalls, equals(1), + reason: 'the second ping must be dropped while the first is ' + 'still in flight'); + + await sub.cancel(); + }); + test('PingHandler throwing is treated as interrupted, no propagation', () async { final sse = makeSse(); diff --git a/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart index 712b181..d563a54 100644 --- a/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart +++ b/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart @@ -89,4 +89,45 @@ void main() { (emissions.last as StatusResult).state, equals(SourceState.shutdown)); expect(sse.isClosed, isTrue); }); + + test('close() is safe to call from an onData listener reacting to goodbye', + () async { + // The Synchronizer interface contract documents close() as + // idempotent. A close() call from inside the listener's onData + // when reacting to a goodbye must not throw. + final sse = makeSse(); + final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); + Object? caught; + runZonedGuarded(() { + sync.results.listen((event) { + if (event is StatusResult && event.state == SourceState.goodbye) { + sync.close(); + } + }); + }, (err, _) => caught = err); + + await Future.delayed(Duration.zero); + emitMessage( + sse, + 'server-intent', + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': 1, + 'intentCode': 'xfer-full', + 'reason': 'test', + } + ] + }), + ); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caught, isNull, + reason: 'sync.close() from onData on goodbye must not throw'); + expect(sse.isClosed, isTrue); + }); }