Skip to content

Commit f40bbaa

Browse files
ctillercopybara-github
authored andcommitted
Event Engine write metrics
Sketched out an approach to customizable write metrics we discussed last week In a nutshell: - endpoints can advertise a set of metrics they provide (in our discussions we used a Span here, I realized during implementation I'd like wrapped endpoints to be able to extend this set, and so using a vector instead) - endpoints also supply name and id queries for said metrics - users of the api can request a set of events to callback on, and a set of metrics they're interested in, and a callback will be made when those events are ready with the set of metrics requested PiperOrigin-RevId: 748559193
1 parent ca35be4 commit f40bbaa

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+531
-148
lines changed

BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ GRPC_PUBLIC_EVENT_ENGINE_HDRS = [
300300
"include/grpc/event_engine/slice.h",
301301
"include/grpc/event_engine/slice_buffer.h",
302302
"include/grpc/event_engine/internal/slice_cast.h",
303+
"include/grpc/event_engine/internal/write_event.h",
303304
]
304305

305306
GRPCXX_SRCS = [

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

build_autogenerated.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gRPC-Core.podspec

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

grpc.gemspec

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

include/grpc/event_engine/event_engine.h

Lines changed: 124 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
#include <grpc/event_engine/endpoint_config.h>
1818
#include <grpc/event_engine/extensible.h>
19+
#include <grpc/event_engine/internal/write_event.h>
1920
#include <grpc/event_engine/memory_allocator.h>
2021
#include <grpc/event_engine/port.h>
2122
#include <grpc/event_engine/slice_buffer.h>
2223
#include <grpc/support/port_platform.h>
2324

25+
#include <bitset>
26+
#include <initializer_list>
2427
#include <vector>
2528

2629
#include "absl/functional/any_invocable.h"
@@ -180,13 +183,26 @@ class EventEngine : public std::enable_shared_from_this<EventEngine>,
180183
/// EventEngine Endpoint Read API call.
181184
///
182185
/// Passed as argument to an Endpoint \a Read
183-
struct ReadArgs {
186+
class ReadArgs final {
187+
public:
188+
ReadArgs() = default;
189+
ReadArgs(const ReadArgs&) = delete;
190+
ReadArgs& operator=(const ReadArgs&) = delete;
191+
ReadArgs(ReadArgs&&) = default;
192+
ReadArgs& operator=(ReadArgs&&) = default;
193+
184194
// A suggestion to the endpoint implementation to read at-least the
185195
// specified number of bytes over the network connection before marking
186196
// the endpoint read operation as complete. gRPC may use this argument
187197
// to minimize the number of endpoint read API calls over the lifetime
188198
// of a connection.
189-
int64_t read_hint_bytes = 1;
199+
void set_read_hint_bytes(int64_t read_hint_bytes) {
200+
read_hint_bytes_ = read_hint_bytes;
201+
}
202+
int64_t read_hint_bytes() const { return read_hint_bytes_; }
203+
204+
private:
205+
int64_t read_hint_bytes_ = 1;
190206
};
191207
/// Reads data from the Endpoint.
192208
///
@@ -212,20 +228,110 @@ class EventEngine : public std::enable_shared_from_this<EventEngine>,
212228
/// statuses to \a on_read. For example, callbacks might expect to receive
213229
/// CANCELLED on endpoint shutdown.
214230
virtual bool Read(absl::AnyInvocable<void(absl::Status)> on_read,
215-
SliceBuffer* buffer, const ReadArgs* args) = 0;
231+
SliceBuffer* buffer, ReadArgs args) = 0;
232+
//// The set of write events that can be reported by an Endpoint.
233+
using WriteEvent = ::grpc_event_engine::experimental::internal::WriteEvent;
234+
/// An output WriteMetric consists of a key and a value.
235+
/// The space of keys can be queried from the endpoint via the
236+
/// \a AllWriteMetrics, \a GetMetricName and \a GetMetricKey APIs.
237+
/// The value is an int64_t that is implementation-defined. Check with the
238+
/// endpoint implementation documentation for the semantics of each metric.
239+
struct WriteMetric {
240+
size_t key;
241+
int64_t value;
242+
};
243+
using WriteEventCallback = absl::AnyInvocable<void(
244+
WriteEvent, absl::Time, std::vector<WriteMetric>) const>;
245+
// A bitmask of the events that the caller is interested in.
246+
// Each bit corresponds to an entry in WriteEvent.
247+
using WriteEventSet = std::bitset<static_cast<int>(WriteEvent::kCount)>;
248+
// A sink to receive write events.
249+
// The requested metrics are the keys of the metrics that the caller is
250+
// interested in. The on_event callback will be called on each event
251+
// requested.
252+
class WriteEventSink final {
253+
public:
254+
WriteEventSink(absl::Span<const size_t> requested_metrics,
255+
std::initializer_list<WriteEvent> requested_events,
256+
WriteEventCallback on_event)
257+
: requested_metrics_(requested_metrics),
258+
on_event_(std::move(on_event)) {
259+
for (auto event : requested_events) {
260+
requested_events_mask_.set(static_cast<int>(event));
261+
}
262+
}
263+
264+
absl::Span<const size_t> requested_metrics() const {
265+
return requested_metrics_;
266+
}
267+
268+
bool requested_event(WriteEvent event) const {
269+
return requested_events_mask_.test(static_cast<int>(event));
270+
}
271+
272+
WriteEventSet requested_events_mask() const {
273+
return requested_events_mask_;
274+
}
275+
276+
WriteEventCallback TakeEventCallback() { return std::move(on_event_); }
277+
278+
private:
279+
absl::Span<const size_t> requested_metrics_;
280+
WriteEventSet requested_events_mask_;
281+
// The callback to be called on each event.
282+
WriteEventCallback on_event_;
283+
};
216284
/// A struct representing optional arguments that may be provided to an
217285
/// EventEngine Endpoint Write API call.
218286
///
219287
/// Passed as argument to an Endpoint \a Write
220-
struct WriteArgs {
288+
class WriteArgs final {
289+
public:
290+
WriteArgs() = default;
291+
WriteArgs(const WriteArgs&) = delete;
292+
WriteArgs& operator=(const WriteArgs&) = delete;
293+
WriteArgs(WriteArgs&&) = default;
294+
WriteArgs& operator=(WriteArgs&&) = default;
295+
296+
// A sink to receive write events.
297+
std::optional<WriteEventSink> TakeMetricsSink() {
298+
auto sink = std::move(metrics_sink_);
299+
metrics_sink_.reset();
300+
return sink;
301+
}
302+
303+
bool has_metrics_sink() const { return metrics_sink_.has_value(); }
304+
305+
void set_metrics_sink(WriteEventSink sink) {
306+
metrics_sink_ = std::move(sink);
307+
}
308+
221309
// Represents private information that may be passed by gRPC for
222310
// select endpoints expected to be used only within google.
223-
void* google_specific = nullptr;
311+
// TODO(ctiller): Remove this method once all callers are migrated to
312+
// metrics sink.
313+
void* GetDeprecatedAndDiscouragedGoogleSpecificPointer() {
314+
return google_specific_;
315+
}
316+
317+
void SetDeprecatedAndDiscouragedGoogleSpecificPointer(void* pointer) {
318+
google_specific_ = pointer;
319+
}
320+
224321
// A suggestion to the endpoint implementation to group data to be written
225322
// into frames of the specified max_frame_size. gRPC may use this
226323
// argument to dynamically control the max sizes of frames sent to a
227324
// receiver in response to high receiver memory pressure.
228-
int64_t max_frame_size = 1024 * 1024;
325+
int64_t max_frame_size() const { return max_frame_size_; }
326+
327+
void set_max_frame_size(int64_t max_frame_size) {
328+
max_frame_size_ = max_frame_size;
329+
}
330+
331+
private:
332+
std::optional<WriteEventSink> metrics_sink_;
333+
void* google_specific_ = nullptr;
334+
int64_t max_frame_size_ = 1024 * 1024;
229335
};
230336
/// Writes data out on the connection.
231337
///
@@ -248,11 +354,22 @@ class EventEngine : public std::enable_shared_from_this<EventEngine>,
248354
/// statuses to \a on_writable. For example, callbacks might expect to
249355
/// receive CANCELLED on endpoint shutdown.
250356
virtual bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
251-
SliceBuffer* data, const WriteArgs* args) = 0;
357+
SliceBuffer* data, WriteArgs args) = 0;
252358
/// Returns an address in the format described in DNSResolver. The returned
253359
/// values are expected to remain valid for the life of the Endpoint.
254360
virtual const ResolvedAddress& GetPeerAddress() const = 0;
255361
virtual const ResolvedAddress& GetLocalAddress() const = 0;
362+
/// Returns the list of write metrics that the endpoint supports.
363+
/// The keys are used to identify the metrics in the GetMetricName and
364+
/// GetMetricKey APIs. The current value of the metric can be queried by
365+
/// adding a WriteEventSink to the WriteArgs of a Write call.
366+
virtual std::vector<size_t> AllWriteMetrics() = 0;
367+
/// Returns the name of the write metric with the given key.
368+
/// If the key is not found, returns std::nullopt.
369+
virtual std::optional<absl::string_view> GetMetricName(size_t key) = 0;
370+
/// Returns the key of the write metric with the given name.
371+
/// If the name is not found, returns std::nullopt.
372+
virtual std::optional<size_t> GetMetricKey(absl::string_view name) = 0;
256373
};
257374

258375
/// Called when a new connection is established.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright 2022 gRPC authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GRPC_EVENT_ENGINE_INTERNAL_WRITE_EVENT_H
16+
#define GRPC_EVENT_ENGINE_INTERNAL_WRITE_EVENT_H
17+
18+
namespace grpc_event_engine::experimental::internal {
19+
20+
// Use of this enum via this name is internal to gRPC.
21+
// API users should get this enumeration via the
22+
// EventEngine::Endpoint::WriteEvent.
23+
enum class WriteEvent {
24+
kSendMsg,
25+
kScheduled,
26+
kSent,
27+
kAcked,
28+
kClosed,
29+
kCount // Must be last.
30+
};
31+
32+
} // namespace grpc_event_engine::experimental::internal
33+
34+
#endif // GRPC_EVENT_ENGINE_INTERNAL_WRITE_EVENT_H

package.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)