diff --git a/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart b/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart index 3436fa6..74318e5 100644 --- a/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart +++ b/packages/common_client/lib/src/data_sources/fdv2/protocol_handler.dart @@ -263,7 +263,9 @@ final class FDv2ProtocolHandler { } ProtocolAction _processError(ServerErrorEvent data) { - _logger.info('Server error encountered receiving updates: ${data.reason}'); + _logger.info('An issue was encountered receiving updates for payload ' + "'${data.payloadId ?? ''}' with reason: '${data.reason}'. " + 'Automatic retry will occur.'); _resetAfterError(); return ActionServerError(data.reason, id: data.payloadId); } diff --git a/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart b/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart new file mode 100644 index 0000000..d1205de --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/streaming_base.dart @@ -0,0 +1,310 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:http/http.dart' as http; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; + +import 'flag_eval_mapper.dart'; +import 'protocol_handler.dart'; +import 'protocol_types.dart'; +import 'source.dart'; +import 'source_result.dart'; + +/// Long-lived streaming data source over SSE. +/// +/// Wraps an [SSEClient] with FDv2 protocol semantics. Each named SSE +/// event is parsed as JSON, wrapped in an [FDv2Event], and fed to a +/// fresh [FDv2ProtocolHandler]. The first emitted [ProtocolAction] +/// per event is translated into an [FDv2SourceResult]: +/// +/// - [ActionPayload] --> [ChangeSetResult] with `persist: true`. +/// - [ActionGoodbye] --> goodbye [StatusResult]; the SSE connection is +/// closed. +/// - [ActionServerError] / [ActionError] --> interrupted +/// [StatusResult]; the SSE client's built-in retry handles the +/// reconnect. +/// - [ActionNone] --> no emission (waiting for more events). +/// +/// Legacy `ping` events are routed to the injected [PingHandler] (which +/// performs a one-shot poll) and the result is forwarded to the +/// stream. This is the streaming-to-polling bridge for older servers +/// that pre-date FDv2. +/// +/// The `x-ld-fd-fallback` header on the initial connection's response +/// is detected and produces a terminal-error result with +/// `fdv1Fallback: true`. The connection is closed. +/// +/// Lifecycle: a single-subscription stream. [results] starts the SSE +/// connection on subscribe; cancelling the subscription tears it down +/// without emitting a shutdown. [close] both stops the source and +/// emits a shutdown [StatusResult] before closing the stream. Both +/// paths funnel through a `Completer _stoppedSignal` so async +/// callbacks short-circuit safely. +/// +/// `SSEClient.restart` is intentionally not surfaced here. The +/// orchestrator drives connection lifecycle by tearing down a +/// streaming source and constructing a fresh one (e.g. on credential +/// rotation or basis change), not by reconnecting an existing one. +final class FDv2StreamingBase { + final SSEClient _sseClient; + final PingHandler _pingHandler; + final DateTime Function() _now; + final LDLogger _logger; + + late final StreamController _controller; + final Completer _stoppedSignal = Completer(); + StreamSubscription? _sseSubscription; + FDv2ProtocolHandler? _handler; + String? _environmentId; + bool _pingInFlight = false; + + FDv2StreamingBase({ + required SSEClient sseClient, + required PingHandler pingHandler, + required LDLogger logger, + DateTime Function()? now, + }) : _sseClient = sseClient, + _pingHandler = pingHandler, + _logger = logger.subLogger('FDv2StreamingBase'), + _now = now ?? DateTime.now { + _controller = StreamController( + onListen: _onListen, + onCancel: _onCancel, + ); + } + + /// Single-subscription stream of results. The SSE connection is + /// established lazily on the first [Stream.listen] call. + Stream get results => _controller.stream; + + /// Stops the source, emits a shutdown [StatusResult], and closes the + /// stream. Idempotent. + void close() { + _terminate( + finalResult: + FDv2SourceResults.shutdown(message: 'Streaming source closed')); + } + + /// Terminal-path helper used by [close] and by the in-stream + /// terminal paths (goodbye event, fdv1-fallback header). Completes + /// [_stoppedSignal] *first* so any subsequent [close] call -- e.g. + /// from inside an `onData` listener reacting to the [finalResult] + /// we are about to emit -- short-circuits at its guard instead of + /// racing into a closed controller. Idempotent. + void _terminate({FDv2SourceResult? finalResult}) { + if (_stoppedSignal.isCompleted) return; + _stoppedSignal.complete(); + _tearDownConnection(); + if (!_controller.isClosed) { + if (finalResult != null) { + _controller.add(finalResult); + } + _controller.close(); + } + } + + void _onListen() { + _resetHandler(); + _sseSubscription = _sseClient.stream.listen( + _handleEvent, + onError: _handleSseError, + ); + } + + /// Builds a fresh [FDv2ProtocolHandler]. Called on initial connect + /// and on every subsequent [OpenEvent] (SSE auto-reconnect), so a + /// partial transfer from the previous connection cannot bleed into + /// the new one regardless of whether the server re-sends + /// `server-intent` after a Last-Event-ID resumption. Also called + /// after a mid-event throw inside [processEvent] so any + /// half-accumulated `_tempUpdates` are discarded. + void _resetHandler() { + _handler = FDv2ProtocolHandler( + objProcessors: {flagEvalKind: processFlagEval}, + logger: _logger, + ); + } + + Future _onCancel() async { + if (_stoppedSignal.isCompleted) return; + _stoppedSignal.complete(); + _tearDownConnection(); + // No shutdown emission -- the subscriber asked us to stop. + } + + void _tearDownConnection() { + _sseSubscription?.cancel(); + _sseSubscription = null; + // Best-effort close. The SSE client may already be closed if it + // emitted an error; that's fine -- the operation is documented as + // safe in any state. + _sseClient.close(); + } + + void _handleEvent(Event event) { + if (_stoppedSignal.isCompleted) return; + switch (event) { + case OpenEvent open: + _handleOpen(open); + case MessageEvent message: + _handleMessage(message); + } + } + + void _handleOpen(OpenEvent event) { + // Every OpenEvent represents a (re)established connection. Rebuild + // the protocol handler so a partial transfer from the prior + // connection cannot bleed into this one -- the SDK must defend + // against this regardless of whether the server respects the + // protocol's "re-send server-intent on resume" semantic. + _resetHandler(); + + final headers = event.headers; + if (headers == null) return; + + final envId = headers['x-ld-envid']; + if (envId != null) { + _environmentId = envId; + } + + final fallback = headers['x-ld-fd-fallback']?.toLowerCase() == 'true'; + if (fallback) { + // Server told us to fall back; route through the terminal helper + // so a close() from the listener's onData -- a natural reaction + // to a fallback signal -- doesn't race with our own close. + _terminate( + finalResult: FDv2SourceResults.terminalError( + message: 'Server requested FDv1 fallback', + fdv1Fallback: true, + )); + } + } + + Future _handleMessage(MessageEvent event) async { + if (event.type == 'ping') { + // Legacy bridge: older servers may still send `ping` instead of + // FDv2 events. Defer to the injected handler for a one-shot poll. + await _handlePing(); + return; + } + + final ProtocolAction action; + try { + final decoded = jsonDecode(event.data); + if (decoded is! Map) { + _logger.warn('Ignoring SSE event with non-object data: ' + 'event=${event.type}'); + _emit(FDv2SourceResults.interrupted( + message: 'Streaming event payload was not a JSON object')); + return; + } + // Wrap the protocol-handler dispatch in the same try/catch as the + // jsonDecode: the structural casts inside the per-event fromJson + // factories (e.g. PayloadIntent, PutObjectEvent) throw TypeError + // on shape mismatch and would otherwise become unhandled async + // exceptions. + action = + _handler!.processEvent(FDv2Event(event: event.type, data: decoded)); + } catch (err) { + _logger.warn('Failed to parse or process SSE event (${err.runtimeType})'); + // Reset the handler -- a mid-event throw can leave it with stale + // _tempUpdates from the partially-processed payload. + _resetHandler(); + _emit(FDv2SourceResults.interrupted( + message: 'Streaming event payload was malformed')); + return; + } + + if (_stoppedSignal.isCompleted) return; + + switch (action) { + case ActionPayload(:final payload): + _emit(ChangeSetResult( + payload: payload, + environmentId: _environmentId, + freshness: _now(), + persist: true, + )); + case ActionGoodbye(:final reason): + // Server told us to disconnect; route through the terminal + // helper so a close() from the listener's onData -- a natural + // reaction to a goodbye -- doesn't race with our own close. + _terminate( + finalResult: FDv2SourceResults.goodbyeResult(message: reason)); + case ActionServerError(:final reason): + _emit(FDv2SourceResults.interrupted(message: reason)); + case ActionError(:final message): + _emit(FDv2SourceResults.interrupted(message: message)); + case ActionNone(): + // No emission; continue accumulating events until the handler + // reaches a terminal action. + break; + } + } + + Future _handlePing() async { + // The FDv2 ping semantic is "go re-poll". A single in-flight poll + // already satisfies any number of pings that arrive while it is + // running, so drop excess pings rather than spawning concurrent + // polls (which would race on emit-order and amplify load on the + // polling endpoint). + if (_pingInFlight) return; + _pingInFlight = true; + try { + final FDv2SourceResult result; + try { + result = await _pingHandler(); + } catch (err) { + _logger.warn('Ping handler threw unexpectedly: ${err.runtimeType}'); + _emit(FDv2SourceResults.interrupted( + message: 'Ping handler raised error unexpectedly')); + return; + } + if (_stoppedSignal.isCompleted) return; + _emit(result); + } finally { + _pingInFlight = false; + } + } + + void _handleSseError(Object err, StackTrace stack) { + if (_stoppedSignal.isCompleted) return; + // The SSE client's built-in backoff handles reconnection. Surface + // the disruption as interrupted; the orchestrator decides whether + // to fall through to a different source after enough time. + // + // Don't log the raw exception. http.ClientException's toString + // formats as 'ClientException: , uri=', and in GET + // mode the URL embeds the base64-encoded context. Only the + // category and a synthetic stack header go to the log. + _logger.warn('SSE error (${err.runtimeType}); will retry'); + _logger.debug('SSE error stack:\n$stack'); + _emit(FDv2SourceResults.interrupted(message: _describeError(err))); + } + + /// Categorizes an exception surfaced on the SSE stream into a fixed + /// sanitized message. Mirrors the polling base's helper so neither + /// surface (the public StatusResult.message nor the warn log) ever + /// echoes a raw http.ClientException -- whose toString carries the + /// full request URL. + String _describeError(Object err) { + if (err is TimeoutException) { + return 'Streaming request timed out'; + } + if (err is http.ClientException) { + return 'Network error during streaming request'; + } + final type = err.runtimeType.toString(); + if (type.contains('Tls') || type.contains('Handshake')) { + return 'TLS error during streaming request'; + } + return 'Streaming connection error'; + } + + void _emit(FDv2SourceResult result) { + if (_stoppedSignal.isCompleted) return; + if (_controller.isClosed) return; + _controller.add(result); + } +} diff --git a/packages/common_client/lib/src/data_sources/fdv2/streaming_initializer.dart b/packages/common_client/lib/src/data_sources/fdv2/streaming_initializer.dart new file mode 100644 index 0000000..9e9629c --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/streaming_initializer.dart @@ -0,0 +1,59 @@ +import 'dart:async'; + +import 'source.dart'; +import 'source_result.dart'; +import 'streaming_base.dart'; + +/// One-shot streaming initializer. +/// +/// Subscribes to the underlying [FDv2StreamingBase], returns the first +/// emitted [FDv2SourceResult], and tears the connection down. Used at +/// SDK init time to bring the SDK to a usable state from the streaming +/// path before handing off to the long-lived synchronizer. +/// +/// Calling [close] before the first emission resolves the pending +/// [run] future with a [SourceState.shutdown] result. +final class FDv2StreamingInitializer implements Initializer { + final FDv2StreamingBase _base; + final Completer _completer = Completer(); + StreamSubscription? _subscription; + bool _closed = false; + + FDv2StreamingInitializer({required FDv2StreamingBase base}) : _base = base; + + @override + Future run() { + if (_closed) { + return Future.value(_shutdownResult()); + } + _subscription = _base.results.listen((result) { + if (_completer.isCompleted) return; + _completer.complete(result); + // First emission received; tear down. + _subscription?.cancel(); + _subscription = null; + _base.close(); + }, onDone: () { + if (_completer.isCompleted) return; + // The base closed before producing a result. Surface as shutdown. + _completer.complete(_shutdownResult()); + }); + return _completer.future; + } + + @override + void close() { + if (_closed) return; + _closed = true; + _subscription?.cancel(); + _subscription = null; + _base.close(); + if (!_completer.isCompleted) { + _completer.complete(_shutdownResult()); + } + } + + StatusResult _shutdownResult() => FDv2SourceResults.shutdown( + message: 'Streaming initializer closed before first emission', + ); +} diff --git a/packages/common_client/lib/src/data_sources/fdv2/streaming_synchronizer.dart b/packages/common_client/lib/src/data_sources/fdv2/streaming_synchronizer.dart new file mode 100644 index 0000000..12dde88 --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/streaming_synchronizer.dart @@ -0,0 +1,22 @@ +import 'source.dart'; +import 'source_result.dart'; +import 'streaming_base.dart'; + +/// Long-lived streaming synchronizer. +/// +/// A thin adapter that exposes [FDv2StreamingBase.results] as a +/// [Synchronizer]. The base class already implements all of the +/// connection lifecycle, protocol parsing, and error handling; this +/// wrapper exists only to satisfy the [Synchronizer] interface so the +/// orchestrator can treat polling and streaming uniformly. +final class FDv2StreamingSynchronizer implements Synchronizer { + final FDv2StreamingBase _base; + + FDv2StreamingSynchronizer({required FDv2StreamingBase base}) : _base = base; + + @override + Stream get results => _base.results; + + @override + void close() => _base.close(); +} diff --git a/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart b/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart index 2532151..af2c7b6 100644 --- a/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart +++ b/packages/common_client/test/data_sources/fdv2/protocol_handler_test.dart @@ -1,10 +1,21 @@ -import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/protocol_handler.dart'; import 'package:launchdarkly_common_client/src/data_sources/fdv2/protocol_types.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:mocktail/mocktail.dart'; import 'package:test/test.dart'; +class MockLogAdapter extends Mock implements LDLogAdapter {} + void main() { + setUpAll(() { + registerFallbackValue(LDLogRecord( + level: LDLogLevel.debug, + message: '', + time: DateTime.now(), + logTag: '')); + }); + final logger = LDLogger(); FDv2ProtocolHandler makeHandler() { @@ -433,6 +444,52 @@ void main() { // but temp updates should be cleared. New data can follow. expect(handler.state, equals(ProtocolState.full)); }); + + test('logs the spec-mandated message format with payload id and reason', + () { + final adapter = MockLogAdapter(); + when(() => adapter.log(any())).thenReturn(null); + final loggerWithAdapter = LDLogger(adapter: adapter); + final handler = FDv2ProtocolHandler( + objProcessors: {'flag-eval': (obj) => obj}, + logger: loggerWithAdapter, + ); + handler.processEvent(serverIntent('xfer-full')); + + handler.processEvent(FDv2Event( + event: FDv2EventTypes.error, + data: {'payload_id': 'p-7', 'reason': 'oops'})); + + final records = verify(() => adapter.log(captureAny())).captured; + final messages = records.map((r) => (r as LDLogRecord).message).toList(); + expect( + messages, + contains("An issue was encountered receiving updates for payload 'p-7' " + "with reason: 'oops'. Automatic retry will occur."), + ); + }); + + test('uses a placeholder for the payload id when the field is missing', () { + final adapter = MockLogAdapter(); + when(() => adapter.log(any())).thenReturn(null); + final loggerWithAdapter = LDLogger(adapter: adapter); + final handler = FDv2ProtocolHandler( + objProcessors: {'flag-eval': (obj) => obj}, + logger: loggerWithAdapter, + ); + handler.processEvent(serverIntent('xfer-full')); + + handler.processEvent( + FDv2Event(event: FDv2EventTypes.error, data: {'reason': 'oops'})); + + final records = verify(() => adapter.log(captureAny())).captured; + final messages = records.map((r) => (r as LDLogRecord).message).toList(); + expect( + messages.any( + (m) => m.contains("for payload '' with reason: 'oops'")), + isTrue, + ); + }); }); group('heartbeat', () { diff --git a/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart new file mode 100644 index 0000000..e6f1ee8 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/streaming_base_test.dart @@ -0,0 +1,596 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:convert'; + +import 'package:http/http.dart' as http; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_base.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:test/test.dart'; + +class MockLogAdapter extends Mock implements LDLogAdapter {} + +TestSseClient makeSse() => SSEClient.testClient(Uri.parse('/test'), const {}); + +void emitOpen(TestSseClient sse, {Map? headers}) { + sse.emitEvent(OpenEvent( + headers: headers == null ? null : UnmodifiableMapView(headers), + )); +} + +void emitMessage(TestSseClient sse, String type, String data, {String? id}) { + sse.emitEvent(MessageEvent(type, data, id)); +} + +String serverIntent({String intentCode = 'xfer-full', int target = 1}) => + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': target, + 'intentCode': intentCode, + 'reason': 'test', + } + ] + }); + +String putObject({ + String key = 'flag-a', + int version = 1, +}) => + jsonEncode({ + 'kind': 'flag-eval', + 'key': key, + 'version': version, + 'object': {'value': true, 'version': version, 'variation': 0}, + }); + +String payloadTransferred({String state = 'sel-1', int version = 1}) => + jsonEncode({ + 'state': state, + 'version': version, + }); + +void emitFullPayload(TestSseClient sse, + {String state = 'sel-1', String flagKey = 'flag-a'}) { + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'put-object', putObject(key: flagKey)); + emitMessage(sse, 'payload-transferred', payloadTransferred(state: state)); +} + +FDv2StreamingBase makeBase( + TestSseClient sse, { + Future Function()? pingHandler, + DateTime Function()? now, +}) { + return FDv2StreamingBase( + sseClient: sse, + pingHandler: pingHandler ?? + () async => FDv2SourceResults.interrupted(message: 'no ping handler'), + logger: LDLogger(level: LDLogLevel.error), + now: now, + ); +} + +void main() { + setUpAll(() { + registerFallbackValue(LDLogRecord( + level: LDLogLevel.debug, + message: '', + time: DateTime.now(), + logTag: '')); + }); + + group('connection lifecycle', () { + test('opens the SSE stream on first listen', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + // No emission yet -- nothing has come in over the SSE stream. + expect(emissions, isEmpty); + + // Drive a full xfer-full sequence and confirm the resulting + // ChangeSetResult is emitted. + emitFullPayload(sse); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(1)); + expect(emissions.single, isA()); + + await sub.cancel(); + }); + + test( + 'subscription cancel tears down the SSE client without emitting ' + 'shutdown', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + await sub.cancel(); + await Future.delayed(Duration.zero); + + expect(sse.isClosed, isTrue); + expect(emissions.whereType(), isEmpty); + }); + + test('close() emits shutdown then closes the stream', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final done = Completer(); + base.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + base.close(); + await done.future; + + expect(emissions, hasLength(1)); + expect((emissions.single as StatusResult).state, + equals(SourceState.shutdown)); + expect(sse.isClosed, isTrue); + }); + + test('close() is idempotent', () async { + final sse = makeSse(); + final base = makeBase(sse); + base.close(); + expect(() => base.close(), returnsNormally); + }); + + test('close() is safe to call from a listener reacting to a goodbye', + () async { + // The orchestrator can legitimately react to a goodbye result + // by calling base.close(). That path must not race with the + // self-close the goodbye branch already does internally. + final sse = makeSse(); + final base = makeBase(sse); + Object? caught; + runZonedGuarded(() { + base.results.listen((event) { + if (event is StatusResult && event.state == SourceState.goodbye) { + base.close(); + } + }); + }, (err, _) => caught = err); + + await Future.delayed(Duration.zero); + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); + // Yield enough times for the listener and any async work to settle. + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caught, isNull, + reason: + 'close() from onData on a goodbye must not throw, got $caught'); + expect(sse.isClosed, isTrue); + }); + + test( + 'close() is safe to call from a listener reacting to an FDv1 ' + 'fallback', () async { + // Same race risk as the goodbye case, but for the fdv1-fallback + // terminal branch. + final sse = makeSse(); + final base = makeBase(sse); + Object? caught; + runZonedGuarded(() { + base.results.listen((event) { + if (event is StatusResult && event.fdv1Fallback) { + base.close(); + } + }); + }, (err, _) => caught = err); + + await Future.delayed(Duration.zero); + emitOpen(sse, headers: {'x-ld-fd-fallback': 'true'}); + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caught, isNull, + reason: + 'close() from onData on fallback must not throw, got $caught'); + expect(sse.isClosed, isTrue); + }); + }); + + group('event handling', () { + test('xfer-full sequence produces ChangeSetResult with full payload', + () async { + final sse = makeSse(); + final fixedNow = DateTime.utc(2026, 1, 1); + final base = makeBase(sse, now: () => fixedNow); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitFullPayload(sse, state: 'sel-99', flagKey: 'k1'); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(1)); + final cs = emissions.single as ChangeSetResult; + expect(cs.payload.type, equals(PayloadType.full)); + expect(cs.payload.selector.state, equals('sel-99')); + expect(cs.payload.updates.single.key, equals('k1')); + expect(cs.persist, isTrue); + expect(cs.freshness, equals(fixedNow)); + + await sub.cancel(); + }); + + test('environmentId from x-ld-envid header rides on the ChangeSetResult', + () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitOpen(sse, headers: {'x-ld-envid': 'env-abc'}); + emitFullPayload(sse); + await Future.delayed(Duration.zero); + + expect((emissions.single as ChangeSetResult).environmentId, + equals('env-abc')); + + await sub.cancel(); + }); + + test('goodbye event closes the source and emits a goodbye result', + () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final done = Completer(); + base.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); + await done.future; + + expect(emissions, hasLength(1)); + expect((emissions.single as StatusResult).state, + equals(SourceState.goodbye)); + expect(sse.isClosed, isTrue); + }); + + test('unparseable event data is reported as interrupted, no throw', + () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'put-object', 'not json'); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + + test( + 'malformed-shape event data that passes the Map check but fails ' + 'inside processEvent is reported as interrupted, no unhandled async', + () async { + // The data is a JSON object, but its inner structure violates + // the FDv2 protocol: payloads must be a list, not a string. The + // List cast inside protocol_types.dart throws TypeError + // synchronously from processEvent. The streaming source must + // catch it and surface as interrupted, not let it become an + // unhandled async exception. + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + Object? caughtAsync; + late final StreamSubscription sub; + runZonedGuarded(() { + sub = base.results.listen(emissions.add); + }, (err, _) => caughtAsync = err); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'server-intent', jsonEncode({'payloads': 'not-a-list'})); + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caughtAsync, isNull, + reason: 'malformed inner shape must not become unhandled async, ' + 'got $caughtAsync'); + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + + test('non-object event data is reported as interrupted, no throw', + () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'server-intent', '[1,2,3]'); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + + test( + 'an SSE reconnect resets the protocol handler so partial transfer ' + 'state from the previous connection does not bleed into the new one', + () async { + // Connection 1: server-intent + put-object, then the connection + // drops BEFORE payload-transferred. The handler is mid-payload + // with one accumulated update. + // + // Connection 2 (auto-reconnect, fresh OpenEvent on the same SSE + // subscriber): a Last-Event-ID resumption could have the server + // skip server-intent and continue with put-object directly. + // Without a per-OpenEvent handler reset, the new put-object + // accumulates on top of the stale buffer. The eventual + // payload-transferred would emit BOTH puts as one payload. + // + // With the reset, the new OpenEvent rebuilds the handler. Absent + // a server-intent the new put-object lands on an inactive + // handler and is rejected; the payload-transferred surfaces as a + // protocol error rather than as a corrupted ChangeSet. + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitOpen(sse); + emitMessage(sse, 'server-intent', serverIntent()); + emitMessage(sse, 'put-object', putObject(key: 'old-flag', version: 1)); + + // Reconnect. Server skips server-intent (Last-Event-ID resume). + emitOpen(sse); + emitMessage(sse, 'put-object', putObject(key: 'new-flag', version: 2)); + emitMessage(sse, 'payload-transferred', payloadTransferred()); + + await Future.delayed(Duration.zero); + + for (final result in emissions.whereType()) { + final keys = result.payload.updates.map((u) => u.key).toSet(); + expect(keys, isNot(contains('old-flag')), + reason: 'old-flag from the previous connection bled into ' + "the new connection's payload"); + } + + await sub.cancel(); + }); + + test('SSE transport error is reported as interrupted', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + sse.emitError(error: Exception('connection dropped')); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + + test( + 'SSE transport error log records do not echo the request URL ' + 'or any other detail of the underlying exception', () async { + // http.ClientException's toString format is + // 'ClientException: , uri='. The URL embeds the + // base64-encoded context in GET mode, which is reversible. + // The streaming source must categorize the error and log only + // the sanitized form, like the polling sibling does. + final adapter = MockLogAdapter(); + when(() => adapter.log(any())).thenReturn(null); + final logger = LDLogger(adapter: adapter, level: LDLogLevel.debug); + + final sse = makeSse(); + final base = FDv2StreamingBase( + sseClient: sse, + pingHandler: () async => + FDv2SourceResults.interrupted(message: 'no ping'), + logger: logger, + ); + final sub = base.results.listen((_) {}); + await Future.delayed(Duration.zero); + + const secret = 'SECRET-ENCODED-CONTEXT'; + sse.emitError( + error: http.ClientException('Connection refused', + Uri.parse('https://example.test/sdk/stream/eval/$secret')), + ); + await Future.delayed(Duration.zero); + + final records = verify(() => adapter.log(captureAny())).captured; + for (final record in records) { + expect((record as LDLogRecord).message, isNot(contains(secret))); + } + + await sub.cancel(); + }); + }); + + group('FDv1 fallback header on connect', () { + test( + 'x-ld-fd-fallback: true on the OpenEvent emits terminalError and ' + 'closes', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final done = Completer(); + base.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + emitOpen(sse, headers: {'x-ld-fd-fallback': 'true'}); + await done.future; + + expect(emissions, hasLength(1)); + final status = emissions.single as StatusResult; + expect(status.state, equals(SourceState.terminalError)); + expect(status.fdv1Fallback, isTrue); + expect(sse.isClosed, isTrue); + }); + + test('fallback header is matched case-insensitively', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitOpen(sse, headers: {'x-ld-fd-fallback': 'TRUE'}); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).fdv1Fallback, isTrue); + + await sub.cancel(); + }); + + test('fallback header value other than true is ignored', () async { + final sse = makeSse(); + final base = makeBase(sse); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitOpen(sse, headers: {'x-ld-fd-fallback': 'false'}); + emitFullPayload(sse); + await Future.delayed(Duration.zero); + + expect(emissions.single, isA()); + expect(emissions.single.fdv1Fallback, isFalse); + + await sub.cancel(); + }); + }); + + group('legacy ping bridge', () { + test( + 'ping event invokes the PingHandler and forwards its result to ' + 'the stream', () async { + var pingCallCount = 0; + final pingResult = ChangeSetResult( + payload: const Payload(type: PayloadType.full, updates: []), + persist: true, + freshness: DateTime.utc(2026, 1, 1), + ); + final sse = makeSse(); + final base = makeBase( + sse, + pingHandler: () async { + pingCallCount++; + return pingResult; + }, + ); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'ping', ''); + await Future.delayed(Duration.zero); + + expect(pingCallCount, equals(1)); + expect(emissions, hasLength(1)); + expect(identical(emissions.single, pingResult), isTrue); + + await sub.cancel(); + }); + + test( + 'consecutive ping events do not spawn concurrent PingHandler ' + 'invocations -- excess pings are dropped while one is in flight', + () async { + // Two pings back-to-back must not result in two concurrent polls. + // The slow poll's result could otherwise overwrite the fast one's + // (out-of-order) and the polling endpoint sees DoS amplification. + // FDv2 ping semantic is "go re-poll" -- a single in-flight poll + // already satisfies it. + var concurrent = 0; + var maxConcurrent = 0; + var totalCalls = 0; + final firstCallGate = Completer(); + final firstCallReleased = Completer(); + final sse = makeSse(); + final base = makeBase( + sse, + pingHandler: () async { + totalCalls++; + concurrent++; + if (concurrent > maxConcurrent) maxConcurrent = concurrent; + if (!firstCallGate.isCompleted) firstCallGate.complete(); + // Hold the first call open until the test releases it. + await firstCallReleased.future; + concurrent--; + return FDv2SourceResults.interrupted(message: 'ok'); + }, + ); + final sub = base.results.listen((_) {}); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'ping', ''); + // Wait for the first call to enter the handler. + await firstCallGate.future; + // Fire the second ping while the first is still in flight. + emitMessage(sse, 'ping', ''); + await Future.delayed(Duration.zero); + // Release the held call so it can return. + firstCallReleased.complete(); + await Future.delayed(Duration.zero); + + expect(maxConcurrent, equals(1), + reason: 'concurrent ping handler invocations are not allowed'); + expect(totalCalls, equals(1), + reason: 'the second ping must be dropped while the first is ' + 'still in flight'); + + await sub.cancel(); + }); + + test('PingHandler throwing is treated as interrupted, no propagation', + () async { + final sse = makeSse(); + final base = makeBase( + sse, + pingHandler: () async { + throw StateError('boom'); + }, + ); + final emissions = []; + final sub = base.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitMessage(sse, 'ping', ''); + await Future.delayed(Duration.zero); + + expect((emissions.single as StatusResult).state, + equals(SourceState.interrupted)); + + await sub.cancel(); + }); + }); +} diff --git a/packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart new file mode 100644 index 0000000..1d3bfd8 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/streaming_initializer_test.dart @@ -0,0 +1,158 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:convert'; + +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_base.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_initializer.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; +import 'package:test/test.dart'; + +TestSseClient makeSse() => SSEClient.testClient(Uri.parse('/test'), const {}); + +void emitMessage(TestSseClient sse, String type, String data) { + sse.emitEvent(MessageEvent(type, data, null)); +} + +void emitOpen(TestSseClient sse, {Map? headers}) { + sse.emitEvent(OpenEvent( + headers: headers == null ? null : UnmodifiableMapView(headers))); +} + +void emitFullPayload(TestSseClient sse, {String state = 'sel-1'}) { + emitMessage( + sse, + 'server-intent', + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': 1, + 'intentCode': 'xfer-full', + 'reason': 'test', + } + ] + }), + ); + emitMessage( + sse, + 'put-object', + jsonEncode({ + 'kind': 'flag-eval', + 'key': 'k', + 'version': 1, + 'object': {'value': true, 'version': 1, 'variation': 0}, + }), + ); + emitMessage( + sse, + 'payload-transferred', + jsonEncode({'state': state, 'version': 1}), + ); +} + +FDv2StreamingBase makeBase(TestSseClient sse) => FDv2StreamingBase( + sseClient: sse, + pingHandler: () async => + FDv2SourceResults.interrupted(message: 'no ping'), + logger: LDLogger(level: LDLogLevel.error), + ); + +void main() { + test('returns the first ChangeSetResult and tears the connection down', + () async { + final sse = makeSse(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + + final future = init.run(); + // Yield once so run subscribes to the base. + await Future.delayed(Duration.zero); + + emitFullPayload(sse, state: 'sel-init'); + final result = await future; + + expect(result, isA()); + expect( + (result as ChangeSetResult).payload.selector.state, equals('sel-init')); + expect(sse.isClosed, isTrue); + }); + + test('surfaces a goodbye result as the first emission', () async { + final sse = makeSse(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + final future = init.run(); + await Future.delayed(Duration.zero); + + emitMessage( + sse, + 'server-intent', + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': 1, + 'intentCode': 'xfer-full', + 'reason': 'test', + } + ] + }), + ); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); + + final result = await future; + expect((result as StatusResult).state, equals(SourceState.goodbye)); + expect(sse.isClosed, isTrue); + }); + + test('surfaces FDv1 fallback as terminalError', () async { + final sse = makeSse(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + final future = init.run(); + await Future.delayed(Duration.zero); + + emitOpen(sse, headers: {'x-ld-fd-fallback': 'true'}); + + final result = await future; + final status = result as StatusResult; + expect(status.state, equals(SourceState.terminalError)); + expect(status.fdv1Fallback, isTrue); + }); + + test('close before any emission resolves with a shutdown result', () async { + final sse = makeSse(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + final future = init.run(); + await Future.delayed(Duration.zero); + + init.close(); + + final result = await future; + expect((result as StatusResult).state, equals(SourceState.shutdown)); + expect(sse.isClosed, isTrue); + }); + + test('close after run() returns is idempotent', () async { + final sse = makeSse(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + final future = init.run(); + await Future.delayed(Duration.zero); + + emitFullPayload(sse); + await future; + + expect(() => init.close(), returnsNormally); + expect(() => init.close(), returnsNormally); + }); + + test('close() before run() yields a shutdown result without a subscription', + () async { + final sse = makeSse(); + final init = FDv2StreamingInitializer(base: makeBase(sse)); + + init.close(); + final result = await init.run(); + + expect((result as StatusResult).state, equals(SourceState.shutdown)); + }); +} diff --git a/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart b/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart new file mode 100644 index 0000000..d563a54 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/streaming_synchronizer_test.dart @@ -0,0 +1,133 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_base.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/streaming_synchronizer.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; +import 'package:test/test.dart'; + +TestSseClient makeSse() => SSEClient.testClient(Uri.parse('/test'), const {}); + +void emitMessage(TestSseClient sse, String type, String data) { + sse.emitEvent(MessageEvent(type, data, null)); +} + +void emitFullPayload(TestSseClient sse, {String state = 'sel-1'}) { + emitMessage( + sse, + 'server-intent', + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': 1, + 'intentCode': 'xfer-full', + 'reason': 'test', + } + ] + }), + ); + emitMessage( + sse, + 'put-object', + jsonEncode({ + 'kind': 'flag-eval', + 'key': 'k', + 'version': 1, + 'object': {'value': true, 'version': 1, 'variation': 0}, + }), + ); + emitMessage( + sse, + 'payload-transferred', + jsonEncode({'state': state, 'version': 1}), + ); +} + +FDv2StreamingBase makeBase(TestSseClient sse) => FDv2StreamingBase( + sseClient: sse, + pingHandler: () async => + FDv2SourceResults.interrupted(message: 'no ping'), + logger: LDLogger(level: LDLogLevel.error), + ); + +void main() { + test('forwards results from the underlying base', () async { + final sse = makeSse(); + final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); + final emissions = []; + final sub = sync.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + emitFullPayload(sse, state: 'sel-1'); + emitFullPayload(sse, state: 'sel-2'); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(2)); + expect((emissions[0] as ChangeSetResult).payload.selector.state, + equals('sel-1')); + expect((emissions[1] as ChangeSetResult).payload.selector.state, + equals('sel-2')); + + await sub.cancel(); + }); + + test('close forwards to the base, emitting shutdown', () async { + final sse = makeSse(); + final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); + final emissions = []; + final done = Completer(); + sync.results.listen(emissions.add, onDone: done.complete); + await Future.delayed(Duration.zero); + + sync.close(); + await done.future; + + expect( + (emissions.last as StatusResult).state, equals(SourceState.shutdown)); + expect(sse.isClosed, isTrue); + }); + + test('close() is safe to call from an onData listener reacting to goodbye', + () async { + // The Synchronizer interface contract documents close() as + // idempotent. A close() call from inside the listener's onData + // when reacting to a goodbye must not throw. + final sse = makeSse(); + final sync = FDv2StreamingSynchronizer(base: makeBase(sse)); + Object? caught; + runZonedGuarded(() { + sync.results.listen((event) { + if (event is StatusResult && event.state == SourceState.goodbye) { + sync.close(); + } + }); + }, (err, _) => caught = err); + + await Future.delayed(Duration.zero); + emitMessage( + sse, + 'server-intent', + jsonEncode({ + 'payloads': [ + { + 'id': 'p1', + 'target': 1, + 'intentCode': 'xfer-full', + 'reason': 'test', + } + ] + }), + ); + emitMessage(sse, 'goodbye', jsonEncode({'reason': 'maintenance'})); + for (var i = 0; i < 5; i++) { + await Future.delayed(Duration.zero); + } + + expect(caught, isNull, + reason: 'sync.close() from onData on goodbye must not throw'); + expect(sse.isClosed, isTrue); + }); +} diff --git a/packages/event_source_client/lib/src/test_sse_client.dart b/packages/event_source_client/lib/src/test_sse_client.dart index d65df43..c8f497e 100644 --- a/packages/event_source_client/lib/src/test_sse_client.dart +++ b/packages/event_source_client/lib/src/test_sse_client.dart @@ -37,6 +37,12 @@ final class TestSseClient implements SSEClient { bool hasCapability(SSECapability capability) => _capabilities.contains(capability); + /// Whether [close] has been called on this client. Test-only -- + /// production [SSEClient] implementations do not expose this state, + /// and tests asserting against it are inherently white-box. Use to + /// verify that code under test correctly tears the connection down. + bool get isClosed => _messageEventsController.isClosed; + /// Emit an event on the stream. /// Has no effect if the client has been closed. ///