gRPC 重要概念源码

[toc]

# 通用概念

gRPC 中,有部分概念在服务端和客户端由类似的定义,比如:

  • 拦截器 Interceptor
  • 上下文 Context

# 拦截器 Interceptor

拦截器是在各种网络框架中经常使用的概念,主要用于对网络请求进行前置和后置操作.

gRPC 的客户端和服务端都有自己的拦截器.

# 服务端拦截器

请参考 服务端概念 -> 服务端拦截器

# 客户端拦截器

请参考 客户端概念 -> 客户端拦截器

# 上下文 Context

gRPC 提供了三种类型的 Context:

  • 共享 Context grpc.RpcContext
  • 服务端 Context grpc.ServicerContext
  • 客户端 Context grpc.Call

# 共享 Context

共享 Context 接口定义为 grpc.RpcContext , 该接口定义如下:

class RpcContext(six.with_metaclass(abc.ABCMeta)):
    def is_active(self):
        pass

    def time_remaining(self):
        pass

    def cancel(self):
        pass

    def add_callback(self, callback):
        pass

服务端上下文 grpc.ServicerContext 和客户端上下文 grpc.Call 均为该类的子类.

# 服务端 Context

请参考 服务端概念 -> 服务端上下文

# 客户端 Context

请参考 客户端概念 -> 客户端上下文

# 服务端概念

服务端的基本概念有:

  • 服务端 Server
  • 服务 Servicer
  • 接口 Handler
  • 服务拦截器 Interceptor
  • ServicerContext

# 服务端 Server

# 创建服务 grpc.server()

# grpc/__init__.py

def server(
	thread_pool: futures.ThreadPoolExecutor,
	handlers: List[GenericRpcHandler] = None,
	interceptors: List[ServerInterceptor] = None,
	options: list[tuple] = None,
	maximum_concurrent_rpcs: int = None,
	compression: grpc.Compression = None,
):

    """Creates a Server with which RPCs can be serviced.

    Args:
      thread_pool: A futures.ThreadPoolExecutor to be used by the Server
        to execute RPC handlers.
      handlers: An optional list of GenericRpcHandlers used for executing RPCs.
        More handlers may be added by calling add_generic_rpc_handlers any time
        before the server is started.
      interceptors: An optional list of ServerInterceptor objects that observe
        and optionally manipulate the incoming RPCs before handing them over to
        handlers. The interceptors are given control in the order they are
        specified. This is an EXPERIMENTAL API.
      options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC runtime)
        to configure the channel.
      maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server
        will service before returning RESOURCE_EXHAUSTED status, or None to
        indicate no limit.
      compression: An element of grpc.compression, e.g.
        grpc.compression.Gzip. This compression algorithm will be used for the
        lifetime of the server unless overridden. This is an EXPERIMENTAL option.

    Returns:
      A Server object.
    """  
    pass

# Server 接口 grpc.Server

grpc.Server 抽象类定义如下

class Server(six.with_metaclass(abc.ABCMeta)):
    def add_generic_rpc_handlers(self, generic_rpc_handlers):
        pass

    def add_insecure_port(self, address):
        pass

    def add_secure_port(self, address, server_credentials):
        pass

    def start(self):
        pass

    def stop(self, grace):
        pass

    def wait_for_termination(self, timeout=None):
        pass

# Cython 实现 cygrpc.Server

其具体实现在 src/python/grpcio/grpc/_server.py#_Server 中。而 _Server 类则调用了 src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi 中使用 Cython 实现的 cygrpc.Server 类.

cygrpc.Server 的初始化函数定义如下:

def __cinit__(self, object arguments):
  fork_handlers_and_grpc_init()
  self.references = []
  self.registered_completion_queues = []
  self.is_started = False
  self.is_shutting_down = False
  self.is_shutdown = False
  self.c_server = NULL
  cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
  self.c_server = grpc_server_create(channel_args.c_args(), NULL)
  self.references.append(arguments)

# CPP 实现 grpc_servergrpc_core::Server

