diff --git a/pykafka/connection.py b/pykafka/connection.py index 5fd119288..cc751eebf 100644 --- a/pykafka/connection.py +++ b/pykafka/connection.py @@ -168,8 +168,9 @@ def connect(self, timeout): timeout / 1000, (self.source_host, self.source_port) )) - except (self._handler.SockErr, self._handler.GaiError): + except (self._handler.SockErr, self._handler.GaiError) as e: log.info("Failed to connect to %s:%s", self.host, self.port) + log.info(e) raise SocketDisconnectedError("".format(self.host, self.port)) log.debug("Successfully connected to %s:%s", self.host, self.port) diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index a54397e4c..d5c0136e4 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -27,6 +27,8 @@ import tempfile import time +from pkg_resources import parse_version + from testinstances import utils from testinstances.exceptions import ProcessNotStartingError from testinstances.managed_instance import ManagedInstance @@ -116,7 +118,8 @@ def delete_topic(self, topic_name): def flush(self): """Delete all topics.""" for topic in self.list_topics(): - if not topic.startswith(b'__'): # leave internal topics alone + if topic and not topic.startswith(b'__') and \ + "marked for deletion" not in topic: # leave internal topics alone self.delete_topic(topic) def list_topics(self): @@ -233,7 +236,8 @@ def _gen_ssl_certs(self): :returns: :class:`CertManager` or None upon failure """ - if self._kafka_version >= "0.9": # no SSL support in earlier versions + # no SSL support in earlier versions + if parse_version(self._kafka_version) >= parse_version("0.9"): try: return CertManager(self._bin_dir) except: # eg. because openssl or other tools not installed