Skip to content

Commit 8b0e41e

Browse files
committed
Update RlqsFilter to the new Filter API.
1 parent b24bef9 commit 8b0e41e

File tree

2 files changed

+100
-82
lines changed

2 files changed

+100
-82
lines changed

xds/src/main/java/io/grpc/xds/FilterRegistry.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ static synchronized FilterRegistry getDefaultRegistry() {
3737
instance = newRegistry().register(
3838
new FaultFilter.Provider(),
3939
new RouterFilter.Provider(),
40-
new RbacFilter.Provider());
40+
new RbacFilter.Provider(),
41+
new RlqsFilter.Provider());
4142
}
4243
return instance;
4344
}

xds/src/main/java/io/grpc/xds/RlqsFilter.java

+98-81
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import io.grpc.ServerCallHandler;
3535
import io.grpc.ServerInterceptor;
3636
import io.grpc.internal.GrpcUtil;
37-
import io.grpc.xds.Filter.ServerInterceptorBuilder;
37+
import io.grpc.internal.SharedResourceHolder;
3838
import io.grpc.xds.client.XdsLogger;
3939
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
4040
import io.grpc.xds.internal.datatype.GrpcService;
@@ -46,14 +46,17 @@
4646
import io.grpc.xds.internal.rlqs.RlqsCache;
4747
import io.grpc.xds.internal.rlqs.RlqsFilterState;
4848
import io.grpc.xds.internal.rlqs.RlqsRateLimitResult;
49+
import java.util.ConcurrentModificationException;
4950
import java.util.concurrent.ScheduledExecutorService;
51+
import java.util.concurrent.atomic.AtomicBoolean;
5052
import java.util.concurrent.atomic.AtomicReference;
53+
import java.util.logging.Logger;
5154
import javax.annotation.Nullable;
5255

