Source code for cloudvision.api.arista.changecontrol.v1

# Copyright (c) 2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: arista/changecontrol.v1/changecontrol.proto, arista/changecontrol.v1/services.gen.proto
# plugin: python-aristaproto
# This file has been @generated

__all__ = (
    "StageStatus",
    "StepStatus",
    "ChangeControlStatus",
    "RepeatedRepeatedString",
    "ChangeControlKey",
    "Action",
    "StageConfig",
    "StageConfigMap",
    "ChangeConfig",
    "FlagConfig",
    "TimestampFlagConfig",
    "ChangeControlConfig",
    "StepInfo",
    "ActionSteps",
    "Stage",
    "StageMap",
    "Change",
    "Flag",
    "TimestampFlag",
    "Filter",
    "DeviceToStageMap",
    "ChangeControl",
    "ApproveConfig",
    "MetaResponse",
    "ApproveConfigRequest",
    "ApproveConfigResponse",
    "ApproveConfigSomeRequest",
    "ApproveConfigSomeResponse",
    "ApproveConfigStreamRequest",
    "ApproveConfigStreamResponse",
    "ApproveConfigBatchedStreamRequest",
    "ApproveConfigBatchedStreamResponse",
    "ApproveConfigSetRequest",
    "ApproveConfigSetResponse",
    "ApproveConfigSetSomeRequest",
    "ApproveConfigSetSomeResponse",
    "ApproveConfigDeleteRequest",
    "ApproveConfigDeleteResponse",
    "ApproveConfigDeleteSomeRequest",
    "ApproveConfigDeleteSomeResponse",
    "ApproveConfigDeleteAllRequest",
    "ApproveConfigDeleteAllResponse",
    "ChangeControlRequest",
    "ChangeControlResponse",
    "ChangeControlSomeRequest",
    "ChangeControlSomeResponse",
    "ChangeControlStreamRequest",
    "ChangeControlStreamResponse",
    "ChangeControlBatchedStreamRequest",
    "ChangeControlBatchedStreamResponse",
    "ChangeControlConfigRequest",
    "ChangeControlConfigResponse",
    "ChangeControlConfigSomeRequest",
    "ChangeControlConfigSomeResponse",
    "ChangeControlConfigStreamRequest",
    "ChangeControlConfigStreamResponse",
    "ChangeControlConfigBatchedStreamRequest",
    "ChangeControlConfigBatchedStreamResponse",
    "ChangeControlConfigSetRequest",
    "ChangeControlConfigSetResponse",
    "ChangeControlConfigSetSomeRequest",
    "ChangeControlConfigSetSomeResponse",
    "ChangeControlConfigDeleteRequest",
    "ChangeControlConfigDeleteResponse",
    "ChangeControlConfigDeleteSomeRequest",
    "ChangeControlConfigDeleteSomeResponse",
    "ChangeControlConfigDeleteAllRequest",
    "ChangeControlConfigDeleteAllResponse",
    "ApproveConfigServiceStub",
    "ApproveConfigServiceBase",
    "ChangeControlServiceStub",
    "ChangeControlServiceBase",
    "ChangeControlConfigServiceStub",
    "ChangeControlConfigServiceBase",
)


from dataclasses import dataclass
from datetime import datetime
from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Dict,
    List,
    Optional,
)

import aristaproto
import grpclib
from aristaproto.grpc.grpclib_server import ServiceBase

if TYPE_CHECKING:
    import grpclib.server
    from aristaproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


[docs] class StageStatus(aristaproto.Enum): """StageStatus defines the possible execution statuses of a stage.""" UNSPECIFIED = 0 """STAGE_STATUS_UNSPECIFIED means unknown stage status.""" RUNNING = 1 """STAGE_STATUS_RUNNING means the stage has begun execution.""" COMPLETED = 2 """ STAGE_STATUS_COMPLETED means the stage has ceased execution. Success/failure of a stage cannot be inferred from this status alone but rather this status plus the stage error. That is, no error implies success and some error implies failure. """ NOT_STARTED = 3 """STAGE_STATUS_NOT_STARTED means the stage has not been started."""
[docs] class StepStatus(aristaproto.Enum): """ StepStatus defines the possible execution statuses of a step within an action. """ UNSPECIFIED = 0 """ STEP_STATUS_UNSPECIFIED represents an unspecified state for a step within an action. """ NOT_STARTED = 1 """ STEP_STATUS_NOT_STARTED represents that the step has not started execution. """ RUNNING = 2 """STEP_STATUS_RUNNING represents that the step is under execution.""" FINISHED = 3 """ STEP_STATUS_FINISHED represents that the step has finished execution. """ SKIPPED = 4 """ STEP_STATUS_SKIPPED represents that the step has been skipped from execution. """ FAILED = 5 """ STEP_STATUS_FAILED represents that the step has failed its execution. """
[docs] class ChangeControlStatus(aristaproto.Enum): """ ChangeControlStatus defines the possible execution statuses of a change control. """ UNSPECIFIED = 0 """ CHANGE_CONTROL_STATUS_UNSPECIFIED means the change control status is unknown. """ RUNNING = 1 """ CHANGE_CONTROL_STATUS_RUNNING means the change control has begun execution. """ COMPLETED = 2 """ CHANGE_CONTROL_STATUS_COMPLETED means the change control has ceased execution. Success/failure of a change control cannot be inferred from this status alone but rather this status plus the change control error. That is, no error implies success and some error implies failure. """ SCHEDULED = 3 """ CHANGE_CONTROL_STATUS_SCHEDULED means the change control has been scheduled for execution at some time. Any failure that occurs during this process will cause a transition back to the unspecified status, a reset of the schedule flag by the system, and an error on the change control reporting the details of the failure. """ NOT_STARTED = 4 """ CHANGE_CONTROL_STATUS_NOT_STARTED means the change control has not been started. This would include approved and not approved change controls. """
[docs] @dataclass(eq=False, repr=False) class RepeatedRepeatedString(aristaproto.Message): """ RepeatedRepeatedString wraps a repeated `fmp.RepeatedString` to define a string matrix which is used to represent stage rows (see `StageConfig`). """ values: List["___fmp__.RepeatedString"] = aristaproto.message_field(1) """values is a list of `fmp.RepeatedString`."""
[docs] @dataclass(eq=False, repr=False) class ChangeControlKey(aristaproto.Message): """ChangeControlKey uniquely identifies a change control.""" id: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """id is the ID of the change control."""
[docs] @dataclass(eq=False, repr=False) class Action(aristaproto.Message): """ Action is an action to perform during the execution of a stage of a change control. Available actions can be fetched using the \"action\" services. """ name: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """name is the name of the action.""" timeout: Optional[int] = aristaproto.message_field(2, wraps=aristaproto.TYPE_UINT32) """ timeout is the maximum duration in seconds that the action can execute before timing out. If this is not set, then this is interpreted to mean there is no timeout. """ args: "___fmp__.MapStringString" = aristaproto.message_field(3) """args are the arguments of the action."""
[docs] @dataclass(eq=False, repr=False) class StageConfig(aristaproto.Message): """ StageConfig holds a configuration for a stage in a change control. Each stage generally defines either an action or a series of sub-stages. """ name: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """name is the name of the stage.""" action: "Action" = aristaproto.message_field(2) """action is the action to perform on stage execution.""" rows: "RepeatedRepeatedString" = aristaproto.message_field(3) """ rows is a series of rows of parallel stages referenced by ID. Each row is run one after the other and the stages within each row are run in parallel. For example: ``` [[\"1a\", \"1b\"], [\"2\"]] ``` This configures stage 1a and stage 1b to run at the same time, and then stage 2 once both of them have completed. """
[docs] @dataclass(eq=False, repr=False) class StageConfigMap(aristaproto.Message): """ StageConfigMap wraps a map from stage ID to `StageConfig`. This defines the configuration and order of execution for the stages in a change control. For example: ``` { \"root\": { name: \"root\", rows: [[\"1-2\"], [\"3\"]] }, \"1-2\": { name: \"stages 1-2\", rows: [[\"1\"], [\"2\"]] }, \"1\": { name: \"stage 1\", rows: [[\"1a\", \"1b\"]] }, \"1a\": { name: \"stage 1a\", action: { name: \"task\", args: { \"TaskID\": \"101\" } }, \"1b\": { name: \"stage 1b\", action: { name: \"task\", args: { \"TaskID\": \"102\" } }, \"2\": { name: \"stage 2\", action: { name: \"task\", args: { \"TaskID\": \"103\" } }, \"3\": { name: \"stage 3\", action: { name: \"task\", args: { \"TaskID\": \"104\" } } } ``` Assuming the root stage ID of the enclosing change control is \"root\", this would mean to do the following in sequence: ``` root |- stages 1-2 | |- stage 1 | | |- stage 1a, stage 1b (parallel) | |- stage 2 |- stage 3 ``` That is, execute tasks `101` and `102` in parallel, then task `103`, then task `104`. """ values: Dict[str, "StageConfig"] = aristaproto.map_field( 1, aristaproto.TYPE_STRING, aristaproto.TYPE_MESSAGE ) """values is a map from stage ID to `StageConfig`."""
[docs] @dataclass(eq=False, repr=False) class ChangeConfig(aristaproto.Message): """ ChangeConfig holds a configuration for the change of a change control which is essentially a named configuration of stages. """ name: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """name is the name of the change.""" root_stage_id: Optional[str] = aristaproto.message_field( 2, wraps=aristaproto.TYPE_STRING ) """ root_stage_id is the ID of the root stage or the stage that should execute first. """ stages: "StageConfigMap" = aristaproto.message_field(3) """ stages holds a configuration of stages. See `StageConfigMap` description for more information. """ notes: Optional[str] = aristaproto.message_field(4, wraps=aristaproto.TYPE_STRING) """notes are any notes associated with the change."""
[docs] @dataclass(eq=False, repr=False) class FlagConfig(aristaproto.Message): """ FlagConfig is used to set a flag on a change control that takes a boolean value (e.g. start/stop, approve/unapprove). """ value: Optional[bool] = aristaproto.message_field(1, wraps=aristaproto.TYPE_BOOL) """value is the value of the flag (`true` or `false`).""" notes: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """notes are any notes associated with the flag value."""
[docs] @dataclass(eq=False, repr=False) class TimestampFlagConfig(aristaproto.Message): """ TimestampFlagConfig is used to set a flag on a change control that takes a timestamp value (e.g. schedule/unschedule). """ value: datetime = aristaproto.message_field(1) """value is the value of the flag (some timestamp).""" notes: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """notes are the notes associated with the flag value."""
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfig(aristaproto.Message): """ChangeControlConfig holds the configuration of a change control.""" key: "ChangeControlKey" = aristaproto.message_field(1) """key uniquely identifies the change control.""" change: "ChangeConfig" = aristaproto.message_field(2) """change is the change subject to execution.""" start: "FlagConfig" = aristaproto.message_field(3) """ start is the flag to start (`start.value` set to `true`) or stop (`start.value` set to `false`) execution of the change control. """ schedule: "TimestampFlagConfig" = aristaproto.message_field(4) """ schedule is the flag to schedule (`schedule.value` set to some timestamp) or unschedule (`schedule.value` set to `nil`) the change control for execution. """
[docs] @dataclass(eq=False, repr=False) class StepInfo(aristaproto.Message): """StepInfo contains the status details of an action step.""" name: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """name represents the name of the step.""" status: "StepStatus" = aristaproto.enum_field(2) """status represents the present execution status of the step.""" error: Optional[str] = aristaproto.message_field(3, wraps=aristaproto.TYPE_STRING) """error captures any error that occurred.""" message: Optional[str] = aristaproto.message_field(4, wraps=aristaproto.TYPE_STRING) """ message represents any relevant information associated with the step. """
[docs] @dataclass(eq=False, repr=False) class ActionSteps(aristaproto.Message): """ActionSteps tracks the status of each step within an action.""" values: Dict[str, "StepInfo"] = aristaproto.map_field( 1, aristaproto.TYPE_STRING, aristaproto.TYPE_MESSAGE ) """ values maps each action step's execution sequence to its status information, represented by StepInfo. """
[docs] @dataclass(eq=False, repr=False) class Stage(aristaproto.Message): """Stage holds the configuration and status of a stage.""" name: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """name is the name of the stage.""" action: "Action" = aristaproto.message_field(2) """action is the action to perform during the stage.""" rows: "RepeatedRepeatedString" = aristaproto.message_field(3) """ rows is a series of rows of parallel stages referenced by ID. See `StageConfig.rows` for more details. """ status: "StageStatus" = aristaproto.enum_field(4) """status is the execution status of the stage.""" error: Optional[str] = aristaproto.message_field(5, wraps=aristaproto.TYPE_STRING) """ error is any error that occured during the execution of the stage. """ start_time: datetime = aristaproto.message_field(6) """start_time is the time when status change to Running.""" end_time: datetime = aristaproto.message_field(7) """end_time is the time when status change to Completed.""" steps: "ActionSteps" = aristaproto.message_field(8) """ steps represent the granular steps and their statuses associated with the action performed in the change control stage. Each stage generally defines either an action or a series of sub-stages. """
[docs] @dataclass(eq=False, repr=False) class StageMap(aristaproto.Message): """ StageMap is a map from stage ID to `Stage`. This has essentially the same structure as `StageConfigMap`, but with each ID mapping to a `Stage` instead of `StageConfig`. """ values: Dict[str, "Stage"] = aristaproto.map_field( 1, aristaproto.TYPE_STRING, aristaproto.TYPE_MESSAGE ) """values is a map from stage ID to `Stage`."""
[docs] @dataclass(eq=False, repr=False) class Change(aristaproto.Message): """ Change holds the configuration and status of the change of a change control. """ name: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """name is the name of the change.""" root_stage_id: Optional[str] = aristaproto.message_field( 2, wraps=aristaproto.TYPE_STRING ) """ root_stage_id is the ID of the root stage or the stage that should execute first. """ stages: "StageMap" = aristaproto.message_field(3) """stages holds a configuration of stages and their statuses.""" notes: Optional[str] = aristaproto.message_field(4, wraps=aristaproto.TYPE_STRING) """notes are any notes associated with the change.""" time: datetime = aristaproto.message_field(5) """time is the time at which the change was last updated.""" user: Optional[str] = aristaproto.message_field(6, wraps=aristaproto.TYPE_STRING) """user is the user by which the change was last updated."""
[docs] @dataclass(eq=False, repr=False) class Flag(aristaproto.Message): """ Flag holds the configuration of a boolean flag plus some information about when and by whom it was set. """ value: Optional[bool] = aristaproto.message_field(1, wraps=aristaproto.TYPE_BOOL) """value is the value of the flag (`true` or `false`).""" notes: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """notes are any notes associated with the setting of the flag.""" time: datetime = aristaproto.message_field(3) """time is the time at which the flag was last updated.""" user: Optional[str] = aristaproto.message_field(4, wraps=aristaproto.TYPE_STRING) """user is the user by which the flag was last updated."""
[docs] @dataclass(eq=False, repr=False) class TimestampFlag(aristaproto.Message): """ TimestampFlag holds the configuration of a timestamp flag plus some information about when and by whom is was set. """ value: datetime = aristaproto.message_field(1) """value is the value of the flag (some timestamp).""" notes: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """notes are any notes associated with the setting of the flag.""" time: datetime = aristaproto.message_field(3) """time is the time at which the flag was last updated.""" user: Optional[str] = aristaproto.message_field(4, wraps=aristaproto.TYPE_STRING) """user is the user by which the flag was last updated."""
[docs] @dataclass(eq=False, repr=False) class Filter(aristaproto.Message): """Filter is used to filter changecontrols for requested device ids.""" device_ids: "___fmp__.RepeatedString" = aristaproto.message_field(1) """ device_ids includes the list of device ids to be matched with devices in the changecontrol state model. At least one of the provided device ids must be present in CC devices field. """
[docs] @dataclass(eq=False, repr=False) class DeviceToStageMap(aristaproto.Message): """ DeviceToStageMap is a map of a device ID and its list of corresponding stages. """ values: Dict[str, "___fmp__.RepeatedString"] = aristaproto.map_field( 1, aristaproto.TYPE_STRING, aristaproto.TYPE_MESSAGE ) """ values maps the device ID with the stages which are associated with a device in a changecontrol state model. """
[docs] @dataclass(eq=False, repr=False) class ChangeControl(aristaproto.Message): """ ChangeControl holds the configuration and status of a change control. """ key: "ChangeControlKey" = aristaproto.message_field(1) """key uniquely identifies the change control.""" change: "Change" = aristaproto.message_field(2) """ change holds the configuration and status of the change of the change control. """ approve: "Flag" = aristaproto.message_field(3) """ approve indicates whether the change control was flagged as approved (`approve.value` set to `true`) or unapproved (`approve.value` set to `false`). """ start: "Flag" = aristaproto.message_field(4) """ start indicates whether the change control was flagged to start (`start.value` set to `true`) or stop (`start.value` set to `false`) execution. """ status: "ChangeControlStatus" = aristaproto.enum_field(5) """status is the execution status of the change control.""" error: Optional[str] = aristaproto.message_field(6, wraps=aristaproto.TYPE_STRING) """ error is any error that occurred during the execution of the change control. """ schedule: "TimestampFlag" = aristaproto.message_field(7) """ schedule indicates whether the change control was flagged to be scheduled (`schedule.value` set to some timestamp) or unscheduled (`schedule.value` set to `nil`) for execution. """ device_ids: "___fmp__.RepeatedString" = aristaproto.message_field(8) """ device_ids is a list of device IDs on which the change control will operate. """ device_id_to_stage_ids: "DeviceToStageMap" = aristaproto.message_field(9) """ device_id_to_stage_ids is a map of device IDs to the stages present in the specified Change state. This is not affected by the device ID based custom filtering and will contain info about all the devices associated with a CC ID. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfig(aristaproto.Message): """ApproveConfig is used to configure the approval of a change control.""" key: "ChangeControlKey" = aristaproto.message_field(1) """key uniquely identifies the change control.""" approve: "FlagConfig" = aristaproto.message_field(2) """ approve is the flag to approve (`approve.value` set to `true`) or unapprove (`approve.value` set to `false`) the change control. """ version: datetime = aristaproto.message_field(3) """ version is the timestamp of the change control to approve. This field must be set when `approve.value` is set to `true` and is intended to safeguard against approving a change control that has been updated since last read. """
[docs] @dataclass(eq=False, repr=False) class MetaResponse(aristaproto.Message): """ """ time: datetime = aristaproto.message_field(1) """ Time holds the timestamp of the last item included in the metadata calculation. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(2) """ Operation indicates how the value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """ count: Optional[int] = aristaproto.message_field(3, wraps=aristaproto.TYPE_UINT32) """ Count is the number of items present under the conditions of the request. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigRequest(aristaproto.Message): """ """ key: "ChangeControlKey" = aristaproto.message_field(1) """ Key uniquely identifies a ApproveConfig instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigResponse(aristaproto.Message): """ """ value: "ApproveConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the ApproveConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigSomeRequest(aristaproto.Message): """ """ keys: List["ChangeControlKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigSomeResponse(aristaproto.Message): """ """ value: "ApproveConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ApproveConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ApproveConfig at end. * Each ApproveConfig response is fully-specified (all fields set). * start: Returns the state of each ApproveConfig at start, followed by updates until now. * Each ApproveConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ApproveConfig at start, followed by updates until end. * Each ApproveConfig response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigStreamResponse(aristaproto.Message): """ """ value: "ApproveConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this ApproveConfig's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the ApproveConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ApproveConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ApproveConfig at end. * Each ApproveConfig response is fully-specified (all fields set). * start: Returns the state of each ApproveConfig at start, followed by updates until now. * Each ApproveConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ApproveConfig at start, followed by updates until end. * Each ApproveConfig response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """ max_messages: Optional[int] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_UINT32 ) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigBatchedStreamResponse(aristaproto.Message): """ """ responses: List["ApproveConfigStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigSetRequest(aristaproto.Message): """ """ value: "ApproveConfig" = aristaproto.message_field(1) """ ApproveConfig carries the value to set into the datastore. See the documentation on the ApproveConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigSetResponse(aristaproto.Message): """ """ value: "ApproveConfig" = aristaproto.message_field(1) """ Value carries all the values given in the ApproveConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigSetSomeRequest(aristaproto.Message): """ """ values: List["ApproveConfig"] = aristaproto.message_field(1) """ value contains a list of ApproveConfig values to write. It is possible to provide more values than can fit within either: - the maxiumum send size of the client - the maximum receive size of the server If this error occurs you must reduce the number of values sent. See gRPC \"maximum message size\" documentation for more information. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigSetSomeResponse(aristaproto.Message): """ """ key: "ChangeControlKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigDeleteRequest(aristaproto.Message): """ """ key: "ChangeControlKey" = aristaproto.message_field(1) """ Key indicates which ApproveConfig instance to remove. This field must always be set. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigDeleteResponse(aristaproto.Message): """ """ key: "ChangeControlKey" = aristaproto.message_field(1) """Key echoes back the key of the deleted ApproveConfig instance.""" time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the deletion. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==DeletedAt will not include this instance. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigDeleteSomeRequest(aristaproto.Message): """ """ keys: List["ChangeControlKey"] = aristaproto.message_field(1) """key contains a list of ApproveConfig keys to delete"""
[docs] @dataclass(eq=False, repr=False) class ApproveConfigDeleteSomeResponse(aristaproto.Message): """ApproveConfigDeleteSomeResponse is only sent when there is an error.""" key: "ChangeControlKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigDeleteAllRequest(aristaproto.Message): """ """ partial_eq_filter: List["ApproveConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a DeleteAll. This requires all provided fields to be equal to the response. A filtered DeleteAll will use GetAll with filter to find things to delete. """
[docs] @dataclass(eq=False, repr=False) class ApproveConfigDeleteAllResponse(aristaproto.Message): """ """ type: "___fmp__.DeleteError" = aristaproto.enum_field(1) """ This describes the class of delete error. A DeleteAllResponse is only sent when there is an error. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """This indicates the error message from the delete failure.""" key: "ChangeControlKey" = aristaproto.message_field(3) """ This is the key of the ApproveConfig instance that failed to be deleted. """ time: datetime = aristaproto.message_field(4) """Time indicates the (UTC) timestamp when the key was being deleted."""
[docs] @dataclass(eq=False, repr=False) class ChangeControlRequest(aristaproto.Message): """ """ key: "ChangeControlKey" = aristaproto.message_field(1) """ Key uniquely identifies a ChangeControl instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlResponse(aristaproto.Message): """ """ value: "ChangeControl" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the ChangeControl instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlSomeRequest(aristaproto.Message): """ """ keys: List["ChangeControlKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlSomeResponse(aristaproto.Message): """ """ value: "ChangeControl" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ """
[docs] @dataclass(eq=False, repr=False) class ChangeControlStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ChangeControl"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ filter: "Filter" = aristaproto.message_field(2) """ For each ChangeControl in the list, all populated fields are considered ANDed together as a filtering operation. Similarly, the list itself is ORed such that any individual filter that matches a given ChangeControl is streamed to the user. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ChangeControl at end. * Each ChangeControl response is fully-specified (all fields set). * start: Returns the state of each ChangeControl at start, followed by updates until now. * Each ChangeControl response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ChangeControl at start, followed by updates until end. * Each ChangeControl response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlStreamResponse(aristaproto.Message): """ """ value: "ChangeControl" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this ChangeControl's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the ChangeControl value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ChangeControl"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ filter: "Filter" = aristaproto.message_field(2) """ For each ChangeControl in the list, all populated fields are considered ANDed together as a filtering operation. Similarly, the list itself is ORed such that any individual filter that matches a given ChangeControl is streamed to the user. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ChangeControl at end. * Each ChangeControl response is fully-specified (all fields set). * start: Returns the state of each ChangeControl at start, followed by updates until now. * Each ChangeControl response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ChangeControl at start, followed by updates until end. * Each ChangeControl response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """ max_messages: Optional[int] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_UINT32 ) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlBatchedStreamResponse(aristaproto.Message): """ """ responses: List["ChangeControlStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigRequest(aristaproto.Message): """ """ key: "ChangeControlKey" = aristaproto.message_field(1) """ Key uniquely identifies a ChangeControlConfig instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigResponse(aristaproto.Message): """ """ value: "ChangeControlConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the ChangeControlConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigSomeRequest(aristaproto.Message): """ """ keys: List["ChangeControlKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigSomeResponse(aristaproto.Message): """ """ value: "ChangeControlConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ChangeControlConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ChangeControlConfig at end. * Each ChangeControlConfig response is fully-specified (all fields set). * start: Returns the state of each ChangeControlConfig at start, followed by updates until now. * Each ChangeControlConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ChangeControlConfig at start, followed by updates until end. * Each ChangeControlConfig response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigStreamResponse(aristaproto.Message): """ """ value: "ChangeControlConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this ChangeControlConfig's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the ChangeControlConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ChangeControlConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ChangeControlConfig at end. * Each ChangeControlConfig response is fully-specified (all fields set). * start: Returns the state of each ChangeControlConfig at start, followed by updates until now. * Each ChangeControlConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ChangeControlConfig at start, followed by updates until end. * Each ChangeControlConfig response at start is fully-specified, but updates until end may be partial. This field is not allowed in the Subscribe RPC. """ max_messages: Optional[int] = aristaproto.message_field( 4, wraps=aristaproto.TYPE_UINT32 ) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigBatchedStreamResponse(aristaproto.Message): """ """ responses: List["ChangeControlConfigStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigSetRequest(aristaproto.Message): """ """ value: "ChangeControlConfig" = aristaproto.message_field(1) """ ChangeControlConfig carries the value to set into the datastore. See the documentation on the ChangeControlConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigSetResponse(aristaproto.Message): """ """ value: "ChangeControlConfig" = aristaproto.message_field(1) """ Value carries all the values given in the ChangeControlConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigSetSomeRequest(aristaproto.Message): """ """ values: List["ChangeControlConfig"] = aristaproto.message_field(1) """ value contains a list of ChangeControlConfig values to write. It is possible to provide more values than can fit within either: - the maxiumum send size of the client - the maximum receive size of the server If this error occurs you must reduce the number of values sent. See gRPC \"maximum message size\" documentation for more information. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigSetSomeResponse(aristaproto.Message): """ """ key: "ChangeControlKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigDeleteRequest(aristaproto.Message): """ """ key: "ChangeControlKey" = aristaproto.message_field(1) """ Key indicates which ChangeControlConfig instance to remove. This field must always be set. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigDeleteResponse(aristaproto.Message): """ """ key: "ChangeControlKey" = aristaproto.message_field(1) """Key echoes back the key of the deleted ChangeControlConfig instance.""" time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the deletion. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==DeletedAt will not include this instance. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigDeleteSomeRequest(aristaproto.Message): """ """ keys: List["ChangeControlKey"] = aristaproto.message_field(1) """key contains a list of ChangeControlConfig keys to delete"""
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigDeleteSomeResponse(aristaproto.Message): """ ChangeControlConfigDeleteSomeResponse is only sent when there is an error. """ key: "ChangeControlKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigDeleteAllRequest(aristaproto.Message): """ """ partial_eq_filter: List["ChangeControlConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a DeleteAll. This requires all provided fields to be equal to the response. A filtered DeleteAll will use GetAll with filter to find things to delete. """
[docs] @dataclass(eq=False, repr=False) class ChangeControlConfigDeleteAllResponse(aristaproto.Message): """ """ type: "___fmp__.DeleteError" = aristaproto.enum_field(1) """ This describes the class of delete error. A DeleteAllResponse is only sent when there is an error. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """This indicates the error message from the delete failure.""" key: "ChangeControlKey" = aristaproto.message_field(3) """ This is the key of the ChangeControlConfig instance that failed to be deleted. """ time: datetime = aristaproto.message_field(4) """Time indicates the (UTC) timestamp when the key was being deleted."""
[docs] class ApproveConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, approve_config_request: "ApproveConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ApproveConfigResponse": """ """ return await self._unary_unary( "/arista.changecontrol.v1.ApproveConfigService/GetOne", approve_config_request, ApproveConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, approve_config_some_request: "ApproveConfigSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApproveConfigSomeResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ApproveConfigService/GetSome", approve_config_some_request, ApproveConfigSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, approve_config_stream_request: "ApproveConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApproveConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ApproveConfigService/GetAll", approve_config_stream_request, ApproveConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, approve_config_stream_request: "ApproveConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApproveConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ApproveConfigService/Subscribe", approve_config_stream_request, ApproveConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, approve_config_stream_request: "ApproveConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.changecontrol.v1.ApproveConfigService/GetMeta", approve_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, approve_config_stream_request: "ApproveConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ApproveConfigService/SubscribeMeta", approve_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, approve_config_set_request: "ApproveConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ApproveConfigSetResponse": """ """ return await self._unary_unary( "/arista.changecontrol.v1.ApproveConfigService/Set", approve_config_set_request, ApproveConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def set_some( self, approve_config_set_some_request: "ApproveConfigSetSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApproveConfigSetSomeResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ApproveConfigService/SetSome", approve_config_set_some_request, ApproveConfigSetSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete( self, approve_config_delete_request: "ApproveConfigDeleteRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ApproveConfigDeleteResponse": """ """ return await self._unary_unary( "/arista.changecontrol.v1.ApproveConfigService/Delete", approve_config_delete_request, ApproveConfigDeleteResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def delete_some( self, approve_config_delete_some_request: "ApproveConfigDeleteSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApproveConfigDeleteSomeResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ApproveConfigService/DeleteSome", approve_config_delete_some_request, ApproveConfigDeleteSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete_all( self, approve_config_delete_all_request: "ApproveConfigDeleteAllRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApproveConfigDeleteAllResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ApproveConfigService/DeleteAll", approve_config_delete_all_request, ApproveConfigDeleteAllResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, approve_config_batched_stream_request: "ApproveConfigBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApproveConfigBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ApproveConfigService/GetAllBatched", approve_config_batched_stream_request, ApproveConfigBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, approve_config_batched_stream_request: "ApproveConfigBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApproveConfigBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ApproveConfigService/SubscribeBatched", approve_config_batched_stream_request, ApproveConfigBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class ChangeControlServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, change_control_request: "ChangeControlRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ChangeControlResponse": """ """ return await self._unary_unary( "/arista.changecontrol.v1.ChangeControlService/GetOne", change_control_request, ChangeControlResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, change_control_some_request: "ChangeControlSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlSomeResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlService/GetSome", change_control_some_request, ChangeControlSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, change_control_stream_request: "ChangeControlStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlService/GetAll", change_control_stream_request, ChangeControlStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, change_control_stream_request: "ChangeControlStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlService/Subscribe", change_control_stream_request, ChangeControlStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, change_control_stream_request: "ChangeControlStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.changecontrol.v1.ChangeControlService/GetMeta", change_control_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, change_control_stream_request: "ChangeControlStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlService/SubscribeMeta", change_control_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, change_control_batched_stream_request: "ChangeControlBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlService/GetAllBatched", change_control_batched_stream_request, ChangeControlBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, change_control_batched_stream_request: "ChangeControlBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlService/SubscribeBatched", change_control_batched_stream_request, ChangeControlBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class ChangeControlConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, change_control_config_request: "ChangeControlConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ChangeControlConfigResponse": """ """ return await self._unary_unary( "/arista.changecontrol.v1.ChangeControlConfigService/GetOne", change_control_config_request, ChangeControlConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, change_control_config_some_request: "ChangeControlConfigSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlConfigSomeResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlConfigService/GetSome", change_control_config_some_request, ChangeControlConfigSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, change_control_config_stream_request: "ChangeControlConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlConfigService/GetAll", change_control_config_stream_request, ChangeControlConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, change_control_config_stream_request: "ChangeControlConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlConfigService/Subscribe", change_control_config_stream_request, ChangeControlConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, change_control_config_stream_request: "ChangeControlConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.changecontrol.v1.ChangeControlConfigService/GetMeta", change_control_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, change_control_config_stream_request: "ChangeControlConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlConfigService/SubscribeMeta", change_control_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, change_control_config_set_request: "ChangeControlConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ChangeControlConfigSetResponse": """ """ return await self._unary_unary( "/arista.changecontrol.v1.ChangeControlConfigService/Set", change_control_config_set_request, ChangeControlConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def set_some( self, change_control_config_set_some_request: "ChangeControlConfigSetSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlConfigSetSomeResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlConfigService/SetSome", change_control_config_set_some_request, ChangeControlConfigSetSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete( self, change_control_config_delete_request: "ChangeControlConfigDeleteRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ChangeControlConfigDeleteResponse": """ """ return await self._unary_unary( "/arista.changecontrol.v1.ChangeControlConfigService/Delete", change_control_config_delete_request, ChangeControlConfigDeleteResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def delete_some( self, change_control_config_delete_some_request: "ChangeControlConfigDeleteSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlConfigDeleteSomeResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlConfigService/DeleteSome", change_control_config_delete_some_request, ChangeControlConfigDeleteSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete_all( self, change_control_config_delete_all_request: "ChangeControlConfigDeleteAllRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlConfigDeleteAllResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlConfigService/DeleteAll", change_control_config_delete_all_request, ChangeControlConfigDeleteAllResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, change_control_config_batched_stream_request: "ChangeControlConfigBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlConfigBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlConfigService/GetAllBatched", change_control_config_batched_stream_request, ChangeControlConfigBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, change_control_config_batched_stream_request: "ChangeControlConfigBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ChangeControlConfigBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.changecontrol.v1.ChangeControlConfigService/SubscribeBatched", change_control_config_batched_stream_request, ChangeControlConfigBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
from .... import fmp as ___fmp__ from ... import subscriptions as __subscriptions__ from ... import time as __time__
[docs] class ApproveConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, approve_config_request: "ApproveConfigRequest" ) -> "ApproveConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, approve_config_some_request: "ApproveConfigSomeRequest" ) -> AsyncIterator[ApproveConfigSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, approve_config_stream_request: "ApproveConfigStreamRequest" ) -> AsyncIterator[ApproveConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, approve_config_stream_request: "ApproveConfigStreamRequest" ) -> AsyncIterator[ApproveConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, approve_config_stream_request: "ApproveConfigStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, approve_config_stream_request: "ApproveConfigStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, approve_config_set_request: "ApproveConfigSetRequest" ) -> "ApproveConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set_some( self, approve_config_set_some_request: "ApproveConfigSetSomeRequest" ) -> AsyncIterator[ApproveConfigSetSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete( self, approve_config_delete_request: "ApproveConfigDeleteRequest" ) -> "ApproveConfigDeleteResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_some( self, approve_config_delete_some_request: "ApproveConfigDeleteSomeRequest" ) -> AsyncIterator[ApproveConfigDeleteSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all( self, approve_config_delete_all_request: "ApproveConfigDeleteAllRequest" ) -> AsyncIterator[ApproveConfigDeleteAllResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, approve_config_batched_stream_request: "ApproveConfigBatchedStreamRequest" ) -> AsyncIterator[ApproveConfigBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, approve_config_batched_stream_request: "ApproveConfigBatchedStreamRequest" ) -> AsyncIterator[ApproveConfigBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[ApproveConfigRequest, ApproveConfigResponse]", ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[ApproveConfigSomeRequest, ApproveConfigSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[ApproveConfigStreamRequest, ApproveConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[ApproveConfigStreamRequest, ApproveConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[ApproveConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[ApproveConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[ApproveConfigSetRequest, ApproveConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) async def __rpc_set_some( self, stream: "grpclib.server.Stream[ApproveConfigSetSomeRequest, ApproveConfigSetSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.set_some, stream, request, ) async def __rpc_delete( self, stream: "grpclib.server.Stream[ApproveConfigDeleteRequest, ApproveConfigDeleteResponse]", ) -> None: request = await stream.recv_message() response = await self.delete(request) await stream.send_message(response) async def __rpc_delete_some( self, stream: "grpclib.server.Stream[ApproveConfigDeleteSomeRequest, ApproveConfigDeleteSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_some, stream, request, ) async def __rpc_delete_all( self, stream: "grpclib.server.Stream[ApproveConfigDeleteAllRequest, ApproveConfigDeleteAllResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_all, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[ApproveConfigBatchedStreamRequest, ApproveConfigBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[ApproveConfigBatchedStreamRequest, ApproveConfigBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.changecontrol.v1.ApproveConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, ApproveConfigRequest, ApproveConfigResponse, ), "/arista.changecontrol.v1.ApproveConfigService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, ApproveConfigSomeRequest, ApproveConfigSomeResponse, ), "/arista.changecontrol.v1.ApproveConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, ApproveConfigStreamRequest, ApproveConfigStreamResponse, ), "/arista.changecontrol.v1.ApproveConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, ApproveConfigStreamRequest, ApproveConfigStreamResponse, ), "/arista.changecontrol.v1.ApproveConfigService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, ApproveConfigStreamRequest, MetaResponse, ), "/arista.changecontrol.v1.ApproveConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, ApproveConfigStreamRequest, MetaResponse, ), "/arista.changecontrol.v1.ApproveConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, ApproveConfigSetRequest, ApproveConfigSetResponse, ), "/arista.changecontrol.v1.ApproveConfigService/SetSome": grpclib.const.Handler( self.__rpc_set_some, grpclib.const.Cardinality.UNARY_STREAM, ApproveConfigSetSomeRequest, ApproveConfigSetSomeResponse, ), "/arista.changecontrol.v1.ApproveConfigService/Delete": grpclib.const.Handler( self.__rpc_delete, grpclib.const.Cardinality.UNARY_UNARY, ApproveConfigDeleteRequest, ApproveConfigDeleteResponse, ), "/arista.changecontrol.v1.ApproveConfigService/DeleteSome": grpclib.const.Handler( self.__rpc_delete_some, grpclib.const.Cardinality.UNARY_STREAM, ApproveConfigDeleteSomeRequest, ApproveConfigDeleteSomeResponse, ), "/arista.changecontrol.v1.ApproveConfigService/DeleteAll": grpclib.const.Handler( self.__rpc_delete_all, grpclib.const.Cardinality.UNARY_STREAM, ApproveConfigDeleteAllRequest, ApproveConfigDeleteAllResponse, ), "/arista.changecontrol.v1.ApproveConfigService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, ApproveConfigBatchedStreamRequest, ApproveConfigBatchedStreamResponse, ), "/arista.changecontrol.v1.ApproveConfigService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, ApproveConfigBatchedStreamRequest, ApproveConfigBatchedStreamResponse, ), }
[docs] class ChangeControlServiceBase(ServiceBase): """ """
[docs] async def get_one( self, change_control_request: "ChangeControlRequest" ) -> "ChangeControlResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, change_control_some_request: "ChangeControlSomeRequest" ) -> AsyncIterator[ChangeControlSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, change_control_stream_request: "ChangeControlStreamRequest" ) -> AsyncIterator[ChangeControlStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, change_control_stream_request: "ChangeControlStreamRequest" ) -> AsyncIterator[ChangeControlStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, change_control_stream_request: "ChangeControlStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, change_control_stream_request: "ChangeControlStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, change_control_batched_stream_request: "ChangeControlBatchedStreamRequest" ) -> AsyncIterator[ChangeControlBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, change_control_batched_stream_request: "ChangeControlBatchedStreamRequest" ) -> AsyncIterator[ChangeControlBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[ChangeControlRequest, ChangeControlResponse]", ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[ChangeControlSomeRequest, ChangeControlSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[ChangeControlStreamRequest, ChangeControlStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[ChangeControlStreamRequest, ChangeControlStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[ChangeControlStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[ChangeControlStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[ChangeControlBatchedStreamRequest, ChangeControlBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[ChangeControlBatchedStreamRequest, ChangeControlBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.changecontrol.v1.ChangeControlService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, ChangeControlRequest, ChangeControlResponse, ), "/arista.changecontrol.v1.ChangeControlService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlSomeRequest, ChangeControlSomeResponse, ), "/arista.changecontrol.v1.ChangeControlService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlStreamRequest, ChangeControlStreamResponse, ), "/arista.changecontrol.v1.ChangeControlService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlStreamRequest, ChangeControlStreamResponse, ), "/arista.changecontrol.v1.ChangeControlService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, ChangeControlStreamRequest, MetaResponse, ), "/arista.changecontrol.v1.ChangeControlService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlStreamRequest, MetaResponse, ), "/arista.changecontrol.v1.ChangeControlService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlBatchedStreamRequest, ChangeControlBatchedStreamResponse, ), "/arista.changecontrol.v1.ChangeControlService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlBatchedStreamRequest, ChangeControlBatchedStreamResponse, ), }
[docs] class ChangeControlConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, change_control_config_request: "ChangeControlConfigRequest" ) -> "ChangeControlConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, change_control_config_some_request: "ChangeControlConfigSomeRequest" ) -> AsyncIterator[ChangeControlConfigSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, change_control_config_stream_request: "ChangeControlConfigStreamRequest" ) -> AsyncIterator[ChangeControlConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, change_control_config_stream_request: "ChangeControlConfigStreamRequest" ) -> AsyncIterator[ChangeControlConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, change_control_config_stream_request: "ChangeControlConfigStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, change_control_config_stream_request: "ChangeControlConfigStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, change_control_config_set_request: "ChangeControlConfigSetRequest" ) -> "ChangeControlConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set_some( self, change_control_config_set_some_request: "ChangeControlConfigSetSomeRequest", ) -> AsyncIterator[ChangeControlConfigSetSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete( self, change_control_config_delete_request: "ChangeControlConfigDeleteRequest" ) -> "ChangeControlConfigDeleteResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_some( self, change_control_config_delete_some_request: "ChangeControlConfigDeleteSomeRequest", ) -> AsyncIterator[ChangeControlConfigDeleteSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all( self, change_control_config_delete_all_request: "ChangeControlConfigDeleteAllRequest", ) -> AsyncIterator[ChangeControlConfigDeleteAllResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, change_control_config_batched_stream_request: "ChangeControlConfigBatchedStreamRequest", ) -> AsyncIterator[ChangeControlConfigBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, change_control_config_batched_stream_request: "ChangeControlConfigBatchedStreamRequest", ) -> AsyncIterator[ChangeControlConfigBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[ChangeControlConfigRequest, ChangeControlConfigResponse]", ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[ChangeControlConfigSomeRequest, ChangeControlConfigSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[ChangeControlConfigStreamRequest, ChangeControlConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[ChangeControlConfigStreamRequest, ChangeControlConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[ChangeControlConfigStreamRequest, MetaResponse]", ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[ChangeControlConfigStreamRequest, MetaResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[ChangeControlConfigSetRequest, ChangeControlConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) async def __rpc_set_some( self, stream: "grpclib.server.Stream[ChangeControlConfigSetSomeRequest, ChangeControlConfigSetSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.set_some, stream, request, ) async def __rpc_delete( self, stream: "grpclib.server.Stream[ChangeControlConfigDeleteRequest, ChangeControlConfigDeleteResponse]", ) -> None: request = await stream.recv_message() response = await self.delete(request) await stream.send_message(response) async def __rpc_delete_some( self, stream: "grpclib.server.Stream[ChangeControlConfigDeleteSomeRequest, ChangeControlConfigDeleteSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_some, stream, request, ) async def __rpc_delete_all( self, stream: "grpclib.server.Stream[ChangeControlConfigDeleteAllRequest, ChangeControlConfigDeleteAllResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_all, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[ChangeControlConfigBatchedStreamRequest, ChangeControlConfigBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[ChangeControlConfigBatchedStreamRequest, ChangeControlConfigBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.changecontrol.v1.ChangeControlConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, ChangeControlConfigRequest, ChangeControlConfigResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlConfigSomeRequest, ChangeControlConfigSomeResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlConfigStreamRequest, ChangeControlConfigStreamResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlConfigStreamRequest, ChangeControlConfigStreamResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, ChangeControlConfigStreamRequest, MetaResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlConfigStreamRequest, MetaResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, ChangeControlConfigSetRequest, ChangeControlConfigSetResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/SetSome": grpclib.const.Handler( self.__rpc_set_some, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlConfigSetSomeRequest, ChangeControlConfigSetSomeResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/Delete": grpclib.const.Handler( self.__rpc_delete, grpclib.const.Cardinality.UNARY_UNARY, ChangeControlConfigDeleteRequest, ChangeControlConfigDeleteResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/DeleteSome": grpclib.const.Handler( self.__rpc_delete_some, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlConfigDeleteSomeRequest, ChangeControlConfigDeleteSomeResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/DeleteAll": grpclib.const.Handler( self.__rpc_delete_all, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlConfigDeleteAllRequest, ChangeControlConfigDeleteAllResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlConfigBatchedStreamRequest, ChangeControlConfigBatchedStreamResponse, ), "/arista.changecontrol.v1.ChangeControlConfigService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, ChangeControlConfigBatchedStreamRequest, ChangeControlConfigBatchedStreamResponse, ), }