Skip to content

fix(ingest-limits): Use stripe locking for metadata #17150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

periklis
Copy link
Collaborator

@periklis periklis commented Apr 11, 2025

What this PR does / why we need it:
This pull request implements a stripe-lock approach to read/write the internal ingest-limits stream metadata structure. The stripes are distributed over the total amount of partitions (e.g. default is 64). This allows updating and reading the structure with less lock contention.

                                                                                  │   old.txt    │                new.txt                │
                                                                                  │    sec/op    │    sec/op     vs base                 │
IngestLimits_updateMetadata/4_partitions_small_streams_single_tenant-14             293.7n ± ∞ ¹   247.7n ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/4_partitions_small_streams_single_tenant_parallel-14    460.7n ± ∞ ¹   366.1n ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/8_partitions_medium_streams_multi_tenant-14             541.9n ± ∞ ¹   458.8n ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/8_partitions_medium_streams_multi_tenant_parallel-14    748.4n ± ∞ ¹   291.3n ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/16_partitions_large_streams_multi_tenant-14             1.906µ ± ∞ ¹   3.479µ ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/16_partitions_large_streams_multi_tenant_parallel-14    2.562µ ± ∞ ¹   1.082µ ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/32_partitions_xlarge_streams_multi_tenant-14            3.816µ ± ∞ ¹   8.444µ ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/32_partitions_xlarge_streams_multi_tenant_parallel-14   4.734µ ± ∞ ¹   2.205µ ± ∞ ¹        ~ (p=1.000 n=1) ²
geomean                                                                             1.218µ         979.8n        -19.55%
¹ need >= 6 samples for confidence interval at level 0.95
² need >= 4 samples to detect a difference at alpha level 0.05

                                                                                  │   old.txt   │                new.txt                │
                                                                                  │    B/op     │     B/op      vs base                 │
IngestLimits_updateMetadata/4_partitions_small_streams_single_tenant-14             56.00 ± ∞ ¹   104.00 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/4_partitions_small_streams_single_tenant_parallel-14    90.00 ± ∞ ¹   137.00 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/8_partitions_medium_streams_multi_tenant-14             56.00 ± ∞ ¹   104.00 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/8_partitions_medium_streams_multi_tenant_parallel-14    88.00 ± ∞ ¹   135.00 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/16_partitions_large_streams_multi_tenant-14             56.00 ± ∞ ¹   104.00 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/16_partitions_large_streams_multi_tenant_parallel-14    63.00 ± ∞ ¹   111.00 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/32_partitions_xlarge_streams_multi_tenant-14            56.00 ± ∞ ¹   104.00 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/32_partitions_xlarge_streams_multi_tenant_parallel-14   56.00 ± ∞ ¹   104.00 ± ∞ ¹        ~ (p=1.000 n=1) ²
geomean                                                                             63.81          112.1        +75.72%
¹ need >= 6 samples for confidence interval at level 0.95
² need >= 4 samples to detect a difference at alpha level 0.05

                                                                                  │   old.txt   │               new.txt                │
                                                                                  │  allocs/op  │  allocs/op   vs base                 │
IngestLimits_updateMetadata/4_partitions_small_streams_single_tenant-14             2.000 ± ∞ ¹   3.000 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/4_partitions_small_streams_single_tenant_parallel-14    2.000 ± ∞ ¹   3.000 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/8_partitions_medium_streams_multi_tenant-14             2.000 ± ∞ ¹   3.000 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/8_partitions_medium_streams_multi_tenant_parallel-14    2.000 ± ∞ ¹   3.000 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/16_partitions_large_streams_multi_tenant-14             2.000 ± ∞ ¹   3.000 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/16_partitions_large_streams_multi_tenant_parallel-14    2.000 ± ∞ ¹   3.000 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/32_partitions_xlarge_streams_multi_tenant-14            2.000 ± ∞ ¹   3.000 ± ∞ ¹        ~ (p=1.000 n=1) ²
IngestLimits_updateMetadata/32_partitions_xlarge_streams_multi_tenant_parallel-14   2.000 ± ∞ ¹   3.000 ± ∞ ¹        ~ (p=1.000 n=1) ²
geomean                                                                             2.000         3.000        +50.00%
¹ need >= 6 samples for confidence interval at level 0.95
² need >= 4 samples to detect a difference at alpha level 0.05

Which issue(s) this PR fixes:
Fixes #

Special notes for your reviewer:

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • Title matches the required conventional commits format, see here
    • Note that Promtail is considered to be feature complete, and future development for logs collection will be in Grafana Alloy. As such, feat PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR

