Source code for cloudvision.api.arista.serviceaccount.v1

# Copyright (c) 2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: arista/serviceaccount.v1/serviceaccount.proto, arista/serviceaccount.v1/services.gen.proto
# plugin: python-aristaproto
# This file has been @generated

__all__ = (
    "AccountStatus",
    "AccountKey",
    "AccountConfig",
    "Account",
    "TokenKey",
    "TokenConfig",
    "Token",
    "TokenSelfRefreshConfig",
    "MetaResponse",
    "AccountRequest",
    "AccountResponse",
    "AccountSomeRequest",
    "AccountSomeResponse",
    "AccountStreamRequest",
    "AccountStreamResponse",
    "AccountConfigRequest",
    "AccountConfigResponse",
    "AccountConfigSomeRequest",
    "AccountConfigSomeResponse",
    "AccountConfigStreamRequest",
    "AccountConfigStreamResponse",
    "AccountConfigSetRequest",
    "AccountConfigSetResponse",
    "AccountConfigSetSomeRequest",
    "AccountConfigSetSomeResponse",
    "AccountConfigDeleteRequest",
    "AccountConfigDeleteResponse",
    "AccountConfigDeleteSomeRequest",
    "AccountConfigDeleteSomeResponse",
    "AccountConfigDeleteAllRequest",
    "AccountConfigDeleteAllResponse",
    "TokenRequest",
    "TokenResponse",
    "TokenSomeRequest",
    "TokenSomeResponse",
    "TokenStreamRequest",
    "TokenStreamResponse",
    "TokenConfigRequest",
    "TokenConfigResponse",
    "TokenConfigSomeRequest",
    "TokenConfigSomeResponse",
    "TokenConfigStreamRequest",
    "TokenConfigStreamResponse",
    "TokenConfigSetRequest",
    "TokenConfigSetResponse",
    "TokenConfigSetSomeRequest",
    "TokenConfigSetSomeResponse",
    "TokenConfigDeleteRequest",
    "TokenConfigDeleteResponse",
    "TokenConfigDeleteSomeRequest",
    "TokenConfigDeleteSomeResponse",
    "TokenConfigDeleteAllRequest",
    "TokenConfigDeleteAllResponse",
    "TokenSelfRefreshConfigRequest",
    "TokenSelfRefreshConfigResponse",
    "TokenSelfRefreshConfigStreamRequest",
    "TokenSelfRefreshConfigStreamResponse",
    "TokenSelfRefreshConfigSetRequest",
    "TokenSelfRefreshConfigSetResponse",
    "AccountServiceStub",
    "AccountServiceBase",
    "AccountConfigServiceStub",
    "AccountConfigServiceBase",
    "TokenServiceStub",
    "TokenServiceBase",
    "TokenConfigServiceStub",
    "TokenConfigServiceBase",
    "TokenSelfRefreshConfigServiceStub",
    "TokenSelfRefreshConfigServiceBase",
)


from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Dict,
    List,
    Optional,
)

import aristaproto
import grpclib
from aristaproto.grpc.grpclib_server import ServiceBase

if TYPE_CHECKING:
    import grpclib.server
    from aristaproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


