Source code for cloudvision.api.arista.syslog.v1

# Copyright (c) 2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: arista/syslog.v1/services.gen.proto, arista/syslog.v1/syslog.proto
# plugin: python-aristaproto
# This file has been @generated

__all__ = (
    "TransportProtocol",
    "Format",
    "ExportResult",
    "ExportKey",
    "ExportSuccess",
    "ExportError",
    "ExportStats",
    "ExportConfig",
    "ExportFormatConfig",
    "Export",
    "MetaResponse",
    "ExportRequest",
    "ExportResponse",
    "ExportSomeRequest",
    "ExportSomeResponse",
    "ExportStreamRequest",
    "ExportStreamResponse",
    "ExportConfigRequest",
    "ExportConfigResponse",
    "ExportConfigSomeRequest",
    "ExportConfigSomeResponse",
    "ExportConfigStreamRequest",
    "ExportConfigStreamResponse",
    "ExportConfigSetRequest",
    "ExportConfigSetResponse",
    "ExportConfigSetSomeRequest",
    "ExportConfigSetSomeResponse",
    "ExportConfigDeleteRequest",
    "ExportConfigDeleteResponse",
    "ExportConfigDeleteSomeRequest",
    "ExportConfigDeleteSomeResponse",
    "ExportConfigDeleteAllRequest",
    "ExportConfigDeleteAllResponse",
    "ExportFormatConfigRequest",
    "ExportFormatConfigResponse",
    "ExportFormatConfigStreamRequest",
    "ExportFormatConfigStreamResponse",
    "ExportFormatConfigSetRequest",
    "ExportFormatConfigSetResponse",
    "ExportServiceStub",
    "ExportServiceBase",
    "ExportConfigServiceStub",
    "ExportConfigServiceBase",
    "ExportFormatConfigServiceStub",
    "ExportFormatConfigServiceBase",
)


from dataclasses import dataclass
from datetime import datetime
from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Dict,
    List,
    Optional,
)

import aristaproto
import grpclib
from aristaproto.grpc.grpclib_server import ServiceBase

if TYPE_CHECKING:
    import grpclib.server
    from aristaproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


