Source code for cloudvision.api.arista.dashboard.v1

# Copyright (c) 2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: arista/dashboard.v1/dashboard.proto, arista/dashboard.v1/services.gen.proto
# plugin: python-aristaproto
# This file has been @generated

__all__ = (
    "Position",
    "Dimensions",
    "WidgetStyles",
    "Widget",
    "Widgets",
    "DashboardKey",
    "DashboardConfig",
    "DashboardMetadata",
    "Filter",
    "Dashboard",
    "GlobalDashboardConfig",
    "MetaResponse",
    "DashboardRequest",
    "DashboardResponse",
    "DashboardSomeRequest",
    "DashboardSomeResponse",
    "DashboardStreamRequest",
    "DashboardStreamResponse",
    "DashboardBatchedStreamRequest",
    "DashboardBatchedStreamResponse",
    "DashboardConfigRequest",
    "DashboardConfigResponse",
    "DashboardConfigSomeRequest",
    "DashboardConfigSomeResponse",
    "DashboardConfigStreamRequest",
    "DashboardConfigStreamResponse",
    "DashboardConfigBatchedStreamRequest",
    "DashboardConfigBatchedStreamResponse",
    "DashboardConfigSetRequest",
    "DashboardConfigSetResponse",
    "DashboardConfigSetSomeRequest",
    "DashboardConfigSetSomeResponse",
    "DashboardConfigDeleteRequest",
    "DashboardConfigDeleteResponse",
    "DashboardConfigDeleteSomeRequest",
    "DashboardConfigDeleteSomeResponse",
    "DashboardConfigDeleteAllRequest",
    "DashboardConfigDeleteAllResponse",
    "GlobalDashboardConfigRequest",
    "GlobalDashboardConfigResponse",
    "GlobalDashboardConfigStreamRequest",
    "GlobalDashboardConfigStreamResponse",
    "GlobalDashboardConfigSetRequest",
    "GlobalDashboardConfigSetResponse",
    "DashboardServiceStub",
    "DashboardServiceBase",
    "DashboardConfigServiceStub",
    "DashboardConfigServiceBase",
    "GlobalDashboardConfigServiceStub",
    "GlobalDashboardConfigServiceBase",
)


from dataclasses import dataclass
from datetime import datetime
from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Dict,
    List,
    Optional,
)

import aristaproto
import grpclib
from aristaproto.grpc.grpclib_server import ServiceBase

if TYPE_CHECKING:
    import grpclib.server
    from aristaproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


