Skip to content

Commit 3e92b42

Browse files
fix: skip streams over limits in dry-run mode
This commit fixes a bug when distributors were running with IngestLimitsDryRunEnabled where streams that were over the max stream limits would still be written to the metadata topic, meaning those streams would be included in the existing streams rather than outside the max stream limit.
1 parent 364ba26 commit 3e92b42

File tree

1 file changed

+35
-23
lines changed

1 file changed

+35
-23
lines changed

pkg/distributor/distributor.go

+35-23
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,7 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
721721
return &logproto.PushResponse{}, validationErr
722722
}
723723

724+
var skipMetadataHashes map[uint64]struct{}
724725
if d.cfg.IngestLimitsEnabled {
725726
var reasonsForStreams map[uint64][]string
726727
streams, reasonsForStreams, err = d.ingestLimits.enforceLimits(ctx, tenantID, streams)
@@ -737,7 +738,17 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
737738
}
738739
}
739740
newStreams := streamsForReasons[limits_frontend.ReasonExceedsMaxStreams]
740-
if !d.cfg.IngestLimitsDryRunEnabled {
741+
// When IngestLimitsDryRunEnabled is true, we need to stop stream hashes
742+
// that exceed the stream limit from being written to the metadata topic.
743+
// If we don't do this, the stream hashes that should have been rejected
744+
// will instead being counted as a known stream, causing a disagreement
745+
// in metrics between the limits service and ingesters.
746+
if d.cfg.IngestLimitsDryRunEnabled {
747+
skipMetadataHashes := make(map[uint64]struct{})
748+
for _, streamHash := range newStreams {
749+
skipMetadataHashes[streamHash] = struct{}{}
750+
}
751+
} else {
741752
// TODO(grobinson): Return the reasons for each stream, instead of
742753
// generic error messages.
743754
if len(newStreams) == len(streams) {
@@ -797,7 +808,7 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
797808
return nil, err
798809
}
799810
// We don't need to create a new context like the ingester writes, because we don't return unless all writes have succeeded.
800-
d.sendStreamsToKafka(ctx, streams, tenantID, &tracker, subring)
811+
d.sendStreamsToKafka(ctx, streams, skipMetadataHashes, tenantID, &tracker, subring)
801812
}
802813

803814
if d.cfg.IngesterEnabled {
@@ -1232,10 +1243,10 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
12321243
return err
12331244
}
12341245

1235-
func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker, subring *ring.PartitionRing) {
1246+
func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, skipMetadataHashes map[uint64]struct{}, tenant string, tracker *pushTracker, subring *ring.PartitionRing) {
12361247
for _, s := range streams {
12371248
go func(s KeyedStream) {
1238-
err := d.sendStreamToKafka(ctx, s, tenant, subring)
1249+
err := d.sendStreamToKafka(ctx, s, skipMetadataHashes, tenant, subring)
12391250
if err != nil {
12401251
err = fmt.Errorf("failed to write stream to kafka: %w", err)
12411252
}
@@ -1244,7 +1255,7 @@ func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStr
12441255
}
12451256
}
12461257

1247-
func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string, subring *ring.PartitionRing) error {
1258+
func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, skipMetadataHashes map[uint64]struct{}, tenant string, subring *ring.PartitionRing) error {
12481259
if len(stream.Stream.Entries) == 0 {
12491260
return nil
12501261
}
@@ -1274,26 +1285,27 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
12741285

12751286
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
12761287

1277-
// However, unlike stream records, the distributor writes stream metadata
1278-
// records to one of a fixed number of partitions, the size of which is
1279-
// determined ahead of time. It does not use a ring. The reason for this
1280-
// is that we want to be able to scale components that consume metadata
1281-
// records independent of ingesters.
1282-
metadataPartitionID := int32(stream.HashKeyNoShard % uint64(d.numMetadataPartitions))
1283-
metadata, err := kafka.EncodeStreamMetadata(
1284-
metadataPartitionID,
1285-
d.cfg.KafkaConfig.Topic,
1286-
tenant,
1287-
stream.HashKeyNoShard,
1288-
entriesSize,
1289-
structuredMetadataSize,
1290-
)
1291-
if err != nil {
1292-
return fmt.Errorf("failed to marshal metadata: %w", err)
1288+
if _, ok := skipMetadataHashes[stream.HashKeyNoShard]; !ok {
1289+
// However, unlike stream records, the distributor writes stream metadata
1290+
// records to one of a fixed number of partitions, the size of which is
1291+
// determined ahead of time. It does not use a ring. The reason for this
1292+
// is that we want to be able to scale components that consume metadata
1293+
// records independent of ingesters.
1294+
metadataPartitionID := int32(stream.HashKeyNoShard % uint64(d.numMetadataPartitions))
1295+
metadata, err := kafka.EncodeStreamMetadata(
1296+
metadataPartitionID,
1297+
d.cfg.KafkaConfig.Topic,
1298+
tenant,
1299+
stream.HashKeyNoShard,
1300+
entriesSize,
1301+
structuredMetadataSize,
1302+
)
1303+
if err != nil {
1304+
return fmt.Errorf("failed to marshal metadata: %w", err)
1305+
}
1306+
records = append(records, metadata)
12931307
}
12941308

1295-
records = append(records, metadata)
1296-
12971309
d.kafkaRecordsPerRequest.Observe(float64(len(records)))
12981310

12991311
produceResults := d.kafkaWriter.ProduceSync(ctx, records)

0 commit comments

Comments
 (0)