Skip to content

KAFKA-19214: Clean up use of Optionals in RequestManagers.entries() #19609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 7, 2025

Conversation

kirktrue
Copy link
Contributor

@kirktrue kirktrue commented May 1, 2025

Change:

public List<Optional<? extends RequestManager>> entries();

to:

public List<RequestManager> entries();

and clean up the callers.

Reviewers: TengYao Chi kitingiao@gmail.com, Andrew Schofield
aschofield@confluent.io, Chia-Ping Tsai chia7712@gmail.com

Change:

public List<Optional<? extends RequestManager>>> entries();

to:

public List<RequestManager> entries();

and clean up the callers.
@github-actions github-actions bot added triage PRs from the community consumer clients small Small PRs labels May 1, 2025
Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue: Thanks for the clean up!

List<Optional<? extends RequestManager>> list = new ArrayList<>();
list.add(Optional.of(coordinatorRequestManager));
list.add(Optional.of(heartbeatRequestManager));
List<RequestManager> list = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use List.of here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not easily, no. I made two different attempts (see commit history), and both just added a bunch of ugly code just to make this look cleaner.

The problem is that the list contains both RequestManagers and Optional<RequestManager>s, so the type of the list would have to be defined as List<Object>. The idea of this PR is to make the entries list only include the succinct set of RequestManagers that are actually in use.

I'm open to suggestions, though 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue I think I'm missing something here. Now that the list only contains RequestManager instances, can't you just write:

List<RequestManager> list = List.of(coordinatorRequestManager, heartbeatRequestManager);

I tried that locally and it looked good to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof! I was looking at the lists being built up in RequestManagers’ constructors 🤦‍♂️ I've updated ConsumerNetworkThreadTest’s list building to use List.of() as suggested.

list.add(Optional.of(coordinatorRequestManager));
list.add(Optional.of(heartbeatRequestManager));
list.add(Optional.of(offsetsRequestManager));
List<RequestManager> list = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List<RequestManager> list = List.of(coordinatorRequestManager, heartbeatRequestManager, offsetsRequestManager);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, fixed this as well. Thank you for your patience 😄

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. This piece of code was always a bit messy so this is a nice improvement.

@AndrewJSchofield AndrewJSchofield added ci-approved and removed triage PRs from the community labels May 1, 2025
@@ -154,16 +153,12 @@ void runOnce() {
lastPollTimeMs = currentTimeMs;

final long pollWaitTimeMs = requestManagers.entries().stream()
.filter(Optional::isPresent)
.map(Optional::get)
.map(rm -> rm.poll(currentTimeMs))
.map(networkClientDelegate::addAll)
.reduce(MAX_POLL_TIMEOUT_MS, Math::min);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a change here, but instead of:

        final long pollWaitTimeMs = requestManagers.entries().stream()
                .map(rm -> rm.poll(currentTimeMs))
                .mapToLong(networkClientDelegate::addAll)
                .min()
                .orElse(MAX_POLL_TIMEOUT_MS);

I had to add a call to filter():

        final long pollWaitTimeMs = requestManagers.entries().stream()
                .map(rm -> rm.poll(currentTimeMs))
                .mapToLong(networkClientDelegate::addAll)
                .filter(ms -> ms <= MAX_POLL_TIMEOUT_MS)
                .min()
                .orElse(MAX_POLL_TIMEOUT_MS);

The stream must only contain values that are numerically less-than-or-equal-to the maximum (MAX_POLL_TIMEOUT_MS) when min() is invoked or else we could end up with value that's greater than our maximum. Fortunately there is a unit test that caught that little wrinkle 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the sharing. nice find!

BTW, do you know the purpose of max poll time? it seems we don't honor the timeUntilNextPollMs configured by the rm. If the value larger than 5 secs is invalid, should we throw exception in PollResult? Additionally, maybe the default timeUntilNextPollMs should be configured to 5 seconds (or optional.empty) as the default value is useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the polling timeout logic is a little... "hard to understand" 😞

The idea was that we want to block in poll() so that we're not just looping in runOnce() continuously. The amount of time to block is dependent on the inflight requests. Some requests, like coordinator discovery, don't want to wait at all, but others don't want to impose any timeouts on blocking in runOnce().

But yes, this is an area that could use some clean up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run a simple benchmark

@State(Scope.Thread)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class ReduceBenchmark {

    private final List<Long> values = IntStream.range(0, 100 * 1000).mapToObj(i -> (long) i).toList();
    private final long maxValue = 100;

    @Benchmark
    public void testReduce(Blackhole blackhole) {
        blackhole.consume(values.stream().reduce(maxValue, Math::min));
    }

    @Benchmark
    public void testMin(Blackhole blackhole) {
        blackhole.consume(values.stream().mapToLong(i -> i)
                        .filter(i -> i <= maxValue)
                        .min()
                        .orElse(maxValue));
    }
}
Benchmark                    Mode  Cnt  Score   Error   Units
ReduceBenchmark.testMin     thrpt   25  7.915 ± 0.087  ops/ms
ReduceBenchmark.testReduce  thrpt   25  3.256 ± 0.017  ops/ms

It seems the filter could optimize it a bit.

@github-actions github-actions bot removed the small Small PRs label May 2, 2025
@github-actions github-actions bot added the small Small PRs label May 2, 2025
@kirktrue kirktrue requested a review from AndrewJSchofield May 5, 2025 17:08
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of final comments.

List<Optional<? extends RequestManager>> list = new ArrayList<>();
list.add(Optional.of(coordinatorRequestManager));
list.add(Optional.of(heartbeatRequestManager));
List<RequestManager> list = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue I think I'm missing something here. Now that the list only contains RequestManager instances, can't you just write:

List<RequestManager> list = List.of(coordinatorRequestManager, heartbeatRequestManager);

I tried that locally and it looked good to me.

list.add(Optional.of(coordinatorRequestManager));
list.add(Optional.of(heartbeatRequestManager));
list.add(Optional.of(offsetsRequestManager));
List<RequestManager> list = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List<RequestManager> list = List.of(coordinatorRequestManager, heartbeatRequestManager, offsetsRequestManager);

@AndrewJSchofield AndrewJSchofield merged commit d3707fc into apache:trunk May 7, 2025
27 checks passed
@kirktrue kirktrue deleted the KAFKA-19214-remove-optional branch May 7, 2025 16:35
shmily7829 pushed a commit to shmily7829/kafka that referenced this pull request May 7, 2025
…pache#19609)

Change:

`public List<Optional<? extends RequestManager>> entries();`

to:

`public List<RequestManager> entries();`

and clean up the callers.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Andrew Schofield
 <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants