Source code for cloudvision.api.arista.connectivitymonitor.v1

# Copyright (c) 2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: arista/connectivitymonitor.v1/connectivitymonitor.proto, arista/connectivitymonitor.v1/services.gen.proto
# plugin: python-aristaproto
# This file has been @generated

__all__ = (
    "ProbeKey",
    "ProbeStatsKey",
    "Probe",
    "ProbeStats",
    "MetaResponse",
    "ProbeRequest",
    "ProbeResponse",
    "ProbeSomeRequest",
    "ProbeSomeResponse",
    "ProbeStreamRequest",
    "ProbeStreamResponse",
    "ProbeBatchedStreamRequest",
    "ProbeBatchedStreamResponse",
    "ProbeStatsRequest",
    "ProbeStatsResponse",
    "ProbeStatsSomeRequest",
    "ProbeStatsSomeResponse",
    "ProbeStatsStreamRequest",
    "ProbeStatsStreamResponse",
    "ProbeStatsBatchedStreamRequest",
    "ProbeStatsBatchedStreamResponse",
    "ProbeServiceStub",
    "ProbeServiceBase",
    "ProbeStatsServiceStub",
    "ProbeStatsServiceBase",
)


from dataclasses import dataclass
from datetime import datetime
from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Dict,
    List,
    Optional,
)

import aristaproto
import grpclib
from aristaproto.grpc.grpclib_server import ServiceBase

if TYPE_CHECKING:
    import grpclib.server
    from aristaproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


