Spaces:
Sleeping
Sleeping
| # Copyright 2017 gRPC authors. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """Implementation of gRPC Python interceptors.""" | |
| import collections | |
| import sys | |
| import types | |
| from typing import Any, Callable, Optional, Sequence, Tuple, Union | |
| import grpc | |
| from ._typing import DeserializingFunction | |
| from ._typing import DoneCallbackType | |
| from ._typing import MetadataType | |
| from ._typing import RequestIterableType | |
| from ._typing import SerializingFunction | |
| class _ServicePipeline(object): | |
| interceptors: Tuple[grpc.ServerInterceptor] | |
| def __init__(self, interceptors: Sequence[grpc.ServerInterceptor]): | |
| self.interceptors = tuple(interceptors) | |
| def _continuation(self, thunk: Callable, index: int) -> Callable: | |
| return lambda context: self._intercept_at(thunk, index, context) | |
| def _intercept_at( | |
| self, thunk: Callable, index: int, context: grpc.HandlerCallDetails | |
| ) -> grpc.RpcMethodHandler: | |
| if index < len(self.interceptors): | |
| interceptor = self.interceptors[index] | |
| thunk = self._continuation(thunk, index + 1) | |
| return interceptor.intercept_service(thunk, context) | |
| else: | |
| return thunk(context) | |
| def execute( | |
| self, thunk: Callable, context: grpc.HandlerCallDetails | |
| ) -> grpc.RpcMethodHandler: | |
| return self._intercept_at(thunk, 0, context) | |
| def service_pipeline( | |
| interceptors: Optional[Sequence[grpc.ServerInterceptor]], | |
| ) -> Optional[_ServicePipeline]: | |
| return _ServicePipeline(interceptors) if interceptors else None | |
| class _ClientCallDetails( | |
| collections.namedtuple( | |
| "_ClientCallDetails", | |
| ( | |
| "method", | |
| "timeout", | |
| "metadata", | |
| "credentials", | |
| "wait_for_ready", | |
| "compression", | |
| ), | |
| ), | |
| grpc.ClientCallDetails, | |
| ): | |
| pass | |
| def _unwrap_client_call_details( | |
| call_details: grpc.ClientCallDetails, | |
| default_details: grpc.ClientCallDetails, | |
| ) -> Tuple[ | |
| str, float, MetadataType, grpc.CallCredentials, bool, grpc.Compression | |
| ]: | |
| try: | |
| method = call_details.method # pytype: disable=attribute-error | |
| except AttributeError: | |
| method = default_details.method # pytype: disable=attribute-error | |
| try: | |
| timeout = call_details.timeout # pytype: disable=attribute-error | |
| except AttributeError: | |
| timeout = default_details.timeout # pytype: disable=attribute-error | |
| try: | |
| metadata = call_details.metadata # pytype: disable=attribute-error | |
| except AttributeError: | |
| metadata = default_details.metadata # pytype: disable=attribute-error | |
| try: | |
| credentials = ( | |
| call_details.credentials | |
| ) # pytype: disable=attribute-error | |
| except AttributeError: | |
| credentials = ( | |
| default_details.credentials | |
| ) # pytype: disable=attribute-error | |
| try: | |
| wait_for_ready = ( | |
| call_details.wait_for_ready | |
| ) # pytype: disable=attribute-error | |
| except AttributeError: | |
| wait_for_ready = ( | |
| default_details.wait_for_ready | |
| ) # pytype: disable=attribute-error | |
| try: | |
| compression = ( | |
| call_details.compression | |
| ) # pytype: disable=attribute-error | |
| except AttributeError: | |
| compression = ( | |
| default_details.compression | |
| ) # pytype: disable=attribute-error | |
| return method, timeout, metadata, credentials, wait_for_ready, compression | |
| class _FailureOutcome( | |
| grpc.RpcError, grpc.Future, grpc.Call | |
| ): # pylint: disable=too-many-ancestors | |
| _exception: Exception | |
| _traceback: types.TracebackType | |
| def __init__(self, exception: Exception, traceback: types.TracebackType): | |
| super(_FailureOutcome, self).__init__() | |
| self._exception = exception | |
| self._traceback = traceback | |
| def initial_metadata(self) -> Optional[MetadataType]: | |
| return None | |
| def trailing_metadata(self) -> Optional[MetadataType]: | |
| return None | |
| def code(self) -> Optional[grpc.StatusCode]: | |
| return grpc.StatusCode.INTERNAL | |
| def details(self) -> Optional[str]: | |
| return "Exception raised while intercepting the RPC" | |
| def cancel(self) -> bool: | |
| return False | |
| def cancelled(self) -> bool: | |
| return False | |
| def is_active(self) -> bool: | |
| return False | |
| def time_remaining(self) -> Optional[float]: | |
| return None | |
| def running(self) -> bool: | |
| return False | |
| def done(self) -> bool: | |
| return True | |
| def result(self, ignored_timeout: Optional[float] = None): | |
| raise self._exception | |
| def exception( | |
| self, ignored_timeout: Optional[float] = None | |
| ) -> Optional[Exception]: | |
| return self._exception | |
| def traceback( | |
| self, ignored_timeout: Optional[float] = None | |
| ) -> Optional[types.TracebackType]: | |
| return self._traceback | |
| def add_callback(self, unused_callback) -> bool: | |
| return False | |
| def add_done_callback(self, fn: DoneCallbackType) -> None: | |
| fn(self) | |
| def __iter__(self): | |
| return self | |
| def __next__(self): | |
| raise self._exception | |
| def next(self): | |
| return self.__next__() | |
| class _UnaryOutcome(grpc.Call, grpc.Future): | |
| _response: Any | |
| _call: grpc.Call | |
| def __init__(self, response: Any, call: grpc.Call): | |
| self._response = response | |
| self._call = call | |
| def initial_metadata(self) -> Optional[MetadataType]: | |
| return self._call.initial_metadata() | |
| def trailing_metadata(self) -> Optional[MetadataType]: | |
| return self._call.trailing_metadata() | |
| def code(self) -> Optional[grpc.StatusCode]: | |
| return self._call.code() | |
| def details(self) -> Optional[str]: | |
| return self._call.details() | |
| def is_active(self) -> bool: | |
| return self._call.is_active() | |
| def time_remaining(self) -> Optional[float]: | |
| return self._call.time_remaining() | |
| def cancel(self) -> bool: | |
| return self._call.cancel() | |
| def add_callback(self, callback) -> bool: | |
| return self._call.add_callback(callback) | |
| def cancelled(self) -> bool: | |
| return False | |
| def running(self) -> bool: | |
| return False | |
| def done(self) -> bool: | |
| return True | |
| def result(self, ignored_timeout: Optional[float] = None): | |
| return self._response | |
| def exception(self, ignored_timeout: Optional[float] = None): | |
| return None | |
| def traceback(self, ignored_timeout: Optional[float] = None): | |
| return None | |
| def add_done_callback(self, fn: DoneCallbackType) -> None: | |
| fn(self) | |
| class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): | |
| _thunk: Callable | |
| _method: str | |
| _interceptor: grpc.UnaryUnaryClientInterceptor | |
| def __init__( | |
| self, | |
| thunk: Callable, | |
| method: str, | |
| interceptor: grpc.UnaryUnaryClientInterceptor, | |
| ): | |
| self._thunk = thunk | |
| self._method = method | |
| self._interceptor = interceptor | |
| def __call__( | |
| self, | |
| request: Any, | |
| timeout: Optional[float] = None, | |
| metadata: Optional[MetadataType] = None, | |
| credentials: Optional[grpc.CallCredentials] = None, | |
| wait_for_ready: Optional[bool] = None, | |
| compression: Optional[grpc.Compression] = None, | |
| ) -> Any: | |
| response, ignored_call = self._with_call( | |
| request, | |
| timeout=timeout, | |
| metadata=metadata, | |
| credentials=credentials, | |
| wait_for_ready=wait_for_ready, | |
| compression=compression, | |
| ) | |
| return response | |
| def _with_call( | |
| self, | |
| request: Any, | |
| timeout: Optional[float] = None, | |
| metadata: Optional[MetadataType] = None, | |
| credentials: Optional[grpc.CallCredentials] = None, | |
| wait_for_ready: Optional[bool] = None, | |
| compression: Optional[grpc.Compression] = None, | |
| ) -> Tuple[Any, grpc.Call]: | |
| client_call_details = _ClientCallDetails( | |
| self._method, | |
| timeout, | |
| metadata, | |
| credentials, | |
| wait_for_ready, | |
| compression, | |
| ) | |
| def continuation(new_details, request): | |
| ( | |
| new_method, | |
| new_timeout, | |
| new_metadata, | |
| new_credentials, | |
| new_wait_for_ready, | |
| new_compression, | |
| ) = _unwrap_client_call_details(new_details, client_call_details) | |
| try: | |
| response, call = self._thunk(new_method).with_call( | |
| request, | |
| timeout=new_timeout, | |
| metadata=new_metadata, | |
| credentials=new_credentials, | |
| wait_for_ready=new_wait_for_ready, | |
| compression=new_compression, | |
| ) | |
| return _UnaryOutcome(response, call) | |
| except grpc.RpcError as rpc_error: | |
| return rpc_error | |
| except Exception as exception: # pylint:disable=broad-except | |
| return _FailureOutcome(exception, sys.exc_info()[2]) | |
| call = self._interceptor.intercept_unary_unary( | |
| continuation, client_call_details, request | |
| ) | |
| return call.result(), call | |
| def with_call( | |
| self, | |
| request: Any, | |
| timeout: Optional[float] = None, | |
| metadata: Optional[MetadataType] = None, | |
| credentials: Optional[grpc.CallCredentials] = None, | |
| wait_for_ready: Optional[bool] = None, | |
| compression: Optional[grpc.Compression] = None, | |
| ) -> Tuple[Any, grpc.Call]: | |
| return self._with_call( | |
| request, | |
| timeout=timeout, | |
| metadata=metadata, | |
| credentials=credentials, | |
| wait_for_ready=wait_for_ready, | |
| compression=compression, | |
| ) | |
| def future( | |
| self, | |
| request: Any, | |
| timeout: Optional[float] = None, | |
| metadata: Optional[MetadataType] = None, | |
| credentials: Optional[grpc.CallCredentials] = None, | |
| wait_for_ready: Optional[bool] = None, | |
| compression: Optional[grpc.Compression] = None, | |
| ) -> Any: | |
| client_call_details = _ClientCallDetails( | |
| self._method, | |
| timeout, | |
| metadata, | |
| credentials, | |
| wait_for_ready, | |
| compression, | |
| ) | |
| def continuation(new_details, request): | |
| ( | |
| new_method, | |
| new_timeout, | |
| new_metadata, | |
| new_credentials, | |
| new_wait_for_ready, | |
| new_compression, | |
| ) = _unwrap_client_call_details(new_details, client_call_details) | |
| return self._thunk(new_method).future( | |
| request, | |
| timeout=new_timeout, | |
| metadata=new_metadata, | |
| credentials=new_credentials, | |
| wait_for_ready=new_wait_for_ready, | |
| compression=new_compression, | |
| ) | |
| try: | |
| return self._interceptor.intercept_unary_unary( | |
| continuation, client_call_details, request | |
| ) | |
| except Exception as exception: # pylint:disable=broad-except | |
| return _FailureOutcome(exception, sys.exc_info()[2]) | |
| class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): | |
| _thunk: Callable | |
| _method: str | |
| _interceptor: grpc.UnaryStreamClientInterceptor | |
| def __init__( | |
| self, | |
| thunk: Callable, | |
| method: str, | |
| interceptor: grpc.UnaryStreamClientInterceptor, | |
| ): | |
| self._thunk = thunk | |
| self._method = method | |
| self._interceptor = interceptor | |
| def __call__( | |
| self, | |
| request: Any, | |
| timeout: Optional[float] = None, | |
| metadata: Optional[MetadataType] = None, | |
| credentials: Optional[grpc.CallCredentials] = None, | |
| wait_for_ready: Optional[bool] = None, | |
| compression: Optional[grpc.Compression] = None, | |
| ): | |
| client_call_details = _ClientCallDetails( | |
| self._method, | |
| timeout, | |
| metadata, | |
| credentials, | |
| wait_for_ready, | |
| compression, | |
| ) | |
| def continuation(new_details, request): | |
| ( | |
| new_method, | |
| new_timeout, | |
| new_metadata, | |
| new_credentials, | |
| new_wait_for_ready, | |
| new_compression, | |
| ) = _unwrap_client_call_details(new_details, client_call_details) | |
| return self._thunk(new_method)( | |
| request, | |
| timeout=new_timeout, | |
| metadata=new_metadata, | |
| credentials=new_credentials, | |
| wait_for_ready=new_wait_for_ready, | |
| compression=new_compression, | |
| ) | |
| try: | |
| return self._interceptor.intercept_unary_stream( | |
| continuation, client_call_details, request | |
| ) | |
| except Exception as exception: # pylint:disable=broad-except | |
| return _FailureOutcome(exception, sys.exc_info()[2]) | |
| class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): | |
| _thunk: Callable | |
| _method: str | |
| _interceptor: grpc.StreamUnaryClientInterceptor | |
| def __init__( | |
| self, | |
| thunk: Callable, | |
| method: str, | |
| interceptor: grpc.StreamUnaryClientInterceptor, | |
| ): | |
| self._thunk = thunk | |
| self._method = method | |
| self._interceptor = interceptor | |
| def __call__( | |
| self, | |
| request_iterator: RequestIterableType, | |
| timeout: Optional[float] = None, | |
| metadata: Optional[MetadataType] = None, | |
| credentials: Optional[grpc.CallCredentials] = None, | |
| wait_for_ready: Optional[bool] = None, | |
| compression: Optional[grpc.Compression] = None, | |
| ) -> Any: | |
| response, ignored_call = self._with_call( | |
| request_iterator, | |
| timeout=timeout, | |
| metadata=metadata, | |
| credentials=credentials, | |
| wait_for_ready=wait_for_ready, | |
| compression=compression, | |
| ) | |
| return response | |
| def _with_call( | |
| self, | |
| request_iterator: RequestIterableType, | |
| timeout: Optional[float] = None, | |
| metadata: Optional[MetadataType] = None, | |
| credentials: Optional[grpc.CallCredentials] = None, | |
| wait_for_ready: Optional[bool] = None, | |
| compression: Optional[grpc.Compression] = None, | |
| ) -> Tuple[Any, grpc.Call]: | |
| client_call_details = _ClientCallDetails( | |
| self._method, | |
| timeout, | |
| metadata, | |
| credentials, | |
| wait_for_ready, | |
| compression, | |
| ) | |
| def continuation(new_details, request_iterator): | |
| ( | |
| new_method, | |
| new_timeout, | |
| new_metadata, | |
| new_credentials, | |
| new_wait_for_ready, | |
| new_compression, | |
| ) = _unwrap_client_call_details(new_details, client_call_details) | |
| try: | |
| response, call = self._thunk(new_method).with_call( | |
| request_iterator, | |
| timeout=new_timeout, | |
| metadata=new_metadata, | |
| credentials=new_credentials, | |
| wait_for_ready=new_wait_for_ready, | |
| compression=new_compression, | |
| ) | |
| return _UnaryOutcome(response, call) | |
| except grpc.RpcError as rpc_error: | |
| return rpc_error | |
| except Exception as exception: # pylint:disable=broad-except | |
| return _FailureOutcome(exception, sys.exc_info()[2]) | |
| call = self._interceptor.intercept_stream_unary( | |
| continuation, client_call_details, request_iterator | |
| ) | |
| return call.result(), call | |
| def with_call( | |
| self, | |
| request_iterator: RequestIterableType, | |
| timeout: Optional[float] = None, | |
| metadata: Optional[MetadataType] = None, | |
| credentials: Optional[grpc.CallCredentials] = None, | |
| wait_for_ready: Optional[bool] = None, | |
| compression: Optional[grpc.Compression] = None, | |
| ) -> Tuple[Any, grpc.Call]: | |
| return self._with_call( | |
| request_iterator, | |
| timeout=timeout, | |
| metadata=metadata, | |
| credentials=credentials, | |
| wait_for_ready=wait_for_ready, | |
| compression=compression, | |
| ) | |
| def future( | |
| self, | |
| request_iterator: RequestIterableType, | |
| timeout: Optional[float] = None, | |
| metadata: Optional[MetadataType] = None, | |
| credentials: Optional[grpc.CallCredentials] = None, | |
| wait_for_ready: Optional[bool] = None, | |
| compression: Optional[grpc.Compression] = None, | |
| ) -> Any: | |
| client_call_details = _ClientCallDetails( | |
| self._method, | |
| timeout, | |
| metadata, | |
| credentials, | |
| wait_for_ready, | |
| compression, | |
| ) | |
| def continuation(new_details, request_iterator): | |
| ( | |
| new_method, | |
| new_timeout, | |
| new_metadata, | |
| new_credentials, | |
| new_wait_for_ready, | |
| new_compression, | |
| ) = _unwrap_client_call_details(new_details, client_call_details) | |
| return self._thunk(new_method).future( | |
| request_iterator, | |
| timeout=new_timeout, | |
| metadata=new_metadata, | |
| credentials=new_credentials, | |
| wait_for_ready=new_wait_for_ready, | |
| compression=new_compression, | |
| ) | |
| try: | |
| return self._interceptor.intercept_stream_unary( | |
| continuation, client_call_details, request_iterator | |
| ) | |
| except Exception as exception: # pylint:disable=broad-except | |
| return _FailureOutcome(exception, sys.exc_info()[2]) | |
| class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): | |
| _thunk: Callable | |
| _method: str | |
| _interceptor: grpc.StreamStreamClientInterceptor | |
| def __init__( | |
| self, | |
| thunk: Callable, | |
| method: str, | |
| interceptor: grpc.StreamStreamClientInterceptor, | |
| ): | |
| self._thunk = thunk | |
| self._method = method | |
| self._interceptor = interceptor | |
| def __call__( | |
| self, | |
| request_iterator: RequestIterableType, | |
| timeout: Optional[float] = None, | |
| metadata: Optional[MetadataType] = None, | |
| credentials: Optional[grpc.CallCredentials] = None, | |
| wait_for_ready: Optional[bool] = None, | |
| compression: Optional[grpc.Compression] = None, | |
| ): | |
| client_call_details = _ClientCallDetails( | |
| self._method, | |
| timeout, | |
| metadata, | |
| credentials, | |
| wait_for_ready, | |
| compression, | |
| ) | |
| def continuation(new_details, request_iterator): | |
| ( | |
| new_method, | |
| new_timeout, | |
| new_metadata, | |
| new_credentials, | |
| new_wait_for_ready, | |
| new_compression, | |
| ) = _unwrap_client_call_details(new_details, client_call_details) | |
| return self._thunk(new_method)( | |
| request_iterator, | |
| timeout=new_timeout, | |
| metadata=new_metadata, | |
| credentials=new_credentials, | |
| wait_for_ready=new_wait_for_ready, | |
| compression=new_compression, | |
| ) | |
| try: | |
| return self._interceptor.intercept_stream_stream( | |
| continuation, client_call_details, request_iterator | |
| ) | |
| except Exception as exception: # pylint:disable=broad-except | |
| return _FailureOutcome(exception, sys.exc_info()[2]) | |
| class _Channel(grpc.Channel): | |
| _channel: grpc.Channel | |
| _interceptor: Union[ | |
| grpc.UnaryUnaryClientInterceptor, | |
| grpc.UnaryStreamClientInterceptor, | |
| grpc.StreamStreamClientInterceptor, | |
| grpc.StreamUnaryClientInterceptor, | |
| ] | |
| def __init__( | |
| self, | |
| channel: grpc.Channel, | |
| interceptor: Union[ | |
| grpc.UnaryUnaryClientInterceptor, | |
| grpc.UnaryStreamClientInterceptor, | |
| grpc.StreamStreamClientInterceptor, | |
| grpc.StreamUnaryClientInterceptor, | |
| ], | |
| ): | |
| self._channel = channel | |
| self._interceptor = interceptor | |
| def subscribe( | |
| self, callback: Callable, try_to_connect: Optional[bool] = False | |
| ): | |
| self._channel.subscribe(callback, try_to_connect=try_to_connect) | |
| def unsubscribe(self, callback: Callable): | |
| self._channel.unsubscribe(callback) | |
| # pylint: disable=arguments-differ | |
| def unary_unary( | |
| self, | |
| method: str, | |
| request_serializer: Optional[SerializingFunction] = None, | |
| response_deserializer: Optional[DeserializingFunction] = None, | |
| _registered_method: Optional[bool] = False, | |
| ) -> grpc.UnaryUnaryMultiCallable: | |
| # pytype: disable=wrong-arg-count | |
| thunk = lambda m: self._channel.unary_unary( | |
| m, | |
| request_serializer, | |
| response_deserializer, | |
| _registered_method, | |
| ) | |
| # pytype: enable=wrong-arg-count | |
| if isinstance(self._interceptor, grpc.UnaryUnaryClientInterceptor): | |
| return _UnaryUnaryMultiCallable(thunk, method, self._interceptor) | |
| else: | |
| return thunk(method) | |
| # pylint: disable=arguments-differ | |
| def unary_stream( | |
| self, | |
| method: str, | |
| request_serializer: Optional[SerializingFunction] = None, | |
| response_deserializer: Optional[DeserializingFunction] = None, | |
| _registered_method: Optional[bool] = False, | |
| ) -> grpc.UnaryStreamMultiCallable: | |
| # pytype: disable=wrong-arg-count | |
| thunk = lambda m: self._channel.unary_stream( | |
| m, | |
| request_serializer, | |
| response_deserializer, | |
| _registered_method, | |
| ) | |
| # pytype: enable=wrong-arg-count | |
| if isinstance(self._interceptor, grpc.UnaryStreamClientInterceptor): | |
| return _UnaryStreamMultiCallable(thunk, method, self._interceptor) | |
| else: | |
| return thunk(method) | |
| # pylint: disable=arguments-differ | |
| def stream_unary( | |
| self, | |
| method: str, | |
| request_serializer: Optional[SerializingFunction] = None, | |
| response_deserializer: Optional[DeserializingFunction] = None, | |
| _registered_method: Optional[bool] = False, | |
| ) -> grpc.StreamUnaryMultiCallable: | |
| # pytype: disable=wrong-arg-count | |
| thunk = lambda m: self._channel.stream_unary( | |
| m, | |
| request_serializer, | |
| response_deserializer, | |
| _registered_method, | |
| ) | |
| # pytype: enable=wrong-arg-count | |
| if isinstance(self._interceptor, grpc.StreamUnaryClientInterceptor): | |
| return _StreamUnaryMultiCallable(thunk, method, self._interceptor) | |
| else: | |
| return thunk(method) | |
| # pylint: disable=arguments-differ | |
| def stream_stream( | |
| self, | |
| method: str, | |
| request_serializer: Optional[SerializingFunction] = None, | |
| response_deserializer: Optional[DeserializingFunction] = None, | |
| _registered_method: Optional[bool] = False, | |
| ) -> grpc.StreamStreamMultiCallable: | |
| # pytype: disable=wrong-arg-count | |
| thunk = lambda m: self._channel.stream_stream( | |
| m, | |
| request_serializer, | |
| response_deserializer, | |
| _registered_method, | |
| ) | |
| # pytype: enable=wrong-arg-count | |
| if isinstance(self._interceptor, grpc.StreamStreamClientInterceptor): | |
| return _StreamStreamMultiCallable(thunk, method, self._interceptor) | |
| else: | |
| return thunk(method) | |
| def _close(self): | |
| self._channel.close() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self._close() | |
| return False | |
| def close(self): | |
| self._channel.close() | |
| def intercept_channel( | |
| channel: grpc.Channel, | |
| *interceptors: Optional[ | |
| Sequence[ | |
| Union[ | |
| grpc.UnaryUnaryClientInterceptor, | |
| grpc.UnaryStreamClientInterceptor, | |
| grpc.StreamStreamClientInterceptor, | |
| grpc.StreamUnaryClientInterceptor, | |
| ] | |
| ] | |
| ], | |
| ) -> grpc.Channel: | |
| for interceptor in reversed(list(interceptors)): | |
| if ( | |
| not isinstance(interceptor, grpc.UnaryUnaryClientInterceptor) | |
| and not isinstance(interceptor, grpc.UnaryStreamClientInterceptor) | |
| and not isinstance(interceptor, grpc.StreamUnaryClientInterceptor) | |
| and not isinstance(interceptor, grpc.StreamStreamClientInterceptor) | |
| ): | |
| raise TypeError( | |
| "interceptor must be " | |
| "grpc.UnaryUnaryClientInterceptor or " | |
| "grpc.UnaryStreamClientInterceptor or " | |
| "grpc.StreamUnaryClientInterceptor or " | |
| "grpc.StreamStreamClientInterceptor or " | |
| ) | |
| channel = _Channel(channel, interceptor) | |
| return channel | |