From 72a12b38f57e6eb375ea791c0c45daffc75d4f5b Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Sun, 17 Mar 2019 23:02:40 +0800 Subject: [PATCH 1/2] Fix tests for librdkafka 0.11.3+ Limit queued.max.messages.kbytes according to librdkafka cc43f4 --- pykafka/rdkafka/simple_consumer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pykafka/rdkafka/simple_consumer.py b/pykafka/rdkafka/simple_consumer.py index d3444463d..687158483 100644 --- a/pykafka/rdkafka/simple_consumer.py +++ b/pykafka/rdkafka/simple_consumer.py @@ -248,9 +248,10 @@ def _mk_rdkafka_config_lists(self): # queued.max.messages.kbytes so for now we infer the implied # maximum (which, with default settings, is ~2GB per partition): "queued.min.messages": self._queued_max_messages, - "queued.max.messages.kbytes": str( + "queued.max.messages.kbytes": str(min( self._queued_max_messages - * self._fetch_message_max_bytes // 1024), + * self._fetch_message_max_bytes // 1024, + 2097151)), # queued.max.messages.kbytes is 1..2097151 according to librdkafka cc43f4 "fetch.wait.max.ms": self._fetch_wait_max_ms, "fetch.message.max.bytes": self._fetch_message_max_bytes, From 04d3c2e6d7028c20c2e05b43c8ccfa68ae549f45 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 22 Mar 2019 01:09:08 +0800 Subject: [PATCH 2/2] Move constant to top of file --- pykafka/rdkafka/simple_consumer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pykafka/rdkafka/simple_consumer.py b/pykafka/rdkafka/simple_consumer.py index 687158483..7e71cd854 100644 --- a/pykafka/rdkafka/simple_consumer.py +++ b/pykafka/rdkafka/simple_consumer.py @@ -13,6 +13,8 @@ log = logging.getLogger(__name__) +LIBRDKAFKA_MAX_ALLOWED_QUEUED_MAX_MESSAGE_BYTES = 2097151 # see librdkafka commit cc43f4 + class RdKafkaSimpleConsumer(SimpleConsumer): """A pykafka.SimpleConsumer with librdkafka-based fetchers @@ -251,7 +253,7 @@ def _mk_rdkafka_config_lists(self): "queued.max.messages.kbytes": str(min( self._queued_max_messages * self._fetch_message_max_bytes // 1024, - 2097151)), # queued.max.messages.kbytes is 1..2097151 according to librdkafka cc43f4 + LIBRDKAFKA_MAX_ALLOWED_QUEUED_MAX_MESSAGE_BYTES)), "fetch.wait.max.ms": self._fetch_wait_max_ms, "fetch.message.max.bytes": self._fetch_message_max_bytes,