-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19214: Clean up use of Optionals in RequestManagers.entries() #19609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-19214: Clean up use of Optionals in RequestManagers.entries() #19609
Conversation
Change: public List<Optional<? extends RequestManager>>> entries(); to: public List<RequestManager> entries(); and clean up the callers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue: Thanks for the clean up!
List<Optional<? extends RequestManager>> list = new ArrayList<>(); | ||
list.add(Optional.of(coordinatorRequestManager)); | ||
list.add(Optional.of(heartbeatRequestManager)); | ||
List<RequestManager> list = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use List.of
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not easily, no. I made two different attempts (see commit history), and both just added a bunch of ugly code just to make this look cleaner.
The problem is that the list contains both RequestManager
s and Optional<RequestManager>
s, so the type of the list would have to be defined as List<Object>
. The idea of this PR is to make the entries
list only include the succinct set of RequestManager
s that are actually in use.
I'm open to suggestions, though 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue I think I'm missing something here. Now that the list only contains RequestManager
instances, can't you just write:
List<RequestManager> list = List.of(coordinatorRequestManager, heartbeatRequestManager);
I tried that locally and it looked good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oof! I was looking at the lists being built up in RequestManagers
’ constructors 🤦♂️ I've updated ConsumerNetworkThreadTest
’s list building to use List.of()
as suggested.
list.add(Optional.of(coordinatorRequestManager)); | ||
list.add(Optional.of(heartbeatRequestManager)); | ||
list.add(Optional.of(offsetsRequestManager)); | ||
List<RequestManager> list = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<RequestManager> list = List.of(coordinatorRequestManager, heartbeatRequestManager, offsetsRequestManager);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, fixed this as well. Thank you for your patience 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. This piece of code was always a bit messy so this is a nice improvement.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
Outdated
Show resolved
Hide resolved
...nts/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
Outdated
Show resolved
Hide resolved
@@ -154,16 +153,12 @@ void runOnce() { | |||
lastPollTimeMs = currentTimeMs; | |||
|
|||
final long pollWaitTimeMs = requestManagers.entries().stream() | |||
.filter(Optional::isPresent) | |||
.map(Optional::get) | |||
.map(rm -> rm.poll(currentTimeMs)) | |||
.map(networkClientDelegate::addAll) | |||
.reduce(MAX_POLL_TIMEOUT_MS, Math::min); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a change here, but instead of:
final long pollWaitTimeMs = requestManagers.entries().stream()
.map(rm -> rm.poll(currentTimeMs))
.mapToLong(networkClientDelegate::addAll)
.min()
.orElse(MAX_POLL_TIMEOUT_MS);
I had to add a call to filter()
:
final long pollWaitTimeMs = requestManagers.entries().stream()
.map(rm -> rm.poll(currentTimeMs))
.mapToLong(networkClientDelegate::addAll)
.filter(ms -> ms <= MAX_POLL_TIMEOUT_MS)
.min()
.orElse(MAX_POLL_TIMEOUT_MS);
The stream must only contain values that are numerically less-than-or-equal-to the maximum (MAX_POLL_TIMEOUT_MS
) when min()
is invoked or else we could end up with value that's greater than our maximum. Fortunately there is a unit test that caught that little wrinkle 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the sharing. nice find!
BTW, do you know the purpose of max poll time? it seems we don't honor the timeUntilNextPollMs
configured by the rm. If the value larger than 5 secs is invalid, should we throw exception in PollResult
? Additionally, maybe the default timeUntilNextPollMs
should be configured to 5 seconds (or optional.empty) as the default value is useless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the polling timeout logic is a little... "hard to understand" 😞
The idea was that we want to block in poll()
so that we're not just looping in runOnce()
continuously. The amount of time to block is dependent on the inflight requests. Some requests, like coordinator discovery, don't want to wait at all, but others don't want to impose any timeouts on blocking in runOnce()
.
But yes, this is an area that could use some clean up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run a simple benchmark
@State(Scope.Thread)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class ReduceBenchmark {
private final List<Long> values = IntStream.range(0, 100 * 1000).mapToObj(i -> (long) i).toList();
private final long maxValue = 100;
@Benchmark
public void testReduce(Blackhole blackhole) {
blackhole.consume(values.stream().reduce(maxValue, Math::min));
}
@Benchmark
public void testMin(Blackhole blackhole) {
blackhole.consume(values.stream().mapToLong(i -> i)
.filter(i -> i <= maxValue)
.min()
.orElse(maxValue));
}
}
Benchmark Mode Cnt Score Error Units
ReduceBenchmark.testMin thrpt 25 7.915 ± 0.087 ops/ms
ReduceBenchmark.testReduce thrpt 25 3.256 ± 0.017 ops/ms
It seems the filter could optimize it a bit.
…d clean up timeout determination logic in ConsumerNetworkThread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of final comments.
List<Optional<? extends RequestManager>> list = new ArrayList<>(); | ||
list.add(Optional.of(coordinatorRequestManager)); | ||
list.add(Optional.of(heartbeatRequestManager)); | ||
List<RequestManager> list = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue I think I'm missing something here. Now that the list only contains RequestManager
instances, can't you just write:
List<RequestManager> list = List.of(coordinatorRequestManager, heartbeatRequestManager);
I tried that locally and it looked good to me.
list.add(Optional.of(coordinatorRequestManager)); | ||
list.add(Optional.of(heartbeatRequestManager)); | ||
list.add(Optional.of(offsetsRequestManager)); | ||
List<RequestManager> list = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<RequestManager> list = List.of(coordinatorRequestManager, heartbeatRequestManager, offsetsRequestManager);
…pache#19609) Change: `public List<Optional<? extends RequestManager>> entries();` to: `public List<RequestManager> entries();` and clean up the callers. Reviewers: TengYao Chi <kitingiao@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Change:
public List<Optional<? extends RequestManager>> entries();
to:
public List<RequestManager> entries();
and clean up the callers.
Reviewers: TengYao Chi kitingiao@gmail.com, Andrew Schofield
aschofield@confluent.io, Chia-Ping Tsai chia7712@gmail.com