Skip to content

Commit c97cd51

Browse files
committed
PSM e2e: works!
1 parent 0a1cf7a commit c97cd51

File tree

7 files changed

+77
-30
lines changed

7 files changed

+77
-30
lines changed

xds/src/main/java/io/grpc/xds/MessagePrinter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
2929
import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig;
3030
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
31+
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig;
32+
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride;
3133
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBAC;
3234
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBACPerRoute;
3335
import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router;
@@ -58,6 +60,9 @@ private static JsonFormat.Printer newPrinter() {
5860
.add(RBAC.getDescriptor())
5961
.add(RBACPerRoute.getDescriptor())
6062
.add(Router.getDescriptor())
63+
// RLQS
64+
.add(RateLimitQuotaFilterConfig.getDescriptor())
65+
.add(RateLimitQuotaOverride.getDescriptor())
6166
// UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported
6267
// by top-level resource types.
6368
.add(UpstreamTlsContext.getDescriptor())

xds/src/main/java/io/grpc/xds/RlqsFilter.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
// TODO(sergiitk): introduce a layer between the filter and interceptor.
5555
// lds has filter names and the names are unique - even for server instances.
5656
final class RlqsFilter implements Filter, ServerInterceptorBuilder {
57+
private final XdsLogger logger;
58+
5759
static final boolean enabled = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_ENABLE_RLQS", false);
5860

5961
// TODO(sergiitk): [IMPL] remove
@@ -69,12 +71,12 @@ final class RlqsFilter implements Filter, ServerInterceptorBuilder {
6971

7072
private final AtomicReference<RlqsCache> rlqsCache = new AtomicReference<>();
7173

72-
private final XdsLogger logger;
73-
7474
public RlqsFilter() {
75-
InternalLogId logId = InternalLogId.allocate("rlqs-filter", null);
75+
// TODO(sergiitk): one per new instance when filters are refactored.
76+
InternalLogId logId = InternalLogId.allocate("RlqsFilter", null);
7677
logger = XdsLogger.withLogId(logId);
77-
logger.log(XdsLogLevel.INFO, "Created RLQS Filter with logId=" + logId);
78+
logger.log(XdsLogLevel.DEBUG,
79+
"Created RLQS Filter with enabled=" + enabled + ", dryRun=" + dryRun);
7880
}
7981

8082
@Override
@@ -177,7 +179,7 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(
177179

178180
// TODO(sergiitk): [IMPL] Remove
179181
if (dryRun) {
180-
logger.log(XdsLogLevel.INFO, "RLQS DRY RUN: request <<" + httpMatchInput + ">>");
182+
// logger.log(XdsLogLevel.INFO, "RLQS DRY RUN: request <<" + httpMatchInput + ">>");
181183
return next.startCall(call, headers);
182184
}
183185

@@ -204,7 +206,7 @@ RlqsFilterConfig parseRlqsFilter(RateLimitQuotaFilterConfig rlqsFilterProto)
204206

205207
// TODO(sergiitk): [IMPL] Remove
206208
if (dryRun) {
207-
logger.log(XdsLogLevel.INFO, "RLQS DRY RUN: skipping matchers");
209+
logger.log(XdsLogLevel.DEBUG, "Dry run: not parsing matchers in the filter filter");
208210
return builder.build();
209211
}
210212

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.io.IOException;
6060
import java.net.SocketAddress;
6161
import java.util.ArrayList;
62+
import java.util.Arrays;
6263
import java.util.Collections;
6364
import java.util.HashMap;
6465
import java.util.HashSet;
@@ -82,7 +83,9 @@ final class XdsServerWrapper extends Server {
8283
new Thread.UncaughtExceptionHandler() {
8384
@Override
8485
public void uncaughtException(Thread t, Throwable e) {
85-
logger.log(Level.SEVERE, "Exception!" + e);
86+
logger.log(Level.SEVERE, "Exception! "
87+
+ e + "\nTrace:\n"
88+
+ Arrays.toString(e.getStackTrace()).replace(',', '\n'));
8689
// TODO(chengyuanzhang): implement cleanup.
8790
}
8891
});

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
4545
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
4646
import java.net.URI;
47+
import java.util.Arrays;
4748
import java.util.Collection;
4849
import java.util.Collections;
4950
import java.util.HashMap;
@@ -72,8 +73,9 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
7273
public void uncaughtException(Thread t, Throwable e) {
7374
logger.log(
7475
XdsLogLevel.ERROR,
75-
"Uncaught exception in XdsClient SynchronizationContext. Panic!",
76-
e);
76+
"Uncaught exception in XdsClient SynchronizationContext. Panic! "
77+
+ e + "\nTrace:\n"
78+
+ Arrays.toString(e.getStackTrace()).replace(',', '\n'));
7779
// TODO(chengyuanzhang): better error handling.
7880
throw new AssertionError(e);
7981
}

xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsCache.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,12 @@ private long hashRlqsFilterConfig(RlqsFilterConfig config) {
9999
// TODO(sergiitk): [DESIGN] the key should be hashed (domain + buckets) merged config?
100100
// TODO(sergiitk): [IMPL] Hash buckets
101101
int k1 = Objects.hash(config.rlqsService().targetUri(), config.domain());
102-
int k2 = config.bucketMatchers().hashCode();
102+
int k2;
103+
if (config.bucketMatchers() == null) {
104+
k2 = 0x42c0ffee;
105+
} else {
106+
k2 = config.bucketMatchers().hashCode();
107+
}
103108
return Long.rotateLeft(Integer.toUnsignedLong(k1), 32) + Integer.toUnsignedLong(k2);
104109
}
105110

xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsClient.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,41 @@
2525
import io.envoyproxy.envoy.service.rate_limit_quota.v3.RateLimitQuotaUsageReports;
2626
import io.envoyproxy.envoy.service.rate_limit_quota.v3.RateLimitQuotaUsageReports.BucketQuotaUsage;
2727
import io.grpc.Grpc;
28+
import io.grpc.InternalLogId;
2829
import io.grpc.ManagedChannel;
30+
import io.grpc.internal.GrpcUtil;
2931
import io.grpc.stub.ClientCallStreamObserver;
3032
import io.grpc.stub.StreamObserver;
3133
import io.grpc.xds.client.Bootstrapper.RemoteServerInfo;
34+
import io.grpc.xds.client.XdsLogger;
35+
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
3236
import io.grpc.xds.internal.rlqs.RlqsBucket.RlqsBucketUsage;
3337
import java.util.List;
34-
import java.util.concurrent.TimeUnit;
3538
import java.util.concurrent.atomic.AtomicBoolean;
3639
import java.util.function.Consumer;
37-
import java.util.logging.Level;
38-
import java.util.logging.Logger;
40+
import javax.annotation.Nullable;
3941

4042
public final class RlqsClient {
41-
private static final Logger logger = Logger.getLogger(RlqsClient.class.getName());
43+
// TODO(sergiitk): [IMPL] remove
44+
// Do do not fail on parsing errors, only log requests.
45+
static final boolean dryRun = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_RLQS_DRY_RUN", false);
46+
47+
private final XdsLogger logger;
4248

4349
private final RemoteServerInfo serverInfo;
4450
private final Consumer<List<RlqsUpdateBucketAction>> bucketsUpdateCallback;
4551
private final RlqsStream rlqsStream;
4652

4753
RlqsClient(
4854
RemoteServerInfo serverInfo, String domain,
49-
Consumer<List<RlqsUpdateBucketAction>> bucketsUpdateCallback) {
55+
Consumer<List<RlqsUpdateBucketAction>> bucketsUpdateCallback, String prettyHash) {
5056
// TODO(sergiitk): [post] check not null.
5157
this.serverInfo = serverInfo;
5258
this.bucketsUpdateCallback = bucketsUpdateCallback;
59+
60+
logger = XdsLogger.withLogId(
61+
InternalLogId.allocate("RlqsClient", "<" + prettyHash + "> " + serverInfo.target()));
62+
5363
this.rlqsStream = new RlqsStream(serverInfo, domain);
5464
}
5565

@@ -62,7 +72,7 @@ public void sendUsageReports(List<RlqsBucketUsage> bucketUsages) {
6272
}
6373

6474
public void shutdown() {
65-
logger.log(Level.FINER, "Shutting down RlqsClient to {0}", serverInfo.target());
75+
logger.log(XdsLogLevel.DEBUG, "Shutting down RlqsClient to {0}", serverInfo.target());
6676
// TODO(sergiitk): [IMPL] RlqsClient shutdown
6777
}
6878

@@ -72,18 +82,26 @@ public void handleStreamClosed() {
7282

7383
private class RlqsStream {
7484
private final AtomicBoolean isFirstReport = new AtomicBoolean(true);
75-
private final ManagedChannel channel;
7685
private final String domain;
86+
@Nullable
7787
private final ClientCallStreamObserver<RateLimitQuotaUsageReports> clientCallStream;
7888

7989
RlqsStream(RemoteServerInfo serverInfo, String domain) {
8090
this.domain = domain;
81-
channel = Grpc.newChannelBuilder(serverInfo.target(), serverInfo.channelCredentials())
82-
.keepAliveTime(10, TimeUnit.SECONDS)
83-
.keepAliveWithoutCalls(true)
84-
.build();
85-
// keepalive?
91+
92+
if (dryRun) {
93+
clientCallStream = null;
94+
logger.log(XdsLogLevel.DEBUG, "Dry run, not connecting to " + serverInfo.target());
95+
return;
96+
}
97+
8698
// TODO(sergiitk): [IMPL] Manage State changes?
99+
ManagedChannel channel =
100+
Grpc.newChannelBuilder(serverInfo.target(), serverInfo.channelCredentials()).build();
101+
// keepalive?
102+
// .keepAliveTime(10, TimeUnit.SECONDS)
103+
// .keepAliveWithoutCalls(true)
104+
87105
RateLimitQuotaServiceStub stub = RateLimitQuotaServiceGrpc.newStub(channel);
88106
clientCallStream = (ClientCallStreamObserver<RateLimitQuotaUsageReports>)
89107
stub.streamRateLimitQuotas(new RlqsStreamObserver());
@@ -107,6 +125,10 @@ void reportUsage(List<RlqsBucket.RlqsBucketUsage> usageReports) {
107125
for (RlqsBucket.RlqsBucketUsage bucketUsage : usageReports) {
108126
report.addBucketQuotaUsages(toUsageReport(bucketUsage));
109127
}
128+
if (clientCallStream == null) {
129+
logger.log(XdsLogLevel.DEBUG, "Dry run, skipping bucket usage report: " + report.build());
130+
return;
131+
}
110132
clientCallStream.onNext(report.build());
111133
}
112134

@@ -128,12 +150,12 @@ public void onNext(RateLimitQuotaResponse response) {
128150

129151
@Override
130152
public void onError(Throwable t) {
131-
153+
logger.log(XdsLogLevel.DEBUG, "Got error in RlqsStreamObserver: " + t.toString());
132154
}
133155

134156
@Override
135157
public void onCompleted() {
136-
158+
logger.log(XdsLogLevel.DEBUG, "RlqsStreamObserver completed");
137159
}
138160
}
139161
}

xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsEngine.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
package io.grpc.xds.internal.rlqs;
1818

1919
import com.google.common.collect.ImmutableList;
20+
import io.grpc.InternalLogId;
2021
import io.grpc.xds.client.Bootstrapper.RemoteServerInfo;
22+
import io.grpc.xds.client.XdsLogger;
23+
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
2124
import io.grpc.xds.internal.datatype.RateLimitStrategy;
2225
import io.grpc.xds.internal.matchers.HttpMatchInput;
2326
import io.grpc.xds.internal.matchers.Matcher;
@@ -29,11 +32,9 @@
2932
import java.util.concurrent.ScheduledExecutorService;
3033
import java.util.concurrent.ScheduledFuture;
3134
import java.util.concurrent.TimeUnit;
32-
import java.util.logging.Level;
33-
import java.util.logging.Logger;
3435

3536
public class RlqsEngine {
36-
private static final Logger logger = Logger.getLogger(RlqsEngine.class.getName());
37+
private final XdsLogger logger;
3738

3839
private final RlqsClient rlqsClient;
3940
private final Matcher<HttpMatchInput, RlqsBucketSettings> bucketMatchers;
@@ -49,8 +50,14 @@ public RlqsEngine(
4950
this.bucketMatchers = bucketMatchers;
5051
this.configHash = configHash;
5152
this.scheduler = scheduler;
53+
54+
String prettyHash = "0x" + Long.toHexString(configHash);
55+
logger = XdsLogger.withLogId(InternalLogId.allocate("RlqsEngine", prettyHash));
56+
logger.log(XdsLogLevel.DEBUG,
57+
"Initialized RlqsEngine for hash={0}, domain={1}", prettyHash, domain);
58+
5259
bucketCache = new RlqsBucketCache();
53-
rlqsClient = new RlqsClient(rlqsServer, domain, this::onBucketsUpdate);
60+
rlqsClient = new RlqsClient(rlqsServer, domain, this::onBucketsUpdate, prettyHash);
5461
}
5562

5663
public RlqsRateLimitResult rateLimit(HttpMatchInput input) {
@@ -95,7 +102,8 @@ private void scheduleImmediateReport(RlqsBucket newBucket) {
95102
1, TimeUnit.MICROSECONDS);
96103
} catch (RejectedExecutionException e) {
97104
// Shouldn't happen.
98-
logger.finer("Couldn't schedule immediate report for bucket " + newBucket.getBucketId());
105+
logger.log(XdsLogLevel.WARNING,
106+
"Couldn't schedule immediate report for bucket " + newBucket.getBucketId());
99107
}
100108
}
101109

@@ -123,7 +131,7 @@ private void reportBucketsWithInterval(long intervalMillis) {
123131
public void shutdown() {
124132
// TODO(sergiitk): [IMPL] Timers shutdown
125133
// TODO(sergiitk): [IMPL] RlqsEngine shutdown
126-
logger.log(Level.FINER, "Shutting down RlqsEngine with hash {0}", configHash);
134+
logger.log(XdsLogLevel.DEBUG, "Shutting down RlqsEngine with hash {0}", configHash);
127135
rlqsClient.shutdown();
128136
}
129137
}

0 commit comments

Comments
 (0)