[docs] @dataclass(eq=False, repr=False) class Position(aristaproto.Message): """Position represents a cell position in the UI.""" x: Optional[int] = aristaproto.message_field(1, wraps=aristaproto.TYPE_UINT32) """x represents a position in the horizontal axis.""" y: Optional[int] = aristaproto.message_field(2, wraps=aristaproto.TYPE_UINT32) """y represents a position in the vertical axis."""
[docs] @dataclass(eq=False, repr=False) class Dimensions(aristaproto.Message): """ Dimensions represents the dimensions in cells of the widgets in the UI. """ width: Optional[int] = aristaproto.message_field(1, wraps=aristaproto.TYPE_UINT32) """width of the widget in the UI, represented in number of cells.""" height: Optional[int] = aristaproto.message_field(2, wraps=aristaproto.TYPE_UINT32) """height of the widget in the UI, represented in number of cells."""
[docs] @dataclass(eq=False, repr=False) class WidgetStyles(aristaproto.Message): """WidgetStyles represents the widget's panel appearance.""" hide_title: Optional[bool] = aristaproto.message_field( 1, wraps=aristaproto.TYPE_BOOL ) """ hide_title is used to hint the dashboard that the widget title must be hidden. """ background_color: Optional[str] = aristaproto.message_field( 2, wraps=aristaproto.TYPE_STRING ) """background_color is used to set the widget's background color.""" hide_horizontal_bar: Optional[bool] = aristaproto.message_field( 3, wraps=aristaproto.TYPE_BOOL ) """ hide_horizontal_bar is used to hint the dashboard that the title separator must be hidden. """ title_size: Optional[int] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_UINT32 ) """title_size is used to set widget's title size."""
[docs] @dataclass(eq=False, repr=False) class Widget(aristaproto.Message): """ Widget is used to create a dashboard. Each widget is responsible to display some type of data. """ id: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """id holds the unique identifier for the widget inside a dashboard""" name: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """name of the widget is displayed at the top of the widget.""" position: "Position" = aristaproto.message_field(3) """ position of the widget, represented as a (x,y) coordinate in a grid. Top left is at (0,0). """ dimensions: "Dimensions" = aristaproto.message_field(4) """ dimensions of the widget represents how many cell in the grid it takes. """ type: Optional[str] = aristaproto.message_field(5, wraps=aristaproto.TYPE_STRING) """ type is the widget type. Each type is handled differently in the UI, and can use different `inputs`. """ inputs: Optional[str] = aristaproto.message_field(6, wraps=aristaproto.TYPE_STRING) """ inputs contains metadata about the data the widget will display, encoded in a JSON string. Internal data vary based on the widget type `type` and is managed by the client. """ location: Optional[str] = aristaproto.message_field( 7, wraps=aristaproto.TYPE_STRING ) """ location is used as a position display hint, used and managed by the UI. """ styles: "WidgetStyles" = aristaproto.message_field(8) """styles represents the widget's panel appearance.""" parent: Optional[str] = aristaproto.message_field(9, wraps=aristaproto.TYPE_STRING) """parent stores the id of its parent widget."""
[docs] @dataclass(eq=False, repr=False) class Widgets(aristaproto.Message): """Widgets holds a list of `Widget`s.""" values: List["Widget"] = aristaproto.message_field(1) """values holds a list of widgets"""
[docs] @dataclass(eq=False, repr=False) class DashboardKey(aristaproto.Message): """DashboardKey represents the dashboard unique identifier.""" dashboard_id: Optional[str] = aristaproto.message_field( 1, wraps=aristaproto.TYPE_STRING ) """dashboard_id holds the id of the dashboard"""
[docs] @dataclass(eq=False, repr=False) class DashboardConfig(aristaproto.Message): """DashboardConfig includes all user-editable dashboard fields.""" key: "DashboardKey" = aristaproto.message_field(1) """ key is the unique identifier. It always must be defined. If set, will create or update a dashboard. """ name: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """name is the dashboard name, displayed at the top of the dashboard.""" description: Optional[str] = aristaproto.message_field( 3, wraps=aristaproto.TYPE_STRING ) """ description may include details about what is displayed in the dashboard. """ widgets: "Widgets" = aristaproto.message_field(4) """widgets list of widgets in the dashboard."""
[docs] @dataclass(eq=False, repr=False) class DashboardMetadata(aristaproto.Message): """ DashboardMetadata includes versioning metadata. All the data here is managed internally, and is read-only. """ schema_version: Optional[str] = aristaproto.message_field( 1, wraps=aristaproto.TYPE_STRING ) """schema_version is managed internally.""" legacy_key: Optional[str] = aristaproto.message_field( 2, wraps=aristaproto.TYPE_STRING ) """ legacy_key holds the key of a previous version of the dashboard, in case it was migrated. """ legacy_version: Optional[str] = aristaproto.message_field( 3, wraps=aristaproto.TYPE_STRING ) """ legacy_version tells from which version the dashboard was migrated from. """ from_package: Optional[str] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_STRING ) """ from_package records the contributing package key and version, if applicable. """
[docs] @dataclass(eq=False, repr=False) class Filter(aristaproto.Message): """Filter is used to filter dashboards for non exact match cases.""" tags: "___fmp__.RepeatedString" = aristaproto.message_field(1) """ tags includes the values to be matched in the dashboard description. Tags are matched by word. Generally, a tag is prefixed by a '#', which must be omitted when provided here. All provided tags must match inside a dashboard for it to be returned. E.g., to match \"#devices\", the tag should be set to \"devices\". """
[docs] @dataclass(eq=False, repr=False) class Dashboard(aristaproto.Message): """Dashboard state contains all dashboard data.""" key: "DashboardKey" = aristaproto.message_field(1) """key is the unique identifier. It will always be defined.""" created_at: datetime = aristaproto.message_field(2) """ created_at represents the date the dashboard was first created. Old dashboards may not have this field set. """ created_by: Optional[str] = aristaproto.message_field( 3, wraps=aristaproto.TYPE_STRING ) """ created_by keeps the name of the user who first created this dashboard. Old dashboards may not have this field set. """ last_modified_at: datetime = aristaproto.message_field(4) """ last_modified_at holds the timestamp this dashboard was last updated by an user. Old dashboards may not have this field set. """ last_modified_by: Optional[str] = aristaproto.message_field( 5, wraps=aristaproto.TYPE_STRING ) """ last_modified_by holds the username who last updated this dashboard. Old dashboards may not have this field set. """ meta_data: "DashboardMetadata" = aristaproto.message_field(6) """meta_data includes version metadata about the dashboard.""" name: Optional[str] = aristaproto.message_field(7, wraps=aristaproto.TYPE_STRING) """name is the dashboard name, displayed at the top of the dashboard.""" description: Optional[str] = aristaproto.message_field( 8, wraps=aristaproto.TYPE_STRING ) """ description may include details about what is displayed in the dashboard. """ widgets: "Widgets" = aristaproto.message_field(9) """widgets list of widgets in the dashboard."""
[docs] @dataclass(eq=False, repr=False) class GlobalDashboardConfig(aristaproto.Message): """GlobalDashboardConfig holds global configs related to Dashboards.""" default_dashboard: "DashboardKey" = aristaproto.message_field(1) """ default_dashboard is the default dashboard shown to a user. To unset, use an empty key (`{dashboard_id: nil}`) in a `Set()` call. """
[docs] @dataclass(eq=False, repr=False) class MetaResponse(aristaproto.Message): """ """ time: datetime = aristaproto.message_field(1) """ Time holds the timestamp of the last item included in the metadata calculation. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(2) """ Operation indicates how the value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """ count: Optional[int] = aristaproto.message_field(3, wraps=aristaproto.TYPE_UINT32) """ Count is the number of items present under the conditions of the request. """
[docs] @dataclass(eq=False, repr=False) class DashboardRequest(aristaproto.Message): """ """ key: "DashboardKey" = aristaproto.message_field(1) """ Key uniquely identifies a Dashboard instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class DashboardResponse(aristaproto.Message): """ """ value: "Dashboard" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the Dashboard instance in this response. """
[docs] @dataclass(eq=False, repr=False) class DashboardSomeRequest(aristaproto.Message): """ """ keys: List["DashboardKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class DashboardSomeResponse(aristaproto.Message): """ """ value: "Dashboard" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ """
[docs] @dataclass(eq=False, repr=False) class DashboardStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["Dashboard"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ filter: List["Filter"] = aristaproto.message_field(2) """ For each Dashboard in the list, all populated fields are considered ANDed together as a filtering operation. Similarly, the list itself is ORed such that any individual filter that matches a given Dashboard is streamed to the user. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each Dashboard at end. * Each Dashboard response is fully-specified (all fields set). * start: Returns the state of each Dashboard at start, followed by updates until now. * Each Dashboard response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each Dashboard at start, followed by updates until end. * Each Dashboard response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """
[docs] @dataclass(eq=False, repr=False) class DashboardStreamResponse(aristaproto.Message): """ """ value: "Dashboard" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this Dashboard's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the Dashboard value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class DashboardBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["Dashboard"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ filter: List["Filter"] = aristaproto.message_field(2) """ For each Dashboard in the list, all populated fields are considered ANDed together as a filtering operation. Similarly, the list itself is ORed such that any individual filter that matches a given Dashboard is streamed to the user. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each Dashboard at end. * Each Dashboard response is fully-specified (all fields set). * start: Returns the state of each Dashboard at start, followed by updates until now. * Each Dashboard response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each Dashboard at start, followed by updates until end. * Each Dashboard response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """ max_messages: Optional[int] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_UINT32 ) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class DashboardBatchedStreamResponse(aristaproto.Message): """ """ responses: List["DashboardStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigRequest(aristaproto.Message): """ """ key: "DashboardKey" = aristaproto.message_field(1) """ Key uniquely identifies a DashboardConfig instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigResponse(aristaproto.Message): """ """ value: "DashboardConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the DashboardConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigSomeRequest(aristaproto.Message): """ """ keys: List["DashboardKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigSomeResponse(aristaproto.Message): """ """ value: "DashboardConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["DashboardConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each DashboardConfig at end. * Each DashboardConfig response is fully-specified (all fields set). * start: Returns the state of each DashboardConfig at start, followed by updates until now. * Each DashboardConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each DashboardConfig at start, followed by updates until end. * Each DashboardConfig response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigStreamResponse(aristaproto.Message): """ """ value: "DashboardConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this DashboardConfig's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the DashboardConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["DashboardConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each DashboardConfig at end. * Each DashboardConfig response is fully-specified (all fields set). * start: Returns the state of each DashboardConfig at start, followed by updates until now. * Each DashboardConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each DashboardConfig at start, followed by updates until end. * Each DashboardConfig response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """ max_messages: Optional[int] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_UINT32 ) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigBatchedStreamResponse(aristaproto.Message): """ """ responses: List["DashboardConfigStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigSetRequest(aristaproto.Message): """ """ value: "DashboardConfig" = aristaproto.message_field(1) """ DashboardConfig carries the value to set into the datastore. See the documentation on the DashboardConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigSetResponse(aristaproto.Message): """ """ value: "DashboardConfig" = aristaproto.message_field(1) """ Value carries all the values given in the DashboardConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigSetSomeRequest(aristaproto.Message): """ """ values: List["DashboardConfig"] = aristaproto.message_field(1) """ value contains a list of DashboardConfig values to write. It is possible to provide more values than can fit within either: - the maxiumum send size of the client - the maximum receive size of the server If this error occurs you must reduce the number of values sent. See gRPC \"maximum message size\" documentation for more information. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigSetSomeResponse(aristaproto.Message): """ """ key: "DashboardKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigDeleteRequest(aristaproto.Message): """ """ key: "DashboardKey" = aristaproto.message_field(1) """ Key indicates which DashboardConfig instance to remove. This field must always be set. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigDeleteResponse(aristaproto.Message): """ """ key: "DashboardKey" = aristaproto.message_field(1) """Key echoes back the key of the deleted DashboardConfig instance.""" time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the deletion. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==DeletedAt will not include this instance. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigDeleteSomeRequest(aristaproto.Message): """ """ keys: List["DashboardKey"] = aristaproto.message_field(1) """key contains a list of DashboardConfig keys to delete"""
[docs] @dataclass(eq=False, repr=False) class DashboardConfigDeleteSomeResponse(aristaproto.Message): """ DashboardConfigDeleteSomeResponse is only sent when there is an error. """ key: "DashboardKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigDeleteAllRequest(aristaproto.Message): """ """ partial_eq_filter: List["DashboardConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a DeleteAll. This requires all provided fields to be equal to the response. A filtered DeleteAll will use GetAll with filter to find things to delete. """
[docs] @dataclass(eq=False, repr=False) class DashboardConfigDeleteAllResponse(aristaproto.Message): """ """ type: "___fmp__.DeleteError" = aristaproto.enum_field(1) """ This describes the class of delete error. A DeleteAllResponse is only sent when there is an error. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """This indicates the error message from the delete failure.""" key: "DashboardKey" = aristaproto.message_field(3) """ This is the key of the DashboardConfig instance that failed to be deleted. """ time: datetime = aristaproto.message_field(4) """Time indicates the (UTC) timestamp when the key was being deleted."""
[docs] @dataclass(eq=False, repr=False) class GlobalDashboardConfigRequest(aristaproto.Message): """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class GlobalDashboardConfigResponse(aristaproto.Message): """ """ value: "GlobalDashboardConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the GlobalDashboardConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class GlobalDashboardConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["GlobalDashboardConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each GlobalDashboardConfig at end. * Each GlobalDashboardConfig response is fully-specified (all fields set). * start: Returns the state of each GlobalDashboardConfig at start, followed by updates until now. * Each GlobalDashboardConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each GlobalDashboardConfig at start, followed by updates until end. * Each GlobalDashboardConfig response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """
[docs] @dataclass(eq=False, repr=False) class GlobalDashboardConfigStreamResponse(aristaproto.Message): """ """ value: "GlobalDashboardConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this GlobalDashboardConfig's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the GlobalDashboardConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class GlobalDashboardConfigSetRequest(aristaproto.Message): """ """ value: "GlobalDashboardConfig" = aristaproto.message_field(1) """ GlobalDashboardConfig carries the value to set into the datastore. See the documentation on the GlobalDashboardConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class GlobalDashboardConfigSetResponse(aristaproto.Message): """ """ value: "GlobalDashboardConfig" = aristaproto.message_field(1) """ Value carries all the values given in the GlobalDashboardConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] class DashboardServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, dashboard_request: "DashboardRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "DashboardResponse": """ """ return await self._unary_unary( "/arista.dashboard.v1.DashboardService/GetOne", dashboard_request, DashboardResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, dashboard_some_request: "DashboardSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardSomeResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardService/GetSome", dashboard_some_request, DashboardSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, dashboard_stream_request: "DashboardStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardStreamResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardService/GetAll", dashboard_stream_request, DashboardStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, dashboard_stream_request: "DashboardStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardStreamResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardService/Subscribe", dashboard_stream_request, DashboardStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, dashboard_stream_request: "DashboardStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.dashboard.v1.DashboardService/GetMeta", dashboard_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, dashboard_stream_request: "DashboardStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardService/SubscribeMeta", dashboard_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, dashboard_batched_stream_request: "DashboardBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardService/GetAllBatched", dashboard_batched_stream_request, DashboardBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, dashboard_batched_stream_request: "DashboardBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardService/SubscribeBatched", dashboard_batched_stream_request, DashboardBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class DashboardConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, dashboard_config_request: "DashboardConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "DashboardConfigResponse": """ """ return await self._unary_unary( "/arista.dashboard.v1.DashboardConfigService/GetOne", dashboard_config_request, DashboardConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, dashboard_config_some_request: "DashboardConfigSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardConfigSomeResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardConfigService/GetSome", dashboard_config_some_request, DashboardConfigSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, dashboard_config_stream_request: "DashboardConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardConfigService/GetAll", dashboard_config_stream_request, DashboardConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, dashboard_config_stream_request: "DashboardConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardConfigService/Subscribe", dashboard_config_stream_request, DashboardConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, dashboard_config_stream_request: "DashboardConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.dashboard.v1.DashboardConfigService/GetMeta", dashboard_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, dashboard_config_stream_request: "DashboardConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardConfigService/SubscribeMeta", dashboard_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, dashboard_config_set_request: "DashboardConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "DashboardConfigSetResponse": """ """ return await self._unary_unary( "/arista.dashboard.v1.DashboardConfigService/Set", dashboard_config_set_request, DashboardConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def set_some( self, dashboard_config_set_some_request: "DashboardConfigSetSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardConfigSetSomeResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardConfigService/SetSome", dashboard_config_set_some_request, DashboardConfigSetSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete( self, dashboard_config_delete_request: "DashboardConfigDeleteRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "DashboardConfigDeleteResponse": """ """ return await self._unary_unary( "/arista.dashboard.v1.DashboardConfigService/Delete", dashboard_config_delete_request, DashboardConfigDeleteResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def delete_some( self, dashboard_config_delete_some_request: "DashboardConfigDeleteSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardConfigDeleteSomeResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardConfigService/DeleteSome", dashboard_config_delete_some_request, DashboardConfigDeleteSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete_all( self, dashboard_config_delete_all_request: "DashboardConfigDeleteAllRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardConfigDeleteAllResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardConfigService/DeleteAll", dashboard_config_delete_all_request, DashboardConfigDeleteAllResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, dashboard_config_batched_stream_request: "DashboardConfigBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardConfigBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardConfigService/GetAllBatched", dashboard_config_batched_stream_request, DashboardConfigBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, dashboard_config_batched_stream_request: "DashboardConfigBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[DashboardConfigBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.DashboardConfigService/SubscribeBatched", dashboard_config_batched_stream_request, DashboardConfigBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class GlobalDashboardConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, global_dashboard_config_request: "GlobalDashboardConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "GlobalDashboardConfigResponse": """ """ return await self._unary_unary( "/arista.dashboard.v1.GlobalDashboardConfigService/GetOne", global_dashboard_config_request, GlobalDashboardConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_all( self, global_dashboard_config_stream_request: "GlobalDashboardConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[GlobalDashboardConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.GlobalDashboardConfigService/GetAll", global_dashboard_config_stream_request, GlobalDashboardConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, global_dashboard_config_stream_request: "GlobalDashboardConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[GlobalDashboardConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.GlobalDashboardConfigService/Subscribe", global_dashboard_config_stream_request, GlobalDashboardConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_meta( self, global_dashboard_config_stream_request: "GlobalDashboardConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.dashboard.v1.GlobalDashboardConfigService/SubscribeMeta", global_dashboard_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, global_dashboard_config_set_request: "GlobalDashboardConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "GlobalDashboardConfigSetResponse": """ """ return await self._unary_unary( "/arista.dashboard.v1.GlobalDashboardConfigService/Set", global_dashboard_config_set_request, GlobalDashboardConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
from .... import fmp as ___fmp__ from ... import subscriptions as __subscriptions__ from ... import time as __time__
[docs] class DashboardServiceBase(ServiceBase): """ """
[docs] async def get_one( self, dashboard_request: "DashboardRequest" ) -> "DashboardResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, dashboard_some_request: "DashboardSomeRequest" ) -> AsyncIterator[DashboardSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, dashboard_stream_request: "DashboardStreamRequest" ) -> AsyncIterator[DashboardStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, dashboard_stream_request: "DashboardStreamRequest" ) -> AsyncIterator[DashboardStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, dashboard_stream_request: "DashboardStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, dashboard_stream_request: "DashboardStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, dashboard_batched_stream_request: "DashboardBatchedStreamRequest" ) -> AsyncIterator[DashboardBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, dashboard_batched_stream_request: "DashboardBatchedStreamRequest" ) -> AsyncIterator[DashboardBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[DashboardRequest, DashboardResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[DashboardSomeRequest, DashboardSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[DashboardStreamRequest, DashboardStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[DashboardStreamRequest, DashboardStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[DashboardStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[DashboardStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[DashboardBatchedStreamRequest, DashboardBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[DashboardBatchedStreamRequest, DashboardBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.dashboard.v1.DashboardService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, DashboardRequest, DashboardResponse, ), "/arista.dashboard.v1.DashboardService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, DashboardSomeRequest, DashboardSomeResponse, ), "/arista.dashboard.v1.DashboardService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, DashboardStreamRequest, DashboardStreamResponse, ), "/arista.dashboard.v1.DashboardService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, DashboardStreamRequest, DashboardStreamResponse, ), "/arista.dashboard.v1.DashboardService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, DashboardStreamRequest, MetaResponse, ), "/arista.dashboard.v1.DashboardService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, DashboardStreamRequest, MetaResponse, ), "/arista.dashboard.v1.DashboardService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, DashboardBatchedStreamRequest, DashboardBatchedStreamResponse, ), "/arista.dashboard.v1.DashboardService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, DashboardBatchedStreamRequest, DashboardBatchedStreamResponse, ), }
[docs] class DashboardConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, dashboard_config_request: "DashboardConfigRequest" ) -> "DashboardConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, dashboard_config_some_request: "DashboardConfigSomeRequest" ) -> AsyncIterator[DashboardConfigSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, dashboard_config_stream_request: "DashboardConfigStreamRequest" ) -> AsyncIterator[DashboardConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, dashboard_config_stream_request: "DashboardConfigStreamRequest" ) -> AsyncIterator[DashboardConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, dashboard_config_stream_request: "DashboardConfigStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, dashboard_config_stream_request: "DashboardConfigStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, dashboard_config_set_request: "DashboardConfigSetRequest" ) -> "DashboardConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set_some( self, dashboard_config_set_some_request: "DashboardConfigSetSomeRequest" ) -> AsyncIterator[DashboardConfigSetSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete( self, dashboard_config_delete_request: "DashboardConfigDeleteRequest" ) -> "DashboardConfigDeleteResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_some( self, dashboard_config_delete_some_request: "DashboardConfigDeleteSomeRequest" ) -> AsyncIterator[DashboardConfigDeleteSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all( self, dashboard_config_delete_all_request: "DashboardConfigDeleteAllRequest" ) -> AsyncIterator[DashboardConfigDeleteAllResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, dashboard_config_batched_stream_request: "DashboardConfigBatchedStreamRequest", ) -> AsyncIterator[DashboardConfigBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, dashboard_config_batched_stream_request: "DashboardConfigBatchedStreamRequest", ) -> AsyncIterator[DashboardConfigBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[DashboardConfigRequest, DashboardConfigResponse]", ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[DashboardConfigSomeRequest, DashboardConfigSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[DashboardConfigStreamRequest, DashboardConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[DashboardConfigStreamRequest, DashboardConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[DashboardConfigStreamRequest, MetaResponse]", ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[DashboardConfigStreamRequest, MetaResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[DashboardConfigSetRequest, DashboardConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) async def __rpc_set_some( self, stream: "grpclib.server.Stream[DashboardConfigSetSomeRequest, DashboardConfigSetSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.set_some, stream, request, ) async def __rpc_delete( self, stream: "grpclib.server.Stream[DashboardConfigDeleteRequest, DashboardConfigDeleteResponse]", ) -> None: request = await stream.recv_message() response = await self.delete(request) await stream.send_message(response) async def __rpc_delete_some( self, stream: "grpclib.server.Stream[DashboardConfigDeleteSomeRequest, DashboardConfigDeleteSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_some, stream, request, ) async def __rpc_delete_all( self, stream: "grpclib.server.Stream[DashboardConfigDeleteAllRequest, DashboardConfigDeleteAllResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_all, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[DashboardConfigBatchedStreamRequest, DashboardConfigBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[DashboardConfigBatchedStreamRequest, DashboardConfigBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.dashboard.v1.DashboardConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, DashboardConfigRequest, DashboardConfigResponse, ), "/arista.dashboard.v1.DashboardConfigService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, DashboardConfigSomeRequest, DashboardConfigSomeResponse, ), "/arista.dashboard.v1.DashboardConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, DashboardConfigStreamRequest, DashboardConfigStreamResponse, ), "/arista.dashboard.v1.DashboardConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, DashboardConfigStreamRequest, DashboardConfigStreamResponse, ), "/arista.dashboard.v1.DashboardConfigService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, DashboardConfigStreamRequest, MetaResponse, ), "/arista.dashboard.v1.DashboardConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, DashboardConfigStreamRequest, MetaResponse, ), "/arista.dashboard.v1.DashboardConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, DashboardConfigSetRequest, DashboardConfigSetResponse, ), "/arista.dashboard.v1.DashboardConfigService/SetSome": grpclib.const.Handler( self.__rpc_set_some, grpclib.const.Cardinality.UNARY_STREAM, DashboardConfigSetSomeRequest, DashboardConfigSetSomeResponse, ), "/arista.dashboard.v1.DashboardConfigService/Delete": grpclib.const.Handler( self.__rpc_delete, grpclib.const.Cardinality.UNARY_UNARY, DashboardConfigDeleteRequest, DashboardConfigDeleteResponse, ), "/arista.dashboard.v1.DashboardConfigService/DeleteSome": grpclib.const.Handler( self.__rpc_delete_some, grpclib.const.Cardinality.UNARY_STREAM, DashboardConfigDeleteSomeRequest, DashboardConfigDeleteSomeResponse, ), "/arista.dashboard.v1.DashboardConfigService/DeleteAll": grpclib.const.Handler( self.__rpc_delete_all, grpclib.const.Cardinality.UNARY_STREAM, DashboardConfigDeleteAllRequest, DashboardConfigDeleteAllResponse, ), "/arista.dashboard.v1.DashboardConfigService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, DashboardConfigBatchedStreamRequest, DashboardConfigBatchedStreamResponse, ), "/arista.dashboard.v1.DashboardConfigService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, DashboardConfigBatchedStreamRequest, DashboardConfigBatchedStreamResponse, ), }
[docs] class GlobalDashboardConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, global_dashboard_config_request: "GlobalDashboardConfigRequest" ) -> "GlobalDashboardConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, global_dashboard_config_stream_request: "GlobalDashboardConfigStreamRequest", ) -> AsyncIterator[GlobalDashboardConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, global_dashboard_config_stream_request: "GlobalDashboardConfigStreamRequest", ) -> AsyncIterator[GlobalDashboardConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, global_dashboard_config_stream_request: "GlobalDashboardConfigStreamRequest", ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, global_dashboard_config_set_request: "GlobalDashboardConfigSetRequest" ) -> "GlobalDashboardConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[GlobalDashboardConfigRequest, GlobalDashboardConfigResponse]", ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_all( self, stream: "grpclib.server.Stream[GlobalDashboardConfigStreamRequest, GlobalDashboardConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[GlobalDashboardConfigStreamRequest, GlobalDashboardConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[GlobalDashboardConfigStreamRequest, MetaResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[GlobalDashboardConfigSetRequest, GlobalDashboardConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.dashboard.v1.GlobalDashboardConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, GlobalDashboardConfigRequest, GlobalDashboardConfigResponse, ), "/arista.dashboard.v1.GlobalDashboardConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, GlobalDashboardConfigStreamRequest, GlobalDashboardConfigStreamResponse, ), "/arista.dashboard.v1.GlobalDashboardConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, GlobalDashboardConfigStreamRequest, GlobalDashboardConfigStreamResponse, ), "/arista.dashboard.v1.GlobalDashboardConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, GlobalDashboardConfigStreamRequest, MetaResponse, ), "/arista.dashboard.v1.GlobalDashboardConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, GlobalDashboardConfigSetRequest, GlobalDashboardConfigSetResponse, ), }