Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions nri-kafka/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Unreleased

- Fix `sendSync` hanging forever when a message fails to deliver. A failed
delivery now returns a descriptive `Task.fail` instead of parking the caller.

# 0.4.0.1

- Require `nri-prelude >= 0.7.0.0`
Expand Down
13 changes: 12 additions & 1 deletion nri-kafka/docs/known-issues.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
# Known issues

## `sendSync` hangs forever on delivery failure
## ~~`sendSync` hangs forever on delivery failure~~ (RESOLVED)

**Resolved.** `Internal.deliveryReportToResult` now maps every delivery report
to a `Result Internal.Error ()`, the `sendSync` terminator carries that result
instead of a bare `Terminate`, and the callback signals it on both the success
and failure branches. A failed delivery now surfaces as a descriptive
`Task.fail` (`DeliveryFailed` for a broker-side failure, `NoMessageDelivered`
for the message-less `NoMessageError` report) instead of parking the caller
forever. `sendAsync` keeps its previous success-only callback contract. The
dispatch is unit-tested in `test/Spec/Kafka.hs`.

The original write-up is kept below for context.

### Where

Expand Down
1 change: 1 addition & 0 deletions nri-kafka/nri-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ test-suite tests
Kafka.Worker.Settings
Kafka.Worker.Stopping
Helpers
Spec.Kafka
Spec.Kafka.Worker.Integration
Spec.Kafka.Worker.Partition
Paths_nri_kafka
Expand Down
36 changes: 25 additions & 11 deletions nri-kafka/src/Kafka.hs
Original file line number Diff line number Diff line change
Expand Up @@ -215,18 +215,32 @@ mkHandler settings producer = do
Platform.tracingSpan "Async send Kafka messages" <| do
let details = Details (List.map Producer.unBrokerAddress (Settings.brokerAddresses settings)) msg'
Platform.setTracingSpanDetails details
sendHelperAsync producer doAnything onDeliveryCallback msg'
-- Preserve the existing async contract: the caller's callback only
-- fires once the broker has confirmed delivery. Failures are not
-- forwarded to async callers (they would have to be reported out of
-- band), only to sync callers below.
let onDeliveryReport deliveryReport =
case deliveryReport of
Producer.DeliverySuccess _producerRecord _offset -> onDeliveryCallback
_ -> Task.succeed ()
sendHelperAsync producer doAnything onDeliveryReport msg'
Comment on lines +222 to +226

@omnibs omnibs Jun 15, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is unchanged from trunk. We do nothing on failure in the async path.

Fixing this is a tad larger than what we're aiming for here, and I'm not 100% sure how it would work. I've tried and failed to have a LogHandler report stuff in code that runs in a separate thread in our deployer service, for instance.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of scope of this pr, agreed

|> Task.mapError Internal.errorToText,
Internal.sendSync = \msg' ->
Platform.tracingSpan "Sync send Kafka messages" <| do
let details = Details (List.map Producer.unBrokerAddress (Settings.brokerAddresses settings)) msg'
Platform.setTracingSpanDetails details
terminator <- doSTM doAnything TMVar.newEmptyTMVar
let onDeliveryCallback = doSTM doAnything (TMVar.putTMVar terminator Terminate)
sendHelperAsync producer doAnything onDeliveryCallback msg'
-- The callback runs on every delivery report, so the terminator is
-- signalled exactly once whether delivery succeeds or fails. This
-- is what keeps a failed delivery from parking the caller forever.
let onDeliveryReport deliveryReport =
doSTM doAnything (TMVar.putTMVar terminator (Internal.deliveryReportToResult deliveryReport))
sendHelperAsync producer doAnything onDeliveryReport msg'
|> Task.mapError Internal.errorToText
Terminate <- doSTM doAnything (TMVar.readTMVar terminator)
Task.succeed ()
result <- doSTM doAnything (TMVar.readTMVar terminator)
case result of
Ok () -> Task.succeed ()
Err err -> Task.fail (Internal.errorToText err)
}

doSTM :: Platform.DoAnythingHandler -> STM.STM a -> Task e a
Expand Down Expand Up @@ -269,10 +283,10 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout
sendHelperAsync ::
Producer.KafkaProducer ->
Platform.DoAnythingHandler ->
Task Never () ->
(Producer.DeliveryReport -> Task Never ()) ->
Internal.Msg ->
Task Internal.Error ()
sendHelperAsync producer doAnything onDeliveryCallback msg' = do
sendHelperAsync producer doAnything onDeliveryReport msg' = do
record' <- record msg'
Exception.handleAny
(\exception -> Prelude.pure (Err (Internal.Uncaught exception)))
Expand All @@ -281,12 +295,12 @@ sendHelperAsync producer doAnything onDeliveryCallback msg' = do
Producer.produceMessage'
producer
record'
-- librdkafka invokes this callback exactly once per message, on
-- both success and failure, so handing it the whole delivery
-- report lets callers be notified of either outcome.
( \deliveryReport -> do
log <- Platform.silentHandler
Task.perform log <|
case deliveryReport of
Producer.DeliverySuccess _producerRecord _offset -> onDeliveryCallback
_ -> Task.succeed ()
Task.perform log (onDeliveryReport deliveryReport)
Comment on lines +298 to +303

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

πŸ’œ

)
Prelude.pure <| case maybeFailedMessages of
Prelude.Right _ -> Ok ()
Expand Down
33 changes: 32 additions & 1 deletion nri-kafka/src/Kafka/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,44 @@ instance Show Encodable where
-- | Errors.
-- If you experience an 'Uncaught' exception, please wrap it here type here!
data Error
= SendingFailed (Producer.ProducerRecord, Producer.KafkaError)
= -- | A message could not be enqueued for sending. This is a pre-flight
-- failure surfaced synchronously by librdkafka (e.g. the local producer
-- queue is full, or the message exceeds the configured maximum size); the
-- message was never handed to the broker.
SendingFailed (Producer.ProducerRecord, Producer.KafkaError)
| -- | A message was enqueued and handed to the broker, but delivery
-- ultimately failed (e.g. @delivery.timeout.ms@ exceeded, retries
-- exhausted, a non-retriable broker error, or no available partition
-- leader). This is reported asynchronously through the delivery callback,
-- after a successful enqueue.
DeliveryFailed (Producer.ProducerRecord, Producer.KafkaError)
| -- | librdkafka invoked the delivery callback to report a failure but did
-- not attach the original message, so there is no 'Producer.ProducerRecord'
-- to report β€” only the 'Producer.KafkaError'. In hw-kafka-client this is
-- the @NoMessageError@ delivery report, which is produced when the C
-- delivery callback fires with a null message pointer and the error code is
-- read from @errno@. It is an exceptional, library-level condition rather
-- than a normal per-message broker rejection (those arrive as
-- 'DeliveryFailed', which carries the record).
NoMessageDelivered Producer.KafkaError
| Uncaught Exception.SomeException
deriving (Show)

errorToText :: Error -> Text
errorToText err = Text.fromList (Prelude.show err)
Comment on lines 74 to 75

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already the case for SendingFailed. I think this is better addressed by ensuring our messages wrap PII in e.g. Log.Secret, so they're still debuggable, than omitting their contents completely.


-- | Translate a librdkafka 'Producer.DeliveryReport' into a 'Result' the
-- caller can act on: a success carries no payload, while the two failure
-- reports map to the corresponding 'Error' constructors. Kept pure (no
-- 'Producer.KafkaProducer', no IO) so the dispatch can be unit-tested without
-- a running broker.
deliveryReportToResult :: Producer.DeliveryReport -> Result Error ()
deliveryReportToResult deliveryReport =
case deliveryReport of
Producer.DeliverySuccess _record _offset -> Ok ()
Producer.DeliveryFailure record kafkaError -> Err (DeliveryFailed (record, kafkaError))
Producer.NoMessageError kafkaError -> Err (NoMessageDelivered kafkaError)

-- | A kafka topic
newtype Topic = Topic {unTopic :: Text} deriving (Aeson.ToJSON, Show)

Expand Down
4 changes: 3 additions & 1 deletion nri-kafka/test/Main.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Main (main) where

import qualified Spec.Kafka
import qualified Spec.Kafka.Worker.Integration
import qualified Spec.Kafka.Worker.Partition
import qualified System.Environment
Expand All @@ -16,6 +17,7 @@ tests :: Test.Test
tests =
Test.describe
"lib/kafka"
[ Spec.Kafka.Worker.Integration.tests,
[ Spec.Kafka.tests,
Spec.Kafka.Worker.Integration.tests,
Spec.Kafka.Worker.Partition.tests
]
45 changes: 45 additions & 0 deletions nri-kafka/test/Spec/Kafka.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
module Spec.Kafka (tests) where

import qualified Expect
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Internal as Internal
import qualified Kafka.Producer as Producer
import qualified Test

tests :: Test.Test
tests =
Test.describe
"Kafka"
[ Test.describe
"deliveryReportToResult"
[ Test.test "a successful delivery becomes Ok ()" <| \() ->
case Internal.deliveryReportToResult
(Producer.DeliverySuccess exampleRecord (Consumer.Offset 0)) of
Ok () -> Expect.pass
other -> Expect.fail ("expected Ok (), got " ++ Debug.toString other),
Test.test "a broker delivery failure becomes DeliveryFailed carrying the record and error" <| \() ->
case Internal.deliveryReportToResult
(Producer.DeliveryFailure exampleRecord exampleError) of
Err (Internal.DeliveryFailed payload) ->
Expect.equal payload (exampleRecord, exampleError)
other -> Expect.fail ("expected Err (DeliveryFailed ...), got " ++ Debug.toString other),
Test.test "a message-less failure becomes NoMessageDelivered carrying the error" <| \() ->
case Internal.deliveryReportToResult
(Producer.NoMessageError exampleError) of
Err (Internal.NoMessageDelivered kafkaError) ->
Expect.equal kafkaError exampleError
other -> Expect.fail ("expected Err (NoMessageDelivered ...), got " ++ Debug.toString other)
]
]

exampleRecord :: Producer.ProducerRecord
exampleRecord =
Producer.ProducerRecord
{ Producer.prTopic = Producer.TopicName "the-topic",
Producer.prPartition = Producer.UnassignedPartition,
Producer.prKey = Nothing,
Producer.prValue = Nothing
}

exampleError :: Producer.KafkaError
exampleError = Producer.KafkaError "boom"
Loading