# Copyright (c) 2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.
# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: arista/redirector.v1/redirector.proto, arista/redirector.v1/services.gen.proto
# plugin: python-aristaproto
# This file has been @generated
__all__ = (
"AssignmentKey",
"Assignment",
"Clusters",
"Cluster",
"MetaResponse",
"AssignmentRequest",
"AssignmentResponse",
"AssignmentSomeRequest",
"AssignmentSomeResponse",
"AssignmentStreamRequest",
"AssignmentStreamResponse",
"AssignmentBatchedStreamRequest",
"AssignmentBatchedStreamResponse",
"AssignmentServiceStub",
"AssignmentServiceBase",
)
from dataclasses import dataclass
from datetime import datetime
from typing import (
TYPE_CHECKING,
AsyncIterator,
Dict,
List,
Optional,
)
import aristaproto
import grpclib
from aristaproto.grpc.grpclib_server import ServiceBase
if TYPE_CHECKING:
import grpclib.server
from aristaproto.grpc.grpclib_client import MetadataLike
from grpclib.metadata import Deadline
[docs]
@dataclass(eq=False, repr=False)
class AssignmentKey(aristaproto.Message):
"""AssignmentKey allows to uniquely identify an assignment."""
system_id: Optional[str] = aristaproto.message_field(
1, wraps=aristaproto.TYPE_STRING
)
"""system_id is the unique identifier of a device."""
[docs]
@dataclass(eq=False, repr=False)
class Assignment(aristaproto.Message):
"""
Assignment returns the information about the regional clusters that the
system is assigned to. Each cluster consists of a series of hosts, each of
which the client can use to connect.
"""
key: "AssignmentKey" = aristaproto.message_field(1)
"""key uniquely identifies the assignment of system_id to the cluster."""
clusters: "Clusters" = aristaproto.message_field(2)
"""clusters that the system is assigned to."""
[docs]
@dataclass(eq=False, repr=False)
class Clusters(aristaproto.Message):
"""
Clusters wraps a cluster list which contain the information about the hosts.
"""
values: List["Cluster"] = aristaproto.message_field(2)
"""values contains the list of clusters associated with the region"""
[docs]
@dataclass(eq=False, repr=False)
class Cluster(aristaproto.Message):
""" """
name: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING)
"""
name of the cluster. The name can change over time as new clusters
are added or removed.
"""
hosts: "___fmp__.RepeatedString" = aristaproto.message_field(2)
"""hosts in the cluster that the devices can connect to."""
[docs]
@dataclass(eq=False, repr=False)
class AssignmentRequest(aristaproto.Message):
""" """
key: "AssignmentKey" = aristaproto.message_field(1)
"""
Key uniquely identifies a Assignment instance to retrieve.
This value must be populated.
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class AssignmentResponse(aristaproto.Message):
""" """
value: "Assignment" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
time: datetime = aristaproto.message_field(2)
"""
Time carries the (UTC) timestamp of the last-modification of the
Assignment instance in this response.
"""
[docs]
@dataclass(eq=False, repr=False)
class AssignmentSomeRequest(aristaproto.Message):
""" """
keys: List["AssignmentKey"] = aristaproto.message_field(1)
"""
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class AssignmentSomeResponse(aristaproto.Message):
""" """
value: "Assignment" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""
Error is an optional field.
It should be filled when there is an error in the GetSome process.
"""
time: datetime = aristaproto.message_field(3)
"""
"""
[docs]
@dataclass(eq=False, repr=False)
class AssignmentStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["Assignment"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each Assignment at end.
* Each Assignment response is fully-specified (all fields set).
* start: Returns the state of each Assignment at start, followed by updates until now.
* Each Assignment response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each Assignment at start, followed by updates
until end.
* Each Assignment response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
[docs]
@dataclass(eq=False, repr=False)
class AssignmentStreamResponse(aristaproto.Message):
""" """
value: "Assignment" = aristaproto.message_field(1)
"""
Value is a value deemed relevant to the initiating request.
This structure will always have its key-field populated. Which other fields are
populated, and why, depends on the value of Operation and what triggered this notification.
"""
time: datetime = aristaproto.message_field(2)
"""Time holds the timestamp of this Assignment's last modification."""
type: "__subscriptions__.Operation" = aristaproto.enum_field(3)
"""
Operation indicates how the Assignment value in this response should be considered.
Under non-subscribe requests, this value should always be INITIAL. In a subscription,
once all initial data is streamed and the client begins to receive modification updates,
you should not see INITIAL again.
"""
[docs]
@dataclass(eq=False, repr=False)
class AssignmentBatchedStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["Assignment"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each Assignment at end.
* Each Assignment response is fully-specified (all fields set).
* start: Returns the state of each Assignment at start, followed by updates until now.
* Each Assignment response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each Assignment at start, followed by updates
until end.
* Each Assignment response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
max_messages: Optional[int] = aristaproto.message_field(
4, wraps=aristaproto.TYPE_UINT32
)
"""
MaxMessages limits the maximum number of messages that can be contained in one batch.
MaxMessages is required to be at least 1.
The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT)
INTERNAL_BATCH_LIMIT is set based on the maximum message size.
"""
[docs]
@dataclass(eq=False, repr=False)
class AssignmentBatchedStreamResponse(aristaproto.Message):
""" """
responses: List["AssignmentStreamResponse"] = aristaproto.message_field(1)
"""
Values are the values deemed relevant to the initiating request.
The length of this structure is guaranteed to be between (inclusive) 1 and
min(req.max_messages, INTERNAL_BATCH_LIMIT).
"""
[docs]
class AssignmentServiceStub(aristaproto.ServiceStub):
""" """
[docs]
async def get_one(
self,
assignment_request: "AssignmentRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AssignmentResponse":
""" """
return await self._unary_unary(
"/arista.redirector.v1.AssignmentService/GetOne",
assignment_request,
AssignmentResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def get_some(
self,
assignment_some_request: "AssignmentSomeRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[AssignmentSomeResponse]":
""" """
async for response in self._unary_stream(
"/arista.redirector.v1.AssignmentService/GetSome",
assignment_some_request,
AssignmentSomeResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def get_all(
self,
assignment_stream_request: "AssignmentStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[AssignmentStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.redirector.v1.AssignmentService/GetAll",
assignment_stream_request,
AssignmentStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe(
self,
assignment_stream_request: "AssignmentStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[AssignmentStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.redirector.v1.AssignmentService/Subscribe",
assignment_stream_request,
AssignmentStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def get_all_batched(
self,
assignment_batched_stream_request: "AssignmentBatchedStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[AssignmentBatchedStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.redirector.v1.AssignmentService/GetAllBatched",
assignment_batched_stream_request,
AssignmentBatchedStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe_batched(
self,
assignment_batched_stream_request: "AssignmentBatchedStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[AssignmentBatchedStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.redirector.v1.AssignmentService/SubscribeBatched",
assignment_batched_stream_request,
AssignmentBatchedStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
from .... import fmp as ___fmp__
from ... import subscriptions as __subscriptions__
from ... import time as __time__
[docs]
class AssignmentServiceBase(ServiceBase):
""" """
[docs]
async def get_one(
self, assignment_request: "AssignmentRequest"
) -> "AssignmentResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_some(
self, assignment_some_request: "AssignmentSomeRequest"
) -> AsyncIterator[AssignmentSomeResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all(
self, assignment_stream_request: "AssignmentStreamRequest"
) -> AsyncIterator[AssignmentStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe(
self, assignment_stream_request: "AssignmentStreamRequest"
) -> AsyncIterator[AssignmentStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all_batched(
self, assignment_batched_stream_request: "AssignmentBatchedStreamRequest"
) -> AsyncIterator[AssignmentBatchedStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe_batched(
self, assignment_batched_stream_request: "AssignmentBatchedStreamRequest"
) -> AsyncIterator[AssignmentBatchedStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one(
self, stream: "grpclib.server.Stream[AssignmentRequest, AssignmentResponse]"
) -> None:
request = await stream.recv_message()
response = await self.get_one(request)
await stream.send_message(response)
async def __rpc_get_some(
self,
stream: "grpclib.server.Stream[AssignmentSomeRequest, AssignmentSomeResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_some,
stream,
request,
)
async def __rpc_get_all(
self,
stream: "grpclib.server.Stream[AssignmentStreamRequest, AssignmentStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all,
stream,
request,
)
async def __rpc_subscribe(
self,
stream: "grpclib.server.Stream[AssignmentStreamRequest, AssignmentStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe,
stream,
request,
)
async def __rpc_get_meta(
self, stream: "grpclib.server.Stream[AssignmentStreamRequest, MetaResponse]"
) -> None:
request = await stream.recv_message()
response = await self.get_meta(request)
await stream.send_message(response)
async def __rpc_subscribe_meta(
self, stream: "grpclib.server.Stream[AssignmentStreamRequest, MetaResponse]"
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_meta,
stream,
request,
)
async def __rpc_get_all_batched(
self,
stream: "grpclib.server.Stream[AssignmentBatchedStreamRequest, AssignmentBatchedStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all_batched,
stream,
request,
)
async def __rpc_subscribe_batched(
self,
stream: "grpclib.server.Stream[AssignmentBatchedStreamRequest, AssignmentBatchedStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_batched,
stream,
request,
)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/arista.redirector.v1.AssignmentService/GetOne": grpclib.const.Handler(
self.__rpc_get_one,
grpclib.const.Cardinality.UNARY_UNARY,
AssignmentRequest,
AssignmentResponse,
),
"/arista.redirector.v1.AssignmentService/GetSome": grpclib.const.Handler(
self.__rpc_get_some,
grpclib.const.Cardinality.UNARY_STREAM,
AssignmentSomeRequest,
AssignmentSomeResponse,
),
"/arista.redirector.v1.AssignmentService/GetAll": grpclib.const.Handler(
self.__rpc_get_all,
grpclib.const.Cardinality.UNARY_STREAM,
AssignmentStreamRequest,
AssignmentStreamResponse,
),
"/arista.redirector.v1.AssignmentService/Subscribe": grpclib.const.Handler(
self.__rpc_subscribe,
grpclib.const.Cardinality.UNARY_STREAM,
AssignmentStreamRequest,
AssignmentStreamResponse,
),
"/arista.redirector.v1.AssignmentService/GetMeta": grpclib.const.Handler(
self.__rpc_get_meta,
grpclib.const.Cardinality.UNARY_UNARY,
AssignmentStreamRequest,
MetaResponse,
),
"/arista.redirector.v1.AssignmentService/SubscribeMeta": grpclib.const.Handler(
self.__rpc_subscribe_meta,
grpclib.const.Cardinality.UNARY_STREAM,
AssignmentStreamRequest,
MetaResponse,
),
"/arista.redirector.v1.AssignmentService/GetAllBatched": grpclib.const.Handler(
self.__rpc_get_all_batched,
grpclib.const.Cardinality.UNARY_STREAM,
AssignmentBatchedStreamRequest,
AssignmentBatchedStreamResponse,
),
"/arista.redirector.v1.AssignmentService/SubscribeBatched": grpclib.const.Handler(
self.__rpc_subscribe_batched,
grpclib.const.Cardinality.UNARY_STREAM,
AssignmentBatchedStreamRequest,
AssignmentBatchedStreamResponse,
),
}