-
Notifications
You must be signed in to change notification settings - Fork 3.6k
fix(ingest-limits): Use stripe locking for metadata #17150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
7222994
to
c6b8c9c
Compare
c6b8c9c
to
805f573
Compare
pkg/limits/ingest_limits.go
Outdated
func (s *IngestLimits) streams(tenant string) <-chan *streamMetadata { | ||
ch := make(chan *streamMetadata) | ||
|
||
go func() { | ||
for i := range s.metadata.size { | ||
s.metadata.locks[i].RLock() | ||
|
||
if s.metadata.stripes[i][tenant] == nil { | ||
s.metadata.locks[i].RUnlock() | ||
continue | ||
} | ||
|
||
for partitionID, streams := range s.metadata.stripes[i][tenant] { | ||
if assigned := s.partitionManager.Has(partitionID); !assigned { | ||
continue | ||
} | ||
|
||
for _, stream := range streams { | ||
ch <- stream | ||
} | ||
} | ||
|
||
s.metadata.locks[i].RUnlock() | ||
} | ||
|
||
close(ch) | ||
}() | ||
|
||
return ch | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@grobinson-grafana Can it be that we need to pass the context here to catch cancelation event to not leak this go routine and in turn holding locks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@grobinson-grafana Thinking about something like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general looks good! I would love to see a better abstraction around the stripes and the lock acquisition though. I think if you do this you'll find the code easier to reason about and also easier to write tests. You could consider using https://pkg.go.dev/iter for some of the operations.
pkg/limits/ingest_limits.go
Outdated
for i := range s.stripes { | ||
s.stripes[i] = make(map[string]map[int32][]*streamMetadata) | ||
|
||
for j := range s.stripes[i] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a no-op since s.stripes[i]
is a new map with no entries?
pkg/limits/ingest_limits.go
Outdated
} | ||
|
||
type streamMetadataStripes struct { | ||
size int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think size
is not required here? We know the number of locks with len(locks)
(for when we iterate over the stripes)? Because it just takes it from the slice header, len(locks)
is constant time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, introduced initially to avoid using len(locks)
in the following op:
i := uint64(partition) & uint64(len(s.metadata.locks)-1)
Since this op is done once, we can skip the extra size
field.
pkg/limits/ingest_limits.go
Outdated
partitionsPerTenant = make(map[string]map[int32]struct{}) | ||
) | ||
|
||
for i := range s.metadata.size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for i := range s.metadata.size { | |
for i := range s.metadata.locks { |
} | ||
|
||
func (s *IngestLimits) onPartitionsAssigned(ctx context.Context, client *kgo.Client, partitions map[string][]int32) { | ||
s.partitionManager.Assign(ctx, client, partitions) | ||
} | ||
|
||
func (s *IngestLimits) onPartitionsRevoked(ctx context.Context, client *kgo.Client, partitions map[string][]int32) { | ||
s.mtx.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just be careful here because removing this mutex means the method is no longer linearizable. I'm not sure if it matters much, but please just take some time to confirm this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You say we need the extra mutex as it is not the same as taking the exclusive locks in the callee:
pkg/limits/ingest_limits.go
Outdated
|
||
// Initialize stripe map if it doesn't exist | ||
if s.metadata.stripes[i] == nil { | ||
s.metadata.stripes[i] = make(map[string]map[int32][]*streamMetadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are now storing a slice of *streamMetadata
instead of streamMetadata
, but I'm not sure I understand why. If it's for performance, this will be quite a bit slower as we'll need to do a lot more memory allocations and garbage collection, and suffer more cache misses on read as the entries will be all over the heap.
pkg/limits/ingest_limits.go
Outdated
@@ -611,3 +624,50 @@ func (s *IngestLimits) GetStreamUsage(_ context.Context, req *logproto.GetStream | |||
Rate: uint64(rate), | |||
}, nil | |||
} | |||
|
|||
func (s *IngestLimits) streams(ctx context.Context, tenant string) <-chan *streamMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really following what the problem is here, but I feel this is very complicated for what is supposed to just be producing a result. I know you had a comment that you were concerned about deadlocks and I'm also a bit worried. If a reader decides not the consume all of the channel then we cannot make any system-wide progress.
I would take a step back here and re-analyze what the problem is here as I think you will end up with a better solution. If it were me, I would just iterate over the stripes as we know the stripes are always acquired in ascending order whenever we iterate, which should mean we have reasonably good pipelining. I would recommend starting simple and then getting complex if only when we have to and not earlier.
pkg/limits/ingest_limits_test.go
Outdated
select { | ||
case <-acquired: | ||
// Success - lock was available | ||
case <-time.After(100 * time.Millisecond): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will flake a lot.
pkg/limits/ingest_limits_test.go
Outdated
t.Fatal("test timed out - possible goroutine leak or deadlock") | ||
} | ||
|
||
// Verify locks are released |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super difficult to test robustly, I would strong advise against it.
What this PR does / why we need it:
This pull request implements a stripe-lock approach to read/write the internal ingest-limits stream metadata structure. The stripes are distributed over the total amount of partitions (e.g. default is 64). This allows updating and reading the structure with less lock contention.
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR