-
Notifications
You must be signed in to change notification settings - Fork 2
nri-kafka: fix sendSync hanging forever on delivery failure #159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -215,18 +215,32 @@ mkHandler settings producer = do | |
| Platform.tracingSpan "Async send Kafka messages" <| do | ||
| let details = Details (List.map Producer.unBrokerAddress (Settings.brokerAddresses settings)) msg' | ||
| Platform.setTracingSpanDetails details | ||
| sendHelperAsync producer doAnything onDeliveryCallback msg' | ||
| -- Preserve the existing async contract: the caller's callback only | ||
| -- fires once the broker has confirmed delivery. Failures are not | ||
| -- forwarded to async callers (they would have to be reported out of | ||
| -- band), only to sync callers below. | ||
| let onDeliveryReport deliveryReport = | ||
| case deliveryReport of | ||
| Producer.DeliverySuccess _producerRecord _offset -> onDeliveryCallback | ||
| _ -> Task.succeed () | ||
| sendHelperAsync producer doAnything onDeliveryReport msg' | ||
| |> Task.mapError Internal.errorToText, | ||
| Internal.sendSync = \msg' -> | ||
| Platform.tracingSpan "Sync send Kafka messages" <| do | ||
| let details = Details (List.map Producer.unBrokerAddress (Settings.brokerAddresses settings)) msg' | ||
| Platform.setTracingSpanDetails details | ||
| terminator <- doSTM doAnything TMVar.newEmptyTMVar | ||
| let onDeliveryCallback = doSTM doAnything (TMVar.putTMVar terminator Terminate) | ||
| sendHelperAsync producer doAnything onDeliveryCallback msg' | ||
| -- The callback runs on every delivery report, so the terminator is | ||
| -- signalled exactly once whether delivery succeeds or fails. This | ||
| -- is what keeps a failed delivery from parking the caller forever. | ||
| let onDeliveryReport deliveryReport = | ||
| doSTM doAnything (TMVar.putTMVar terminator (Internal.deliveryReportToResult deliveryReport)) | ||
| sendHelperAsync producer doAnything onDeliveryReport msg' | ||
| |> Task.mapError Internal.errorToText | ||
| Terminate <- doSTM doAnything (TMVar.readTMVar terminator) | ||
| Task.succeed () | ||
| result <- doSTM doAnything (TMVar.readTMVar terminator) | ||
| case result of | ||
| Ok () -> Task.succeed () | ||
| Err err -> Task.fail (Internal.errorToText err) | ||
| } | ||
|
|
||
| doSTM :: Platform.DoAnythingHandler -> STM.STM a -> Task e a | ||
|
|
@@ -269,10 +283,10 @@ mkProducer Settings.Settings {Settings.brokerAddresses, Settings.deliveryTimeout | |
| sendHelperAsync :: | ||
| Producer.KafkaProducer -> | ||
| Platform.DoAnythingHandler -> | ||
| Task Never () -> | ||
| (Producer.DeliveryReport -> Task Never ()) -> | ||
| Internal.Msg -> | ||
| Task Internal.Error () | ||
| sendHelperAsync producer doAnything onDeliveryCallback msg' = do | ||
| sendHelperAsync producer doAnything onDeliveryReport msg' = do | ||
| record' <- record msg' | ||
| Exception.handleAny | ||
| (\exception -> Prelude.pure (Err (Internal.Uncaught exception))) | ||
|
|
@@ -281,12 +295,12 @@ sendHelperAsync producer doAnything onDeliveryCallback msg' = do | |
| Producer.produceMessage' | ||
| producer | ||
| record' | ||
| -- librdkafka invokes this callback exactly once per message, on | ||
| -- both success and failure, so handing it the whole delivery | ||
| -- report lets callers be notified of either outcome. | ||
| ( \deliveryReport -> do | ||
| log <- Platform.silentHandler | ||
| Task.perform log <| | ||
| case deliveryReport of | ||
| Producer.DeliverySuccess _producerRecord _offset -> onDeliveryCallback | ||
| _ -> Task.succeed () | ||
| Task.perform log (onDeliveryReport deliveryReport) | ||
|
Comment on lines
+298
to
+303
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. π |
||
| ) | ||
| Prelude.pure <| case maybeFailedMessages of | ||
| Prelude.Right _ -> Ok () | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,13 +48,44 @@ instance Show Encodable where | |
| -- | Errors. | ||
| -- If you experience an 'Uncaught' exception, please wrap it here type here! | ||
| data Error | ||
| = SendingFailed (Producer.ProducerRecord, Producer.KafkaError) | ||
| = -- | A message could not be enqueued for sending. This is a pre-flight | ||
| -- failure surfaced synchronously by librdkafka (e.g. the local producer | ||
| -- queue is full, or the message exceeds the configured maximum size); the | ||
| -- message was never handed to the broker. | ||
| SendingFailed (Producer.ProducerRecord, Producer.KafkaError) | ||
| | -- | A message was enqueued and handed to the broker, but delivery | ||
| -- ultimately failed (e.g. @delivery.timeout.ms@ exceeded, retries | ||
| -- exhausted, a non-retriable broker error, or no available partition | ||
| -- leader). This is reported asynchronously through the delivery callback, | ||
| -- after a successful enqueue. | ||
| DeliveryFailed (Producer.ProducerRecord, Producer.KafkaError) | ||
| | -- | librdkafka invoked the delivery callback to report a failure but did | ||
| -- not attach the original message, so there is no 'Producer.ProducerRecord' | ||
| -- to report β only the 'Producer.KafkaError'. In hw-kafka-client this is | ||
| -- the @NoMessageError@ delivery report, which is produced when the C | ||
| -- delivery callback fires with a null message pointer and the error code is | ||
| -- read from @errno@. It is an exceptional, library-level condition rather | ||
| -- than a normal per-message broker rejection (those arrive as | ||
| -- 'DeliveryFailed', which carries the record). | ||
| NoMessageDelivered Producer.KafkaError | ||
| | Uncaught Exception.SomeException | ||
| deriving (Show) | ||
|
|
||
| errorToText :: Error -> Text | ||
| errorToText err = Text.fromList (Prelude.show err) | ||
|
Comment on lines
74
to
75
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was already the case for |
||
|
|
||
| -- | Translate a librdkafka 'Producer.DeliveryReport' into a 'Result' the | ||
| -- caller can act on: a success carries no payload, while the two failure | ||
| -- reports map to the corresponding 'Error' constructors. Kept pure (no | ||
| -- 'Producer.KafkaProducer', no IO) so the dispatch can be unit-tested without | ||
| -- a running broker. | ||
| deliveryReportToResult :: Producer.DeliveryReport -> Result Error () | ||
| deliveryReportToResult deliveryReport = | ||
| case deliveryReport of | ||
| Producer.DeliverySuccess _record _offset -> Ok () | ||
| Producer.DeliveryFailure record kafkaError -> Err (DeliveryFailed (record, kafkaError)) | ||
| Producer.NoMessageError kafkaError -> Err (NoMessageDelivered kafkaError) | ||
|
|
||
| -- | A kafka topic | ||
| newtype Topic = Topic {unTopic :: Text} deriving (Aeson.ToJSON, Show) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| module Spec.Kafka (tests) where | ||
|
|
||
| import qualified Expect | ||
| import qualified Kafka.Consumer as Consumer | ||
| import qualified Kafka.Internal as Internal | ||
| import qualified Kafka.Producer as Producer | ||
| import qualified Test | ||
|
|
||
| tests :: Test.Test | ||
| tests = | ||
| Test.describe | ||
| "Kafka" | ||
| [ Test.describe | ||
| "deliveryReportToResult" | ||
| [ Test.test "a successful delivery becomes Ok ()" <| \() -> | ||
| case Internal.deliveryReportToResult | ||
| (Producer.DeliverySuccess exampleRecord (Consumer.Offset 0)) of | ||
| Ok () -> Expect.pass | ||
| other -> Expect.fail ("expected Ok (), got " ++ Debug.toString other), | ||
| Test.test "a broker delivery failure becomes DeliveryFailed carrying the record and error" <| \() -> | ||
| case Internal.deliveryReportToResult | ||
| (Producer.DeliveryFailure exampleRecord exampleError) of | ||
| Err (Internal.DeliveryFailed payload) -> | ||
| Expect.equal payload (exampleRecord, exampleError) | ||
| other -> Expect.fail ("expected Err (DeliveryFailed ...), got " ++ Debug.toString other), | ||
| Test.test "a message-less failure becomes NoMessageDelivered carrying the error" <| \() -> | ||
| case Internal.deliveryReportToResult | ||
| (Producer.NoMessageError exampleError) of | ||
| Err (Internal.NoMessageDelivered kafkaError) -> | ||
| Expect.equal kafkaError exampleError | ||
| other -> Expect.fail ("expected Err (NoMessageDelivered ...), got " ++ Debug.toString other) | ||
| ] | ||
| ] | ||
|
|
||
| exampleRecord :: Producer.ProducerRecord | ||
| exampleRecord = | ||
| Producer.ProducerRecord | ||
| { Producer.prTopic = Producer.TopicName "the-topic", | ||
| Producer.prPartition = Producer.UnassignedPartition, | ||
| Producer.prKey = Nothing, | ||
| Producer.prValue = Nothing | ||
| } | ||
|
|
||
| exampleError :: Producer.KafkaError | ||
| exampleError = Producer.KafkaError "boom" |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this is unchanged from
trunk. We do nothing on failure in the async path.Fixing this is a tad larger than what we're aiming for here, and I'm not 100% sure how it would work. I've tried and failed to have a
LogHandlerreport stuff in code that runs in a separate thread in our deployer service, for instance.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of scope of this pr, agreed