cython.Server 调用 C 函数 grpc_server_create 创建了 grpc_server , grpc_server 是对 grpc_core::Server 的封装,后者是 gRPC 中 server 的最终实现.

grpc_server_create 函数在 include/grpc/grpc.h 中声明,在 src/core/lib/surface/server.cc 文件实现.

grpc_server 则定义在 src/core/lib/surface/server.h 文件中.

grpc_core::Serversrc/core/lib/surface/server.h 定义,大部分成员函数在 src/core/lib/surface/server.cc 中实现.

# 服务 Servicer

当我们编译 .proto 文件时, grpcio_tools.protoc 中的 gRPC 编译器扩展会基于 .proto 中的 service 定义自动生成 XXXServicer 抽象类 以及对应的辅助函数 add_XXXServicer_to_server(servicer: XXXServicer, server: grpc.Server) .

XXXServicer.proto 中的 service 定义一致,我们重点关注其中的接口函数.

以较为复杂的双向流接口为例:

.proto 文件中定义如下:

// 文件名 examples/protos/route_guide.proto
service examples/python/route_guide/route_guide_pb2_grpc.py {
  // 省略其它内容
  
  // A Bidirectional streaming RPC.
  //
  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

生成的 Servicer 定义如下:

# 文件名 examples/python/route_guide/route_guide_pb2_grpc.py
class RouteGuideServicer(object):

  def RouteChat(
      self, 
      request_iterator: grpc._server._RequestIterator, 
      context: grpc.ServicerContext,
  ):
    """A Bidirectional streaming RPC.

    Accepts a stream of RouteNotes sent while a route is being traversed,
    while receiving other RouteNotes (e.g. from other users).
    """
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    context.set_details('Method not implemented!')
    raise NotImplementedError('Method not implemented!')

辅助函数则是对 grpc.Server.add_generic_rpc_handlers 的封装,其定义如下:

# 文件名 examples/python/route_guide/route_guide_pb2_grpc.py
def add_RouteGuideServicer_to_server(servicer: RouteGuideServicer, server: grpc.Server):
  rpc_method_handlers = {
      # 省略其它内容
      'RouteChat': grpc.stream_stream_rpc_method_handler(
          servicer.RouteChat,
          request_deserializer=route__guide__pb2.RouteNote.FromString,
          response_serializer=route__guide__pb2.RouteNote.SerializeToString,
      ),
  }
  generic_handler = grpc.method_handlers_generic_handler('routeguide.RouteGuide', rpc_method_handlers)
  server.add_generic_rpc_handlers((generic_handler,))

上面的 generic_handler 是一个 grpc._utilities.DictionaryGenericHandler 对象,其属性 _method_handlers 是一个字典,key 为 /service/method (如上述 /routeguide.RouteGuide/RouteChat ), 其 value 是 grpc._utilities.RpcMethodHandler 对象.

# 接口 Handler

gRPC 提供了两种 Handler 接口:

  • grpc.RpcMethodHandler 方法 Handler
  • grpc.ServiceRpcHandler 服务 Handler

方法 Handler 和 服务 Handler 一般由 gRPC 编译器 grpc_tools.protoc 自动生成,无需我们自行创建.

# 方法 Handler

方法 Handler 的接口定义为 grpc.RpcMethodHandler , 具体实现则为 grpc._utilities.RpcMethodHandler .

其继承关系图如下

grpc._utilities.RpcMethodHandler 
	-> grpc.RpcMethodHandler 
		-> abc.ABCMeta (metaclass)                         
    -> collections.namedtuple

gRPC 提供了四种创建 grpc.RpcMethodHandler 的方法:

  • unary_unary_rpc_method_handler
  • unary_stream_rpc_method_handler
  • stream_unary_rpc_method_handler
  • stream_stream_rpc_method_handler

这些方法与 gRPC 提供的四种接口类型一一对应。这些方法拥有相似的函数签名,

def xxx_yyy_rpc_method_handler(
    behavior: Callable, 
    request_deserializer=None, 
    response_serializer=None,
):
    pass

其中 behavior 是生成的 Servicer 中对应 .proto 中的 RPC 方法, request_deserializer 是 RPC 方法的输入参数 request 的反序列化方法, response_serialzer RPC 方法的输出 response 的 序列化方法.

grpc._utilities.RpcMethodHandler 的定义如下:

class RpcMethodHandler(
        collections.namedtuple('_RpcMethodHandler', (
            'request_streaming',
            'response_streaming',
            'request_deserializer',
            'response_serializer',
            'unary_unary',
            'unary_stream',
            'stream_unary',
            'stream_stream',
        )), grpc.RpcMethodHandler):
    pass

其父类 grpc.RpcMethodHandler 没有定义自己的方法和属性,因此其所有该类有 8 个属性:

  • request_streaming , bool 变量,表明请求是否为流式
  • response_streaming , bool 变量,表明响应是否为流式
  • request_deserializer , 请求序列化方法 (在编译时自动生成)
  • response_serializer , 响应序列化方法 (在编译时自动生成)
  • unary_unary , 响应函数
  • unary_stream , 响应函数
  • stream_unary , 响应函数
  • stream_stream , 响应函数

不同类型的 handler 只需要赋值不同的属性即可,如 stream_stream_rpc_method_handler 类 handler, 其初始化方法为:

_utilities.RpcMethodHandler(
	request_streaming=True, 
	response_streaming=True, 
	request_deserializer=request_deserializer,
	response_serializer=response_serializer, 
	unary_unary=None, 
	unary_stream=None, 
	stream_unary=None,
	stream_stream=behavior,
)

# 服务 Handler

服务 Handler 是一种特殊的 Handler, 可以将其理解为同一服务的各个方法 Handler 组成的集合.

服务 Handler 的接口定义为 grpc.ServiceRpcHandler , 该接口是 grpc.GenericRpcHandler 的子接口。具体实现则为 grpc._utilities.DictionaryGenericHandler .

继承关系图如下:

grpc._utilities.DictionaryGenericHandler 
	-> grpc.ServiceRpcHandler 
		-> abc.ABCMeta (metaclass)
        -> grpc.GenericRpcHandler (metaclass)
        	-> abc.ABCMeta (metaclass)

可以使用 gRPC 提供的 grpc.method_handlers_generic_handler 创建服务 Handler.

grpc._utilities.DictionaryGenericHandler 的定义如下:

class DictionaryGenericHandler(grpc.ServiceRpcHandler):

    def __init__(
        self, 
        service: str, 
        method_handlers: Mapping[str, grpc.RpcMethodHandler],
    ):
        self._name = service
        self._method_handlers = {
            _common.fully_qualified_method(service, method): method_handler
            for method, method_handler in six.iteritems(method_handlers)
        }

    def service_name(self) -> str:
        return self._name

    def service(self, handler_call_details: grpc.HandlerCallDetails) -> grpc.RpcMethodHandler:
        return self._method_handlers.get(handler_call_details.method)

其中 service_name() 是从 grpc.ServiceRpcHandler 继承的方法, service(handler_call_details: grpc.HandlerCallDetails) 是从 grpc.GenericRpcHandler 类继承的方法.

# 服务端拦截器

服务端拦截器的接口定义为 grpc.ServerInterceptor , 主要提供了一个方法 intercept_service , 该方法签名如下:

def intercept_service(
    self, 
    # continuation 是一个函数, 该函数接收一个 grpc.HandlerCallDetails 参数
    continuation: Callable, 
    handler_call_details: grpc.HandlerCallDetails,
):
    pass

其中参数 handler_call_details 记录了一个方法的的基本信息,如:

  • method 方法名
  • invocation_metadata 客户端调用时传入的元数据

gRPC 没有提供内置的服务端拦截器实现,具体实现可以参考:

  • examples/python/interceptors/headers/request_header_validator_interceptor.py
  • examples/python/auth/customized_auth_server.py
  • src/python/grpcio_tests/tests/unit/_interceptor_test.py

一个服务端拦截器实现示例如下:

# 文件: src/python/grpcio_tests/tests/unit/_interceptor_test.py

class _GenericServerInterceptor(grpc.ServerInterceptor):

    def __init__(self, fn):
        self._fn = fn

    def intercept_service(self, continuation, handler_call_details):
        return self._fn(continuation, handler_call_details)


def _filter_server_interceptor(condition, interceptor):

    def intercept_service(continuation, handler_call_details):
        if condition(handler_call_details):
            return interceptor.intercept_service(continuation,
                                                 handler_call_details)
        return continuation(handler_call_details)

    return _GenericServerInterceptor(intercept_service)

# 服务端上下文

服务端上下文接口定义为 grpc.ServicerContext , 该类继承了共享上下文 grpc.RpcContext .

grpc.ServicerContextgrpc.RpcContext 的基础上新增了 13 个服务接口.

class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
    def invocation_metadata(self):
        pass

    def peer(self):
        pass

    def peer_identities(self):
        pass

    def peer_identity_key(self):
        pass

    def auth_context(self):
        pass

    def set_compression(self, compression):
        pass

    def send_initial_metadata(self, initial_metadata):
        pass

    def set_trailing_metadata(self, trailing_metadata):
        pass

    def abort(self, code, details):
        pass

    def abort_with_status(self, status):
        pass

    def set_code(self, code):
        pass

    def set_details(self, details):
        pass

    def disable_next_mssage_compression(self):
        pass

根据 gRPC 的设计理念,gRPC 服务中的异常都建议使用 grpc.ServicerContext 回传给调用方,而不是使用自定义响应包中的 error_codeerror_message .

grpc.ServicerContext 中有一个重要方法 set_code() , 该方法可以设置一个 grpc.StatusCode , 并最终由客户端上下文 grpc.Call 中的 code() 方法返回给调用者.

状态码 grpc.StatusCode 是一个枚举类型,定义了 gRPC 工作过程中常用的状态码,如 OK , CANCELLED , RESOURCE_EXHAUSTED 等。具体可以参考 gRPC Status Code 官方文档.

服务端 Context 的具体实现为 grpc._server._Context , 该类定义在 src/python/grpcio/grpc/_server.py 文件中,在此不详细展开.

# 客户端概念

客户端的主要概念有:

  • Stub
  • Channel
  • 接口 Callable
  • 客户端拦截器 Interceptor
  • 客户端上下文 Context
  • 异步调用结果 Future

# 客户端 Stub

客户端 Stub 是编译 .proto 文件时自动生成的. Stub 初始化时需要传入 Channel, 对 Stub 的所有请求最终都会由 Channel 来完成.

还是以前面的 RouteGuide 为例,生成的 RouteGuideStub 如下:

# 文件名 examples/python/route_guide/route_guide_pb2_grpc.py

class RouteGuideStub(object):
  def __init__(self, channel):
	# 省略其它内容
    self.RouteChat = channel.stream_stream(
        '/routeguide.RouteGuide/RouteChat',
        request_serializer=route__guide__pb2.RouteNote.SerializeToString,
        response_deserializer=route__guide__pb2.RouteNote.FromString,
        )

调用 RouteGuideStub.RouteChat() 方法实际上是在调用 channel.stream_stream() 返回的 grpc.StreamStreamMultiCallable 对象.

# Channel

# 创建 Channel

def insecure_channel(
    target: str, 
    options: List[tuple] = None, 
    compression: grpc.Compression = None,
):
    """Creates an insecure Channel to a server.

    The returned Channel is thread-safe.

    Args:
      target: The server address
      options: An optional list of key-value pairs (:term:`channel_arguments`
        in gRPC Core runtime) to configure the channel.
      compression: An optional value indicating the compression method to be
        used over the lifetime of the channel. This is an EXPERIMENTAL option.

    Returns:
      A Channel.
    """
    pass

# 接口定义 grpc.Channel

Channel 的接口 grpc.Channel 定义在 src/python/grpcio/grpc/__init__.py 中。主体定义如下:

class Channel(six.with_metaclass(abc.ABCMeta)):
    def subscribe(self, callback, try_to_connect=False):
        pass

    def unsubscribe(self, callback):
        pass

    def unary_unary(self, method, request_serializer=None, response_deserializer=None):
        pass

    def unary_stream(self, method, request_serializer=None, response_deserializer=None):
        pass

    def stream_unary(self, method, request_serializer=None, response_deserializer=None):
        pass

    def stream_stream(self, method, request_serializer=None, response_deserializer=None):
        pass

    def close(self):
        pass

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass

gRPC 中,Channel 本质上是 HTTP2 长连接的抽象,gRPC 请求应该尽可能地复用 Channel.

参考资料:

  1. Stack overflow: gRPC call, channel, connection and HTTP/2 lifecycle
  2. Performance best practices with gRPC

# 接口实现 grpc._channel.Channel

模块 src/python/grpcio/grpc/_channel.py 给出了 grpc.Channel 接口的实现。其初始化函数:

class Channel(grpc.Channel):
    def __init__(
        self, 
        target: str, 
        options: List[tuple], 
        credentials: grpc.ChannelCredentials, 
        compression: grpc.Compression,
    ):
        python_options, core_options = _separate_channel_options(options)
        self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM
        self._process_python_options(python_options)
        self._channel = cygrpc.Channel(
            _common.encode(target), _augment_options(core_options, compression),
            credentials)
        self._call_state = _ChannelCallState(self._channel)
        self._connectivity_state = _ChannelConnectivityState(self._channel)
        cygrpc.fork_register_channel(self)

由上述代码可知, grpc._channel.Channel 使用了 Cython 实现 cygrpc.Channel .

cygrpc.Channel 的初始化函数如下:

cdef class Channel:

  def __cinit__(
      self, 
      bytes target, 
      object arguments,
      ChannelCredentials channel_credentials,
  ):
    arguments = () if arguments is None else tuple(arguments)
    fork_handlers_and_grpc_init()
    self._state = _ChannelState()
    self._state.c_call_completion_queue = (
        grpc_completion_queue_create_for_next(NULL))
    self._state.c_connectivity_completion_queue = (
        grpc_completion_queue_create_for_next(NULL))
    self._arguments = arguments
    cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
    if channel_credentials is None:
      self._state.c_channel = grpc_insecure_channel_create(
          <char *>target, channel_args.c_args(), NULL)
    else:
      c_channel_credentials = channel_credentials.c()
      self._state.c_channel = grpc_secure_channel_create(
          c_channel_credentials, <char *>target, channel_args.c_args(), NULL)
      grpc_channel_credentials_release(c_channel_credentials)

cygrpc.Channel 会调用 C 函数 grpc_insecure_channel_create()grpc_secure_channel_create() , 这两者都会返回 grpc_channel 对象.

grpc_channel 定义在 src/core/lib/surface/channel.h 文件中,同时该文件还定义了一组操作 grpc_channel 的方法.

# Channel 的状态

grpc.Channel 有自己的状态,状态使用 grpc.ChannelConnectivity 变量存储.

grpc.ChannelConnectivity 是一个 enum.Enum 枚举类型,一共有五个不同的枚举变量:

@enum.unique
class ChannelConnectivity(enum.Enum):
    IDLE = (_cygrpc.ConnectivityState.idle, 'idle')
    CONNECTING = (_cygrpc.ConnectivityState.connecting, 'connecting')
    READY = (_cygrpc.ConnectivityState.ready, 'ready')
    TRANSIENT_FAILURE = (_cygrpc.ConnectivityState.transient_failure,'transient failure')
    SHUTDOWN = (_cygrpc.ConnectivityState.shutdown, 'shutdown')

从上面的源码可以看到,每个枚举变量都是一个 元组,该元素表示了 grpc.Channel 的状态 与 cygrpc.Channel 的状态的对应关系。后者的状态使用了枚举类 cygrpc.ConnectivityState (位于文件 src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi 中) 来存储.

# Callable

前面讲到,调用 Stub 的方法实际上是对相应的 Callable 类的调用.

gRPC 的四种接口分别对应了四种 Callable

  • grpc.UnaryUnaryMultiCallable
  • grpc.UnaryStreamMultiCallable
  • grpc.StreamUnaryMultiCallable
  • grpc.StreamStreamMultiCallable

上述 Callable 类在 src/python/grpcio/grpc/__init__.py 中给出了抽象接口,在 src/python/grpcio/grpc/_channel.py 给出具体实现.

上述四种 Multi-Callable 中, grpc.xxxUnaryyyy 两个类表示返回的非流数据,这两个类除了定义 __call__() 方法外,还定义了

  • future() 实现异步调用
  • with_call() 实现同步调用

两个方法。具体可以参考对应的接口定义和实现.

我们重点关注较为复杂的 grpc.StreamStreamMultiCallable 及其实现 grpc._channel.StreamStreamMultiCallable .

# 接口: grpc.StreamStreamMultiCallable

其调用参数为

class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
    def __call__(
        self,
        request_iterator: Iterator,
        timeout: float = None,
        metadata: List[tuple] = None,
        credentials: grpc.CallCredentials = None,
        wait_for_ready: bool = None,
        compression: grpc.Compression = None,
    ):
        pass

# 实现: grpc._channel._StreamStreamMultiCallable

class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):

    # pylint: disable=too-many-arguments
    def __init__(
        self, 
        channel: grpc.Channel, 
        managed_call: cygrpc.IntegratedCall, 
        method: bytes, 
        request_serializer,
        response_deserializer,
    ):
        self._channel = channel
        self._managed_call = managed_call
        self._method = method
        self._request_serializer = request_serializer
        self._response_deserializer = response_deserializer
        self._context = cygrpc.build_census_context()

    def __call__(
        self,
        request_iterator: Iterator,
        timeout: float = None,
        metadata: List[tuple] = None,
        credentials: grpc.CallCredentials = None,
        wait_for_ready: bool = None,
        compression: grpc.Compression = None,
    ):
        deadline = _deadline(timeout)
        state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(wait_for_ready)
        augmented_metadata = _compression.augment_metadata(metadata, compression)
        operationses = (
            (
                cygrpc.SendInitialMetadataOperation(augmented_metadata,initial_metadata_flags),
                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
            ),
            (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
        )
        event_handler = _event_handler(state, self._response_deserializer)
        call = self._managed_call(
            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 
            self._method,
            None, 
            _determine_deadline(deadline), 
            augmented_metadata,
            None if credentials is None else credentials._credentials,
            operationses, 
            event_handler, 
            self._context,
        )
        _consume_request_iterator(
            request_iterator, 
            state, 
            call,
            self._request_serializer, 
            event_handler,
        )
        return _MultiThreadedRendezvous(state, call, self._response_deserializer, deadline)

census, 其中文释义为 人口普查; (官方的)调查 等,来自 self._context = cygrpc.build_census_context()

rendezvous, 其中文释义为 约会, 聚会, 集合; 约会地点, 聚会场所 等含义,来自 _MultiThreadedRendezvous(state, call, self._response_deserializer, deadline)

从上述代码可知,输入流是通过 grpc._channel._consume_request_iterator() 方法实现的,该方法内部为一个 while True 循环.

grpc._channel.StreamStreamMultiCallable 最终会由 grpc._channel._MultiThreadedRendezvous 类来实现.

继承关系图

grpc._channel._MultiThreadedRendezvous
	-> grpc._channel._Rendezvous
		-> grpc.RpcError -> Exception
		-> grpc.RpcContext -> abc.ABCMeta (metaclass)
	-> grpc.Call
		-> abc.ABCMeta (metaclass)
		-> grpc.RpcContext (metaclass) -> abc.ABCMeta (metaclass)
	-> grpc.Future
		-> abc.ABCMeta (metaclass)

其父类 grpc._channel._Rendezvous 实现了 __iter__()__next__() 方法,表明 grpc._channel._MultiThreadedRendezvous 既是迭代器,也是可迭代对象.

# 客户端拦截器

gRPC 提供了四种客户端拦截器:

  • UnaryUnaryClientInterceptor
  • UnaryStreamClientInterceptor
  • StreamUnaryClientInterceptor
  • StreamStreamClientInterceptor

每个拦截器都定义了一个拦截器方法,分别如下:

class UnaryUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)):
    def intercept_unary_unary(self, continuation, client_call_details, request):
        pass
    
class UnaryStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)):
    def intercept_unary_stream(self, continuation, client_call_details, request):
        pass
    
class StreamUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)):
    def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
        pass
    
class StreamStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)):
    def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
        pass

这些接口拥有相似的函数签名:

  • continuation 是一个函数,该函数接收 grpc.ClientCallDetails 类型的参数,也就是接口的下一个参数 client_call_details
  • client_call_details 是一个 grpc.ClientCallDetails 类型参数
  • requestrequest_iterator 是对应函数的请求参数

参数 client_call_details 记录了一个客户端请求的详细信息,如:

  • method 方法名
  • timeout 超时时间
  • metadata 请求元数据
  • credentials 请求证书
  • wait_for_ready 等待就绪标识,具体参考 术语 wait_for_ready
  • compression 客户端请求压缩方法
  • 等等,具体参考 grpc.ClientCallDetails 类实现

同样,gRPC 也没有提供内置的客户端拦截器实现。客户端拦截器实现可以参考:

  • examples/python/interceptors/headers/generic_client_interceptor.py
  • src/python/grpcio_tests/tests/unit/_interceptor_test.py

src/python/grpcio_tests/tests/unit/_interceptor_test.py 文件中的 _LoggingInterceptor 给出了一个较好的拦截器实现示例.