[docs] @dataclass(eq=False, repr=False) class ProbeKey(aristaproto.Message): """ProbeKey uniquely identifies a connectivity monitor probe.""" device_id: Optional[str] = aristaproto.message_field( 1, wraps=aristaproto.TYPE_STRING ) """device_id is the id of the device in the probe.""" host: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """host is the hostname used in the probe.""" vrf: Optional[str] = aristaproto.message_field(3, wraps=aristaproto.TYPE_STRING) """vrf is the name of the VRF in the probe."""
[docs] @dataclass(eq=False, repr=False) class ProbeStatsKey(aristaproto.Message): """ ProbeStatsKey uniquely identifies a connectivity monitor probe's statistics, per source interface. """ device_id: Optional[str] = aristaproto.message_field( 1, wraps=aristaproto.TYPE_STRING ) """device_id is the id of the device in the probe.""" host: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """host is the hostname used in the probe.""" vrf: Optional[str] = aristaproto.message_field(3, wraps=aristaproto.TYPE_STRING) """vrf is the name of the VRF in the probe.""" source_intf: Optional[str] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_STRING ) """source_intf is the name of the interface in the probe."""
[docs] @dataclass(eq=False, repr=False) class Probe(aristaproto.Message): """ Probe is identifying information of a connectivity monitor probe. It is used to retrieve probe information without getting the corresponding stats so that probe information can be displayed without streaming all related data, such as in the UI. """ key: "ProbeKey" = aristaproto.message_field(1) """key uniquely identifies the connectivity monitor probe.""" ip_addr: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ip_addr is the IP Address of the probe.""" host_name: Optional[str] = aristaproto.message_field( 3, wraps=aristaproto.TYPE_STRING ) """host_name is the name of the host of the probe.""" description: Optional[str] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_STRING ) """description is the description of the probe."""
[docs] @dataclass(eq=False, repr=False) class ProbeStats(aristaproto.Message): """ ProbeStats is the connectivity monitor statistics related to the specified probe. """ key: "ProbeStatsKey" = aristaproto.message_field(1) """key uniquely identifies the connectivity monitor probe.""" latency_millis: Optional[float] = aristaproto.message_field( 2, wraps=aristaproto.TYPE_DOUBLE ) """ latency_millis is the latency between the device interface and the host. Value is in milliseconds. """ jitter_millis: Optional[float] = aristaproto.message_field( 3, wraps=aristaproto.TYPE_DOUBLE ) """ jitter_millis is the amount of jitter experienced by requests between the device interface and host. Value is in milliseconds. """ http_response_time_millis: Optional[float] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_DOUBLE ) """ http_response_time_millis is the amount of time taken to respond to a http request between the device interface and the host. Value is in milliseconds. """ packet_loss_percent: Optional[int] = aristaproto.message_field( 5, wraps=aristaproto.TYPE_INT64 ) """ packet_loss_percent is the amount of packet loss experienced by requests between the device interface and host. Value is a percentage. """ error: Optional[str] = aristaproto.message_field(6, wraps=aristaproto.TYPE_STRING) """error is the error reported on the connection."""
[docs] @dataclass(eq=False, repr=False) class MetaResponse(aristaproto.Message): """ """ time: datetime = aristaproto.message_field(1) """ Time holds the timestamp of the last item included in the metadata calculation. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(2) """ Operation indicates how the value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """ count: Optional[int] = aristaproto.message_field(3, wraps=aristaproto.TYPE_UINT32) """ Count is the number of items present under the conditions of the request. """
[docs] @dataclass(eq=False, repr=False) class ProbeRequest(aristaproto.Message): """ """ key: "ProbeKey" = aristaproto.message_field(1) """ Key uniquely identifies a Probe instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ProbeResponse(aristaproto.Message): """ """ value: "Probe" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the Probe instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ProbeSomeRequest(aristaproto.Message): """ """ keys: List["ProbeKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ProbeSomeResponse(aristaproto.Message): """ """ value: "Probe" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ """
[docs] @dataclass(eq=False, repr=False) class ProbeStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["Probe"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each Probe at end. * Each Probe response is fully-specified (all fields set). * start: Returns the state of each Probe at start, followed by updates until now. * Each Probe response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each Probe at start, followed by updates until end. * Each Probe response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """
[docs] @dataclass(eq=False, repr=False) class ProbeStreamResponse(aristaproto.Message): """ """ value: "Probe" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this Probe's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the Probe value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class ProbeBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["Probe"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each Probe at end. * Each Probe response is fully-specified (all fields set). * start: Returns the state of each Probe at start, followed by updates until now. * Each Probe response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each Probe at start, followed by updates until end. * Each Probe response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """ max_messages: Optional[int] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_UINT32 ) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class ProbeBatchedStreamResponse(aristaproto.Message): """ """ responses: List["ProbeStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class ProbeStatsRequest(aristaproto.Message): """ """ key: "ProbeStatsKey" = aristaproto.message_field(1) """ Key uniquely identifies a ProbeStats instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ProbeStatsResponse(aristaproto.Message): """ """ value: "ProbeStats" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the ProbeStats instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ProbeStatsSomeRequest(aristaproto.Message): """ """ keys: List["ProbeStatsKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ProbeStatsSomeResponse(aristaproto.Message): """ """ value: "ProbeStats" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ """
[docs] @dataclass(eq=False, repr=False) class ProbeStatsStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ProbeStats"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ProbeStats at end. * Each ProbeStats response is fully-specified (all fields set). * start: Returns the state of each ProbeStats at start, followed by updates until now. * Each ProbeStats response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ProbeStats at start, followed by updates until end. * Each ProbeStats response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """
[docs] @dataclass(eq=False, repr=False) class ProbeStatsStreamResponse(aristaproto.Message): """ """ value: "ProbeStats" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this ProbeStats's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the ProbeStats value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class ProbeStatsBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ProbeStats"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ProbeStats at end. * Each ProbeStats response is fully-specified (all fields set). * start: Returns the state of each ProbeStats at start, followed by updates until now. * Each ProbeStats response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ProbeStats at start, followed by updates until end. * Each ProbeStats response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """ max_messages: Optional[int] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_UINT32 ) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class ProbeStatsBatchedStreamResponse(aristaproto.Message): """ """ responses: List["ProbeStatsStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] class ProbeServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, probe_request: "ProbeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ProbeResponse": """ """ return await self._unary_unary( "/arista.connectivitymonitor.v1.ProbeService/GetOne", probe_request, ProbeResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, probe_some_request: "ProbeSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ProbeSomeResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeService/GetSome", probe_some_request, ProbeSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, probe_stream_request: "ProbeStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ProbeStreamResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeService/GetAll", probe_stream_request, ProbeStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, probe_stream_request: "ProbeStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ProbeStreamResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeService/Subscribe", probe_stream_request, ProbeStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, probe_stream_request: "ProbeStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.connectivitymonitor.v1.ProbeService/GetMeta", probe_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, probe_stream_request: "ProbeStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeService/SubscribeMeta", probe_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, probe_batched_stream_request: "ProbeBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ProbeBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeService/GetAllBatched", probe_batched_stream_request, ProbeBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, probe_batched_stream_request: "ProbeBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ProbeBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeService/SubscribeBatched", probe_batched_stream_request, ProbeBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class ProbeStatsServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, probe_stats_request: "ProbeStatsRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ProbeStatsResponse": """ """ return await self._unary_unary( "/arista.connectivitymonitor.v1.ProbeStatsService/GetOne", probe_stats_request, ProbeStatsResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, probe_stats_some_request: "ProbeStatsSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ProbeStatsSomeResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeStatsService/GetSome", probe_stats_some_request, ProbeStatsSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, probe_stats_stream_request: "ProbeStatsStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ProbeStatsStreamResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeStatsService/GetAll", probe_stats_stream_request, ProbeStatsStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, probe_stats_stream_request: "ProbeStatsStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ProbeStatsStreamResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeStatsService/Subscribe", probe_stats_stream_request, ProbeStatsStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, probe_stats_stream_request: "ProbeStatsStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.connectivitymonitor.v1.ProbeStatsService/GetMeta", probe_stats_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, probe_stats_stream_request: "ProbeStatsStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeStatsService/SubscribeMeta", probe_stats_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, probe_stats_batched_stream_request: "ProbeStatsBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ProbeStatsBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeStatsService/GetAllBatched", probe_stats_batched_stream_request, ProbeStatsBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, probe_stats_batched_stream_request: "ProbeStatsBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ProbeStatsBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.connectivitymonitor.v1.ProbeStatsService/SubscribeBatched", probe_stats_batched_stream_request, ProbeStatsBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
from ... import subscriptions as __subscriptions__ from ... import time as __time__
[docs] class ProbeServiceBase(ServiceBase): """ """
[docs] async def get_one(self, probe_request: "ProbeRequest") -> "ProbeResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, probe_some_request: "ProbeSomeRequest" ) -> AsyncIterator[ProbeSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, probe_stream_request: "ProbeStreamRequest" ) -> AsyncIterator[ProbeStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, probe_stream_request: "ProbeStreamRequest" ) -> AsyncIterator[ProbeStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, probe_stream_request: "ProbeStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, probe_stream_request: "ProbeStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, probe_batched_stream_request: "ProbeBatchedStreamRequest" ) -> AsyncIterator[ProbeBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, probe_batched_stream_request: "ProbeBatchedStreamRequest" ) -> AsyncIterator[ProbeBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[ProbeRequest, ProbeResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[ProbeSomeRequest, ProbeSomeResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[ProbeStreamRequest, ProbeStreamResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[ProbeStreamRequest, ProbeStreamResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[ProbeStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[ProbeStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[ProbeBatchedStreamRequest, ProbeBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[ProbeBatchedStreamRequest, ProbeBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.connectivitymonitor.v1.ProbeService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, ProbeRequest, ProbeResponse, ), "/arista.connectivitymonitor.v1.ProbeService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, ProbeSomeRequest, ProbeSomeResponse, ), "/arista.connectivitymonitor.v1.ProbeService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, ProbeStreamRequest, ProbeStreamResponse, ), "/arista.connectivitymonitor.v1.ProbeService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, ProbeStreamRequest, ProbeStreamResponse, ), "/arista.connectivitymonitor.v1.ProbeService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, ProbeStreamRequest, MetaResponse, ), "/arista.connectivitymonitor.v1.ProbeService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, ProbeStreamRequest, MetaResponse, ), "/arista.connectivitymonitor.v1.ProbeService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, ProbeBatchedStreamRequest, ProbeBatchedStreamResponse, ), "/arista.connectivitymonitor.v1.ProbeService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, ProbeBatchedStreamRequest, ProbeBatchedStreamResponse, ), }
[docs] class ProbeStatsServiceBase(ServiceBase): """ """
[docs] async def get_one( self, probe_stats_request: "ProbeStatsRequest" ) -> "ProbeStatsResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, probe_stats_some_request: "ProbeStatsSomeRequest" ) -> AsyncIterator[ProbeStatsSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, probe_stats_stream_request: "ProbeStatsStreamRequest" ) -> AsyncIterator[ProbeStatsStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, probe_stats_stream_request: "ProbeStatsStreamRequest" ) -> AsyncIterator[ProbeStatsStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, probe_stats_stream_request: "ProbeStatsStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, probe_stats_stream_request: "ProbeStatsStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, probe_stats_batched_stream_request: "ProbeStatsBatchedStreamRequest" ) -> AsyncIterator[ProbeStatsBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, probe_stats_batched_stream_request: "ProbeStatsBatchedStreamRequest" ) -> AsyncIterator[ProbeStatsBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[ProbeStatsRequest, ProbeStatsResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[ProbeStatsSomeRequest, ProbeStatsSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[ProbeStatsStreamRequest, ProbeStatsStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[ProbeStatsStreamRequest, ProbeStatsStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[ProbeStatsStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[ProbeStatsStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[ProbeStatsBatchedStreamRequest, ProbeStatsBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[ProbeStatsBatchedStreamRequest, ProbeStatsBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.connectivitymonitor.v1.ProbeStatsService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, ProbeStatsRequest, ProbeStatsResponse, ), "/arista.connectivitymonitor.v1.ProbeStatsService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, ProbeStatsSomeRequest, ProbeStatsSomeResponse, ), "/arista.connectivitymonitor.v1.ProbeStatsService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, ProbeStatsStreamRequest, ProbeStatsStreamResponse, ), "/arista.connectivitymonitor.v1.ProbeStatsService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, ProbeStatsStreamRequest, ProbeStatsStreamResponse, ), "/arista.connectivitymonitor.v1.ProbeStatsService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, ProbeStatsStreamRequest, MetaResponse, ), "/arista.connectivitymonitor.v1.ProbeStatsService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, ProbeStatsStreamRequest, MetaResponse, ), "/arista.connectivitymonitor.v1.ProbeStatsService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, ProbeStatsBatchedStreamRequest, ProbeStatsBatchedStreamResponse, ), "/arista.connectivitymonitor.v1.ProbeStatsService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, ProbeStatsBatchedStreamRequest, ProbeStatsBatchedStreamResponse, ), }