5356
/** RBAC Http filter implementation. */
5457
// TODO(sergiitk): introduce a layer between the filter and interceptor.
5558
// lds has filter names and the names are unique - even for server instances.
56-
final class RlqsFilter implements Filter, ServerInterceptorBuilder {
59+
final class RlqsFilter implements Filter {
5760
private final XdsLogger logger;
5861

5962
static final boolean enabled = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_ENABLE_RLQS", false);
@@ -62,70 +65,124 @@ final class RlqsFilter implements Filter, ServerInterceptorBuilder {
6265
// Do do not fail on parsing errors, only log requests.
6366
static final boolean dryRun = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_RLQS_DRY_RUN", false);
6467

65-
static final RlqsFilter INSTANCE = new RlqsFilter();
66-
6768
static final String TYPE_URL = "type.googleapis.com/"
6869
+ "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig";
6970
static final String TYPE_URL_OVERRIDE_CONFIG = "type.googleapis.com/"
7071
+ "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride";
7172

73+
private final AtomicBoolean shutdown = new AtomicBoolean();
7274
private final AtomicReference<RlqsCache> rlqsCache = new AtomicReference<>();
7375

74-
public RlqsFilter() {
75-
// TODO(sergiitk): one per new instance when filters are refactored.
76+
// TODO(sergiitk): [IMPL] figure out what to use here.
77+
private final ScheduledExecutorService scheduler =
78+
SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
79+
80+
public RlqsFilter(String name) {
7681
logger = XdsLogger.withLogId(InternalLogId.allocate(this.getClass(), null));
7782
logger.log(XdsLogLevel.DEBUG,
78-
"Created RLQS Filter with enabled=" + enabled + ", dryRun=" + dryRun);
83+
"Created RLQS Filter name='%s' with enabled=%s, dryRun=%s", name, enabled, dryRun);
7984
}
8085

81-
@Override
82-
public String[] typeUrls() {
83-
return new String[]{TYPE_URL, TYPE_URL_OVERRIDE_CONFIG};
84-
}
86+
static final class Provider implements Filter.Provider {
87+
private static final Logger logger = Logger.getLogger(Provider.class.getName());
8588

86-
@Override
87-
public boolean isEnabled() {
88-
return enabled;
89-
}
89+
@Override
90+
public String[] typeUrls() {
91+
return new String[]{TYPE_URL, TYPE_URL_OVERRIDE_CONFIG};
92+
}
9093

91-
@Override
92-
public ConfigOrError<RlqsFilterConfig> parseFilterConfig(Message rawProtoMessage) {
93-
try {
94-
RlqsFilterConfig rlqsFilterConfig =
95-
parseRlqsFilter(unpackAny(rawProtoMessage, RateLimitQuotaFilterConfig.class));
96-
return ConfigOrError.fromConfig(rlqsFilterConfig);
97-
} catch (InvalidProtocolBufferException e) {
98-
return ConfigOrError.fromError("Can't unpack RateLimitQuotaFilterConfig proto: " + e);
99-
} catch (ResourceInvalidException e) {
100-
return ConfigOrError.fromError(e.getMessage());
94+
@Override
95+
public boolean isServerFilter() {
96+
return true;
97+
}
98+
99+
@Override
100+
public RlqsFilter newInstance(String name) {
101+
return new RlqsFilter(name);
102+
}
103+
104+
@Override
105+
public ConfigOrError<RlqsFilterConfig> parseFilterConfig(Message rawProtoMessage) {
106+
try {
107+
RlqsFilterConfig rlqsFilterConfig =
108+
parseRlqsFilter(unpackAny(rawProtoMessage, RateLimitQuotaFilterConfig.class));
109+
return ConfigOrError.fromConfig(rlqsFilterConfig);
110+
} catch (InvalidProtocolBufferException e) {
111+
return ConfigOrError.fromError("Can't unpack RateLimitQuotaFilterConfig proto: " + e);
112+
} catch (ResourceInvalidException e) {
113+
return ConfigOrError.fromError(e.getMessage());
114+
}
115+
}
116+
117+
@Override
118+
public ConfigOrError<RlqsFilterConfig> parseFilterConfigOverride(Message rawProtoMessage) {
119+
try {
120+
RlqsFilterConfig rlqsFilterConfig =
121+
parseRlqsFilterOverride(unpackAny(rawProtoMessage, RateLimitQuotaOverride.class));
122+
return ConfigOrError.fromConfig(rlqsFilterConfig);
123+
} catch (InvalidProtocolBufferException e) {
124+
return ConfigOrError.fromError("Can't unpack RateLimitQuotaOverride proto: " + e);
125+
} catch (ResourceInvalidException e) {
126+
return ConfigOrError.fromError(e.getMessage());
127+
}
128+
}
129+
130+
@VisibleForTesting
131+
RlqsFilterConfig parseRlqsFilter(RateLimitQuotaFilterConfig rlqsFilterProto)
132+
throws ResourceInvalidException, InvalidProtocolBufferException {
133+
RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder();
134+
if (rlqsFilterProto.getDomain().isEmpty()) {
135+
throw new ResourceInvalidException("RateLimitQuotaFilterConfig domain is required");
136+
}
137+
builder.domain(rlqsFilterProto.getDomain())
138+
.rlqsService(GrpcService.fromEnvoyProto(rlqsFilterProto.getRlqsServer()));
139+
140+
// TODO(sergiitk): [IMPL] Remove
141+
if (dryRun) {
142+
logger.finest("RLQS DRY RUN: not parsing matchers");
143+
return builder.build();
144+
}
145+
146+
// TODO(sergiitk): [IMPL] actually parse, move to RlqsBucketSettings.fromProto()
147+
RateLimitQuotaBucketSettings fallbackBucketSettingsProto = unpackAny(
148+
rlqsFilterProto.getBucketMatchers().getOnNoMatch().getAction().getTypedConfig(),
149+
RateLimitQuotaBucketSettings.class);
150+
RlqsBucketSettings fallbackBucket = RlqsBucketSettings.create(
151+
ImmutableMap.of("bucket_id", headers -> "hello"),
152+
fallbackBucketSettingsProto.getReportingInterval());
153+
154+
// TODO(sergiitk): [IMPL] actually parse, move to Matcher.fromProto()
155+
Matcher<HttpMatchInput, RlqsBucketSettings> bucketMatchers = new RlqsMatcher(fallbackBucket);
156+
157+
return builder.bucketMatchers(bucketMatchers).build();
101158
}
102159
}
103160

104161
@Override
105-
public ConfigOrError<RlqsFilterConfig> parseFilterConfigOverride(Message rawProtoMessage) {
106-
try {
107-
RlqsFilterConfig rlqsFilterConfig =
108-
parseRlqsFilterOverride(unpackAny(rawProtoMessage, RateLimitQuotaOverride.class));
109-
return ConfigOrError.fromConfig(rlqsFilterConfig);
110-
} catch (InvalidProtocolBufferException e) {
111-
return ConfigOrError.fromError("Can't unpack RateLimitQuotaOverride proto: " + e);
112-
} catch (ResourceInvalidException e) {
113-
return ConfigOrError.fromError(e.getMessage());
162+
public void close() {
163+
// TODO(sergiitk): [DESIGN] besides shutting down everything, should there
164+
// be per-route interceptor destructors?
165+
if (!shutdown.compareAndSet(false, true)) {
166+
throw new ConcurrentModificationException(
167+
"Unexpected: RlqsFilter#close called multiple times");
168+
}
169+
RlqsCache oldCache = rlqsCache.getAndUpdate(unused -> null);
170+
if (oldCache != null) {
171+
oldCache.shutdown();
114172
}
115173
}
116174

175+
// @Override
176+
public boolean isEnabled() {
177+
return enabled;
178+
}
179+
117180
@Nullable
118181
@Override
119182
public ServerInterceptor buildServerInterceptor(
120183
FilterConfig config, @Nullable FilterConfig overrideConfig) {
121-
throw new UnsupportedOperationException("ScheduledExecutorService scheduler required");
122-
}
184+
// ScheduledExecutorService scheduler
123185

124-
@Override
125-
public ServerInterceptor buildServerInterceptor(
126-
FilterConfig config,
127-
@Nullable FilterConfig overrideConfig,
128-
ScheduledExecutorService scheduler) {
129186
// Called when we get an xds update - when the LRS or RLS changes.
130187
RlqsFilterConfig rlqsFilterConfig = (RlqsFilterConfig) checkNotNull(config, "config");
131188

@@ -148,16 +205,6 @@ public ServerInterceptor buildServerInterceptor(
148205
return generateRlqsInterceptor(rlqsFilterConfig);
149206
}
150207

151-
@Override
152-
public void shutdown() {
153-
// TODO(sergiitk): [DESIGN] besides shutting down everything, should there
154-
// be per-route interceptor destructors?
155-
RlqsCache oldCache = rlqsCache.getAndUpdate(unused -> null);
156-
if (oldCache != null) {
157-
oldCache.shutdown();
158-
}
159-
}
160-
161208
@Nullable
162209
private ServerInterceptor generateRlqsInterceptor(RlqsFilterConfig config) {
163210
checkNotNull(config, "config");
@@ -193,36 +240,6 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(
193240
};
194241
}
195242

196-
@VisibleForTesting
197-
RlqsFilterConfig parseRlqsFilter(RateLimitQuotaFilterConfig rlqsFilterProto)
198-
throws ResourceInvalidException, InvalidProtocolBufferException {
199-
RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder();
200-
if (rlqsFilterProto.getDomain().isEmpty()) {
201-
throw new ResourceInvalidException("RateLimitQuotaFilterConfig domain is required");
202-
}
203-
builder.domain(rlqsFilterProto.getDomain())
204-
.rlqsService(GrpcService.fromEnvoyProto(rlqsFilterProto.getRlqsServer()));
205-
206-
// TODO(sergiitk): [IMPL] Remove
207-
if (dryRun) {
208-
logger.log(XdsLogLevel.DEBUG, "Dry run: not parsing matchers in the filter filter");
209-
return builder.build();
210-
}
211-
212-
// TODO(sergiitk): [IMPL] actually parse, move to RlqsBucketSettings.fromProto()
213-
RateLimitQuotaBucketSettings fallbackBucketSettingsProto = unpackAny(
214-
rlqsFilterProto.getBucketMatchers().getOnNoMatch().getAction().getTypedConfig(),
215-
RateLimitQuotaBucketSettings.class);
216-
RlqsBucketSettings fallbackBucket = RlqsBucketSettings.create(
217-
ImmutableMap.of("bucket_id", headers -> "hello"),
218-
fallbackBucketSettingsProto.getReportingInterval());
219-
220-
// TODO(sergiitk): [IMPL] actually parse, move to Matcher.fromProto()
221-
Matcher<HttpMatchInput, RlqsBucketSettings> bucketMatchers = new RlqsMatcher(fallbackBucket);
222-
223-
return builder.bucketMatchers(bucketMatchers).build();
224-
}
225-
226243
static class RlqsMatcher extends Matcher<HttpMatchInput, RlqsBucketSettings> {
227244
private final RlqsBucketSettings fallbackBucket;
228245

0 commit comments

Comments
 (0)