# 客户端上下文

客户端上下文接口定义为 grpc.Call , 该接口继承了 grpc.RpcContext .

grpc.Callgrpc.RpcContext 的基础上新增了 4 个接口.

class Call(six.with_metaclass(abc.ABCMeta, RpcContext)):
    def initial_metadata(self):
        pass

    def trailing_metadata(self):
        pass

    def code(self):
        pass

    def details(self):
        pass

grpc.Call 及其子类 实际上就是一次 gRPC 请求的返回接口. grpc.Channel 的各个 Callable (如前面介绍的 grpc._channel._StreamStreamMultiCallable 等) 调用时的返回值都是 grpc.Call 的实现,如:

  • grpc._channel._SingleThreadedRendezvous

  • grpc._channel._MultiThreadedRendezvous

  • grpc._channel._InactiveRpcError

  • grpc._interceptor._FailureOutcome

  • grpc._interceptor._UnaryOutcome

  • 等等

从这些实现来看,它们都同时继承了 grpc.Callgrpc.Future .

客户端上下文中有一个重要的方法 code() , 客户端可以通过该方法获取服务端设置的状态码。该状态码一般由服务端上下文 grpc.ServicerContextset_code() 方法设置.

# 异步调用结果 Future

前面讲到, grpc.Call 代表 gRPC 请求的返回结果。实现了 grpc.Call 接口的方法都实现了 grpc.Future 用于实现异步调用 gRPC.

grpc.Future 提供了如下方法:

class Future(six.with_metaclass(abc.ABCMeta)):
    def cancel(self):
        pass

    def cancelled(self):
        pass

    def running(self):
        pass

    def done(self):
        pass

    def result(self, timeout=None):
        pass

    def exception(self, timeout=None):
        pass

    def traceback(self, timeout=None):
        pass

    def add_done_callback(self, fn):
        pass

除了前面提到的 grpc.Callgrpc.Future 的共同子类外, grpc.Future 还有一个额外的子类 grpc._utilities._ChannelReadyFuture , 这个类由 grpc.channel_ready_future() 方法暴露给调用方使用.