Skip to content

absolute deadline propagation #11966

Open
@macrergate

Description

@macrergate

Currently grpc deadlines are propagated from client to server via grpc-timeout header as relative value(timeout).
We believe we have pretty good clock synchronization between our servers (up to several milliseconds) and are interesting in propagation of the absolute deadline in some custom header. Is there a way to do it by customizing grpc-java server and client logic in interceptors or something else, but without forking grpc-java implementation?

Activity

macrergate

macrergate commented on Mar 20, 2025

@macrergate
Author

Is this the right way to go?

package io.grpc.examples.deadline;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.grpc.Contexts.statusFromCancelled;
import static io.grpc.Status.DEADLINE_EXCEEDED;

class DeadlineServerInterceptor implements ServerInterceptor {
    public static final Metadata.Key<String> GRPC_DEADLINE = Metadata.Key.of("grpc-deadline",
            Metadata.ASCII_STRING_MARSHALLER);

    private final ScheduledExecutorService scheduler;

    public DeadlineServerInterceptor(ScheduledExecutorService scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
                                                                 ServerCallHandler<ReqT, RespT> next) {

        long deadlineMillis = Long.parseLong(headers.get(GRPC_DEADLINE));
        long timeout = deadlineMillis - System.currentTimeMillis();

        Context.CancellableContext context = Context.current()
                .fork()
                .withDeadlineAfter(timeout, TimeUnit.MILLISECONDS, scheduler);

        context.addListener(ctx -> {
            Status status = statusFromCancelled(ctx);
            if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
                call.close(status, new Metadata());
            }
        }, directExecutor());

        return Contexts.interceptCall(context, call, headers, next);
    }
}
macrergate

macrergate commented on Mar 20, 2025

@macrergate
Author

I have found another approach and it looks better than the former - using ServerStreamTracer.Factory. In this case I'm getting in before original deadline context creation in ServerImpl

package io.grpc.examples.deadline;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.GrpcUtil;

class AbsoluteDeadlineServerStreamTracerFactory extends ServerStreamTracer.Factory {
    public static final Metadata.Key<String> GRPC_DEADLINE = Metadata.Key.of("grpc-deadline",
            Metadata.ASCII_STRING_MARSHALLER);
    public static final ServerStreamTracer NOOP_STREAM_TRACER = new ServerStreamTracer() {
    };
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    @Override
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
        String deadlineString = headers.get(GRPC_DEADLINE);

        if (deadlineString == null) {
            return NOOP_STREAM_TRACER;
        }

        long deadlineMillis = Long.parseLong(deadlineString);
        headers.discardAll(GrpcUtil.TIMEOUT_KEY);

        return new ServerStreamTracer() {
            @Override
            public Context filterContext(Context context) {
                long timeout = deadlineMillis - System.currentTimeMillis();

                return context.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS, scheduler);
            }
        };
    }
}

Any thoughts and criticism are greatly appreciated.

kannanjgithub

kannanjgithub commented on Mar 21, 2025

@kannanjgithub
Contributor

Yes, the 2nd solution is better, because in the 1st solution you are creating a forked cancellable context and so the cancellation will be called twice. Also it handles closing the call when cancellation due to the timeout occurs.
The 2nd solution just replaced the header timeout via filtering contexts and the ServerStreamCancellationListener will do the job of closing the call when the timeout occurs. There are no multiple contexts for the same purpose. The only caveat is that the timeout key from GrpcUtil is internal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Development

      No branches or pull requests

        Participants

        @macrergate@kannanjgithub

        Issue actions

          absolute deadline propagation · Issue #11966 · grpc/grpc-java