@periklis periklis self-assigned this Apr 11, 2025
@periklis periklis force-pushed the ingest-limits-stripe-locks branch from 7222994 to c6b8c9c Compare April 15, 2025 13:47
@periklis periklis marked this pull request as ready for review April 15, 2025 18:16
@periklis periklis requested a review from a team as a code owner April 15, 2025 18:16
@periklis periklis force-pushed the ingest-limits-stripe-locks branch from c6b8c9c to 805f573 Compare April 15, 2025 18:17
Comment on lines 628 to 657
func (s *IngestLimits) streams(tenant string) <-chan *streamMetadata {
ch := make(chan *streamMetadata)

go func() {
for i := range s.metadata.size {
s.metadata.locks[i].RLock()

if s.metadata.stripes[i][tenant] == nil {
s.metadata.locks[i].RUnlock()
continue
}

for partitionID, streams := range s.metadata.stripes[i][tenant] {
if assigned := s.partitionManager.Has(partitionID); !assigned {
continue
}

for _, stream := range streams {
ch <- stream
}
}

s.metadata.locks[i].RUnlock()
}

close(ch)
}()

return ch
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grobinson-grafana Can it be that we need to pass the context here to catch cancelation event to not leak this go routine and in turn holding locks?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@grobinson-grafana grobinson-grafana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general looks good! I would love to see a better abstraction around the stripes and the lock acquisition though. I think if you do this you'll find the code easier to reason about and also easier to write tests. You could consider using https://pkg.go.dev/iter for some of the operations.

for i := range s.stripes {
s.stripes[i] = make(map[string]map[int32][]*streamMetadata)

for j := range s.stripes[i] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a no-op since s.stripes[i] is a new map with no entries?

}

type streamMetadataStripes struct {
size int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think size is not required here? We know the number of locks with len(locks) (for when we iterate over the stripes)? Because it just takes it from the slice header, len(locks) is constant time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, introduced initially to avoid using len(locks) in the following op:

i := uint64(partition) & uint64(len(s.metadata.locks)-1)

Since this op is done once, we can skip the extra size field.

partitionsPerTenant = make(map[string]map[int32]struct{})
)

for i := range s.metadata.size {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for i := range s.metadata.size {
for i := range s.metadata.locks {

}

func (s *IngestLimits) onPartitionsAssigned(ctx context.Context, client *kgo.Client, partitions map[string][]int32) {
s.partitionManager.Assign(ctx, client, partitions)
}

func (s *IngestLimits) onPartitionsRevoked(ctx context.Context, client *kgo.Client, partitions map[string][]int32) {
s.mtx.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just be careful here because removing this mutex means the method is no longer linearizable. I'm not sure if it matters much, but please just take some time to confirm this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You say we need the extra mutex as it is not the same as taking the exclusive locks in the callee:

https://github.com/grafana/loki/pull/17150/files#diff-ca846946ac537045b180cd9bb04424c4d454ac70b15980f43d2fa275ca9feecaR293-R306


// Initialize stripe map if it doesn't exist
if s.metadata.stripes[i] == nil {
s.metadata.stripes[i] = make(map[string]map[int32][]*streamMetadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are now storing a slice of *streamMetadata instead of streamMetadata, but I'm not sure I understand why. If it's for performance, this will be quite a bit slower as we'll need to do a lot more memory allocations and garbage collection, and suffer more cache misses on read as the entries will be all over the heap.

@@ -611,3 +624,50 @@ func (s *IngestLimits) GetStreamUsage(_ context.Context, req *logproto.GetStream
Rate: uint64(rate),
}, nil
}

func (s *IngestLimits) streams(ctx context.Context, tenant string) <-chan *streamMetadata {
Copy link
Contributor

@grobinson-grafana grobinson-grafana Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really following what the problem is here, but I feel this is very complicated for what is supposed to just be producing a result. I know you had a comment that you were concerned about deadlocks and I'm also a bit worried. If a reader decides not the consume all of the channel then we cannot make any system-wide progress.

I would take a step back here and re-analyze what the problem is here as I think you will end up with a better solution. If it were me, I would just iterate over the stripes as we know the stripes are always acquired in ascending order whenever we iterate, which should mean we have reasonably good pipelining. I would recommend starting simple and then getting complex if only when we have to and not earlier.

select {
case <-acquired:
// Success - lock was available
case <-time.After(100 * time.Millisecond):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will flake a lot.

t.Fatal("test timed out - possible goroutine leak or deadlock")
}

// Verify locks are released
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super difficult to test robustly, I would strong advise against it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants