diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 7fa7c3d..2db4935 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -828,6 +828,8 @@ async def close(self) -> None: def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any): def complete(): + if future.done(): + return if result == _pulsar.Result.Ok: future.set_result(value) else: diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 8a441c4..3cc1078 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -39,6 +39,7 @@ Consumer, Producer, PulsarException, + _set_future, ) from pulsar.schema import ( # pylint: disable=import-error AvroSchema, @@ -484,5 +485,25 @@ class ExampleRecord(Record): # pylint: disable=too-few-public-methods self.assertEqual(msg.value().int_field, 42) +class AsyncioSetFutureTest(IsolatedAsyncioTestCase): + """Tests for asyncio bridge helpers (no live Pulsar broker).""" + + async def test_set_future_noop_when_future_cancelled(self): + loop = asyncio.get_running_loop() + fut = loop.create_future() + fut.cancel() + _set_future(fut, _pulsar.Result.Ok, None) + await asyncio.sleep(0) + self.assertTrue(fut.cancelled()) + + async def test_set_future_noop_when_future_already_resolved(self): + loop = asyncio.get_running_loop() + fut = loop.create_future() + fut.set_result("first") + _set_future(fut, _pulsar.Result.Ok, "late") + await asyncio.sleep(0) + self.assertEqual(fut.result(), "first") + + if __name__ == '__main__': main()