# Copyright (c) 2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.
# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: arista/dashboard.v1/dashboard.proto, arista/dashboard.v1/services.gen.proto
# plugin: python-aristaproto
# This file has been @generated
__all__ = (
"Position",
"Dimensions",
"WidgetStyles",
"Widget",
"Widgets",
"DashboardKey",
"DashboardConfig",
"DashboardMetadata",
"Filter",
"Dashboard",
"GlobalDashboardConfig",
"MetaResponse",
"DashboardRequest",
"DashboardResponse",
"DashboardSomeRequest",
"DashboardSomeResponse",
"DashboardStreamRequest",
"DashboardStreamResponse",
"DashboardBatchedStreamRequest",
"DashboardBatchedStreamResponse",
"DashboardConfigRequest",
"DashboardConfigResponse",
"DashboardConfigSomeRequest",
"DashboardConfigSomeResponse",
"DashboardConfigStreamRequest",
"DashboardConfigStreamResponse",
"DashboardConfigBatchedStreamRequest",
"DashboardConfigBatchedStreamResponse",
"DashboardConfigSetRequest",
"DashboardConfigSetResponse",
"DashboardConfigSetSomeRequest",
"DashboardConfigSetSomeResponse",
"DashboardConfigDeleteRequest",
"DashboardConfigDeleteResponse",
"DashboardConfigDeleteSomeRequest",
"DashboardConfigDeleteSomeResponse",
"DashboardConfigDeleteAllRequest",
"DashboardConfigDeleteAllResponse",
"GlobalDashboardConfigRequest",
"GlobalDashboardConfigResponse",
"GlobalDashboardConfigStreamRequest",
"GlobalDashboardConfigStreamResponse",
"GlobalDashboardConfigSetRequest",
"GlobalDashboardConfigSetResponse",
"DashboardServiceStub",
"DashboardServiceBase",
"DashboardConfigServiceStub",
"DashboardConfigServiceBase",
"GlobalDashboardConfigServiceStub",
"GlobalDashboardConfigServiceBase",
)
from dataclasses import dataclass
from datetime import datetime
from typing import (
TYPE_CHECKING,
AsyncIterator,
Dict,
List,
Optional,
)
import aristaproto
import grpclib
from aristaproto.grpc.grpclib_server import ServiceBase
if TYPE_CHECKING:
import grpclib.server
from aristaproto.grpc.grpclib_client import MetadataLike
from grpclib.metadata import Deadline
[docs]
@dataclass(eq=False, repr=False)
class Position(aristaproto.Message):
"""Position represents a cell position in the UI."""
x: Optional[int] = aristaproto.message_field(1, wraps=aristaproto.TYPE_UINT32)
"""x represents a position in the horizontal axis."""
y: Optional[int] = aristaproto.message_field(2, wraps=aristaproto.TYPE_UINT32)
"""y represents a position in the vertical axis."""
[docs]
@dataclass(eq=False, repr=False)
class Dimensions(aristaproto.Message):
"""
Dimensions represents the dimensions in cells of the widgets in the UI.
"""
width: Optional[int] = aristaproto.message_field(1, wraps=aristaproto.TYPE_UINT32)
"""width of the widget in the UI, represented in number of cells."""
height: Optional[int] = aristaproto.message_field(2, wraps=aristaproto.TYPE_UINT32)
"""height of the widget in the UI, represented in number of cells."""
[docs]
@dataclass(eq=False, repr=False)
class DashboardKey(aristaproto.Message):
"""DashboardKey represents the dashboard unique identifier."""
dashboard_id: Optional[str] = aristaproto.message_field(
1, wraps=aristaproto.TYPE_STRING
)
"""dashboard_id holds the id of the dashboard"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfig(aristaproto.Message):
"""DashboardConfig includes all user-editable dashboard fields."""
key: "DashboardKey" = aristaproto.message_field(1)
"""
key is the unique identifier. It always must be defined.
If set, will create or update a dashboard.
"""
name: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""name is the dashboard name, displayed at the top of the dashboard."""
description: Optional[str] = aristaproto.message_field(
3, wraps=aristaproto.TYPE_STRING
)
"""
description may include details about what is displayed in the dashboard.
"""
widgets: "Widgets" = aristaproto.message_field(4)
"""widgets list of widgets in the dashboard."""
[docs]
@dataclass(eq=False, repr=False)
class Filter(aristaproto.Message):
"""Filter is used to filter dashboards for non exact match cases."""
tags: "___fmp__.RepeatedString" = aristaproto.message_field(1)
"""
tags includes the values to be matched in the dashboard description.
Tags are matched by word. Generally, a tag is prefixed by a '#',
which must be omitted when provided here.
All provided tags must match inside a dashboard for it to be returned.
E.g., to match \"#devices\", the tag should be set to \"devices\".
"""
[docs]
@dataclass(eq=False, repr=False)
class Dashboard(aristaproto.Message):
"""Dashboard state contains all dashboard data."""
key: "DashboardKey" = aristaproto.message_field(1)
"""key is the unique identifier. It will always be defined."""
created_at: datetime = aristaproto.message_field(2)
"""
created_at represents the date the dashboard was first created.
Old dashboards may not have this field set.
"""
created_by: Optional[str] = aristaproto.message_field(
3, wraps=aristaproto.TYPE_STRING
)
"""
created_by keeps the name of the user who first created this dashboard.
Old dashboards may not have this field set.
"""
last_modified_at: datetime = aristaproto.message_field(4)
"""
last_modified_at holds the timestamp this dashboard was last updated by an user.
Old dashboards may not have this field set.
"""
last_modified_by: Optional[str] = aristaproto.message_field(
5, wraps=aristaproto.TYPE_STRING
)
"""
last_modified_by holds the username who last updated this dashboard.
Old dashboards may not have this field set.
"""
meta_data: "DashboardMetadata" = aristaproto.message_field(6)
"""meta_data includes version metadata about the dashboard."""
name: Optional[str] = aristaproto.message_field(7, wraps=aristaproto.TYPE_STRING)
"""name is the dashboard name, displayed at the top of the dashboard."""
description: Optional[str] = aristaproto.message_field(
8, wraps=aristaproto.TYPE_STRING
)
"""
description may include details about what is displayed in the dashboard.
"""
widgets: "Widgets" = aristaproto.message_field(9)
"""widgets list of widgets in the dashboard."""
[docs]
@dataclass(eq=False, repr=False)
class GlobalDashboardConfig(aristaproto.Message):
"""GlobalDashboardConfig holds global configs related to Dashboards."""
default_dashboard: "DashboardKey" = aristaproto.message_field(1)
"""
default_dashboard is the default dashboard shown to a user.
To unset, use an empty key (`{dashboard_id: nil}`) in a `Set()` call.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardRequest(aristaproto.Message):
""" """
key: "DashboardKey" = aristaproto.message_field(1)
"""
Key uniquely identifies a Dashboard instance to retrieve.
This value must be populated.
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardResponse(aristaproto.Message):
""" """
value: "Dashboard" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
time: datetime = aristaproto.message_field(2)
"""
Time carries the (UTC) timestamp of the last-modification of the
Dashboard instance in this response.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardSomeRequest(aristaproto.Message):
""" """
keys: List["DashboardKey"] = aristaproto.message_field(1)
"""
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardSomeResponse(aristaproto.Message):
""" """
value: "Dashboard" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""
Error is an optional field.
It should be filled when there is an error in the GetSome process.
"""
time: datetime = aristaproto.message_field(3)
"""
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["Dashboard"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
filter: List["Filter"] = aristaproto.message_field(2)
"""
For each Dashboard in the list, all populated fields are considered ANDed together
as a filtering operation. Similarly, the list itself is ORed such that any individual
filter that matches a given Dashboard is streamed to the user.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each Dashboard at end.
* Each Dashboard response is fully-specified (all fields set).
* start: Returns the state of each Dashboard at start, followed by updates until now.
* Each Dashboard response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each Dashboard at start, followed by updates
until end.
* Each Dashboard response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardStreamResponse(aristaproto.Message):
""" """
value: "Dashboard" = aristaproto.message_field(1)
"""
Value is a value deemed relevant to the initiating request.
This structure will always have its key-field populated. Which other fields are
populated, and why, depends on the value of Operation and what triggered this notification.
"""
time: datetime = aristaproto.message_field(2)
"""Time holds the timestamp of this Dashboard's last modification."""
type: "__subscriptions__.Operation" = aristaproto.enum_field(3)
"""
Operation indicates how the Dashboard value in this response should be considered.
Under non-subscribe requests, this value should always be INITIAL. In a subscription,
once all initial data is streamed and the client begins to receive modification updates,
you should not see INITIAL again.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardBatchedStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["Dashboard"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
filter: List["Filter"] = aristaproto.message_field(2)
"""
For each Dashboard in the list, all populated fields are considered ANDed together
as a filtering operation. Similarly, the list itself is ORed such that any individual
filter that matches a given Dashboard is streamed to the user.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each Dashboard at end.
* Each Dashboard response is fully-specified (all fields set).
* start: Returns the state of each Dashboard at start, followed by updates until now.
* Each Dashboard response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each Dashboard at start, followed by updates
until end.
* Each Dashboard response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
max_messages: Optional[int] = aristaproto.message_field(
4, wraps=aristaproto.TYPE_UINT32
)
"""
MaxMessages limits the maximum number of messages that can be contained in one batch.
MaxMessages is required to be at least 1.
The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT)
INTERNAL_BATCH_LIMIT is set based on the maximum message size.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardBatchedStreamResponse(aristaproto.Message):
""" """
responses: List["DashboardStreamResponse"] = aristaproto.message_field(1)
"""
Values are the values deemed relevant to the initiating request.
The length of this structure is guaranteed to be between (inclusive) 1 and
min(req.max_messages, INTERNAL_BATCH_LIMIT).
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigRequest(aristaproto.Message):
""" """
key: "DashboardKey" = aristaproto.message_field(1)
"""
Key uniquely identifies a DashboardConfig instance to retrieve.
This value must be populated.
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigResponse(aristaproto.Message):
""" """
value: "DashboardConfig" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
time: datetime = aristaproto.message_field(2)
"""
Time carries the (UTC) timestamp of the last-modification of the
DashboardConfig instance in this response.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigSomeRequest(aristaproto.Message):
""" """
keys: List["DashboardKey"] = aristaproto.message_field(1)
"""
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigSomeResponse(aristaproto.Message):
""" """
value: "DashboardConfig" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""
Error is an optional field.
It should be filled when there is an error in the GetSome process.
"""
time: datetime = aristaproto.message_field(3)
"""
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["DashboardConfig"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each DashboardConfig at end.
* Each DashboardConfig response is fully-specified (all fields set).
* start: Returns the state of each DashboardConfig at start, followed by updates until now.
* Each DashboardConfig response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each DashboardConfig at start, followed by updates
until end.
* Each DashboardConfig response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigStreamResponse(aristaproto.Message):
""" """
value: "DashboardConfig" = aristaproto.message_field(1)
"""
Value is a value deemed relevant to the initiating request.
This structure will always have its key-field populated. Which other fields are
populated, and why, depends on the value of Operation and what triggered this notification.
"""
time: datetime = aristaproto.message_field(2)
"""
Time holds the timestamp of this DashboardConfig's last modification.
"""
type: "__subscriptions__.Operation" = aristaproto.enum_field(3)
"""
Operation indicates how the DashboardConfig value in this response should be considered.
Under non-subscribe requests, this value should always be INITIAL. In a subscription,
once all initial data is streamed and the client begins to receive modification updates,
you should not see INITIAL again.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigBatchedStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["DashboardConfig"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each DashboardConfig at end.
* Each DashboardConfig response is fully-specified (all fields set).
* start: Returns the state of each DashboardConfig at start, followed by updates until now.
* Each DashboardConfig response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each DashboardConfig at start, followed by updates
until end.
* Each DashboardConfig response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
max_messages: Optional[int] = aristaproto.message_field(
4, wraps=aristaproto.TYPE_UINT32
)
"""
MaxMessages limits the maximum number of messages that can be contained in one batch.
MaxMessages is required to be at least 1.
The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT)
INTERNAL_BATCH_LIMIT is set based on the maximum message size.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigBatchedStreamResponse(aristaproto.Message):
""" """
responses: List["DashboardConfigStreamResponse"] = aristaproto.message_field(1)
"""
Values are the values deemed relevant to the initiating request.
The length of this structure is guaranteed to be between (inclusive) 1 and
min(req.max_messages, INTERNAL_BATCH_LIMIT).
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigSetRequest(aristaproto.Message):
""" """
value: "DashboardConfig" = aristaproto.message_field(1)
"""
DashboardConfig carries the value to set into the datastore.
See the documentation on the DashboardConfig struct for which fields are required.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigSetResponse(aristaproto.Message):
""" """
value: "DashboardConfig" = aristaproto.message_field(1)
"""
Value carries all the values given in the DashboardConfigSetRequest as well
as any server-generated values.
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the (UTC) timestamp at which the system recognizes the
creation. The only guarantees made about this timestamp are:
- it is after the time the request was received
- a time-ranged query with StartTime==CreatedAt will include this instance.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigSetSomeRequest(aristaproto.Message):
""" """
values: List["DashboardConfig"] = aristaproto.message_field(1)
"""
value contains a list of DashboardConfig values to write.
It is possible to provide more values than can fit within either:
- the maxiumum send size of the client
- the maximum receive size of the server
If this error occurs you must reduce the number of values sent.
See gRPC \"maximum message size\" documentation for more information.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigSetSomeResponse(aristaproto.Message):
""" """
key: "DashboardKey" = aristaproto.message_field(1)
"""
"""
error: str = aristaproto.string_field(2)
"""
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigDeleteRequest(aristaproto.Message):
""" """
key: "DashboardKey" = aristaproto.message_field(1)
"""
Key indicates which DashboardConfig instance to remove.
This field must always be set.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigDeleteResponse(aristaproto.Message):
""" """
key: "DashboardKey" = aristaproto.message_field(1)
"""Key echoes back the key of the deleted DashboardConfig instance."""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the (UTC) timestamp at which the system recognizes the
deletion. The only guarantees made about this timestamp are:
- it is after the time the request was received
- a time-ranged query with StartTime==DeletedAt will not include this instance.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigDeleteSomeRequest(aristaproto.Message):
""" """
keys: List["DashboardKey"] = aristaproto.message_field(1)
"""key contains a list of DashboardConfig keys to delete"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigDeleteSomeResponse(aristaproto.Message):
"""
DashboardConfigDeleteSomeResponse is only sent when there is an error.
"""
key: "DashboardKey" = aristaproto.message_field(1)
"""
"""
error: str = aristaproto.string_field(2)
"""
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigDeleteAllRequest(aristaproto.Message):
""" """
partial_eq_filter: List["DashboardConfig"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a DeleteAll.
This requires all provided fields to be equal to the response.
A filtered DeleteAll will use GetAll with filter to find things to delete.
"""
[docs]
@dataclass(eq=False, repr=False)
class DashboardConfigDeleteAllResponse(aristaproto.Message):
""" """
type: "___fmp__.DeleteError" = aristaproto.enum_field(1)
"""
This describes the class of delete error.
A DeleteAllResponse is only sent when there is an error.
"""
error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""This indicates the error message from the delete failure."""
key: "DashboardKey" = aristaproto.message_field(3)
"""
This is the key of the DashboardConfig instance that failed to be deleted.
"""
time: datetime = aristaproto.message_field(4)
"""Time indicates the (UTC) timestamp when the key was being deleted."""
[docs]
@dataclass(eq=False, repr=False)
class GlobalDashboardConfigRequest(aristaproto.Message):
""" """
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class GlobalDashboardConfigResponse(aristaproto.Message):
""" """
value: "GlobalDashboardConfig" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
time: datetime = aristaproto.message_field(2)
"""
Time carries the (UTC) timestamp of the last-modification of the
GlobalDashboardConfig instance in this response.
"""
[docs]
@dataclass(eq=False, repr=False)
class GlobalDashboardConfigStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["GlobalDashboardConfig"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each GlobalDashboardConfig at end.
* Each GlobalDashboardConfig response is fully-specified (all fields set).
* start: Returns the state of each GlobalDashboardConfig at start, followed by updates until now.
* Each GlobalDashboardConfig response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each GlobalDashboardConfig at start, followed by updates
until end.
* Each GlobalDashboardConfig response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
[docs]
@dataclass(eq=False, repr=False)
class GlobalDashboardConfigStreamResponse(aristaproto.Message):
""" """
value: "GlobalDashboardConfig" = aristaproto.message_field(1)
"""
Value is a value deemed relevant to the initiating request.
This structure will always have its key-field populated. Which other fields are
populated, and why, depends on the value of Operation and what triggered this notification.
"""
time: datetime = aristaproto.message_field(2)
"""
Time holds the timestamp of this GlobalDashboardConfig's last modification.
"""
type: "__subscriptions__.Operation" = aristaproto.enum_field(3)
"""
Operation indicates how the GlobalDashboardConfig value in this response should be considered.
Under non-subscribe requests, this value should always be INITIAL. In a subscription,
once all initial data is streamed and the client begins to receive modification updates,
you should not see INITIAL again.
"""
[docs]
@dataclass(eq=False, repr=False)
class GlobalDashboardConfigSetRequest(aristaproto.Message):
""" """
value: "GlobalDashboardConfig" = aristaproto.message_field(1)
"""
GlobalDashboardConfig carries the value to set into the datastore.
See the documentation on the GlobalDashboardConfig struct for which fields are required.
"""
[docs]
@dataclass(eq=False, repr=False)
class GlobalDashboardConfigSetResponse(aristaproto.Message):
""" """
value: "GlobalDashboardConfig" = aristaproto.message_field(1)
"""
Value carries all the values given in the GlobalDashboardConfigSetRequest as well
as any server-generated values.
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the (UTC) timestamp at which the system recognizes the
creation. The only guarantees made about this timestamp are:
- it is after the time the request was received
- a time-ranged query with StartTime==CreatedAt will include this instance.
"""
[docs]
class DashboardServiceStub(aristaproto.ServiceStub):
""" """
[docs]
async def get_one(
self,
dashboard_request: "DashboardRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "DashboardResponse":
""" """
return await self._unary_unary(
"/arista.dashboard.v1.DashboardService/GetOne",
dashboard_request,
DashboardResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def get_some(
self,
dashboard_some_request: "DashboardSomeRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardSomeResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardService/GetSome",
dashboard_some_request,
DashboardSomeResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def get_all(
self,
dashboard_stream_request: "DashboardStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardService/GetAll",
dashboard_stream_request,
DashboardStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe(
self,
dashboard_stream_request: "DashboardStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardService/Subscribe",
dashboard_stream_request,
DashboardStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def get_all_batched(
self,
dashboard_batched_stream_request: "DashboardBatchedStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardBatchedStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardService/GetAllBatched",
dashboard_batched_stream_request,
DashboardBatchedStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe_batched(
self,
dashboard_batched_stream_request: "DashboardBatchedStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardBatchedStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardService/SubscribeBatched",
dashboard_batched_stream_request,
DashboardBatchedStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
class DashboardConfigServiceStub(aristaproto.ServiceStub):
""" """
[docs]
async def get_one(
self,
dashboard_config_request: "DashboardConfigRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "DashboardConfigResponse":
""" """
return await self._unary_unary(
"/arista.dashboard.v1.DashboardConfigService/GetOne",
dashboard_config_request,
DashboardConfigResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def get_some(
self,
dashboard_config_some_request: "DashboardConfigSomeRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardConfigSomeResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardConfigService/GetSome",
dashboard_config_some_request,
DashboardConfigSomeResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def get_all(
self,
dashboard_config_stream_request: "DashboardConfigStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardConfigStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardConfigService/GetAll",
dashboard_config_stream_request,
DashboardConfigStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe(
self,
dashboard_config_stream_request: "DashboardConfigStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardConfigStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardConfigService/Subscribe",
dashboard_config_stream_request,
DashboardConfigStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def set(
self,
dashboard_config_set_request: "DashboardConfigSetRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "DashboardConfigSetResponse":
""" """
return await self._unary_unary(
"/arista.dashboard.v1.DashboardConfigService/Set",
dashboard_config_set_request,
DashboardConfigSetResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def set_some(
self,
dashboard_config_set_some_request: "DashboardConfigSetSomeRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardConfigSetSomeResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardConfigService/SetSome",
dashboard_config_set_some_request,
DashboardConfigSetSomeResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def delete(
self,
dashboard_config_delete_request: "DashboardConfigDeleteRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "DashboardConfigDeleteResponse":
""" """
return await self._unary_unary(
"/arista.dashboard.v1.DashboardConfigService/Delete",
dashboard_config_delete_request,
DashboardConfigDeleteResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def delete_some(
self,
dashboard_config_delete_some_request: "DashboardConfigDeleteSomeRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardConfigDeleteSomeResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardConfigService/DeleteSome",
dashboard_config_delete_some_request,
DashboardConfigDeleteSomeResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def delete_all(
self,
dashboard_config_delete_all_request: "DashboardConfigDeleteAllRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardConfigDeleteAllResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardConfigService/DeleteAll",
dashboard_config_delete_all_request,
DashboardConfigDeleteAllResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def get_all_batched(
self,
dashboard_config_batched_stream_request: "DashboardConfigBatchedStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardConfigBatchedStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardConfigService/GetAllBatched",
dashboard_config_batched_stream_request,
DashboardConfigBatchedStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe_batched(
self,
dashboard_config_batched_stream_request: "DashboardConfigBatchedStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[DashboardConfigBatchedStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.DashboardConfigService/SubscribeBatched",
dashboard_config_batched_stream_request,
DashboardConfigBatchedStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
class GlobalDashboardConfigServiceStub(aristaproto.ServiceStub):
""" """
[docs]
async def get_one(
self,
global_dashboard_config_request: "GlobalDashboardConfigRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "GlobalDashboardConfigResponse":
""" """
return await self._unary_unary(
"/arista.dashboard.v1.GlobalDashboardConfigService/GetOne",
global_dashboard_config_request,
GlobalDashboardConfigResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def get_all(
self,
global_dashboard_config_stream_request: "GlobalDashboardConfigStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[GlobalDashboardConfigStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.GlobalDashboardConfigService/GetAll",
global_dashboard_config_stream_request,
GlobalDashboardConfigStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe(
self,
global_dashboard_config_stream_request: "GlobalDashboardConfigStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[GlobalDashboardConfigStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.dashboard.v1.GlobalDashboardConfigService/Subscribe",
global_dashboard_config_stream_request,
GlobalDashboardConfigStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def set(
self,
global_dashboard_config_set_request: "GlobalDashboardConfigSetRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "GlobalDashboardConfigSetResponse":
""" """
return await self._unary_unary(
"/arista.dashboard.v1.GlobalDashboardConfigService/Set",
global_dashboard_config_set_request,
GlobalDashboardConfigSetResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
from .... import fmp as ___fmp__
from ... import subscriptions as __subscriptions__
from ... import time as __time__
[docs]
class DashboardServiceBase(ServiceBase):
""" """
[docs]
async def get_one(
self, dashboard_request: "DashboardRequest"
) -> "DashboardResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_some(
self, dashboard_some_request: "DashboardSomeRequest"
) -> AsyncIterator[DashboardSomeResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all(
self, dashboard_stream_request: "DashboardStreamRequest"
) -> AsyncIterator[DashboardStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe(
self, dashboard_stream_request: "DashboardStreamRequest"
) -> AsyncIterator[DashboardStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all_batched(
self, dashboard_batched_stream_request: "DashboardBatchedStreamRequest"
) -> AsyncIterator[DashboardBatchedStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe_batched(
self, dashboard_batched_stream_request: "DashboardBatchedStreamRequest"
) -> AsyncIterator[DashboardBatchedStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one(
self, stream: "grpclib.server.Stream[DashboardRequest, DashboardResponse]"
) -> None:
request = await stream.recv_message()
response = await self.get_one(request)
await stream.send_message(response)
async def __rpc_get_some(
self,
stream: "grpclib.server.Stream[DashboardSomeRequest, DashboardSomeResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_some,
stream,
request,
)
async def __rpc_get_all(
self,
stream: "grpclib.server.Stream[DashboardStreamRequest, DashboardStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all,
stream,
request,
)
async def __rpc_subscribe(
self,
stream: "grpclib.server.Stream[DashboardStreamRequest, DashboardStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe,
stream,
request,
)
async def __rpc_get_meta(
self, stream: "grpclib.server.Stream[DashboardStreamRequest, MetaResponse]"
) -> None:
request = await stream.recv_message()
response = await self.get_meta(request)
await stream.send_message(response)
async def __rpc_subscribe_meta(
self, stream: "grpclib.server.Stream[DashboardStreamRequest, MetaResponse]"
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_meta,
stream,
request,
)
async def __rpc_get_all_batched(
self,
stream: "grpclib.server.Stream[DashboardBatchedStreamRequest, DashboardBatchedStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all_batched,
stream,
request,
)
async def __rpc_subscribe_batched(
self,
stream: "grpclib.server.Stream[DashboardBatchedStreamRequest, DashboardBatchedStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_batched,
stream,
request,
)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/arista.dashboard.v1.DashboardService/GetOne": grpclib.const.Handler(
self.__rpc_get_one,
grpclib.const.Cardinality.UNARY_UNARY,
DashboardRequest,
DashboardResponse,
),
"/arista.dashboard.v1.DashboardService/GetSome": grpclib.const.Handler(
self.__rpc_get_some,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardSomeRequest,
DashboardSomeResponse,
),
"/arista.dashboard.v1.DashboardService/GetAll": grpclib.const.Handler(
self.__rpc_get_all,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardStreamRequest,
DashboardStreamResponse,
),
"/arista.dashboard.v1.DashboardService/Subscribe": grpclib.const.Handler(
self.__rpc_subscribe,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardStreamRequest,
DashboardStreamResponse,
),
"/arista.dashboard.v1.DashboardService/GetMeta": grpclib.const.Handler(
self.__rpc_get_meta,
grpclib.const.Cardinality.UNARY_UNARY,
DashboardStreamRequest,
MetaResponse,
),
"/arista.dashboard.v1.DashboardService/SubscribeMeta": grpclib.const.Handler(
self.__rpc_subscribe_meta,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardStreamRequest,
MetaResponse,
),
"/arista.dashboard.v1.DashboardService/GetAllBatched": grpclib.const.Handler(
self.__rpc_get_all_batched,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardBatchedStreamRequest,
DashboardBatchedStreamResponse,
),
"/arista.dashboard.v1.DashboardService/SubscribeBatched": grpclib.const.Handler(
self.__rpc_subscribe_batched,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardBatchedStreamRequest,
DashboardBatchedStreamResponse,
),
}
[docs]
class DashboardConfigServiceBase(ServiceBase):
""" """
[docs]
async def get_one(
self, dashboard_config_request: "DashboardConfigRequest"
) -> "DashboardConfigResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_some(
self, dashboard_config_some_request: "DashboardConfigSomeRequest"
) -> AsyncIterator[DashboardConfigSomeResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all(
self, dashboard_config_stream_request: "DashboardConfigStreamRequest"
) -> AsyncIterator[DashboardConfigStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe(
self, dashboard_config_stream_request: "DashboardConfigStreamRequest"
) -> AsyncIterator[DashboardConfigStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def set(
self, dashboard_config_set_request: "DashboardConfigSetRequest"
) -> "DashboardConfigSetResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def set_some(
self, dashboard_config_set_some_request: "DashboardConfigSetSomeRequest"
) -> AsyncIterator[DashboardConfigSetSomeResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def delete(
self, dashboard_config_delete_request: "DashboardConfigDeleteRequest"
) -> "DashboardConfigDeleteResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def delete_some(
self, dashboard_config_delete_some_request: "DashboardConfigDeleteSomeRequest"
) -> AsyncIterator[DashboardConfigDeleteSomeResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def delete_all(
self, dashboard_config_delete_all_request: "DashboardConfigDeleteAllRequest"
) -> AsyncIterator[DashboardConfigDeleteAllResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all_batched(
self,
dashboard_config_batched_stream_request: "DashboardConfigBatchedStreamRequest",
) -> AsyncIterator[DashboardConfigBatchedStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe_batched(
self,
dashboard_config_batched_stream_request: "DashboardConfigBatchedStreamRequest",
) -> AsyncIterator[DashboardConfigBatchedStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one(
self,
stream: "grpclib.server.Stream[DashboardConfigRequest, DashboardConfigResponse]",
) -> None:
request = await stream.recv_message()
response = await self.get_one(request)
await stream.send_message(response)
async def __rpc_get_some(
self,
stream: "grpclib.server.Stream[DashboardConfigSomeRequest, DashboardConfigSomeResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_some,
stream,
request,
)
async def __rpc_get_all(
self,
stream: "grpclib.server.Stream[DashboardConfigStreamRequest, DashboardConfigStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all,
stream,
request,
)
async def __rpc_subscribe(
self,
stream: "grpclib.server.Stream[DashboardConfigStreamRequest, DashboardConfigStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe,
stream,
request,
)
async def __rpc_get_meta(
self,
stream: "grpclib.server.Stream[DashboardConfigStreamRequest, MetaResponse]",
) -> None:
request = await stream.recv_message()
response = await self.get_meta(request)
await stream.send_message(response)
async def __rpc_subscribe_meta(
self,
stream: "grpclib.server.Stream[DashboardConfigStreamRequest, MetaResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_meta,
stream,
request,
)
async def __rpc_set(
self,
stream: "grpclib.server.Stream[DashboardConfigSetRequest, DashboardConfigSetResponse]",
) -> None:
request = await stream.recv_message()
response = await self.set(request)
await stream.send_message(response)
async def __rpc_set_some(
self,
stream: "grpclib.server.Stream[DashboardConfigSetSomeRequest, DashboardConfigSetSomeResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.set_some,
stream,
request,
)
async def __rpc_delete(
self,
stream: "grpclib.server.Stream[DashboardConfigDeleteRequest, DashboardConfigDeleteResponse]",
) -> None:
request = await stream.recv_message()
response = await self.delete(request)
await stream.send_message(response)
async def __rpc_delete_some(
self,
stream: "grpclib.server.Stream[DashboardConfigDeleteSomeRequest, DashboardConfigDeleteSomeResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.delete_some,
stream,
request,
)
async def __rpc_delete_all(
self,
stream: "grpclib.server.Stream[DashboardConfigDeleteAllRequest, DashboardConfigDeleteAllResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.delete_all,
stream,
request,
)
async def __rpc_get_all_batched(
self,
stream: "grpclib.server.Stream[DashboardConfigBatchedStreamRequest, DashboardConfigBatchedStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all_batched,
stream,
request,
)
async def __rpc_subscribe_batched(
self,
stream: "grpclib.server.Stream[DashboardConfigBatchedStreamRequest, DashboardConfigBatchedStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_batched,
stream,
request,
)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/arista.dashboard.v1.DashboardConfigService/GetOne": grpclib.const.Handler(
self.__rpc_get_one,
grpclib.const.Cardinality.UNARY_UNARY,
DashboardConfigRequest,
DashboardConfigResponse,
),
"/arista.dashboard.v1.DashboardConfigService/GetSome": grpclib.const.Handler(
self.__rpc_get_some,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardConfigSomeRequest,
DashboardConfigSomeResponse,
),
"/arista.dashboard.v1.DashboardConfigService/GetAll": grpclib.const.Handler(
self.__rpc_get_all,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardConfigStreamRequest,
DashboardConfigStreamResponse,
),
"/arista.dashboard.v1.DashboardConfigService/Subscribe": grpclib.const.Handler(
self.__rpc_subscribe,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardConfigStreamRequest,
DashboardConfigStreamResponse,
),
"/arista.dashboard.v1.DashboardConfigService/GetMeta": grpclib.const.Handler(
self.__rpc_get_meta,
grpclib.const.Cardinality.UNARY_UNARY,
DashboardConfigStreamRequest,
MetaResponse,
),
"/arista.dashboard.v1.DashboardConfigService/SubscribeMeta": grpclib.const.Handler(
self.__rpc_subscribe_meta,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardConfigStreamRequest,
MetaResponse,
),
"/arista.dashboard.v1.DashboardConfigService/Set": grpclib.const.Handler(
self.__rpc_set,
grpclib.const.Cardinality.UNARY_UNARY,
DashboardConfigSetRequest,
DashboardConfigSetResponse,
),
"/arista.dashboard.v1.DashboardConfigService/SetSome": grpclib.const.Handler(
self.__rpc_set_some,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardConfigSetSomeRequest,
DashboardConfigSetSomeResponse,
),
"/arista.dashboard.v1.DashboardConfigService/Delete": grpclib.const.Handler(
self.__rpc_delete,
grpclib.const.Cardinality.UNARY_UNARY,
DashboardConfigDeleteRequest,
DashboardConfigDeleteResponse,
),
"/arista.dashboard.v1.DashboardConfigService/DeleteSome": grpclib.const.Handler(
self.__rpc_delete_some,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardConfigDeleteSomeRequest,
DashboardConfigDeleteSomeResponse,
),
"/arista.dashboard.v1.DashboardConfigService/DeleteAll": grpclib.const.Handler(
self.__rpc_delete_all,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardConfigDeleteAllRequest,
DashboardConfigDeleteAllResponse,
),
"/arista.dashboard.v1.DashboardConfigService/GetAllBatched": grpclib.const.Handler(
self.__rpc_get_all_batched,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardConfigBatchedStreamRequest,
DashboardConfigBatchedStreamResponse,
),
"/arista.dashboard.v1.DashboardConfigService/SubscribeBatched": grpclib.const.Handler(
self.__rpc_subscribe_batched,
grpclib.const.Cardinality.UNARY_STREAM,
DashboardConfigBatchedStreamRequest,
DashboardConfigBatchedStreamResponse,
),
}
[docs]
class GlobalDashboardConfigServiceBase(ServiceBase):
""" """
[docs]
async def get_one(
self, global_dashboard_config_request: "GlobalDashboardConfigRequest"
) -> "GlobalDashboardConfigResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all(
self,
global_dashboard_config_stream_request: "GlobalDashboardConfigStreamRequest",
) -> AsyncIterator[GlobalDashboardConfigStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe(
self,
global_dashboard_config_stream_request: "GlobalDashboardConfigStreamRequest",
) -> AsyncIterator[GlobalDashboardConfigStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def set(
self, global_dashboard_config_set_request: "GlobalDashboardConfigSetRequest"
) -> "GlobalDashboardConfigSetResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one(
self,
stream: "grpclib.server.Stream[GlobalDashboardConfigRequest, GlobalDashboardConfigResponse]",
) -> None:
request = await stream.recv_message()
response = await self.get_one(request)
await stream.send_message(response)
async def __rpc_get_all(
self,
stream: "grpclib.server.Stream[GlobalDashboardConfigStreamRequest, GlobalDashboardConfigStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all,
stream,
request,
)
async def __rpc_subscribe(
self,
stream: "grpclib.server.Stream[GlobalDashboardConfigStreamRequest, GlobalDashboardConfigStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe,
stream,
request,
)
async def __rpc_subscribe_meta(
self,
stream: "grpclib.server.Stream[GlobalDashboardConfigStreamRequest, MetaResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_meta,
stream,
request,
)
async def __rpc_set(
self,
stream: "grpclib.server.Stream[GlobalDashboardConfigSetRequest, GlobalDashboardConfigSetResponse]",
) -> None:
request = await stream.recv_message()
response = await self.set(request)
await stream.send_message(response)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/arista.dashboard.v1.GlobalDashboardConfigService/GetOne": grpclib.const.Handler(
self.__rpc_get_one,
grpclib.const.Cardinality.UNARY_UNARY,
GlobalDashboardConfigRequest,
GlobalDashboardConfigResponse,
),
"/arista.dashboard.v1.GlobalDashboardConfigService/GetAll": grpclib.const.Handler(
self.__rpc_get_all,
grpclib.const.Cardinality.UNARY_STREAM,
GlobalDashboardConfigStreamRequest,
GlobalDashboardConfigStreamResponse,
),
"/arista.dashboard.v1.GlobalDashboardConfigService/Subscribe": grpclib.const.Handler(
self.__rpc_subscribe,
grpclib.const.Cardinality.UNARY_STREAM,
GlobalDashboardConfigStreamRequest,
GlobalDashboardConfigStreamResponse,
),
"/arista.dashboard.v1.GlobalDashboardConfigService/SubscribeMeta": grpclib.const.Handler(
self.__rpc_subscribe_meta,
grpclib.const.Cardinality.UNARY_STREAM,
GlobalDashboardConfigStreamRequest,
MetaResponse,
),
"/arista.dashboard.v1.GlobalDashboardConfigService/Set": grpclib.const.Handler(
self.__rpc_set,
grpclib.const.Cardinality.UNARY_UNARY,
GlobalDashboardConfigSetRequest,
GlobalDashboardConfigSetResponse,
),
}