fix(agent-module): correct MqttRelay availability topic emission#142
fix(agent-module): correct MqttRelay availability topic emission#142ottobolyos wants to merge 31 commits into
Conversation
57faf86 to
a220b80
Compare
915f96b to
a1f722e
Compare
bd02a4e to
1772aae
Compare
|
@PatrickRitchie, I've put this PR back in draft and reordered the queue—here's the reasoning. The only red check here is the catalogued Windows I've moved #140 and #150 to the front of the remaining queue, ahead of this PR. Together they close that flake—#140 carries the server-side Nothing in this PR's scope changed: the remaining order is #140 → #150 → #142 → #148 → #149 → #155 → #160. |
WaitForSampleShouldSucceedAfterFirstItemIsSent passed on Ubuntu CI but
intermittently failed on the Windows leg with Assert.True(false) at the
final observationEvt.WaitOne(10000), because the in-process agent's
ShdrAdapterClient could not reliably hand the test's adapter data point
across to the agent within the 10 s budget.
The Adapter side of this test binds an IPv4-only TCP listener through
AgentClientConnectionListener (TcpListener(IPAddress.Any, port)). The
client side, ShdrAdapterClient via ShdrClient.ListenForAdapter,
constructs the TcpClient with AddressFamily.InterNetwork when the
hostname does not parse as an IPAddress (and "localhost" never does, so
the helper falls through to InterNetwork by default), then calls
TcpClient.Connect(hostname, port). On Windows CI runners,
Dns.GetHostAddresses("localhost") returns the IPv6 "::1" entry before
"127.0.0.1", so .NET walks the address list and tries the IPv6 entry
against the IPv4 socket. That attempt can intermittently leave the
connection in an unestablished state without raising a fatal exception
on the caller, and subsequent writes from the adapter's worker silently
never reach the agent. Linux Dns.GetHostAddresses returns the IPv4
entry first and hits the working path, which is why Ubuntu CI is green
on the same commit.
Bind the adapter client's Hostname to IPAddress.Loopback.ToString()
("127.0.0.1") so the SHDR client side parses an IPAddress straight
away, the family check returns InterNetwork unambiguously, and the
Connect walks a single-entry IPv4 address list that matches the
adapter's listener. This mirrors the server-side bind on line 168 of
the same fixture, which already pins the embedded HTTP listener to
IPAddress.Loopback for the same reason.
Local repeats on Ubuntu (the green-on-Linux baseline rather than a
reproducer for the Windows race): the target test passes 5/5 and the
full ClientAgentCommunicationTests class passes 3/3.
Out of scope for the MqttRelay focus of this PR but added here to
unblock the queue.
Refs TrakHound#142.
|
@PatrickRitchie, is there anything you’d like me to change in this PR? 🤔 |
|
@ottobolyos Looks good! I was just waiting on #150 before I merged this as that was the sequence you mentioned |
|
You’re right, my bad. You can review #150 now. 😉 |
Adds docs/testing/issue-135.md with the per-phase writeup index for the MqttRelay availability topic fix and seeds the docs/testing/issue-135/ subfolder with the phase-00 foundation writeup.
Records the publish-surface investigation, the cppagent reference posture, and the strategy decision (relocate out of the Probe wildcard) for the issue-135 fix.
Extracts the agent availability topic-construction logic in the
MqttRelay module to a new public static helper class
AvailabilityTopic.Build(topicPrefix, agentUuid). Module.cs's
GetAgentAvailableTopic delegates to the helper. No behavior change:
the helper still emits the existing
{TopicPrefix}/Probe/{AgentUuid}/Available shape so the broken topic
contract reported in TrakHound#135 is preserved for the
incoming red-test commit. The subsequent fix flips the helper's
output to the corrected shape.
Adds tests/MTConnect.NET-AgentModule-MqttRelay-Tests/ NUnit test project covering the agent module's availability topic-construction helper. The fixture pins the corrected topic shape that resolves TrakHound#135: {TopicPrefix}/Agent/{AgentUuid}/Available rather than the broken {TopicPrefix}/Probe/{AgentUuid}/Available shape under the Probe wildcard. The fixture also covers null and empty inputs plus the public AvailableSegment constant so coverage of AvailabilityTopic.cs is 100 percent once the fix flips the Probe segment to Agent. Three of the eight tests are red against the prior commit (the extracted helper still emits the broken topic shape). The fix in the following commit makes them green. Registers the new project in MTConnect.NET.sln so the solution-level build picks it up.
Relocates the MqttRelay agent module's MQTT Last Will and Testament
topic plus its on-connect retained Available message out of the
{TopicPrefix}/Probe/# wildcard. AvailabilityTopic.Build now emits
{TopicPrefix}/Agent/{AgentUuid}/Available instead of
{TopicPrefix}/Probe/{AgentUuid}/Available. The Probe wildcard
therefore carries only JSON document envelopes, restoring the
contract a subscriber wildcarding Probe/# relies on (the prior shape
crashed JSON parsers on the raw "AVAILABLE" / "UNAVAILABLE" UTF-8
string payload).
Closes TrakHound#135
BREAKING CHANGE: operators that subscribed directly to
{TopicPrefix}/Probe/{AgentUuid}/Available for the raw availability
state must move their subscription to
{TopicPrefix}/Agent/{AgentUuid}/Available.
Adds a regression fixture that guards against re-introduction of the broken availability topic shape resolved in TrakHound#135. The fixture covers a parametric matrix of topic prefixes and agent uuids (including adversarial inputs where the prefix or the uuid is literally the string "Probe") and asserts that the dedicated availability segment stays the AvailabilityTopic.AgentSegment constant rather than collapsing back to the Probe topic constant.
Adds per-phase writeups for the red tests (phase 02), library fix (phase 03), regression pins (phase 04), end-to-end validation posture (phase 05), and the closing summary (phase 06) of the MqttRelay availability topic fix. Each writeup records executed work, metrics deltas, deviations from the original plan, and follow-ups.
Removes a stray reference to a gitignored plans path from the phase 04 regression-pins writeup. The reference resolved only off the public tree.
The docs/testing/issue-135/ subtree carried phase-by-phase campaign writeups that referenced internal tooling (CONVENTIONS rule-book, internal section numbers, extra-files.user/ paths, internal tracker terminology). Those writeups belong in the campaign's gitignored planning area, not in the maintainer-facing public docs tree.
Add NUnit coverage for the MQTT 3.1.1 reserved-character rules (section 4.7.1.1) on AvailabilityTopic.Build inputs, plus canonicalisation of leading and trailing slashes in the topicPrefix. The current Build implementation does not validate or canonicalise its inputs, so the new tests fail and pin the corrected contract.
Reject MQTT-reserved characters ('+', '#', '\0') in topicPrefix and
agentUuid per MQTT 3.1.1 §4.7.1.1; reject '/' inside agentUuid since
it is a single topic segment; canonicalise leading and trailing '/'
in topicPrefix so the resulting topic stays canonical and never
emits a stray empty segment.
Add NUnit coverage for a RelayBufferDiagnostics.ComputeMissed helper that exposes the missed-observations diagnostic computation in RelayBufferedObservations. The Module currently inlines long missed = (long)(to - lastSent); which underflows when lastSent > to. The new helper does not yet exist so the tests fail and pin the corrected contract.
Extract the missed-observation diagnostic into RelayBufferDiagnostics.ComputeMissed and route the Module through the helper. ComputeMissed returns 0 when lastSent >= to so a stale last-sent-sequence file or a rolled broker sequence no longer produces a huge spurious "missed" figure in the diagnostic log.
The MqttRelay agent module used to publish agent Availability under the
{TopicPrefix}/Probe/# wildcard (specifically
{TopicPrefix}/Probe/{AgentUuid}/Available with a raw UTF-8 string
payload). That broke the wildcard's pure-JSON contract for any
subscriber binding {TopicPrefix}/Probe/#. The module now publishes that
same Availability state on a dedicated
{TopicPrefix}/Agent/{AgentUuid}/Available topic, matching the cppagent
reference implementation.
Add a Migration section to the NuGet README so operators upgrading the
package see the topic move and the subscriber action required, rather
than only the raw API-shape contract pinned by the unit tests.
Module.OnStop() unconditionally invoked _documentServer.Stop(). In Entity-mode the constructor only initialises _entityServer, so _documentServer was null and the shutdown raised a NullReferenceException, masking the real shutdown reason in the host service event log. Pin the lifecycle policy via a focused helper: * StopServers must be a total function over (null, null). * StopServers must invoke whichever stop action is provided. * A throwing document-server stop must not prevent the entity-server stop from running, so a partial shutdown does not leak handlers. This commit only adds the failing tests against MqttRelayLifecycle so that the helper is introduced under TDD in the follow-up fix commit.
OnStop() unconditionally called _documentServer.Stop(). When the module was constructed with TopicStructure=Entity only _entityServer was initialised, so the call raised NullReferenceException during agent shutdown and masked the real shutdown reason in the host service event log. Introduce MqttRelayLifecycle.StopServers, a small policy helper that takes optional per-server stop actions, tolerates null targets, and isolates each stop in its own try/catch so a throw on one path cannot leave the other server running. Route OnStop() through the helper so the null-guard is enforced uniformly and is unit-testable without standing up an MTConnect agent broker. Pinned by MqttRelayLifecycleStopTests.
Module.OnStop() previously invoked _mqttClient.DisconnectAsync(...) as a fire-and-forget task: the returned Task was never awaited, any fault on the disconnect path was silently dropped, and the host process risked exiting before the disconnect actually completed. That hid broker errors at shutdown from the operator. Pin the lifecycle disconnect policy via a focused helper: * Successful disconnect must not invoke the fault logger. * A faulted disconnect Task must surface its exception to the logger. * A disconnect that never completes must not hang shutdown; the helper bounds the wait and treats the timeout as best-effort success. * A factory that throws synchronously must be caught and routed to the fault logger. * A null disconnect factory must no-op (worker never ran path). Tests fail to compile until DisconnectWithTimeout is introduced in the follow-up fix commit.
OnStop() called _mqttClient.DisconnectAsync(...) as a fire-and-forget task. The returned Task was never awaited, faults were silently dropped, and the host process risked exiting before the disconnect completed. Operators diagnosing a hung shutdown could not see the real broker error. Add MqttRelayLifecycle.DisconnectWithTimeout: a bounded synchronous wait on the disconnect Task with a fault-logging callback. Synchronous throws and Task faults route to the logger; a wait that elapses bails out best-effort and attaches an OnlyOnFaulted continuation so a late fault is not swallowed. Route OnStop() through the helper with a five second bound and a Warning-level log line. Pinned by MqttRelayLifecycleDisconnectTests.
Module.cs read and wrote the 64-bit _lastSentSequence field without Interlocked. On 32-bit hosts that is not atomic: a concurrent reader can observe a torn value (one half from the previous value, one from the new). MqttRelay reads the field from observation event handlers (multiple ThreadPool threads) while the durable-relay Worker writes it. A torn read can log a wildly wrong "unsent" figure and propagate to the persisted last-sent-sequence file, causing buffered observations to be skipped or re-sent on the next reconnect. Pin the atomic-access policy via a focused tracker: * Fresh tracker reads zero. * Write round-trips through Read. * Full ulong range (including high-bit values that map to negative long on Interlocked.Read) round-trips losslessly. * Last write wins: the tracker is not a max-watermark. * Concurrent writer/reader smoke test: the reader never observes a half-torn value (high half not as written by the writer). Tests fail to compile until LastSentSequenceTracker is introduced in the follow-up fix commit.
Module.cs read and wrote the 64-bit _lastSentSequence field without Interlocked. On 32-bit hosts a 64-bit field is read and written as two 32-bit halves and a concurrent reader can observe a torn value. MqttRelay reads the counter from observation event handlers (multiple ThreadPool threads) while the durable-relay Worker writes it. A torn read could log a wildly wrong "unsent" figure and propagate to the persisted last-sent-sequence file. Introduce LastSentSequenceTracker, a small class that wraps every read/write in Interlocked.Read / Interlocked.Exchange so the access is atomic on every supported runtime. Replace the bare _lastSentSequence field with the tracker and route all three call sites (RelayBufferedObservations diagnostic compute and AgentObservationAdded durable-relay path) through Read/Write. Pinned by LastSentSequenceTrackerTests including a concurrent writer/reader smoke check that asserts no torn high-half values are observed.
Module.cs exposes seven event handlers the agent broker invokes as async void. Three (AgentDeviceAdded, AgentObservationAdded, AgentAssetAdded) had no top-level try/catch; the other four guarded only the inner publish. An async-void method that throws routes its exception to the synchronization context, which on the ThreadPool tears down the host process. A formatter throw or a synchronous broker call (DataItem null-deref, broker shutting down) crashed the agent. Pin the safety policy via a focused guard helper: * Successful body must not invoke the fault logger. * Synchronous throw must route to the logger. * Async throw (Task fault) must route to the logger. * Null logger must not rethrow: the guard absorbs the fault either way. * Throwing logger must not corrupt the guard contract. * Null body must no-op (defensive against a mis-wired handler). Tests fail to compile until AsyncVoidGuard is introduced in the follow-up fix commit.
Module.cs exposes seven event handlers the agent broker invokes as async void. Three of them (AgentDeviceAdded, AgentObservationAdded, AgentAssetAdded) had no top-level try/catch; the other four (ProbeReceived, CurrentReceived, SampleReceived, AssetReceived) guarded only the inner publish but not the formatter call. An async void method that throws routes its exception to the synchronization context, and on the ThreadPool that tears down the host process. So a formatter throw, a synchronous broker call (DataItem null-deref, broker shutting down), or an unexpected publish error crashed the agent. Add AsyncVoidGuard.Run, a tiny helper that awaits a delegate and routes any synchronous or async exception to a logging callback, never rethrowing. Wrap every async void handler with it. The four formatter handlers are split into a slim async void shell plus an async Task *Core method so the existing inner try/catch logic stays intact while the Core call itself is now guarded. Pinned by AsyncVoidGuardTests.
The Worker do/while loop in Module.cs previously swallowed any
unexpected exception escaping the inner try/catch with a bare
"catch (Exception) { }". The relay quietly entered the reconnect
delay branch and the underlying defect (a throw inside the inner
finally, or an oversight in connection handling) went undiagnosed for
the lifetime of the agent.
Pin the policy via WorkerLoopExceptionLogger:
* TaskCanceledException is the orderly-shutdown signal; do not log.
* OperationCanceledException (parent type) is also a shutdown signal.
* Any other exception is genuinely unexpected and must be logged with
the exception type name and message.
* Null exception is a no-op; null callback is tolerated.
Tests fail to compile until the helper is introduced in the follow-up
fix commit.
The concurrent writer/reader smoke test asserted observed > 0 sample hits as a sanity check, but on a fast machine the writer completed before the reader took its first sample, failing the assertion without indicating a real defect. The atomicity assertions inside the reader loop are the load-bearing checks; the sample-count sanity was not. Drop the brittle observed-count assertion. Increase the iteration count (5_000 -> 100_000) so even a fast writer cannot beat the reader, and add a ManualResetEventSlim startGate so both tasks begin under the same observation window. The torn-read assertions inside the reader loop are unchanged and still load-bearing.
The Worker do/while outer-catch in Module.cs was a bare empty
"catch (Exception) { }". Any unexpected exception escaping the inner
try/catch (a throw inside the inner finally, an oversight in
connection handling) was silently swallowed. The relay quietly
entered the reconnect-delay branch and the underlying defect went
undiagnosed for the lifetime of the agent.
Introduce WorkerLoopExceptionLogger.Log: log any non-cancellation
exception at Warning with the exception type name and message.
TaskCanceledException and OperationCanceledException stay silent
because they are the orderly-shutdown signal. Route the outer-catch
through the helper so the policy is unit-testable. Pinned by
WorkerLoopExceptionLoggerTests.
Module.PublishObservations iterated the input enumerable up to three times per distinct DataItemId (Distinct, Where, FirstOrDefault). For n observations across k distinct data items the cost is O(n*k), and each iteration may re-execute a deferred upstream query (the broker's observation enumerator). On large agents (thousands of observations across hundreds of data items) the catch-up after a reconnect was materially slowed. Pin the grouping policy via a focused helper: * Null and empty input return empty. * Distinct DataItemId values become distinct groups. * The source enumerable is iterated exactly once (smoke-tested with a counting iterator). * Encounter order is preserved within each group so callers relying on sequence-monotonic ordering are not broken. * Null DataItemId is tolerated as its own (null-keyed) group rather than crashing the enumeration. Tests fail to compile until ObservationGrouper is introduced in the follow-up fix commit.
Module.PublishObservations iterated the input enumerable up to three times per distinct DataItemId (Distinct, Where, FirstOrDefault). For n observations across k distinct data items the cost was O(n*k) and each iteration could re-execute a deferred upstream broker query. On large agents that throttled the catch-up after a reconnect. Introduce ObservationGrouper.GroupByDataItem: materialise the source once and group by DataItemId, returning IGrouping<string, T> entries that preserve encounter order within each group. Refactor PublishObservations to use a foreach over groups, materialise each group once into a List, and dispatch CONDITION versus single-value data items off the first item's category. Extract the IObservationOutput -> Observation copy into a static CloneAsObservation helper so the condition and non-condition paths share the projection. Pinned by ObservationGrouperTests.
Module.cs synchronously wrote to disk from every successful
observation publish in AgentObservationAdded:
File.WriteAllText(path, seq.ToString());
Under high-rate observation arrival under DurableRelay that put a
synchronous disk write on every event-handler invocation, throttling
the relay. The fix is to track the value in memory and flush only on
a timer, on shutdown, and at batch boundaries.
Pin the persister policy:
* Update marks dirty and round-trips through Read.
* Read does not clear dirty: only Flush establishes durable state.
* TryFlush emits exactly one writer call when dirty, and clears dirty
on success. Returns true if a write happened.
* TryFlush no-ops when clean (avoids burning IOPS on idle ticks).
* TryFlush keeps dirty when the writer throws so the next tick
retries.
* Initialize seeds the in-memory value from disk without marking
dirty (no redundant flush after startup).
* Last write wins (not a max-watermark).
* Null writer is tolerated.
Tests fail to compile until LastSentSequencePersister is introduced
in the follow-up fix commit.
Module.cs synchronously read and wrote the last-sent-sequence file from the AgentObservationAdded handler on every observation under DurableRelay. On a high-rate stream that put a synchronous disk read plus a synchronous disk write on every ThreadPool callback, serialising the relay behind disk IO and crashing the host's IOPS budget. Wire LastSentSequencePersister into Module: * OnStartAfterLoad seeds the persister from ReadLastSentSequenceFromDisk (the only remaining hot disk read) and starts a one-second flush timer that calls FlushLastSentSequence. * RecordLastSentSequence (replacing SetLastSentSequence) just updates the in-memory persister; no IO on the hot path. * FlushLastSentSequence runs the actual File.WriteAllText under _lastSentSequenceLock and only when the persister is dirty. * RelayBufferedObservations reads in memory, calls RecordLastSentSequence per observation, and flushes once at the batch boundary so a crash before the next timer tick does not lose the batch's progress. * OnStop disposes the timer first, then flushes one last time, so a clean shutdown does not lose pending progress. * AgentObservationAdded no longer issues File.ReadAllText per event; the durable-relay path becomes pure in-memory work. Pinned by LastSentSequencePersisterTests.
WaitForSampleShouldSucceedAfterFirstItemIsSent passed on Ubuntu CI but
intermittently failed on the Windows leg with Assert.True(false) at the
final observationEvt.WaitOne(10000), because the in-process agent's
ShdrAdapterClient could not reliably hand the test's adapter data point
across to the agent within the 10 s budget.
The Adapter side of this test binds an IPv4-only TCP listener through
AgentClientConnectionListener (TcpListener(IPAddress.Any, port)). The
client side, ShdrAdapterClient via ShdrClient.ListenForAdapter,
constructs the TcpClient with AddressFamily.InterNetwork when the
hostname does not parse as an IPAddress (and "localhost" never does, so
the helper falls through to InterNetwork by default), then calls
TcpClient.Connect(hostname, port). On Windows CI runners,
Dns.GetHostAddresses("localhost") returns the IPv6 "::1" entry before
"127.0.0.1", so .NET walks the address list and tries the IPv6 entry
against the IPv4 socket. That attempt can intermittently leave the
connection in an unestablished state without raising a fatal exception
on the caller, and subsequent writes from the adapter's worker silently
never reach the agent. Linux Dns.GetHostAddresses returns the IPv4
entry first and hits the working path, which is why Ubuntu CI is green
on the same commit.
Bind the adapter client's Hostname to IPAddress.Loopback.ToString()
("127.0.0.1") so the SHDR client side parses an IPAddress straight
away, the family check returns InterNetwork unambiguously, and the
Connect walks a single-entry IPv4 address list that matches the
adapter's listener. This mirrors the server-side bind on line 168 of
the same fixture, which already pins the embedded HTTP listener to
IPAddress.Loopback for the same reason.
Local repeats on Ubuntu (the green-on-Linux baseline rather than a
reproducer for the Windows race): the target test passes 5/5 and the
full ClientAgentCommunicationTests class passes 3/3.
Out of scope for the MqttRelay focus of this PR but added here to
unblock the queue.
Refs TrakHound#142.
…ved Current
The Ubuntu CI flake of
ClientAgentCommunicationTests.WaitForSampleShouldSucceedAfterFirstItemIsSent
was a stream-liveness race the previous fix left uncovered. The test waited
for CurrentReceived ("Current has been served") and then published the
sample under test, on the assumption that the client's worker would have the
Sample stream open and the server-side stream worker polling by the time the
sample's sequence was assigned. Neither assumption is guaranteed:
CurrentReceived fires synchronously inside ProcessCurrentDocument, well
before the worker constructs the Sample stream client, and StreamStarted is
raised before the HTTP GET is even sent. On a loaded Linux runner the
server's MTConnectHttpServerStream worker can still be spinning up when the
sample is added, and if the agent's Heartbeat lands inside the assertion
window the stream emits only an empty heartbeat chunk that
ProcessSampleDocument drops, never raising SampleReceived. The 24s WaitOne
then elapses with the AutoResetEvent unset.
The fix introduces an explicit sample-stream-ready primitive. After
CurrentReceived, the test publishes a benign sentinel observation
("avail" = AVAILABLE, a data item the devices template defines as
UNAVAILABLE so the agent's duplicate filter accepts it), then awaits the
first SampleReceived with a non-empty Streams element. That is the only
client-side signal that proves the HTTP GET reached the server, the
server's stream worker is polling, and a multipart chunk has been parsed
back into a document. The agent assigns the sentinel a sequence at or above
the Sample stream's from=, so the next server polling tick is guaranteed to
emit it. Only then does the test publish servotemp1=120; the now-live
stream's next tick is guaranteed to include it, so the assertion is
decided on stream behaviour, not on startup timing.
The CurrentReceived wait is kept as the first sequence point (it pins the
Sample stream's starting sequence) and the test-wide CancellationToken
still bounds every stage so a genuine hang fails fast with a clear message.
Production code is untouched.
Summary
Fixes #135 — the MqttRelay agent module published Agent Availability on
{TopicPrefix}/Probe/{agent-uuid}/Availablewith a rawAVAILABLE/UNAVAILABLEstring, breaking the contract that{TopicPrefix}/Probe/#carries only JSON envelopes.{TopicPrefix}/Agent/{agent-uuid}/Available, outside the Probe wildcard.AvailabilityTopicconfiguration inputs and reject empty / whitespace / wildcard-segment values before the relay starts.MqttRelay.OnStopagainst a NullReferenceException when the relay shuts down in Entity mode.Interlocked.Read/Interlocked.Exchange, so concurrent observation publishing cannot tear the 64-bit value or thrash the I/O path.MTConnect.NET-AgentModule-MqttRelay-Testswith NUnit coverage of the topic-construction helper, a structural guard refusing any future availability topic containing/Probe/, and unit pins for the shutdown / fault-handling / atomic-counter / single-pass-grouping / in-memory-persister contracts.agent/Modules/MTConnect.NET-AgentModule-MqttRelay/README-Nuget.md(and the in-treeREADME.md).{TopicPrefix}/Probe/{agent-uuid}/Availablefor the raw-string availability state must move their subscription to{TopicPrefix}/Agent/{agent-uuid}/Available. Subscribers of theProbe/#wildcard now receive only JSON envelopes.