[docs] class TransportProtocol(aristaproto.Enum): """ TransportProtocol defines the set of transport protocols to be used for export. """ UNSPECIFIED = 0 """TRANSPORT_PROTOCOL_UNSPECIFIED indicates unspecified protocol.""" UDP = 1 """TRANSPORT_PROTOCOL_UDP indicates the User Datagram Protocol (UDP).""" TCP = 2 """TRANSPORT_PROTOCOL_TCP indicates the TCP over TLS protocol."""
[docs] class Format(aristaproto.Enum): """ Format defines the set of formats to be used for export to syslog servers. """ UNSPECIFIED = 0 """FORMAT_UNSPECIFIED indicates unspecified format.""" RFC3164 = 1 """ FORMAT_RFC3164 indicates the BSD syslog protocol as defined by RFC 3164. """ RFC5424 = 2 """ FORMAT_RFC5424 indicates the IETF syslog protocol as defined by RFC 5424. """ CUSTOM = 3 """FORMAT_CUSTOM indicates a native custom format with a UTC timestamp."""
[docs] class ExportResult(aristaproto.Enum): """ ExportResult defines the set of possible results of an export operation. """ UNSPECIFIED = 0 """EXPORT_RESULT_UNSPECIFIED indicates unspecified export operation.""" SUCCESS = 1 """EXPORT_RESULT_SUCCESS indicates a successful export operation.""" FAILURE = 2 """EXPORT_RESULT_FAILURE indicates an unsuccessful export operation."""
[docs] @dataclass(eq=False, repr=False) class ExportKey(aristaproto.Message): """ ExportKey uniquely identifies a syslog server to which to export logs. """ hostname_or_ip: Optional[str] = aristaproto.message_field( 1, wraps=aristaproto.TYPE_STRING ) """hostname_or_ip is the IP address or hostname of the syslog server.""" port_num: "___fmp__.Port" = aristaproto.message_field(2) """port_num is the destination port number of the syslog server.""" protocol: "TransportProtocol" = aristaproto.enum_field(3) """protocol defines the transport protocol for the syslog export."""
[docs] @dataclass(eq=False, repr=False) class ExportSuccess(aristaproto.Message): """ ExportSuccess provides the details of the last successful export operation. """ timestamp: datetime = aristaproto.message_field(1) """ timestamp provides the timestamp of last successful export opeartion. """ log_timestamp: datetime = aristaproto.message_field(2) """ log_timestamp provides the timestamp of the last successful audit log. """
[docs] @dataclass(eq=False, repr=False) class ExportError(aristaproto.Message): """ExportError provides the details of the most recent export error.""" timestamp: datetime = aristaproto.message_field(1) """timestamp provides the timestamp of the export error.""" error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """error indicates the export error.""" detail: Optional[str] = aristaproto.message_field(3, wraps=aristaproto.TYPE_STRING) """detail indicates the audit log that was not exported.""" log_timestamp: datetime = aristaproto.message_field(4) """ log_timestamp provides the timestamp of the last audit log that was not exported due to error. """
[docs] @dataclass(eq=False, repr=False) class ExportStats(aristaproto.Message): """ ExportStats provide statistics of export operations to a syslog sever. """ num_success: Optional[int] = aristaproto.message_field( 1, wraps=aristaproto.TYPE_UINT64 ) """num_success indicates the count of successful exports.""" num_errors: Optional[int] = aristaproto.message_field( 2, wraps=aristaproto.TYPE_UINT64 ) """num_errors indicates the count of export errors.""" last_result: "ExportResult" = aristaproto.enum_field(3) """last_result provides the status of the last export operation."""
[docs] @dataclass(eq=False, repr=False) class ExportConfig(aristaproto.Message): """ExportConfig is used to configure a syslog server.""" key: "ExportKey" = aristaproto.message_field(1) """key uniquely identifies a syslog server."""
[docs] @dataclass(eq=False, repr=False) class ExportFormatConfig(aristaproto.Message): """ ExportFormatConfig is used to confgure a global format for exporting logs to all the configured syslog servers. """ format: "Format" = aristaproto.enum_field(1) """format defines the syslog format to be used for export."""
[docs] @dataclass(eq=False, repr=False) class Export(aristaproto.Message): """Export provides the export status for a syslog server.""" key: "ExportKey" = aristaproto.message_field(1) """key uniquely identifies a syslog server.""" stats: "ExportStats" = aristaproto.message_field(2) """stats contains the statistics for all export operations.""" last_error: "ExportError" = aristaproto.message_field(3) """last_error provides the details of the last export error.""" last_success: "ExportSuccess" = aristaproto.message_field(4) """last_success provides details of last successful export."""
[docs] @dataclass(eq=False, repr=False) class MetaResponse(aristaproto.Message): """ """ time: datetime = aristaproto.message_field(1) """ Time holds the timestamp of the last item included in the metadata calculation. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(2) """ Operation indicates how the value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """ count: Optional[int] = aristaproto.message_field(3, wraps=aristaproto.TYPE_UINT32) """ Count is the number of items present under the conditions of the request. """
[docs] @dataclass(eq=False, repr=False) class ExportRequest(aristaproto.Message): """ """ key: "ExportKey" = aristaproto.message_field(1) """ Key uniquely identifies a Export instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ExportResponse(aristaproto.Message): """ """ value: "Export" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the Export instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ExportSomeRequest(aristaproto.Message): """ """ keys: List["ExportKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ExportSomeResponse(aristaproto.Message): """ """ value: "Export" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the Export instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ExportStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["Export"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each Export at end. * Each Export response is fully-specified (all fields set). * start: Returns the state of each Export at start, followed by updates until now. * Each Export response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each Export at start, followed by updates until end. * Each Export response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class ExportStreamResponse(aristaproto.Message): """ """ value: "Export" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this Export's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the Export value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigRequest(aristaproto.Message): """ """ key: "ExportKey" = aristaproto.message_field(1) """ Key uniquely identifies a ExportConfig instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigResponse(aristaproto.Message): """ """ value: "ExportConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the ExportConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigSomeRequest(aristaproto.Message): """ """ keys: List["ExportKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigSomeResponse(aristaproto.Message): """ """ value: "ExportConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the ExportConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ExportConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ExportConfig at end. * Each ExportConfig response is fully-specified (all fields set). * start: Returns the state of each ExportConfig at start, followed by updates until now. * Each ExportConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ExportConfig at start, followed by updates until end. * Each ExportConfig response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigStreamResponse(aristaproto.Message): """ """ value: "ExportConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this ExportConfig's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the ExportConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigSetRequest(aristaproto.Message): """ """ value: "ExportConfig" = aristaproto.message_field(1) """ ExportConfig carries the value to set into the datastore. See the documentation on the ExportConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigSetResponse(aristaproto.Message): """ """ value: "ExportConfig" = aristaproto.message_field(1) """ Value carries all the values given in the ExportConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigSetSomeRequest(aristaproto.Message): """ """ values: List["ExportConfig"] = aristaproto.message_field(1) """ value contains a list of ExportConfig values to write. It is possible to provide more values than can fit within either: - the maxiumum send size of the client - the maximum receive size of the server If this error occurs you must reduce the number of values sent. See gRPC \"maximum message size\" documentation for more information. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigSetSomeResponse(aristaproto.Message): """ """ key: "ExportKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class ExportConfigDeleteRequest(aristaproto.Message): """ """ key: "ExportKey" = aristaproto.message_field(1) """ Key indicates which ExportConfig instance to remove. This field must always be set. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigDeleteResponse(aristaproto.Message): """ """ key: "ExportKey" = aristaproto.message_field(1) """Key echoes back the key of the deleted ExportConfig instance.""" time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the deletion. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==DeletedAt will not include this instance. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigDeleteSomeRequest(aristaproto.Message): """ """ keys: List["ExportKey"] = aristaproto.message_field(1) """key contains a list of ExportConfig keys to delete"""
[docs] @dataclass(eq=False, repr=False) class ExportConfigDeleteSomeResponse(aristaproto.Message): """ExportConfigDeleteSomeResponse is only sent when there is an error.""" key: "ExportKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class ExportConfigDeleteAllRequest(aristaproto.Message): """ """ partial_eq_filter: List["ExportConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a DeleteAll. This requires all provided fields to be equal to the response. A filtered DeleteAll will use GetAll with filter to find things to delete. """
[docs] @dataclass(eq=False, repr=False) class ExportConfigDeleteAllResponse(aristaproto.Message): """ """ type: "___fmp__.DeleteError" = aristaproto.enum_field(1) """ This describes the class of delete error. A DeleteAllResponse is only sent when there is an error. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """This indicates the error message from the delete failure.""" key: "ExportKey" = aristaproto.message_field(3) """ This is the key of the ExportConfig instance that failed to be deleted. """ time: datetime = aristaproto.message_field(4) """Time indicates the (UTC) timestamp when the key was being deleted."""
[docs] @dataclass(eq=False, repr=False) class ExportFormatConfigRequest(aristaproto.Message): """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ExportFormatConfigResponse(aristaproto.Message): """ """ value: "ExportFormatConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the ExportFormatConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ExportFormatConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ExportFormatConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ExportFormatConfig at end. * Each ExportFormatConfig response is fully-specified (all fields set). * start: Returns the state of each ExportFormatConfig at start, followed by updates until now. * Each ExportFormatConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ExportFormatConfig at start, followed by updates until end. * Each ExportFormatConfig response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class ExportFormatConfigStreamResponse(aristaproto.Message): """ """ value: "ExportFormatConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this ExportFormatConfig's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the ExportFormatConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class ExportFormatConfigSetRequest(aristaproto.Message): """ """ value: "ExportFormatConfig" = aristaproto.message_field(1) """ ExportFormatConfig carries the value to set into the datastore. See the documentation on the ExportFormatConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class ExportFormatConfigSetResponse(aristaproto.Message): """ """ value: "ExportFormatConfig" = aristaproto.message_field(1) """ Value carries all the values given in the ExportFormatConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] class ExportServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, export_request: "ExportRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ExportResponse": """ """ return await self._unary_unary( "/arista.syslog.v1.ExportService/GetOne", export_request, ExportResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, export_some_request: "ExportSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportSomeResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportService/GetSome", export_some_request, ExportSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, export_stream_request: "ExportStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportStreamResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportService/GetAll", export_stream_request, ExportStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, export_stream_request: "ExportStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportStreamResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportService/Subscribe", export_stream_request, ExportStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, export_stream_request: "ExportStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.syslog.v1.ExportService/GetMeta", export_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, export_stream_request: "ExportStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportService/SubscribeMeta", export_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class ExportConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, export_config_request: "ExportConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ExportConfigResponse": """ """ return await self._unary_unary( "/arista.syslog.v1.ExportConfigService/GetOne", export_config_request, ExportConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, export_config_some_request: "ExportConfigSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportConfigSomeResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportConfigService/GetSome", export_config_some_request, ExportConfigSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, export_config_stream_request: "ExportConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportConfigService/GetAll", export_config_stream_request, ExportConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, export_config_stream_request: "ExportConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportConfigService/Subscribe", export_config_stream_request, ExportConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, export_config_stream_request: "ExportConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.syslog.v1.ExportConfigService/GetMeta", export_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, export_config_stream_request: "ExportConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportConfigService/SubscribeMeta", export_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, export_config_set_request: "ExportConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ExportConfigSetResponse": """ """ return await self._unary_unary( "/arista.syslog.v1.ExportConfigService/Set", export_config_set_request, ExportConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def set_some( self, export_config_set_some_request: "ExportConfigSetSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportConfigSetSomeResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportConfigService/SetSome", export_config_set_some_request, ExportConfigSetSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete( self, export_config_delete_request: "ExportConfigDeleteRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ExportConfigDeleteResponse": """ """ return await self._unary_unary( "/arista.syslog.v1.ExportConfigService/Delete", export_config_delete_request, ExportConfigDeleteResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def delete_some( self, export_config_delete_some_request: "ExportConfigDeleteSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportConfigDeleteSomeResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportConfigService/DeleteSome", export_config_delete_some_request, ExportConfigDeleteSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete_all( self, export_config_delete_all_request: "ExportConfigDeleteAllRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportConfigDeleteAllResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportConfigService/DeleteAll", export_config_delete_all_request, ExportConfigDeleteAllResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class ExportFormatConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, export_format_config_request: "ExportFormatConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ExportFormatConfigResponse": """ """ return await self._unary_unary( "/arista.syslog.v1.ExportFormatConfigService/GetOne", export_format_config_request, ExportFormatConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_all( self, export_format_config_stream_request: "ExportFormatConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportFormatConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportFormatConfigService/GetAll", export_format_config_stream_request, ExportFormatConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, export_format_config_stream_request: "ExportFormatConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ExportFormatConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportFormatConfigService/Subscribe", export_format_config_stream_request, ExportFormatConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_meta( self, export_format_config_stream_request: "ExportFormatConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.syslog.v1.ExportFormatConfigService/SubscribeMeta", export_format_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, export_format_config_set_request: "ExportFormatConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ExportFormatConfigSetResponse": """ """ return await self._unary_unary( "/arista.syslog.v1.ExportFormatConfigService/Set", export_format_config_set_request, ExportFormatConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
from .... import fmp as ___fmp__ from ... import subscriptions as __subscriptions__ from ... import time as __time__
[docs] class ExportServiceBase(ServiceBase): """ """
[docs] async def get_one(self, export_request: "ExportRequest") -> "ExportResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, export_some_request: "ExportSomeRequest" ) -> AsyncIterator[ExportSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, export_stream_request: "ExportStreamRequest" ) -> AsyncIterator[ExportStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, export_stream_request: "ExportStreamRequest" ) -> AsyncIterator[ExportStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, export_stream_request: "ExportStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, export_stream_request: "ExportStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[ExportRequest, ExportResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[ExportSomeRequest, ExportSomeResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[ExportStreamRequest, ExportStreamResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[ExportStreamRequest, ExportStreamResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[ExportStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[ExportStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.syslog.v1.ExportService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, ExportRequest, ExportResponse, ), "/arista.syslog.v1.ExportService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, ExportSomeRequest, ExportSomeResponse, ), "/arista.syslog.v1.ExportService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, ExportStreamRequest, ExportStreamResponse, ), "/arista.syslog.v1.ExportService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, ExportStreamRequest, ExportStreamResponse, ), "/arista.syslog.v1.ExportService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, ExportStreamRequest, MetaResponse, ), "/arista.syslog.v1.ExportService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, ExportStreamRequest, MetaResponse, ), }
[docs] class ExportConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, export_config_request: "ExportConfigRequest" ) -> "ExportConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, export_config_some_request: "ExportConfigSomeRequest" ) -> AsyncIterator[ExportConfigSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, export_config_stream_request: "ExportConfigStreamRequest" ) -> AsyncIterator[ExportConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, export_config_stream_request: "ExportConfigStreamRequest" ) -> AsyncIterator[ExportConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, export_config_stream_request: "ExportConfigStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, export_config_stream_request: "ExportConfigStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, export_config_set_request: "ExportConfigSetRequest" ) -> "ExportConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set_some( self, export_config_set_some_request: "ExportConfigSetSomeRequest" ) -> AsyncIterator[ExportConfigSetSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete( self, export_config_delete_request: "ExportConfigDeleteRequest" ) -> "ExportConfigDeleteResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_some( self, export_config_delete_some_request: "ExportConfigDeleteSomeRequest" ) -> AsyncIterator[ExportConfigDeleteSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all( self, export_config_delete_all_request: "ExportConfigDeleteAllRequest" ) -> AsyncIterator[ExportConfigDeleteAllResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[ExportConfigRequest, ExportConfigResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[ExportConfigSomeRequest, ExportConfigSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[ExportConfigStreamRequest, ExportConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[ExportConfigStreamRequest, ExportConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[ExportConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[ExportConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[ExportConfigSetRequest, ExportConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) async def __rpc_set_some( self, stream: "grpclib.server.Stream[ExportConfigSetSomeRequest, ExportConfigSetSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.set_some, stream, request, ) async def __rpc_delete( self, stream: "grpclib.server.Stream[ExportConfigDeleteRequest, ExportConfigDeleteResponse]", ) -> None: request = await stream.recv_message() response = await self.delete(request) await stream.send_message(response) async def __rpc_delete_some( self, stream: "grpclib.server.Stream[ExportConfigDeleteSomeRequest, ExportConfigDeleteSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_some, stream, request, ) async def __rpc_delete_all( self, stream: "grpclib.server.Stream[ExportConfigDeleteAllRequest, ExportConfigDeleteAllResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_all, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.syslog.v1.ExportConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, ExportConfigRequest, ExportConfigResponse, ), "/arista.syslog.v1.ExportConfigService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, ExportConfigSomeRequest, ExportConfigSomeResponse, ), "/arista.syslog.v1.ExportConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, ExportConfigStreamRequest, ExportConfigStreamResponse, ), "/arista.syslog.v1.ExportConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, ExportConfigStreamRequest, ExportConfigStreamResponse, ), "/arista.syslog.v1.ExportConfigService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, ExportConfigStreamRequest, MetaResponse, ), "/arista.syslog.v1.ExportConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, ExportConfigStreamRequest, MetaResponse, ), "/arista.syslog.v1.ExportConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, ExportConfigSetRequest, ExportConfigSetResponse, ), "/arista.syslog.v1.ExportConfigService/SetSome": grpclib.const.Handler( self.__rpc_set_some, grpclib.const.Cardinality.UNARY_STREAM, ExportConfigSetSomeRequest, ExportConfigSetSomeResponse, ), "/arista.syslog.v1.ExportConfigService/Delete": grpclib.const.Handler( self.__rpc_delete, grpclib.const.Cardinality.UNARY_UNARY, ExportConfigDeleteRequest, ExportConfigDeleteResponse, ), "/arista.syslog.v1.ExportConfigService/DeleteSome": grpclib.const.Handler( self.__rpc_delete_some, grpclib.const.Cardinality.UNARY_STREAM, ExportConfigDeleteSomeRequest, ExportConfigDeleteSomeResponse, ), "/arista.syslog.v1.ExportConfigService/DeleteAll": grpclib.const.Handler( self.__rpc_delete_all, grpclib.const.Cardinality.UNARY_STREAM, ExportConfigDeleteAllRequest, ExportConfigDeleteAllResponse, ), }
[docs] class ExportFormatConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, export_format_config_request: "ExportFormatConfigRequest" ) -> "ExportFormatConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, export_format_config_stream_request: "ExportFormatConfigStreamRequest" ) -> AsyncIterator[ExportFormatConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, export_format_config_stream_request: "ExportFormatConfigStreamRequest" ) -> AsyncIterator[ExportFormatConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, export_format_config_stream_request: "ExportFormatConfigStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, export_format_config_set_request: "ExportFormatConfigSetRequest" ) -> "ExportFormatConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[ExportFormatConfigRequest, ExportFormatConfigResponse]", ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_all( self, stream: "grpclib.server.Stream[ExportFormatConfigStreamRequest, ExportFormatConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[ExportFormatConfigStreamRequest, ExportFormatConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[ExportFormatConfigStreamRequest, MetaResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[ExportFormatConfigSetRequest, ExportFormatConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.syslog.v1.ExportFormatConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, ExportFormatConfigRequest, ExportFormatConfigResponse, ), "/arista.syslog.v1.ExportFormatConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, ExportFormatConfigStreamRequest, ExportFormatConfigStreamResponse, ), "/arista.syslog.v1.ExportFormatConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, ExportFormatConfigStreamRequest, ExportFormatConfigStreamResponse, ), "/arista.syslog.v1.ExportFormatConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, ExportFormatConfigStreamRequest, MetaResponse, ), "/arista.syslog.v1.ExportFormatConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, ExportFormatConfigSetRequest, ExportFormatConfigSetResponse, ), }