# Copyright (c) 2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.
# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: arista/softwaremanagement.v1/services.gen.proto, arista/softwaremanagement.v1/softwaremanagement.proto
# plugin: python-aristaproto
# This file has been @generated
__all__ = (
"SoftwareSource",
"Progress",
"SoftwareType",
"ImageFormatVersion",
"Arch",
"Variant",
"Flavor",
"DigestType",
"RecommendedAction",
"Level",
"RepositoryKey",
"Repository",
"RepositoryConfig",
"SoftwareStatus",
"SoftwareMetadata",
"SwiMetadata",
"ExtensionMetadata",
"ReleasesStatus",
"Releases",
"ConcreteFile",
"ConcreteTypeToConcreteSwiMap",
"MetaResponse",
"ReleasesRequest",
"ReleasesResponse",
"ReleasesStreamRequest",
"ReleasesStreamResponse",
"RepositoryRequest",
"RepositoryResponse",
"RepositorySomeRequest",
"RepositorySomeResponse",
"RepositoryStreamRequest",
"RepositoryStreamResponse",
"RepositoryBatchedStreamRequest",
"RepositoryBatchedStreamResponse",
"RepositoryConfigRequest",
"RepositoryConfigResponse",
"RepositoryConfigSomeRequest",
"RepositoryConfigSomeResponse",
"RepositoryConfigStreamRequest",
"RepositoryConfigStreamResponse",
"RepositoryConfigBatchedStreamRequest",
"RepositoryConfigBatchedStreamResponse",
"RepositoryConfigSetRequest",
"RepositoryConfigSetResponse",
"RepositoryConfigSetSomeRequest",
"RepositoryConfigSetSomeResponse",
"RepositoryConfigDeleteRequest",
"RepositoryConfigDeleteResponse",
"RepositoryConfigDeleteSomeRequest",
"RepositoryConfigDeleteSomeResponse",
"RepositoryConfigDeleteAllRequest",
"RepositoryConfigDeleteAllResponse",
"ReleasesServiceStub",
"ReleasesServiceBase",
"RepositoryServiceStub",
"RepositoryServiceBase",
"RepositoryConfigServiceStub",
"RepositoryConfigServiceBase",
)
from dataclasses import dataclass
from datetime import datetime
from typing import (
TYPE_CHECKING,
AsyncIterator,
Dict,
List,
Optional,
)
import aristaproto
import grpclib
from aristaproto.grpc.grpclib_server import ServiceBase
if TYPE_CHECKING:
import grpclib.server
from aristaproto.grpc.grpclib_client import MetadataLike
from grpclib.metadata import Deadline
[docs]
class SoftwareSource(aristaproto.Enum):
"""SoftwareSource describes the software upload mode."""
UNSPECIFIED = 0
"""SOFTWARE_SOURCE_UNSPECIFIED indicates that the source is not known"""
CLOUD = 1
"""
SOFTWARE_SOURCE_CLOUD indicates that the image or extension has been downloaded from
Arista's Software Download site
"""
LOCAL = 2
"""
SOFTWARE_SOURCE_LOCAL indicates that the image or extension file has been uploaded directly
to CloudVision by the user
"""
PRELOADED = 3
"""
SOFTWARE_SOURCE_PRELOADED indicates that the image or extension file has been added as part
of an upgrade of CloudVision.
"""
NP_MIGRATION = 4
"""
SOFTWARE_SOURCE_NP_MIGRATION indicates that the image or extension file has been internally
transferred from Network Provisioning to Software Management Studio as part of the
migration process.
"""
[docs]
class Progress(aristaproto.Enum):
"""
Progress is used by the Repository service to describe the stages of the software upload
and verification process.
"""
UNSPECIFIED = 0
"""
PROGRESS_UNSPECIFIED indicates that upload progress is not currently reportable
"""
INITIAL = 1
"""PROGRESS_INITIAL indicates that a software upload has begun"""
QUEUED = 2
"""
PROGRESS_QUEUED indicates that an image or extension upload is waiting in a queue
"""
LOADING = 3
"""
PROGRESS_LOADING indicates that an image or extension is currently being retrieved from
Arista's Software Download site or CloudVision file server for processing
"""
VALIDATING = 4
"""
PROGRESS_VALIDATING indicates that an image or extension is being validated
"""
SAVING = 5
"""
PROGRESS_SAVING indicates that an image or extension is being uploaded to
CloudVision file server
"""
SUCCESS = 6
"""
PROGRESS_SUCCESS indicates that an image or extension has been successfully uploaded and
validated. This is a terminal state of an upload.
"""
ERROR = 7
"""
PROGRESS_ERROR indicates that an image or extension failed at either the upload or
validation phase. This is a terminal state of an upload.
"""
[docs]
class SoftwareType(aristaproto.Enum):
"""
SoftwareType is an enum containing the possible Software types. `SOFTWARE_TYPE_TERMINATTR`,
which corresponds to an EOS Streaming Agent, is first-classed due to it's importance to
CloudVision.
"""
UNSPECIFIED = 0
"""
SOFTWARE_TYPE_UNSPECIFIED indicates that the type cannot or has not been determined
"""
SWI = 1
"""
SOFTWARE_TYPE_SWI identifies an EOS swi image type. The corresponding image files will
have a .swi extension.
"""
TERMINATTR = 2
"""
SOFTWARE_TYPE_TERMINATTR identifies a Streaming Agent extension. The corresponding
files will contain the term: \"TerminAttr\" (case insensitive) and a .swix extension.
"""
SWIX = 3
"""
SOFTWARE_TYPE_SWIX identifies an extension type. The corresponding files will have a .swix
extension.
"""
RPM = 4
"""
SOFTWARE_TYPE_RPM identifies an RPM file. The corresponding files will have a .rpm
extension.
"""
[docs]
class Arch(aristaproto.Enum):
"""
Arch is an enum used to specify the target architecture of an image or extension.
"""
UNSPECIFIED = 0
"""ARCH_UNSPECIFIED indicates an unknown or unspecified architecture"""
NO_ARCH = 1
"""ARCH_NO_ARCH indicates that the software is architecture agnostic"""
I386 = 2
"""
ARCH_I386 indicates that the software is intended for the i386 architecture
"""
I686 = 3
"""
ARCH_I686 indicates that the software is intended for the i686 architecture
"""
X86_64 = 4
"""
ARCH_X86_64 indicates that the software is intended to be run on 64-bit platforms
"""
AARCH64 = 5
"""
ARCH_AARCH64 indicates that the software is intended to be run on ARM 64-bit platforms
"""
MULTIARCH = 6
"""
ARCH_MULTIARCH indicates that the software supports all architectures
"""
[docs]
class Variant(aristaproto.Enum):
"""Variant is an enum containing the possible .swi variant types."""
UNSPECIFIED = 0
"""
VARIANT_UNSPECIFIED is used to indicate an unknown or unspecified EOS image variant
"""
US = 1
"""VARIANT_US specifies a US or default version of an EOS image variant"""
INTERNATIONAL = 2
"""VARIANT_INTERNATIONAL specifies an international EOS image variant"""
[docs]
class Flavor(aristaproto.Enum):
"""Flavor is an enum containing the possible .swi flavor types."""
UNSPECIFIED = 0
"""FLAVOR_UNSPECIFIED indicates that the swi image flavor is unknown"""
DEFAULT = 1
"""FLAVOR_DEFAULT is the default swi image"""
_2GB = 2
"""
FLAVOR_2GB is an obsolete image supporting a reduced-size 2GB image version that runs
only on older models of devices with two gigabytes of flash memory
"""
CLOUD = 3
"""
FLAVOR_CLOUD is a swi image that runs in a virtualized cloud environment
"""
DPE = 4
"""FLAVOR_DPE is a swi image that provides MacSec without a license"""
PDP = 5
"""
FLAVOR_PDP is an image that defaults to support PDP but can also support CoPP
"""
_2GB_PDP = 6
"""
FLAVOR_2GB_PDP is a 2GB image that defaults to support PDP if available, but can support
CoPP
"""
DPE_CTNR = 7
"""
FLAVOR_DPE_CTNR is a swi image that provides MacSec without a license and natively
supports running containers on EOS
"""
[docs]
class DigestType(aristaproto.Enum):
"""
DigestType is the digest algorithm used as a checksum on the software file.
"""
UNSPECIFIED = 0
"""DIGEST_TYPE_UNSPECIFIED indicates that the digest type is unknown"""
SHA512 = 1
"""
DIGEST_TYPE_SHA512 indicates that the SHA512 algorithm is used to calculate the digest
"""
[docs]
class RecommendedAction(aristaproto.Enum):
"""
RecommendedAction is an enum containing all of the possible steps a user can take to address
problems encountered while performing software management-related actions.
"""
UNSPECIFIED = 0
"""
RECOMMENDED_ACTION_UNSPECIFIED indicates that no remedial action is needed.
"""
CONTACT_TAC = 1
"""
RECOMMENDED_ACTION_CONTACT_TAC indicates that in order to resolve the current issue,
Arista TAC assistance is recommended.
"""
ADD_TOKEN = 2
"""
RECOMMENDED_ACTION_ADD_TOKEN indicates that a particular issue can be resolved by adding
the access token stored in CloudVision.
"""
UPDATE_TOKEN = 3
"""
RECOMMENDED_ACTION_UPDATE_TOKEN indicates that a particular issue can be resolved by updating
the access token stored in CloudVision.
"""
ACCEPT_EULA = 4
"""
RECOMMENDED_ACTION_ACCEPT_EULA indicates that a particular issue can be resolved by accepting
the EULA agreement available at the Arista Software Download site.
"""
RETRY = 5
"""
RECOMMENDED_ACTION_RETRY indicates that a paticular issue may be resolved by
re-attempting the previous action.
"""
[docs]
class Level(aristaproto.Enum):
"""
Level specifies relative importance of the information returned in the message field of the
software releases status.
"""
UNSPECIFIED = 0
"""LEVEL_UNSPECIFIED is set when the message level is unknown"""
INFO = 1
"""
LEVEL_INFO is set when the message contains useful, informational content. No user action is
required.
"""
WARNING = 2
"""
LEVEL_WARNING is set when the message contains important information for the user that
should be addressed for continued proper behavior
"""
ERROR = 3
"""
LEVEL_ERROR is set when the message contains information alerting the user to an error
condition that occurred while retrieving the set of software releases. Remedial user action
may be required to clear this condition.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryKey(aristaproto.Message):
"""
RepositoryKey is the key used by the `Repository` state and config models to uniquely
identify an image or extension.
The `name` field must contain the file suffix. Valid file types include .swi, .swix and .rpm
files. Streaming Agent files must also contain the term: `TerminAttr` (case insensitive).
"""
name: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING)
"""
name is a unique identifier that will be used to identify the image or extension in
CloudVision
"""
[docs]
@dataclass(eq=False, repr=False)
class Repository(aristaproto.Message):
"""
Repository is the state model that displays metadata for software images and extensions.
`Repository` objects are created when new software is added via a `Set`
request issued to the `RepositoryConfig` resource or when using the Software Upload REST API.
The `key` uniquely identifies the software state and is identical to the key used to create it
via the call to `Set`.
"""
key: "RepositoryKey" = aristaproto.message_field(1)
"""key is the unique identifier"""
uploaded_by: Optional[str] = aristaproto.message_field(
2, wraps=aristaproto.TYPE_STRING
)
"""uploaded_by specifies the author of the upload"""
uploaded_at: datetime = aristaproto.message_field(3)
"""uploaded_at specifies the date and time of the upload"""
last_modified_by: Optional[str] = aristaproto.message_field(
4, wraps=aristaproto.TYPE_STRING
)
"""
last_modified_by is the author of the most recent metadata modification
"""
last_modified_at: datetime = aristaproto.message_field(5)
"""
last_modified_at is the date and time of the most recent metadata modification
"""
software_status: "SoftwareStatus" = aristaproto.message_field(6)
"""
software_status displays status of the software that is being added to CloudVision
"""
software_metadata: "SoftwareMetadata" = aristaproto.message_field(7)
"""software_metadata displays details about available software"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfig(aristaproto.Message):
"""
RepositoryConfig objects are used to add, delete, and update images or extensions in
CloudVision.
The `Set` method is used to add and modify software stored in CloudVision.
The `Delete` method is used to remove software.
Adding Software to CloudVision:
When a `Set` is made, the software identified by the URI will be downloaded from Arista's
Software Download site and stored in CloudVision. The status of the software will be reflected
by a corresponding `Repository` entry identified by the same `RepositoryKey`.
Here is an example where a .swi EOS image is added to CloudVision:
```
{
\"value\": {
\"key\": {
\"name\": \"EOS64-4.30.0.1F.swi\"
},
\"uri\": \"/support/download/EOS-USA/Active Releases/4.30/EOS-4.30.0.1F/EOS64-4.30.0.1F.swi\",
\"rebootRequired\": true
},
}
```
The following is an example of a configuration that will add the
_AristaAppForSplunk-2.0.1-4.27.swix_ extension to CloudVision:
```
{
\"value\": {
\"key\": {
\"name\": \"AristaAppForSplunk-2.0.1-4.27.swix\"
},
\"uri\": \"/support/download/Extensions/Splunk/AristaAppForSplunk-2.0.1-4.27.swix\",
\"rebootRequired\": false
},
}
```
Updating Existing Software:
`Repository` metadata objects can be _updated_ by using the config `Set` method and
specifying the field or fields to be modified. Currently, `reboot_required` is the only field
that can be modified after software has been added to CloudVision. All other fields must match
the existing configuration, or can be omitted.
NOTE: The `reboot_required` field must be `true` for .swi images.
In this example, we update the `reboot_required` field changing it from `false` to `true`
while leaving out the uri field:
```
{
\"value\": {
\"key\": {
\"name\": \"AristaAppForSplunk-2.0.1-4.27.swix\"
},
\"rebootRequired\": true
},
}
```
Deleting Software:
`Repository` objects can be deleted using the `RepositoryConfig` `Delete` method.
A `Delete` request will specify the `key` which uniquely identifies the image or extension in
CloudVision. For example, a delete request will look like the following:
```
{
\"value\": {
\"key\": {
\"name\": \"AristaAppForSplunk-2.0.1-4.27.swix\"
},
},
}
```
"""
key: "RepositoryKey" = aristaproto.message_field(1)
"""
key is a unique identifier that must be supplied by the user to start a valid upload of
software
"""
uri: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""
uri value depends on the origin of the software. If you are downloading an image or
extension file from Arista's Software Download site, use the site identifier. If you are
uploading software from your computer using the Software Upload REST API, this field will
be populated with the CloudVision file server identifier.
"""
reboot_required: Optional[bool] = aristaproto.message_field(
3, wraps=aristaproto.TYPE_BOOL
)
"""
reboot_required is a Boolean indicating if a device requires a reboot after software
installation. .swi images always require a reboot.
"""
[docs]
@dataclass(eq=False, repr=False)
class SoftwareStatus(aristaproto.Message):
"""
SoftwareStatus contains a progress field and a description explaining the progress during
software upload.
"""
progress: "Progress" = aristaproto.enum_field(1)
"""progress displays the current status of the software upload"""
message: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""message describes the progress of the software upload"""
recommended_action: "RecommendedAction" = aristaproto.enum_field(3)
"""
recommended_action expresses a particular user-actionable step that can be carried out to
ensure the proper operation of CloudVision while managing software.
"""
[docs]
@dataclass(eq=False, repr=False)
class ReleasesStatus(aristaproto.Message):
"""
ReleasesStatus contains information about the state of the available software releases
which are retrieved from Arista's Software Download site by CloudVision.
"""
level: "Level" = aristaproto.enum_field(1)
"""
level is the relative importance of the message field contained in the status
"""
message: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""
message is populated with a string which will provide more information regarding
the latest interaction with the Arista's Software Download site while pulling the set of
available software releases
"""
recommended_action: "RecommendedAction" = aristaproto.enum_field(3)
"""
recommended_action expresses a particular user-actionable step that can be carried out to
ensure the proper operation of CloudVision while interacting with Arista's Software
Download site to obtain the set of available software releases.
"""
[docs]
@dataclass(eq=False, repr=False)
class Releases(aristaproto.Message):
"""
Releases is the entire set of software releases available from the Arista's Software
Download site. Once downloaded, a software release can be deployed to devices managed by
CloudVision.
"""
status: "ReleasesStatus" = aristaproto.message_field(1)
"""
status is providing additional information about the retrieval of the software releases
returned in the message
"""
uris: "___fmp__.RepeatedString" = aristaproto.message_field(2)
"""
uris are the URI paths specifying the location of the available software releases that
can be downloaded from the Arista's Software Download site. These URIs are used
by the `RepositoryConfig` API to add a software release to CloudVision.
"""
[docs]
@dataclass(eq=False, repr=False)
class ConcreteFile(aristaproto.Message):
"""
ConcreteFile contains the name and corresponding metadata of the
file (like fileserver, file size)
"""
name: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING)
"""name is the name of the concrete file"""
file_server_path: Optional[str] = aristaproto.message_field(
2, wraps=aristaproto.TYPE_STRING
)
"""
file_server_path is the location of the image or extension as stored in CloudVision
"""
size: Optional[int] = aristaproto.message_field(3, wraps=aristaproto.TYPE_UINT64)
"""size is the size of the image or extension in bytes"""
release: Optional[str] = aristaproto.message_field(4, wraps=aristaproto.TYPE_STRING)
"""release is the release version of the concrete file"""
[docs]
@dataclass(eq=False, repr=False)
class ConcreteTypeToConcreteSwiMap(aristaproto.Message):
"""
ConcreteTypeToConcreteSwiMap indicates a map keyed by stringified concrete type (limited
to i386, i686, x86_64 and aarch64, but can be extended to platform types in the future) to the concrete swi map.
"""
values: Dict[str, "ConcreteFile"] = aristaproto.map_field(
1, aristaproto.TYPE_STRING, aristaproto.TYPE_MESSAGE
)
"""values contains a collection of ConcreteTypeToConcreteSwiMap items."""
[docs]
@dataclass(eq=False, repr=False)
class ReleasesRequest(aristaproto.Message):
""" """
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class ReleasesResponse(aristaproto.Message):
""" """
value: "Releases" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
time: datetime = aristaproto.message_field(2)
"""
Time carries the (UTC) timestamp of the last-modification of the
Releases instance in this response.
"""
[docs]
@dataclass(eq=False, repr=False)
class ReleasesStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["Releases"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each Releases at end.
* Each Releases response is fully-specified (all fields set).
* start: Returns the state of each Releases at start, followed by updates until now.
* Each Releases response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each Releases at start, followed by updates
until end.
* Each Releases response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
[docs]
@dataclass(eq=False, repr=False)
class ReleasesStreamResponse(aristaproto.Message):
""" """
value: "Releases" = aristaproto.message_field(1)
"""
Value is a value deemed relevant to the initiating request.
This structure will always have its key-field populated. Which other fields are
populated, and why, depends on the value of Operation and what triggered this notification.
"""
time: datetime = aristaproto.message_field(2)
"""Time holds the timestamp of this Releases's last modification."""
type: "__subscriptions__.Operation" = aristaproto.enum_field(3)
"""
Operation indicates how the Releases value in this response should be considered.
Under non-subscribe requests, this value should always be INITIAL. In a subscription,
once all initial data is streamed and the client begins to receive modification updates,
you should not see INITIAL again.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryRequest(aristaproto.Message):
""" """
key: "RepositoryKey" = aristaproto.message_field(1)
"""
Key uniquely identifies a Repository instance to retrieve.
This value must be populated.
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryResponse(aristaproto.Message):
""" """
value: "Repository" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
time: datetime = aristaproto.message_field(2)
"""
Time carries the (UTC) timestamp of the last-modification of the
Repository instance in this response.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositorySomeRequest(aristaproto.Message):
""" """
keys: List["RepositoryKey"] = aristaproto.message_field(1)
"""
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositorySomeResponse(aristaproto.Message):
""" """
value: "Repository" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""
Error is an optional field.
It should be filled when there is an error in the GetSome process.
"""
time: datetime = aristaproto.message_field(3)
"""
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["Repository"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each Repository at end.
* Each Repository response is fully-specified (all fields set).
* start: Returns the state of each Repository at start, followed by updates until now.
* Each Repository response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each Repository at start, followed by updates
until end.
* Each Repository response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryStreamResponse(aristaproto.Message):
""" """
value: "Repository" = aristaproto.message_field(1)
"""
Value is a value deemed relevant to the initiating request.
This structure will always have its key-field populated. Which other fields are
populated, and why, depends on the value of Operation and what triggered this notification.
"""
time: datetime = aristaproto.message_field(2)
"""Time holds the timestamp of this Repository's last modification."""
type: "__subscriptions__.Operation" = aristaproto.enum_field(3)
"""
Operation indicates how the Repository value in this response should be considered.
Under non-subscribe requests, this value should always be INITIAL. In a subscription,
once all initial data is streamed and the client begins to receive modification updates,
you should not see INITIAL again.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryBatchedStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["Repository"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each Repository at end.
* Each Repository response is fully-specified (all fields set).
* start: Returns the state of each Repository at start, followed by updates until now.
* Each Repository response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each Repository at start, followed by updates
until end.
* Each Repository response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
max_messages: Optional[int] = aristaproto.message_field(
4, wraps=aristaproto.TYPE_UINT32
)
"""
MaxMessages limits the maximum number of messages that can be contained in one batch.
MaxMessages is required to be at least 1.
The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT)
INTERNAL_BATCH_LIMIT is set based on the maximum message size.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryBatchedStreamResponse(aristaproto.Message):
""" """
responses: List["RepositoryStreamResponse"] = aristaproto.message_field(1)
"""
Values are the values deemed relevant to the initiating request.
The length of this structure is guaranteed to be between (inclusive) 1 and
min(req.max_messages, INTERNAL_BATCH_LIMIT).
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigRequest(aristaproto.Message):
""" """
key: "RepositoryKey" = aristaproto.message_field(1)
"""
Key uniquely identifies a RepositoryConfig instance to retrieve.
This value must be populated.
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigResponse(aristaproto.Message):
""" """
value: "RepositoryConfig" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
time: datetime = aristaproto.message_field(2)
"""
Time carries the (UTC) timestamp of the last-modification of the
RepositoryConfig instance in this response.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigSomeRequest(aristaproto.Message):
""" """
keys: List["RepositoryKey"] = aristaproto.message_field(1)
"""
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the time for which you are interested in the data.
If no time is given, the server will use the time at which it makes the request.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigSomeResponse(aristaproto.Message):
""" """
value: "RepositoryConfig" = aristaproto.message_field(1)
"""
Value is the value requested.
This structure will be fully-populated as it exists in the datastore. If
optional fields were not given at creation, these fields will be empty or
set to default values.
"""
error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""
Error is an optional field.
It should be filled when there is an error in the GetSome process.
"""
time: datetime = aristaproto.message_field(3)
"""
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["RepositoryConfig"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each RepositoryConfig at end.
* Each RepositoryConfig response is fully-specified (all fields set).
* start: Returns the state of each RepositoryConfig at start, followed by updates until now.
* Each RepositoryConfig response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each RepositoryConfig at start, followed by updates
until end.
* Each RepositoryConfig response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigStreamResponse(aristaproto.Message):
""" """
value: "RepositoryConfig" = aristaproto.message_field(1)
"""
Value is a value deemed relevant to the initiating request.
This structure will always have its key-field populated. Which other fields are
populated, and why, depends on the value of Operation and what triggered this notification.
"""
time: datetime = aristaproto.message_field(2)
"""
Time holds the timestamp of this RepositoryConfig's last modification.
"""
type: "__subscriptions__.Operation" = aristaproto.enum_field(3)
"""
Operation indicates how the RepositoryConfig value in this response should be considered.
Under non-subscribe requests, this value should always be INITIAL. In a subscription,
once all initial data is streamed and the client begins to receive modification updates,
you should not see INITIAL again.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigBatchedStreamRequest(aristaproto.Message):
""" """
partial_eq_filter: List["RepositoryConfig"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a GetAll/Subscribe.
This requires all provided fields to be equal to the response.
While transparent to users, this field also allows services to optimize internal
subscriptions if filter(s) are sufficiently specific.
"""
time: "__time__.TimeBounds" = aristaproto.message_field(3)
"""
TimeRange allows limiting response data to within a specified time window.
If this field is populated, at least one of the two time fields are required.
For GetAll, the fields start and end can be used as follows:
* end: Returns the state of each RepositoryConfig at end.
* Each RepositoryConfig response is fully-specified (all fields set).
* start: Returns the state of each RepositoryConfig at start, followed by updates until now.
* Each RepositoryConfig response at start is fully-specified, but updates may be partial.
* start and end: Returns the state of each RepositoryConfig at start, followed by updates
until end.
* Each RepositoryConfig response at start is fully-specified, but updates until end may
be partial.
This field is not allowed in the Subscribe RPC.
"""
max_messages: Optional[int] = aristaproto.message_field(
4, wraps=aristaproto.TYPE_UINT32
)
"""
MaxMessages limits the maximum number of messages that can be contained in one batch.
MaxMessages is required to be at least 1.
The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT)
INTERNAL_BATCH_LIMIT is set based on the maximum message size.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigBatchedStreamResponse(aristaproto.Message):
""" """
responses: List["RepositoryConfigStreamResponse"] = aristaproto.message_field(1)
"""
Values are the values deemed relevant to the initiating request.
The length of this structure is guaranteed to be between (inclusive) 1 and
min(req.max_messages, INTERNAL_BATCH_LIMIT).
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigSetRequest(aristaproto.Message):
""" """
value: "RepositoryConfig" = aristaproto.message_field(1)
"""
RepositoryConfig carries the value to set into the datastore.
See the documentation on the RepositoryConfig struct for which fields are required.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigSetResponse(aristaproto.Message):
""" """
value: "RepositoryConfig" = aristaproto.message_field(1)
"""
Value carries all the values given in the RepositoryConfigSetRequest as well
as any server-generated values.
"""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the (UTC) timestamp at which the system recognizes the
creation. The only guarantees made about this timestamp are:
- it is after the time the request was received
- a time-ranged query with StartTime==CreatedAt will include this instance.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigSetSomeRequest(aristaproto.Message):
""" """
values: List["RepositoryConfig"] = aristaproto.message_field(1)
"""
value contains a list of RepositoryConfig values to write.
It is possible to provide more values than can fit within either:
- the maxiumum send size of the client
- the maximum receive size of the server
If this error occurs you must reduce the number of values sent.
See gRPC \"maximum message size\" documentation for more information.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigSetSomeResponse(aristaproto.Message):
""" """
key: "RepositoryKey" = aristaproto.message_field(1)
"""
"""
error: str = aristaproto.string_field(2)
"""
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigDeleteRequest(aristaproto.Message):
""" """
key: "RepositoryKey" = aristaproto.message_field(1)
"""
Key indicates which RepositoryConfig instance to remove.
This field must always be set.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigDeleteResponse(aristaproto.Message):
""" """
key: "RepositoryKey" = aristaproto.message_field(1)
"""Key echoes back the key of the deleted RepositoryConfig instance."""
time: datetime = aristaproto.message_field(2)
"""
Time indicates the (UTC) timestamp at which the system recognizes the
deletion. The only guarantees made about this timestamp are:
- it is after the time the request was received
- a time-ranged query with StartTime==DeletedAt will not include this instance.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigDeleteSomeRequest(aristaproto.Message):
""" """
keys: List["RepositoryKey"] = aristaproto.message_field(1)
"""key contains a list of RepositoryConfig keys to delete"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigDeleteSomeResponse(aristaproto.Message):
"""
RepositoryConfigDeleteSomeResponse is only sent when there is an error.
"""
key: "RepositoryKey" = aristaproto.message_field(1)
"""
"""
error: str = aristaproto.string_field(2)
"""
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigDeleteAllRequest(aristaproto.Message):
""" """
partial_eq_filter: List["RepositoryConfig"] = aristaproto.message_field(1)
"""
PartialEqFilter provides a way to server-side filter a DeleteAll.
This requires all provided fields to be equal to the response.
A filtered DeleteAll will use GetAll with filter to find things to delete.
"""
[docs]
@dataclass(eq=False, repr=False)
class RepositoryConfigDeleteAllResponse(aristaproto.Message):
""" """
type: "___fmp__.DeleteError" = aristaproto.enum_field(1)
"""
This describes the class of delete error.
A DeleteAllResponse is only sent when there is an error.
"""
error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING)
"""This indicates the error message from the delete failure."""
key: "RepositoryKey" = aristaproto.message_field(3)
"""
This is the key of the RepositoryConfig instance that failed to be deleted.
"""
time: datetime = aristaproto.message_field(4)
"""Time indicates the (UTC) timestamp when the key was being deleted."""
[docs]
class ReleasesServiceStub(aristaproto.ServiceStub):
""" """
[docs]
async def get_one(
self,
releases_request: "ReleasesRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "ReleasesResponse":
""" """
return await self._unary_unary(
"/arista.softwaremanagement.v1.ReleasesService/GetOne",
releases_request,
ReleasesResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def get_all(
self,
releases_stream_request: "ReleasesStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[ReleasesStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.ReleasesService/GetAll",
releases_stream_request,
ReleasesStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe(
self,
releases_stream_request: "ReleasesStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[ReleasesStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.ReleasesService/Subscribe",
releases_stream_request,
ReleasesStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
class RepositoryServiceStub(aristaproto.ServiceStub):
""" """
[docs]
async def get_one(
self,
repository_request: "RepositoryRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "RepositoryResponse":
""" """
return await self._unary_unary(
"/arista.softwaremanagement.v1.RepositoryService/GetOne",
repository_request,
RepositoryResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def get_some(
self,
repository_some_request: "RepositorySomeRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositorySomeResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryService/GetSome",
repository_some_request,
RepositorySomeResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def get_all(
self,
repository_stream_request: "RepositoryStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryService/GetAll",
repository_stream_request,
RepositoryStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe(
self,
repository_stream_request: "RepositoryStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryService/Subscribe",
repository_stream_request,
RepositoryStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def get_all_batched(
self,
repository_batched_stream_request: "RepositoryBatchedStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryBatchedStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryService/GetAllBatched",
repository_batched_stream_request,
RepositoryBatchedStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe_batched(
self,
repository_batched_stream_request: "RepositoryBatchedStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryBatchedStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryService/SubscribeBatched",
repository_batched_stream_request,
RepositoryBatchedStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
class RepositoryConfigServiceStub(aristaproto.ServiceStub):
""" """
[docs]
async def get_one(
self,
repository_config_request: "RepositoryConfigRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "RepositoryConfigResponse":
""" """
return await self._unary_unary(
"/arista.softwaremanagement.v1.RepositoryConfigService/GetOne",
repository_config_request,
RepositoryConfigResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def get_some(
self,
repository_config_some_request: "RepositoryConfigSomeRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryConfigSomeResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryConfigService/GetSome",
repository_config_some_request,
RepositoryConfigSomeResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def get_all(
self,
repository_config_stream_request: "RepositoryConfigStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryConfigStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryConfigService/GetAll",
repository_config_stream_request,
RepositoryConfigStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe(
self,
repository_config_stream_request: "RepositoryConfigStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryConfigStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryConfigService/Subscribe",
repository_config_stream_request,
RepositoryConfigStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def set(
self,
repository_config_set_request: "RepositoryConfigSetRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "RepositoryConfigSetResponse":
""" """
return await self._unary_unary(
"/arista.softwaremanagement.v1.RepositoryConfigService/Set",
repository_config_set_request,
RepositoryConfigSetResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def set_some(
self,
repository_config_set_some_request: "RepositoryConfigSetSomeRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryConfigSetSomeResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryConfigService/SetSome",
repository_config_set_some_request,
RepositoryConfigSetSomeResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def delete(
self,
repository_config_delete_request: "RepositoryConfigDeleteRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "RepositoryConfigDeleteResponse":
""" """
return await self._unary_unary(
"/arista.softwaremanagement.v1.RepositoryConfigService/Delete",
repository_config_delete_request,
RepositoryConfigDeleteResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def delete_some(
self,
repository_config_delete_some_request: "RepositoryConfigDeleteSomeRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryConfigDeleteSomeResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryConfigService/DeleteSome",
repository_config_delete_some_request,
RepositoryConfigDeleteSomeResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def delete_all(
self,
repository_config_delete_all_request: "RepositoryConfigDeleteAllRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryConfigDeleteAllResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryConfigService/DeleteAll",
repository_config_delete_all_request,
RepositoryConfigDeleteAllResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def get_all_batched(
self,
repository_config_batched_stream_request: "RepositoryConfigBatchedStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryConfigBatchedStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryConfigService/GetAllBatched",
repository_config_batched_stream_request,
RepositoryConfigBatchedStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
[docs]
async def subscribe_batched(
self,
repository_config_batched_stream_request: "RepositoryConfigBatchedStreamRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "AsyncIterator[RepositoryConfigBatchedStreamResponse]":
""" """
async for response in self._unary_stream(
"/arista.softwaremanagement.v1.RepositoryConfigService/SubscribeBatched",
repository_config_batched_stream_request,
RepositoryConfigBatchedStreamResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
):
yield response
from .... import fmp as ___fmp__
from ... import subscriptions as __subscriptions__
from ... import time as __time__
[docs]
class ReleasesServiceBase(ServiceBase):
""" """
[docs]
async def get_one(self, releases_request: "ReleasesRequest") -> "ReleasesResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all(
self, releases_stream_request: "ReleasesStreamRequest"
) -> AsyncIterator[ReleasesStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe(
self, releases_stream_request: "ReleasesStreamRequest"
) -> AsyncIterator[ReleasesStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one(
self, stream: "grpclib.server.Stream[ReleasesRequest, ReleasesResponse]"
) -> None:
request = await stream.recv_message()
response = await self.get_one(request)
await stream.send_message(response)
async def __rpc_get_all(
self,
stream: "grpclib.server.Stream[ReleasesStreamRequest, ReleasesStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all,
stream,
request,
)
async def __rpc_subscribe(
self,
stream: "grpclib.server.Stream[ReleasesStreamRequest, ReleasesStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe,
stream,
request,
)
async def __rpc_subscribe_meta(
self, stream: "grpclib.server.Stream[ReleasesStreamRequest, MetaResponse]"
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_meta,
stream,
request,
)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/arista.softwaremanagement.v1.ReleasesService/GetOne": grpclib.const.Handler(
self.__rpc_get_one,
grpclib.const.Cardinality.UNARY_UNARY,
ReleasesRequest,
ReleasesResponse,
),
"/arista.softwaremanagement.v1.ReleasesService/GetAll": grpclib.const.Handler(
self.__rpc_get_all,
grpclib.const.Cardinality.UNARY_STREAM,
ReleasesStreamRequest,
ReleasesStreamResponse,
),
"/arista.softwaremanagement.v1.ReleasesService/Subscribe": grpclib.const.Handler(
self.__rpc_subscribe,
grpclib.const.Cardinality.UNARY_STREAM,
ReleasesStreamRequest,
ReleasesStreamResponse,
),
"/arista.softwaremanagement.v1.ReleasesService/SubscribeMeta": grpclib.const.Handler(
self.__rpc_subscribe_meta,
grpclib.const.Cardinality.UNARY_STREAM,
ReleasesStreamRequest,
MetaResponse,
),
}
[docs]
class RepositoryServiceBase(ServiceBase):
""" """
[docs]
async def get_one(
self, repository_request: "RepositoryRequest"
) -> "RepositoryResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_some(
self, repository_some_request: "RepositorySomeRequest"
) -> AsyncIterator[RepositorySomeResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all(
self, repository_stream_request: "RepositoryStreamRequest"
) -> AsyncIterator[RepositoryStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe(
self, repository_stream_request: "RepositoryStreamRequest"
) -> AsyncIterator[RepositoryStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all_batched(
self, repository_batched_stream_request: "RepositoryBatchedStreamRequest"
) -> AsyncIterator[RepositoryBatchedStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe_batched(
self, repository_batched_stream_request: "RepositoryBatchedStreamRequest"
) -> AsyncIterator[RepositoryBatchedStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one(
self, stream: "grpclib.server.Stream[RepositoryRequest, RepositoryResponse]"
) -> None:
request = await stream.recv_message()
response = await self.get_one(request)
await stream.send_message(response)
async def __rpc_get_some(
self,
stream: "grpclib.server.Stream[RepositorySomeRequest, RepositorySomeResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_some,
stream,
request,
)
async def __rpc_get_all(
self,
stream: "grpclib.server.Stream[RepositoryStreamRequest, RepositoryStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all,
stream,
request,
)
async def __rpc_subscribe(
self,
stream: "grpclib.server.Stream[RepositoryStreamRequest, RepositoryStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe,
stream,
request,
)
async def __rpc_get_meta(
self, stream: "grpclib.server.Stream[RepositoryStreamRequest, MetaResponse]"
) -> None:
request = await stream.recv_message()
response = await self.get_meta(request)
await stream.send_message(response)
async def __rpc_subscribe_meta(
self, stream: "grpclib.server.Stream[RepositoryStreamRequest, MetaResponse]"
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_meta,
stream,
request,
)
async def __rpc_get_all_batched(
self,
stream: "grpclib.server.Stream[RepositoryBatchedStreamRequest, RepositoryBatchedStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all_batched,
stream,
request,
)
async def __rpc_subscribe_batched(
self,
stream: "grpclib.server.Stream[RepositoryBatchedStreamRequest, RepositoryBatchedStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_batched,
stream,
request,
)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/arista.softwaremanagement.v1.RepositoryService/GetOne": grpclib.const.Handler(
self.__rpc_get_one,
grpclib.const.Cardinality.UNARY_UNARY,
RepositoryRequest,
RepositoryResponse,
),
"/arista.softwaremanagement.v1.RepositoryService/GetSome": grpclib.const.Handler(
self.__rpc_get_some,
grpclib.const.Cardinality.UNARY_STREAM,
RepositorySomeRequest,
RepositorySomeResponse,
),
"/arista.softwaremanagement.v1.RepositoryService/GetAll": grpclib.const.Handler(
self.__rpc_get_all,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryStreamRequest,
RepositoryStreamResponse,
),
"/arista.softwaremanagement.v1.RepositoryService/Subscribe": grpclib.const.Handler(
self.__rpc_subscribe,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryStreamRequest,
RepositoryStreamResponse,
),
"/arista.softwaremanagement.v1.RepositoryService/GetMeta": grpclib.const.Handler(
self.__rpc_get_meta,
grpclib.const.Cardinality.UNARY_UNARY,
RepositoryStreamRequest,
MetaResponse,
),
"/arista.softwaremanagement.v1.RepositoryService/SubscribeMeta": grpclib.const.Handler(
self.__rpc_subscribe_meta,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryStreamRequest,
MetaResponse,
),
"/arista.softwaremanagement.v1.RepositoryService/GetAllBatched": grpclib.const.Handler(
self.__rpc_get_all_batched,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryBatchedStreamRequest,
RepositoryBatchedStreamResponse,
),
"/arista.softwaremanagement.v1.RepositoryService/SubscribeBatched": grpclib.const.Handler(
self.__rpc_subscribe_batched,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryBatchedStreamRequest,
RepositoryBatchedStreamResponse,
),
}
[docs]
class RepositoryConfigServiceBase(ServiceBase):
""" """
[docs]
async def get_one(
self, repository_config_request: "RepositoryConfigRequest"
) -> "RepositoryConfigResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_some(
self, repository_config_some_request: "RepositoryConfigSomeRequest"
) -> AsyncIterator[RepositoryConfigSomeResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all(
self, repository_config_stream_request: "RepositoryConfigStreamRequest"
) -> AsyncIterator[RepositoryConfigStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe(
self, repository_config_stream_request: "RepositoryConfigStreamRequest"
) -> AsyncIterator[RepositoryConfigStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def set(
self, repository_config_set_request: "RepositoryConfigSetRequest"
) -> "RepositoryConfigSetResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def set_some(
self, repository_config_set_some_request: "RepositoryConfigSetSomeRequest"
) -> AsyncIterator[RepositoryConfigSetSomeResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def delete(
self, repository_config_delete_request: "RepositoryConfigDeleteRequest"
) -> "RepositoryConfigDeleteResponse":
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def delete_some(
self, repository_config_delete_some_request: "RepositoryConfigDeleteSomeRequest"
) -> AsyncIterator[RepositoryConfigDeleteSomeResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def delete_all(
self, repository_config_delete_all_request: "RepositoryConfigDeleteAllRequest"
) -> AsyncIterator[RepositoryConfigDeleteAllResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def get_all_batched(
self,
repository_config_batched_stream_request: "RepositoryConfigBatchedStreamRequest",
) -> AsyncIterator[RepositoryConfigBatchedStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def subscribe_batched(
self,
repository_config_batched_stream_request: "RepositoryConfigBatchedStreamRequest",
) -> AsyncIterator[RepositoryConfigBatchedStreamResponse]:
""" """
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one(
self,
stream: "grpclib.server.Stream[RepositoryConfigRequest, RepositoryConfigResponse]",
) -> None:
request = await stream.recv_message()
response = await self.get_one(request)
await stream.send_message(response)
async def __rpc_get_some(
self,
stream: "grpclib.server.Stream[RepositoryConfigSomeRequest, RepositoryConfigSomeResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_some,
stream,
request,
)
async def __rpc_get_all(
self,
stream: "grpclib.server.Stream[RepositoryConfigStreamRequest, RepositoryConfigStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all,
stream,
request,
)
async def __rpc_subscribe(
self,
stream: "grpclib.server.Stream[RepositoryConfigStreamRequest, RepositoryConfigStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe,
stream,
request,
)
async def __rpc_get_meta(
self,
stream: "grpclib.server.Stream[RepositoryConfigStreamRequest, MetaResponse]",
) -> None:
request = await stream.recv_message()
response = await self.get_meta(request)
await stream.send_message(response)
async def __rpc_subscribe_meta(
self,
stream: "grpclib.server.Stream[RepositoryConfigStreamRequest, MetaResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_meta,
stream,
request,
)
async def __rpc_set(
self,
stream: "grpclib.server.Stream[RepositoryConfigSetRequest, RepositoryConfigSetResponse]",
) -> None:
request = await stream.recv_message()
response = await self.set(request)
await stream.send_message(response)
async def __rpc_set_some(
self,
stream: "grpclib.server.Stream[RepositoryConfigSetSomeRequest, RepositoryConfigSetSomeResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.set_some,
stream,
request,
)
async def __rpc_delete(
self,
stream: "grpclib.server.Stream[RepositoryConfigDeleteRequest, RepositoryConfigDeleteResponse]",
) -> None:
request = await stream.recv_message()
response = await self.delete(request)
await stream.send_message(response)
async def __rpc_delete_some(
self,
stream: "grpclib.server.Stream[RepositoryConfigDeleteSomeRequest, RepositoryConfigDeleteSomeResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.delete_some,
stream,
request,
)
async def __rpc_delete_all(
self,
stream: "grpclib.server.Stream[RepositoryConfigDeleteAllRequest, RepositoryConfigDeleteAllResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.delete_all,
stream,
request,
)
async def __rpc_get_all_batched(
self,
stream: "grpclib.server.Stream[RepositoryConfigBatchedStreamRequest, RepositoryConfigBatchedStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.get_all_batched,
stream,
request,
)
async def __rpc_subscribe_batched(
self,
stream: "grpclib.server.Stream[RepositoryConfigBatchedStreamRequest, RepositoryConfigBatchedStreamResponse]",
) -> None:
request = await stream.recv_message()
await self._call_rpc_handler_server_stream(
self.subscribe_batched,
stream,
request,
)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/arista.softwaremanagement.v1.RepositoryConfigService/GetOne": grpclib.const.Handler(
self.__rpc_get_one,
grpclib.const.Cardinality.UNARY_UNARY,
RepositoryConfigRequest,
RepositoryConfigResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/GetSome": grpclib.const.Handler(
self.__rpc_get_some,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryConfigSomeRequest,
RepositoryConfigSomeResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/GetAll": grpclib.const.Handler(
self.__rpc_get_all,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryConfigStreamRequest,
RepositoryConfigStreamResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/Subscribe": grpclib.const.Handler(
self.__rpc_subscribe,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryConfigStreamRequest,
RepositoryConfigStreamResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/GetMeta": grpclib.const.Handler(
self.__rpc_get_meta,
grpclib.const.Cardinality.UNARY_UNARY,
RepositoryConfigStreamRequest,
MetaResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/SubscribeMeta": grpclib.const.Handler(
self.__rpc_subscribe_meta,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryConfigStreamRequest,
MetaResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/Set": grpclib.const.Handler(
self.__rpc_set,
grpclib.const.Cardinality.UNARY_UNARY,
RepositoryConfigSetRequest,
RepositoryConfigSetResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/SetSome": grpclib.const.Handler(
self.__rpc_set_some,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryConfigSetSomeRequest,
RepositoryConfigSetSomeResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/Delete": grpclib.const.Handler(
self.__rpc_delete,
grpclib.const.Cardinality.UNARY_UNARY,
RepositoryConfigDeleteRequest,
RepositoryConfigDeleteResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/DeleteSome": grpclib.const.Handler(
self.__rpc_delete_some,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryConfigDeleteSomeRequest,
RepositoryConfigDeleteSomeResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/DeleteAll": grpclib.const.Handler(
self.__rpc_delete_all,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryConfigDeleteAllRequest,
RepositoryConfigDeleteAllResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/GetAllBatched": grpclib.const.Handler(
self.__rpc_get_all_batched,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryConfigBatchedStreamRequest,
RepositoryConfigBatchedStreamResponse,
),
"/arista.softwaremanagement.v1.RepositoryConfigService/SubscribeBatched": grpclib.const.Handler(
self.__rpc_subscribe_batched,
grpclib.const.Cardinality.UNARY_STREAM,
RepositoryConfigBatchedStreamRequest,
RepositoryConfigBatchedStreamResponse,
),
}