[docs] class AccountStatus(aristaproto.Enum): """ AccountStatus determines whether an service account is enabled or disabled. """ UNSPECIFIED = 0 """ ACCOUNT_STATUS_UNSPECIFIED indicates the service account status is unspecified. """ ENABLED = 1 """ACCOUNT_STATUS_ENABLED indicates the service account is enabled.""" DISABLED = 2 """ACCOUNT_STATUS_DISABLED indicates the service account is disabled."""
[docs] @dataclass(eq=False, repr=False) class AccountKey(aristaproto.Message): """AccountKey contains the name of the service account.""" name: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """name is the unique identifier of the service account."""
[docs] @dataclass(eq=False, repr=False) class AccountConfig(aristaproto.Message): """AccountConfig holds the configuration for a service account.""" key: "AccountKey" = aristaproto.message_field(1) """key contains the name of the service account.""" status: "AccountStatus" = aristaproto.enum_field(2) """ status determines if the service account is enabled or disabled. New service accounts are enabled by default. """ description: Optional[str] = aristaproto.message_field( 3, wraps=aristaproto.TYPE_STRING ) """description is a comment describing the service account.""" groups: "___fmp__.RepeatedString" = aristaproto.message_field(4) """ groups is a list of roles that the service account inherits permissions from. """ allow_token_refresh: Optional[bool] = aristaproto.message_field( 7, wraps=aristaproto.TYPE_BOOL ) """ allow_token_refresh determines whether or not the service account can refresh any of its own active tokens, including preexisting ones. """
[docs] @dataclass(eq=False, repr=False) class Account(aristaproto.Message): """Account describes a service account.""" key: "AccountKey" = aristaproto.message_field(1) """key uniquely identifies the service account.""" status: "AccountStatus" = aristaproto.enum_field(2) """ status determines whether the service account is enabled or disabled. """ description: Optional[str] = aristaproto.message_field( 3, wraps=aristaproto.TYPE_STRING ) """description is a comment describing the service account.""" groups: "___fmp__.RepeatedString" = aristaproto.message_field(4) """ groups is a list of roles that the service account inherits permissions from. """ created_by: Optional[str] = aristaproto.message_field( 5, wraps=aristaproto.TYPE_STRING ) """ created_by is the name of the entity that created the service account. """ last_access: datetime = aristaproto.message_field(6) """last_access is the time when the service account was last fetched.""" allow_token_refresh: Optional[bool] = aristaproto.message_field( 7, wraps=aristaproto.TYPE_BOOL ) """ allow_token_refresh determines whether or not the service account can refresh any of its own active tokens, including preexisting ones. """
[docs] @dataclass(eq=False, repr=False) class TokenKey(aristaproto.Message): """TokenKey contains service account token ID.""" id: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """id is the unique identifier of the service account token."""
[docs] @dataclass(eq=False, repr=False) class TokenConfig(aristaproto.Message): """ TokenConfig holds the configuration for a service account token. The token is a signed JWT which can be used as a credential for REST and WRPC endpoints. """ key: "TokenKey" = aristaproto.message_field(1) """key uniquely identifies the service account token.""" user: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ user is the name of the service account that the token is generated for. """ description: Optional[str] = aristaproto.message_field( 3, wraps=aristaproto.TYPE_STRING ) """ description is a short name or comment used to identify the service account token. """ valid_for: timedelta = aristaproto.message_field(4) """ valid_for determines the duration that the service account token will be valid for. """ token: Optional[str] = aristaproto.message_field(5, wraps=aristaproto.TYPE_STRING) """ token is the JWT generated for a service account token. It is only populated in Set response. """
[docs] @dataclass(eq=False, repr=False) class Token(aristaproto.Message): """Token describes a service account token.""" key: "TokenKey" = aristaproto.message_field(1) """key uniquely identifies the service account token.""" user: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ user is the name of the service account that the token is generated for. """ description: Optional[str] = aristaproto.message_field( 3, wraps=aristaproto.TYPE_STRING ) """ description is a short name or comment used to identify the service account token. """ valid_until: datetime = aristaproto.message_field(4) """ valid_until is the time that the service account token will be valid until. """ created_by: Optional[str] = aristaproto.message_field( 5, wraps=aristaproto.TYPE_STRING ) """ created_by is the name of the entity that created the service account token. """ last_used: datetime = aristaproto.message_field(6) """ last_used is the time when the service account token was last used to authenticate. """ replaced_at: datetime = aristaproto.message_field(7) """ replaced_at is the time which the token has been replaced. It is only populated when the token is refreshed. """
[docs] @dataclass(eq=False, repr=False) class TokenSelfRefreshConfig(aristaproto.Message): """ TokenSelfRefreshConfig describes the response a service account gets when they refresh their token """ valid_for: timedelta = aristaproto.message_field(1) """ valid_for is the duration that the service account token will be valid for. Default value will be the original token duration. """ new_token: Optional[str] = aristaproto.message_field( 2, wraps=aristaproto.TYPE_STRING ) """ new_token is the JWT token generated for a service account token. This is populated in the response of a Set() request. """
[docs] @dataclass(eq=False, repr=False) class MetaResponse(aristaproto.Message): """ """ time: datetime = aristaproto.message_field(1) """ Time holds the timestamp of the last item included in the metadata calculation. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(2) """ Operation indicates how the value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """ count: Optional[int] = aristaproto.message_field(3, wraps=aristaproto.TYPE_UINT32) """ Count is the number of items present under the conditions of the request. """
[docs] @dataclass(eq=False, repr=False) class AccountRequest(aristaproto.Message): """ """ key: "AccountKey" = aristaproto.message_field(1) """ Key uniquely identifies a Account instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class AccountResponse(aristaproto.Message): """ """ value: "Account" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the Account instance in this response. """
[docs] @dataclass(eq=False, repr=False) class AccountSomeRequest(aristaproto.Message): """ """ keys: List["AccountKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class AccountSomeResponse(aristaproto.Message): """ """ value: "Account" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the Account instance in this response. """
[docs] @dataclass(eq=False, repr=False) class AccountStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["Account"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each Account at end. * Each Account response is fully-specified (all fields set). * start: Returns the state of each Account at start, followed by updates until now. * Each Account response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each Account at start, followed by updates until end. * Each Account response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class AccountStreamResponse(aristaproto.Message): """ """ value: "Account" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this Account's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the Account value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigRequest(aristaproto.Message): """ """ key: "AccountKey" = aristaproto.message_field(1) """ Key uniquely identifies a AccountConfig instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigResponse(aristaproto.Message): """ """ value: "AccountConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the AccountConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigSomeRequest(aristaproto.Message): """ """ keys: List["AccountKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigSomeResponse(aristaproto.Message): """ """ value: "AccountConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the AccountConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["AccountConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each AccountConfig at end. * Each AccountConfig response is fully-specified (all fields set). * start: Returns the state of each AccountConfig at start, followed by updates until now. * Each AccountConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each AccountConfig at start, followed by updates until end. * Each AccountConfig response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigStreamResponse(aristaproto.Message): """ """ value: "AccountConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this AccountConfig's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the AccountConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigSetRequest(aristaproto.Message): """ """ value: "AccountConfig" = aristaproto.message_field(1) """ AccountConfig carries the value to set into the datastore. See the documentation on the AccountConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigSetResponse(aristaproto.Message): """ """ value: "AccountConfig" = aristaproto.message_field(1) """ Value carries all the values given in the AccountConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigSetSomeRequest(aristaproto.Message): """ """ values: List["AccountConfig"] = aristaproto.message_field(1) """ value contains a list of AccountConfig values to write. It is possible to provide more values than can fit within either: - the maxiumum send size of the client - the maximum receive size of the server If this error occurs you must reduce the number of values sent. See gRPC \"maximum message size\" documentation for more information. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigSetSomeResponse(aristaproto.Message): """ """ key: "AccountKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class AccountConfigDeleteRequest(aristaproto.Message): """ """ key: "AccountKey" = aristaproto.message_field(1) """ Key indicates which AccountConfig instance to remove. This field must always be set. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigDeleteResponse(aristaproto.Message): """ """ key: "AccountKey" = aristaproto.message_field(1) """Key echoes back the key of the deleted AccountConfig instance.""" time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the deletion. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==DeletedAt will not include this instance. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigDeleteSomeRequest(aristaproto.Message): """ """ keys: List["AccountKey"] = aristaproto.message_field(1) """key contains a list of AccountConfig keys to delete"""
[docs] @dataclass(eq=False, repr=False) class AccountConfigDeleteSomeResponse(aristaproto.Message): """AccountConfigDeleteSomeResponse is only sent when there is an error.""" key: "AccountKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class AccountConfigDeleteAllRequest(aristaproto.Message): """ """ partial_eq_filter: List["AccountConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a DeleteAll. This requires all provided fields to be equal to the response. A filtered DeleteAll will use GetAll with filter to find things to delete. """
[docs] @dataclass(eq=False, repr=False) class AccountConfigDeleteAllResponse(aristaproto.Message): """ """ type: "___fmp__.DeleteError" = aristaproto.enum_field(1) """ This describes the class of delete error. A DeleteAllResponse is only sent when there is an error. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """This indicates the error message from the delete failure.""" key: "AccountKey" = aristaproto.message_field(3) """ This is the key of the AccountConfig instance that failed to be deleted. """ time: datetime = aristaproto.message_field(4) """Time indicates the (UTC) timestamp when the key was being deleted."""
[docs] @dataclass(eq=False, repr=False) class TokenRequest(aristaproto.Message): """ """ key: "TokenKey" = aristaproto.message_field(1) """ Key uniquely identifies a Token instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class TokenResponse(aristaproto.Message): """ """ value: "Token" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the Token instance in this response. """
[docs] @dataclass(eq=False, repr=False) class TokenSomeRequest(aristaproto.Message): """ """ keys: List["TokenKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class TokenSomeResponse(aristaproto.Message): """ """ value: "Token" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the Token instance in this response. """
[docs] @dataclass(eq=False, repr=False) class TokenStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["Token"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each Token at end. * Each Token response is fully-specified (all fields set). * start: Returns the state of each Token at start, followed by updates until now. * Each Token response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each Token at start, followed by updates until end. * Each Token response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class TokenStreamResponse(aristaproto.Message): """ """ value: "Token" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this Token's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the Token value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigRequest(aristaproto.Message): """ """ key: "TokenKey" = aristaproto.message_field(1) """ Key uniquely identifies a TokenConfig instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigResponse(aristaproto.Message): """ """ value: "TokenConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the TokenConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigSomeRequest(aristaproto.Message): """ """ keys: List["TokenKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigSomeResponse(aristaproto.Message): """ """ value: "TokenConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the TokenConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["TokenConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each TokenConfig at end. * Each TokenConfig response is fully-specified (all fields set). * start: Returns the state of each TokenConfig at start, followed by updates until now. * Each TokenConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each TokenConfig at start, followed by updates until end. * Each TokenConfig response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigStreamResponse(aristaproto.Message): """ """ value: "TokenConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this TokenConfig's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the TokenConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigSetRequest(aristaproto.Message): """ """ value: "TokenConfig" = aristaproto.message_field(1) """ TokenConfig carries the value to set into the datastore. See the documentation on the TokenConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigSetResponse(aristaproto.Message): """ """ value: "TokenConfig" = aristaproto.message_field(1) """ Value carries all the values given in the TokenConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigSetSomeRequest(aristaproto.Message): """ """ values: List["TokenConfig"] = aristaproto.message_field(1) """ value contains a list of TokenConfig values to write. It is possible to provide more values than can fit within either: - the maxiumum send size of the client - the maximum receive size of the server If this error occurs you must reduce the number of values sent. See gRPC \"maximum message size\" documentation for more information. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigSetSomeResponse(aristaproto.Message): """ """ key: "TokenKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class TokenConfigDeleteRequest(aristaproto.Message): """ """ key: "TokenKey" = aristaproto.message_field(1) """ Key indicates which TokenConfig instance to remove. This field must always be set. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigDeleteResponse(aristaproto.Message): """ """ key: "TokenKey" = aristaproto.message_field(1) """Key echoes back the key of the deleted TokenConfig instance.""" time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the deletion. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==DeletedAt will not include this instance. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigDeleteSomeRequest(aristaproto.Message): """ """ keys: List["TokenKey"] = aristaproto.message_field(1) """key contains a list of TokenConfig keys to delete"""
[docs] @dataclass(eq=False, repr=False) class TokenConfigDeleteSomeResponse(aristaproto.Message): """TokenConfigDeleteSomeResponse is only sent when there is an error.""" key: "TokenKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class TokenConfigDeleteAllRequest(aristaproto.Message): """ """ partial_eq_filter: List["TokenConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a DeleteAll. This requires all provided fields to be equal to the response. A filtered DeleteAll will use GetAll with filter to find things to delete. """
[docs] @dataclass(eq=False, repr=False) class TokenConfigDeleteAllResponse(aristaproto.Message): """ """ type: "___fmp__.DeleteError" = aristaproto.enum_field(1) """ This describes the class of delete error. A DeleteAllResponse is only sent when there is an error. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """This indicates the error message from the delete failure.""" key: "TokenKey" = aristaproto.message_field(3) """ This is the key of the TokenConfig instance that failed to be deleted. """ time: datetime = aristaproto.message_field(4) """Time indicates the (UTC) timestamp when the key was being deleted."""
[docs] @dataclass(eq=False, repr=False) class TokenSelfRefreshConfigRequest(aristaproto.Message): """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class TokenSelfRefreshConfigResponse(aristaproto.Message): """ """ value: "TokenSelfRefreshConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the TokenSelfRefreshConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class TokenSelfRefreshConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["TokenSelfRefreshConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each TokenSelfRefreshConfig at end. * Each TokenSelfRefreshConfig response is fully-specified (all fields set). * start: Returns the state of each TokenSelfRefreshConfig at start, followed by updates until now. * Each TokenSelfRefreshConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each TokenSelfRefreshConfig at start, followed by updates until end. * Each TokenSelfRefreshConfig response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class TokenSelfRefreshConfigStreamResponse(aristaproto.Message): """ """ value: "TokenSelfRefreshConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this TokenSelfRefreshConfig's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the TokenSelfRefreshConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class TokenSelfRefreshConfigSetRequest(aristaproto.Message): """ """ value: "TokenSelfRefreshConfig" = aristaproto.message_field(1) """ TokenSelfRefreshConfig carries the value to set into the datastore. See the documentation on the TokenSelfRefreshConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class TokenSelfRefreshConfigSetResponse(aristaproto.Message): """ """ value: "TokenSelfRefreshConfig" = aristaproto.message_field(1) """ Value carries all the values given in the TokenSelfRefreshConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] class AccountServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, account_request: "AccountRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AccountResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.AccountService/GetOne", account_request, AccountResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, account_some_request: "AccountSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[AccountSomeResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountService/GetSome", account_some_request, AccountSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, account_stream_request: "AccountStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[AccountStreamResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountService/GetAll", account_stream_request, AccountStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, account_stream_request: "AccountStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[AccountStreamResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountService/Subscribe", account_stream_request, AccountStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, account_stream_request: "AccountStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.AccountService/GetMeta", account_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, account_stream_request: "AccountStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountService/SubscribeMeta", account_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class AccountConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, account_config_request: "AccountConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AccountConfigResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.AccountConfigService/GetOne", account_config_request, AccountConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, account_config_some_request: "AccountConfigSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[AccountConfigSomeResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountConfigService/GetSome", account_config_some_request, AccountConfigSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, account_config_stream_request: "AccountConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[AccountConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountConfigService/GetAll", account_config_stream_request, AccountConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, account_config_stream_request: "AccountConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[AccountConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountConfigService/Subscribe", account_config_stream_request, AccountConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, account_config_stream_request: "AccountConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.AccountConfigService/GetMeta", account_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, account_config_stream_request: "AccountConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountConfigService/SubscribeMeta", account_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, account_config_set_request: "AccountConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AccountConfigSetResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.AccountConfigService/Set", account_config_set_request, AccountConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def set_some( self, account_config_set_some_request: "AccountConfigSetSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[AccountConfigSetSomeResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountConfigService/SetSome", account_config_set_some_request, AccountConfigSetSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete( self, account_config_delete_request: "AccountConfigDeleteRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AccountConfigDeleteResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.AccountConfigService/Delete", account_config_delete_request, AccountConfigDeleteResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def delete_some( self, account_config_delete_some_request: "AccountConfigDeleteSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[AccountConfigDeleteSomeResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountConfigService/DeleteSome", account_config_delete_some_request, AccountConfigDeleteSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete_all( self, account_config_delete_all_request: "AccountConfigDeleteAllRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[AccountConfigDeleteAllResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.AccountConfigService/DeleteAll", account_config_delete_all_request, AccountConfigDeleteAllResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class TokenServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, token_request: "TokenRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "TokenResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.TokenService/GetOne", token_request, TokenResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, token_some_request: "TokenSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenSomeResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenService/GetSome", token_some_request, TokenSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, token_stream_request: "TokenStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenStreamResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenService/GetAll", token_stream_request, TokenStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, token_stream_request: "TokenStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenStreamResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenService/Subscribe", token_stream_request, TokenStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, token_stream_request: "TokenStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.TokenService/GetMeta", token_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, token_stream_request: "TokenStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenService/SubscribeMeta", token_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class TokenConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, token_config_request: "TokenConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "TokenConfigResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.TokenConfigService/GetOne", token_config_request, TokenConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, token_config_some_request: "TokenConfigSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenConfigSomeResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenConfigService/GetSome", token_config_some_request, TokenConfigSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, token_config_stream_request: "TokenConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenConfigService/GetAll", token_config_stream_request, TokenConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, token_config_stream_request: "TokenConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenConfigService/Subscribe", token_config_stream_request, TokenConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, token_config_stream_request: "TokenConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.TokenConfigService/GetMeta", token_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, token_config_stream_request: "TokenConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenConfigService/SubscribeMeta", token_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, token_config_set_request: "TokenConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "TokenConfigSetResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.TokenConfigService/Set", token_config_set_request, TokenConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def set_some( self, token_config_set_some_request: "TokenConfigSetSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenConfigSetSomeResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenConfigService/SetSome", token_config_set_some_request, TokenConfigSetSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete( self, token_config_delete_request: "TokenConfigDeleteRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "TokenConfigDeleteResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.TokenConfigService/Delete", token_config_delete_request, TokenConfigDeleteResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def delete_some( self, token_config_delete_some_request: "TokenConfigDeleteSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenConfigDeleteSomeResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenConfigService/DeleteSome", token_config_delete_some_request, TokenConfigDeleteSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete_all( self, token_config_delete_all_request: "TokenConfigDeleteAllRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenConfigDeleteAllResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenConfigService/DeleteAll", token_config_delete_all_request, TokenConfigDeleteAllResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class TokenSelfRefreshConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, token_self_refresh_config_request: "TokenSelfRefreshConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "TokenSelfRefreshConfigResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.TokenSelfRefreshConfigService/GetOne", token_self_refresh_config_request, TokenSelfRefreshConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_all( self, token_self_refresh_config_stream_request: "TokenSelfRefreshConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenSelfRefreshConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenSelfRefreshConfigService/GetAll", token_self_refresh_config_stream_request, TokenSelfRefreshConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, token_self_refresh_config_stream_request: "TokenSelfRefreshConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[TokenSelfRefreshConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenSelfRefreshConfigService/Subscribe", token_self_refresh_config_stream_request, TokenSelfRefreshConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_meta( self, token_self_refresh_config_stream_request: "TokenSelfRefreshConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.serviceaccount.v1.TokenSelfRefreshConfigService/SubscribeMeta", token_self_refresh_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, token_self_refresh_config_set_request: "TokenSelfRefreshConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "TokenSelfRefreshConfigSetResponse": """ """ return await self._unary_unary( "/arista.serviceaccount.v1.TokenSelfRefreshConfigService/Set", token_self_refresh_config_set_request, TokenSelfRefreshConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
from .... import fmp as ___fmp__ from ... import subscriptions as __subscriptions__ from ... import time as __time__
[docs] class AccountServiceBase(ServiceBase): """ """
[docs] async def get_one(self, account_request: "AccountRequest") -> "AccountResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, account_some_request: "AccountSomeRequest" ) -> AsyncIterator[AccountSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, account_stream_request: "AccountStreamRequest" ) -> AsyncIterator[AccountStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, account_stream_request: "AccountStreamRequest" ) -> AsyncIterator[AccountStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, account_stream_request: "AccountStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, account_stream_request: "AccountStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[AccountRequest, AccountResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[AccountSomeRequest, AccountSomeResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[AccountStreamRequest, AccountStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[AccountStreamRequest, AccountStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[AccountStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[AccountStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.serviceaccount.v1.AccountService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, AccountRequest, AccountResponse, ), "/arista.serviceaccount.v1.AccountService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, AccountSomeRequest, AccountSomeResponse, ), "/arista.serviceaccount.v1.AccountService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, AccountStreamRequest, AccountStreamResponse, ), "/arista.serviceaccount.v1.AccountService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, AccountStreamRequest, AccountStreamResponse, ), "/arista.serviceaccount.v1.AccountService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, AccountStreamRequest, MetaResponse, ), "/arista.serviceaccount.v1.AccountService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, AccountStreamRequest, MetaResponse, ), }
[docs] class AccountConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, account_config_request: "AccountConfigRequest" ) -> "AccountConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, account_config_some_request: "AccountConfigSomeRequest" ) -> AsyncIterator[AccountConfigSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, account_config_stream_request: "AccountConfigStreamRequest" ) -> AsyncIterator[AccountConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, account_config_stream_request: "AccountConfigStreamRequest" ) -> AsyncIterator[AccountConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, account_config_stream_request: "AccountConfigStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, account_config_stream_request: "AccountConfigStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, account_config_set_request: "AccountConfigSetRequest" ) -> "AccountConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set_some( self, account_config_set_some_request: "AccountConfigSetSomeRequest" ) -> AsyncIterator[AccountConfigSetSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete( self, account_config_delete_request: "AccountConfigDeleteRequest" ) -> "AccountConfigDeleteResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_some( self, account_config_delete_some_request: "AccountConfigDeleteSomeRequest" ) -> AsyncIterator[AccountConfigDeleteSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all( self, account_config_delete_all_request: "AccountConfigDeleteAllRequest" ) -> AsyncIterator[AccountConfigDeleteAllResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[AccountConfigRequest, AccountConfigResponse]", ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[AccountConfigSomeRequest, AccountConfigSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[AccountConfigStreamRequest, AccountConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[AccountConfigStreamRequest, AccountConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[AccountConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[AccountConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[AccountConfigSetRequest, AccountConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) async def __rpc_set_some( self, stream: "grpclib.server.Stream[AccountConfigSetSomeRequest, AccountConfigSetSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.set_some, stream, request, ) async def __rpc_delete( self, stream: "grpclib.server.Stream[AccountConfigDeleteRequest, AccountConfigDeleteResponse]", ) -> None: request = await stream.recv_message() response = await self.delete(request) await stream.send_message(response) async def __rpc_delete_some( self, stream: "grpclib.server.Stream[AccountConfigDeleteSomeRequest, AccountConfigDeleteSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_some, stream, request, ) async def __rpc_delete_all( self, stream: "grpclib.server.Stream[AccountConfigDeleteAllRequest, AccountConfigDeleteAllResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_all, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.serviceaccount.v1.AccountConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, AccountConfigRequest, AccountConfigResponse, ), "/arista.serviceaccount.v1.AccountConfigService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, AccountConfigSomeRequest, AccountConfigSomeResponse, ), "/arista.serviceaccount.v1.AccountConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, AccountConfigStreamRequest, AccountConfigStreamResponse, ), "/arista.serviceaccount.v1.AccountConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, AccountConfigStreamRequest, AccountConfigStreamResponse, ), "/arista.serviceaccount.v1.AccountConfigService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, AccountConfigStreamRequest, MetaResponse, ), "/arista.serviceaccount.v1.AccountConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, AccountConfigStreamRequest, MetaResponse, ), "/arista.serviceaccount.v1.AccountConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, AccountConfigSetRequest, AccountConfigSetResponse, ), "/arista.serviceaccount.v1.AccountConfigService/SetSome": grpclib.const.Handler( self.__rpc_set_some, grpclib.const.Cardinality.UNARY_STREAM, AccountConfigSetSomeRequest, AccountConfigSetSomeResponse, ), "/arista.serviceaccount.v1.AccountConfigService/Delete": grpclib.const.Handler( self.__rpc_delete, grpclib.const.Cardinality.UNARY_UNARY, AccountConfigDeleteRequest, AccountConfigDeleteResponse, ), "/arista.serviceaccount.v1.AccountConfigService/DeleteSome": grpclib.const.Handler( self.__rpc_delete_some, grpclib.const.Cardinality.UNARY_STREAM, AccountConfigDeleteSomeRequest, AccountConfigDeleteSomeResponse, ), "/arista.serviceaccount.v1.AccountConfigService/DeleteAll": grpclib.const.Handler( self.__rpc_delete_all, grpclib.const.Cardinality.UNARY_STREAM, AccountConfigDeleteAllRequest, AccountConfigDeleteAllResponse, ), }
[docs] class TokenServiceBase(ServiceBase): """ """
[docs] async def get_one(self, token_request: "TokenRequest") -> "TokenResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, token_some_request: "TokenSomeRequest" ) -> AsyncIterator[TokenSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, token_stream_request: "TokenStreamRequest" ) -> AsyncIterator[TokenStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, token_stream_request: "TokenStreamRequest" ) -> AsyncIterator[TokenStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, token_stream_request: "TokenStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, token_stream_request: "TokenStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[TokenRequest, TokenResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[TokenSomeRequest, TokenSomeResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[TokenStreamRequest, TokenStreamResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[TokenStreamRequest, TokenStreamResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[TokenStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[TokenStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.serviceaccount.v1.TokenService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, TokenRequest, TokenResponse, ), "/arista.serviceaccount.v1.TokenService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, TokenSomeRequest, TokenSomeResponse, ), "/arista.serviceaccount.v1.TokenService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, TokenStreamRequest, TokenStreamResponse, ), "/arista.serviceaccount.v1.TokenService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, TokenStreamRequest, TokenStreamResponse, ), "/arista.serviceaccount.v1.TokenService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, TokenStreamRequest, MetaResponse, ), "/arista.serviceaccount.v1.TokenService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, TokenStreamRequest, MetaResponse, ), }
[docs] class TokenConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, token_config_request: "TokenConfigRequest" ) -> "TokenConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, token_config_some_request: "TokenConfigSomeRequest" ) -> AsyncIterator[TokenConfigSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, token_config_stream_request: "TokenConfigStreamRequest" ) -> AsyncIterator[TokenConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, token_config_stream_request: "TokenConfigStreamRequest" ) -> AsyncIterator[TokenConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, token_config_stream_request: "TokenConfigStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, token_config_stream_request: "TokenConfigStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, token_config_set_request: "TokenConfigSetRequest" ) -> "TokenConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set_some( self, token_config_set_some_request: "TokenConfigSetSomeRequest" ) -> AsyncIterator[TokenConfigSetSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete( self, token_config_delete_request: "TokenConfigDeleteRequest" ) -> "TokenConfigDeleteResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_some( self, token_config_delete_some_request: "TokenConfigDeleteSomeRequest" ) -> AsyncIterator[TokenConfigDeleteSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all( self, token_config_delete_all_request: "TokenConfigDeleteAllRequest" ) -> AsyncIterator[TokenConfigDeleteAllResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[TokenConfigRequest, TokenConfigResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[TokenConfigSomeRequest, TokenConfigSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[TokenConfigStreamRequest, TokenConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[TokenConfigStreamRequest, TokenConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[TokenConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[TokenConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[TokenConfigSetRequest, TokenConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) async def __rpc_set_some( self, stream: "grpclib.server.Stream[TokenConfigSetSomeRequest, TokenConfigSetSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.set_some, stream, request, ) async def __rpc_delete( self, stream: "grpclib.server.Stream[TokenConfigDeleteRequest, TokenConfigDeleteResponse]", ) -> None: request = await stream.recv_message() response = await self.delete(request) await stream.send_message(response) async def __rpc_delete_some( self, stream: "grpclib.server.Stream[TokenConfigDeleteSomeRequest, TokenConfigDeleteSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_some, stream, request, ) async def __rpc_delete_all( self, stream: "grpclib.server.Stream[TokenConfigDeleteAllRequest, TokenConfigDeleteAllResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_all, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.serviceaccount.v1.TokenConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, TokenConfigRequest, TokenConfigResponse, ), "/arista.serviceaccount.v1.TokenConfigService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, TokenConfigSomeRequest, TokenConfigSomeResponse, ), "/arista.serviceaccount.v1.TokenConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, TokenConfigStreamRequest, TokenConfigStreamResponse, ), "/arista.serviceaccount.v1.TokenConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, TokenConfigStreamRequest, TokenConfigStreamResponse, ), "/arista.serviceaccount.v1.TokenConfigService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, TokenConfigStreamRequest, MetaResponse, ), "/arista.serviceaccount.v1.TokenConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, TokenConfigStreamRequest, MetaResponse, ), "/arista.serviceaccount.v1.TokenConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, TokenConfigSetRequest, TokenConfigSetResponse, ), "/arista.serviceaccount.v1.TokenConfigService/SetSome": grpclib.const.Handler( self.__rpc_set_some, grpclib.const.Cardinality.UNARY_STREAM, TokenConfigSetSomeRequest, TokenConfigSetSomeResponse, ), "/arista.serviceaccount.v1.TokenConfigService/Delete": grpclib.const.Handler( self.__rpc_delete, grpclib.const.Cardinality.UNARY_UNARY, TokenConfigDeleteRequest, TokenConfigDeleteResponse, ), "/arista.serviceaccount.v1.TokenConfigService/DeleteSome": grpclib.const.Handler( self.__rpc_delete_some, grpclib.const.Cardinality.UNARY_STREAM, TokenConfigDeleteSomeRequest, TokenConfigDeleteSomeResponse, ), "/arista.serviceaccount.v1.TokenConfigService/DeleteAll": grpclib.const.Handler( self.__rpc_delete_all, grpclib.const.Cardinality.UNARY_STREAM, TokenConfigDeleteAllRequest, TokenConfigDeleteAllResponse, ), }
[docs] class TokenSelfRefreshConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, token_self_refresh_config_request: "TokenSelfRefreshConfigRequest" ) -> "TokenSelfRefreshConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, token_self_refresh_config_stream_request: "TokenSelfRefreshConfigStreamRequest", ) -> AsyncIterator[TokenSelfRefreshConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, token_self_refresh_config_stream_request: "TokenSelfRefreshConfigStreamRequest", ) -> AsyncIterator[TokenSelfRefreshConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, token_self_refresh_config_stream_request: "TokenSelfRefreshConfigStreamRequest", ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, token_self_refresh_config_set_request: "TokenSelfRefreshConfigSetRequest" ) -> "TokenSelfRefreshConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[TokenSelfRefreshConfigRequest, TokenSelfRefreshConfigResponse]", ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_all( self, stream: "grpclib.server.Stream[TokenSelfRefreshConfigStreamRequest, TokenSelfRefreshConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[TokenSelfRefreshConfigStreamRequest, TokenSelfRefreshConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[TokenSelfRefreshConfigStreamRequest, MetaResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[TokenSelfRefreshConfigSetRequest, TokenSelfRefreshConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.serviceaccount.v1.TokenSelfRefreshConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, TokenSelfRefreshConfigRequest, TokenSelfRefreshConfigResponse, ), "/arista.serviceaccount.v1.TokenSelfRefreshConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, TokenSelfRefreshConfigStreamRequest, TokenSelfRefreshConfigStreamResponse, ), "/arista.serviceaccount.v1.TokenSelfRefreshConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, TokenSelfRefreshConfigStreamRequest, TokenSelfRefreshConfigStreamResponse, ), "/arista.serviceaccount.v1.TokenSelfRefreshConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, TokenSelfRefreshConfigStreamRequest, MetaResponse, ), "/arista.serviceaccount.v1.TokenSelfRefreshConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, TokenSelfRefreshConfigSetRequest, TokenSelfRefreshConfigSetResponse, ), }