Source code for cloudvision.api.arista.license.v1

# Copyright (c) 2026 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: arista/license.v1/license.proto, arista/license.v1/services.gen.proto
# plugin: python-aristaproto
# This file has been @generated

__all__ = (
    "LicenseScope",
    "LicenseState",
    "LicenseSource",
    "Feature",
    "PlanName",
    "AssignmentStatus",
    "PreferredBundleReason",
    "PurchasedLicenseKey",
    "PurchasedLicense",
    "LicenseAssignmentKey",
    "LicenseAssignmentConfig",
    "LicenseAssignment",
    "LicenseFileKey",
    "LicenseFileConfig",
    "LicenseStatus",
    "DeviceLicenseStatus",
    "LicenseFile",
    "LicenseInfo",
    "RepeatedLicenseInfo",
    "DeviceData",
    "ApplicableDevicesKey",
    "Filter",
    "ApplicableDevices",
    "ApplicableLicenseBundlesKey",
    "LicenseBundle",
    "RepeatedLicenseBundle",
    "ApplicableLicenseBundles",
    "MetaResponse",
    "ApplicableDevicesRequest",
    "ApplicableDevicesResponse",
    "ApplicableDevicesSomeRequest",
    "ApplicableDevicesSomeResponse",
    "ApplicableDevicesStreamRequest",
    "ApplicableDevicesStreamResponse",
    "ApplicableDevicesBatchedStreamRequest",
    "ApplicableDevicesBatchedStreamResponse",
    "ApplicableLicenseBundlesRequest",
    "ApplicableLicenseBundlesResponse",
    "ApplicableLicenseBundlesSomeRequest",
    "ApplicableLicenseBundlesSomeResponse",
    "ApplicableLicenseBundlesStreamRequest",
    "ApplicableLicenseBundlesStreamResponse",
    "ApplicableLicenseBundlesBatchedStreamRequest",
    "ApplicableLicenseBundlesBatchedStreamResponse",
    "LicenseAssignmentRequest",
    "LicenseAssignmentResponse",
    "LicenseAssignmentSomeRequest",
    "LicenseAssignmentSomeResponse",
    "LicenseAssignmentStreamRequest",
    "LicenseAssignmentStreamResponse",
    "LicenseAssignmentBatchedStreamRequest",
    "LicenseAssignmentBatchedStreamResponse",
    "LicenseAssignmentConfigRequest",
    "LicenseAssignmentConfigResponse",
    "LicenseAssignmentConfigSomeRequest",
    "LicenseAssignmentConfigSomeResponse",
    "LicenseAssignmentConfigStreamRequest",
    "LicenseAssignmentConfigStreamResponse",
    "LicenseAssignmentConfigBatchedStreamRequest",
    "LicenseAssignmentConfigBatchedStreamResponse",
    "LicenseAssignmentConfigSetRequest",
    "LicenseAssignmentConfigSetResponse",
    "LicenseAssignmentConfigSetSomeRequest",
    "LicenseAssignmentConfigSetSomeResponse",
    "LicenseAssignmentConfigDeleteRequest",
    "LicenseAssignmentConfigDeleteResponse",
    "LicenseAssignmentConfigDeleteSomeRequest",
    "LicenseAssignmentConfigDeleteSomeResponse",
    "LicenseAssignmentConfigDeleteAllRequest",
    "LicenseAssignmentConfigDeleteAllResponse",
    "LicenseFileRequest",
    "LicenseFileResponse",
    "LicenseFileSomeRequest",
    "LicenseFileSomeResponse",
    "LicenseFileStreamRequest",
    "LicenseFileStreamResponse",
    "LicenseFileBatchedStreamRequest",
    "LicenseFileBatchedStreamResponse",
    "LicenseFileConfigRequest",
    "LicenseFileConfigResponse",
    "LicenseFileConfigSomeRequest",
    "LicenseFileConfigSomeResponse",
    "LicenseFileConfigStreamRequest",
    "LicenseFileConfigStreamResponse",
    "LicenseFileConfigBatchedStreamRequest",
    "LicenseFileConfigBatchedStreamResponse",
    "LicenseFileConfigSetRequest",
    "LicenseFileConfigSetResponse",
    "LicenseFileConfigSetSomeRequest",
    "LicenseFileConfigSetSomeResponse",
    "LicenseFileConfigDeleteRequest",
    "LicenseFileConfigDeleteResponse",
    "LicenseFileConfigDeleteSomeRequest",
    "LicenseFileConfigDeleteSomeResponse",
    "LicenseFileConfigDeleteAllRequest",
    "LicenseFileConfigDeleteAllResponse",
    "PurchasedLicenseRequest",
    "PurchasedLicenseResponse",
    "PurchasedLicenseSomeRequest",
    "PurchasedLicenseSomeResponse",
    "PurchasedLicenseStreamRequest",
    "PurchasedLicenseStreamResponse",
    "PurchasedLicenseBatchedStreamRequest",
    "PurchasedLicenseBatchedStreamResponse",
    "ApplicableDevicesServiceStub",
    "ApplicableDevicesServiceBase",
    "ApplicableLicenseBundlesServiceStub",
    "ApplicableLicenseBundlesServiceBase",
    "LicenseAssignmentServiceStub",
    "LicenseAssignmentServiceBase",
    "LicenseAssignmentConfigServiceStub",
    "LicenseAssignmentConfigServiceBase",
    "LicenseFileServiceStub",
    "LicenseFileServiceBase",
    "LicenseFileConfigServiceStub",
    "LicenseFileConfigServiceBase",
    "PurchasedLicenseServiceStub",
    "PurchasedLicenseServiceBase",
)


from dataclasses import dataclass
from datetime import datetime
from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Dict,
    List,
    Optional,
)

import aristaproto
import grpclib
from aristaproto.grpc.grpclib_server import ServiceBase

if TYPE_CHECKING:
    import grpclib.server
    from aristaproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


[docs] class LicenseScope(aristaproto.Enum): """LicenseScope indicates the number of devices a license supports.""" UNSPECIFIED = 0 """LICENSE_SCOPE_UNSPECIFIED indicates an unspecified license scope.""" SINGLE_DEVICE = 1 """ LICENSE_SCOPE_SINGLE_DEVICE indicates a license file that supports a single device. """ MULTIPLE_DEVICE = 2 """ LICENSE_SCOPE_MULTIPLE_DEVICE indicates a license file that supports multiple devices. """
[docs] class LicenseState(aristaproto.Enum): """ LicenseState represents the installation status of a device license file. """ UNSPECIFIED = 0 """LICENSE_STATE_UNSPECIFIED indicates an unspecified license state.""" INSTALLED = 1 """LICENSE_STATE_INSTALLED indicates the license is installed.""" UNINSTALLED = 2 """LICENSE_STATE_UNINSTALLED indicates the license is uninstalled.""" INSTALLATION_IN_PROGRESS = 3 """ LICENSE_STATE_INSTALLATION_IN_PROGRESS indicates the license installation is in progress. """ UNINSTALLATION_IN_PROGRESS = 4 """ LICENSE_STATE_UNINSTALLATION_IN_PROGRESS indicates the license uninstallation is in progress. """ INSTALLATION_FAILED = 5 """ LICENSE_STATE_INSTALLATION_FAILED indicates the license installation failed. """ UNINSTALLATION_FAILED = 6 """ LICENSE_STATE_UNINSTALLATION_FAILED indicates the license uninstallation failed. """
[docs] class LicenseSource(aristaproto.Enum): """ LicenseSource is the source from where license file has been obtained. """ UNSPECIFIED = 0 """ LICENSE_SOURCE_UNSPECIFIED refers to unspecified license source. """ USER_UPLOAD = 1 """ LICENSE_SOURCE_USER_UPLOAD refers to the license source as uploaded by user through UI. """ DISCOVERED = 2 """ LICENSE_SOURCE_DISCOVERED refers to the license source as being discovered from device where license was manually installed. """ PROCURED = 3 """ LICENSE_SOURCE_PROCURED refers to the license source as being procured from License Request Service. """
[docs] class Feature(aristaproto.Enum): """ Feature is the name of the feature for which the license was purchased. """ UNSPECIFIED = 0 """ FEATURE_UNSPECIFIED refers to unspecified license feature. """ IPSEC = 1 """FEATURE_IPSEC refers to the feature IPSEC.""" MACSEC = 2 """FEATURE_MACSEC refers to the feature MACSEC.""" ENCR = 3 """FEATURE_ENCR refers to the encryption.""" CLOUDEOS = 4 """FEATURE_CLOUDEOS refers to the feature CLOUDEOS.""" CVPATHFINDER = 5 """FEATURE_CVPATHFINDER refers to the feature CVPATHFINDER."""
[docs] class PlanName(aristaproto.Enum): """ PlanName is the name of the plan for which the license was purchased. """ UNSPECIFIED = 0 """PLAN_NAME_UNSPECIFIED refers to unspecified plan name.""" SUBSCRIPTION = 1 """PLAN_NAME_SUBSCRIPTION refers to the subscription licenses.""" PERPETUAL = 2 """ PLAN_NAME_PERPETUAL refers to the licenses valid for lifetime of the device. """
[docs] class AssignmentStatus(aristaproto.Enum): """ AssignmentStatus enumerates the set of execution statuses of device assignment. """ UNSPECIFIED = 0 """ASSIGNMENT_STATUS_UNSPECIFIED refers to unspecified status.""" ASSIGNMENT_IN_PROGRESS = 1 """ ASSIGNMENT_STATUS_ASSIGNMENT_IN_PROGRESS indicates device assignment is in progress. """ ASSIGNMENT_SUCCESS = 2 """ ASSIGNMENT_STATUS_ASSIGNMENT_SUCCESS indicates device assignment succeeded. """ ASSIGNMENT_FAILURE = 3 """ ASSIGNMENT_STATUS_ASSIGNMENT_FAILURE indicates device assignment failed. """ UNASSIGNMENT_IN_PROGRESS = 4 """ ASSIGNMENT_STATUS_UNASSIGNMENT_IN_PROGRESS indicates device unassignment is in progress. """ UNASSIGNMENT_SUCCESS = 5 """ ASSIGNMENT_STATUS_UNASSIGNMENT_SUCCESS indicates device unassignment succeeded. """ UNASSIGNMENT_FAILURE = 6 """ ASSIGNMENT_STATUS_UNASSIGNMENT_FAILURE indicates device unassignment failed. """
[docs] class PreferredBundleReason(aristaproto.Enum): """ PreferredBundleReason indicates the reason a license bundle is preferred. """ UNSPECIFIED = 0 """PREFERRED_BUNDLE_REASON_UNSPECIFIED indicates an unspecified reason.""" SALES_ORDER_MATCH = 1 """ PREFERRED_BUNDLE_REASON_SALES_ORDER_MATCH indicates a match with the device's sales order. """ PERPETUAL_PLAN = 2 """ PREFERRED_BUNDLE_REASON_PERPETUAL_PLAN indicates that the license is preferred over other licenses as it is a perpetual license """ PERPETUAL_AND_SALES_ORDER_AGE = 3 """ PREFERRED_BUNDLE_REASON_PERPETUAL_AND_SALES_ORDER_AGE indicates a perpetual license with the oldest sales order. """
[docs] @dataclass(eq=False, repr=False) class PurchasedLicenseKey(aristaproto.Message): """PurchasedLicenseKey uniquely identifies a license.""" license_uuid: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """license_uuid is a unique id associated with a license."""
[docs] @dataclass(eq=False, repr=False) class PurchasedLicense(aristaproto.Message): """ PurchasedLicense is a license record derived from the sales order information where each order is expanded into individual licenses. These licenses will be uniquely identified with license_uuid. Note: This resource is only available in CVaaS. """ key: "PurchasedLicenseKey" = aristaproto.message_field(1) """key uniquely identifies a license.""" assigned_device: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ assigned_device denotes device serial number to which this license is associated. Initially assigned_device will be nil, it will be updated after device assignment by the user. """ so_number: Optional[str] = aristaproto.message_field(3, wraps=aristaproto.TYPE_STRING) """so_number is the sales order number for the purchased license.""" license_bundle_uuid: Optional[str] = aristaproto.message_field(4, wraps=aristaproto.TYPE_STRING) """license_bundle_uuid is a unique id of the license bundle.""" sku: Optional[str] = aristaproto.message_field(5, wraps=aristaproto.TYPE_STRING) """ sku represents the name of the sku which is combination of platform and feature name. Example: LIC-FIX-4-MACSEC """ feature: "Feature" = aristaproto.enum_field(6) """ feature is the name of the feature for which the license was purchased. """ start_time: datetime = aristaproto.message_field(7) """start_time is the time when the license comes into force.""" end_time: datetime = aristaproto.message_field(8) """end_time is the time when the license expires.""" plan_name: "PlanName" = aristaproto.enum_field(9) """plan_name indicates type of license (perpetual/ subscription).""" original_subscription_record_number: Optional[int] = aristaproto.message_field( 10, wraps=aristaproto.TYPE_INT64 ) """ original_subscription_record_number is the parent subscription record number. """ original_sales_order_number: Optional[int] = aristaproto.message_field( 11, wraps=aristaproto.TYPE_INT64 ) """original_sales_order_number is the parent sales order number.""" renewal_status: Optional[str] = aristaproto.message_field(12, wraps=aristaproto.TYPE_STRING) """renewal_status is the renewal status of license.""" renewal_status_date: datetime = aristaproto.message_field(13) """renewal_status_date is the renewal date of license.""" end_customer_parent_account_id: Optional[int] = aristaproto.message_field( 14, wraps=aristaproto.TYPE_INT64 ) """end_customer_parent_account_id is the parent company ID.""" end_customer_parent_account_name: Optional[str] = aristaproto.message_field( 15, wraps=aristaproto.TYPE_STRING ) """end_customer_parent_account_name is the parent company name.""" platform_class: Optional[str] = aristaproto.message_field(16, wraps=aristaproto.TYPE_STRING) """platform_class is the platform class of the license sku."""
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentKey(aristaproto.Message): """LicenseAssignmentKey uniquely identifies a license.""" license_bundle_uuid: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """license_bundle_uuid is a unique id of the license bundle.""" device: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ device is the serial number of the device to which the license should be assigned. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfig(aristaproto.Message): """ LicenseAssignmentConfig describes a device assignment request. \"Assignment\" refers to associating the purchased license to a device. Note: This resource is only available in CVaaS. """ key: "LicenseAssignmentKey" = aristaproto.message_field(1) """key uniquely identifies a license assignment for the device.""" assigned: Optional[bool] = aristaproto.message_field(2, wraps=aristaproto.TYPE_BOOL) """ assigned indicates whether the license should be assigned or unassigned. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignment(aristaproto.Message): """ LicenseAssignment describes the status of device assignment. Note: This resource is only available in CVaaS. """ key: "LicenseAssignmentKey" = aristaproto.message_field(1) """key uniquely identifies a license assignment for the device.""" license_uuid: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """license_uuid is a unique id associated with a license.""" license_file_serial: Optional[str] = aristaproto.message_field(3, wraps=aristaproto.TYPE_STRING) """ license_file_serial is unique serial number embedded in the license file. """ status: "AssignmentStatus" = aristaproto.enum_field(4) """status is device assignment status.""" assign_time: datetime = aristaproto.message_field(5) """assign_time is the time when the license is assigned.""" error: Optional[str] = aristaproto.message_field(6, wraps=aristaproto.TYPE_STRING) """error is an error message for the device assignment failure."""
[docs] @dataclass(eq=False, repr=False) class LicenseFileKey(aristaproto.Message): """LicenseFileKey uniquely identifies a license file.""" license_file_serial: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """ license_file_serial is unique serial number embedded in the license file. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfig(aristaproto.Message): """ LicenseFileConfig describes a request to upload and install/uninstall a license file to the assigned device. User can request license installation or uninstallation by setting the installed attribute as true/false in the request. """ key: "LicenseFileKey" = aristaproto.message_field(1) """key uniquely identifies a license file.""" license_file_json: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ license_file_json is the JSON string of the license file. """ installed: Optional[bool] = aristaproto.message_field(3, wraps=aristaproto.TYPE_BOOL) """ installed indicates whether the license file should be installed or uninstalled. """
[docs] @dataclass(eq=False, repr=False) class LicenseStatus(aristaproto.Message): """ LicenseStatus contains license installation state and error. """ state: "LicenseState" = aristaproto.enum_field(1) """ state is the installation status of the license file on the device. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ error indicates the error message for license installation/uninstallation failure. """
[docs] @dataclass(eq=False, repr=False) class DeviceLicenseStatus(aristaproto.Message): """ DeviceLicenseStatus contains a mapping of assigned devices to license installation state. """ values: Dict[str, "LicenseStatus"] = aristaproto.map_field( 1, aristaproto.TYPE_STRING, aristaproto.TYPE_MESSAGE ) """values is a map of deviceID to installation status."""
[docs] @dataclass(eq=False, repr=False) class LicenseFile(aristaproto.Message): """ LicenseFile describes the available license files for installation or uninstallation on the device. """ key: "LicenseFileKey" = aristaproto.message_field(1) """key uniquely identifies a license file.""" version: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """version is the version number of the license file.""" customer_name: Optional[str] = aristaproto.message_field(3, wraps=aristaproto.TYPE_STRING) """ customer_name is the name of customer who owns this license file. """ device_license_status: "DeviceLicenseStatus" = aristaproto.message_field(4) """ device_license_status contains mapping of assigned. devices to license installation status. """ feature: "Feature" = aristaproto.enum_field(5) """ feature is the name of license feature that this license file refers to. """ start_time: datetime = aristaproto.message_field(6) """start_time is the time when the license comes into force.""" end_time: datetime = aristaproto.message_field(7) """end_time is the time when the license expires.""" source: "LicenseSource" = aristaproto.enum_field(8) """ source is the means by which the license file was obtained. """ file_name: Optional[str] = aristaproto.message_field(9, wraps=aristaproto.TYPE_STRING) """file_name is the name of the license file."""
[docs] @dataclass(eq=False, repr=False) class LicenseInfo(aristaproto.Message): """ LicenseInfo has the start time, end time, grace period and plan name of a license. """ start_time: datetime = aristaproto.message_field(1) """start_time is the time when the license starts its effect.""" end_time: datetime = aristaproto.message_field(2) """end_time is the time when the license will be terminated.""" grace_period: Optional[int] = aristaproto.message_field(3, wraps=aristaproto.TYPE_UINT32) """grace_period is the number of days a license is valid after expiry.""" plan_name: "PlanName" = aristaproto.enum_field(4) """plan_name indicates type of license (perpetual/subscription)."""
[docs] @dataclass(eq=False, repr=False) class RepeatedLicenseInfo(aristaproto.Message): """RepeatedLicenseInfo is a wrapper around LicenseInfo.""" values: List["LicenseInfo"] = aristaproto.message_field(1) """values store the details of licenses installed on the devices."""
[docs] @dataclass(eq=False, repr=False) class DeviceData(aristaproto.Message): """ DeviceData represents basic data of the device along with the licenses installed on it. """ serial: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """ serial is the serial number of the device.""" hostname: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ hostname is the hostname of the device.""" sales_order_number: Optional[str] = aristaproto.message_field(3, wraps=aristaproto.TYPE_STRING) """ sales_order_number is the sales order of the device.""" model: Optional[str] = aristaproto.message_field(4, wraps=aristaproto.TYPE_STRING) """ model is the model name of the device.""" platform_classes: "___fmp__.RepeatedString" = aristaproto.message_field(5) """ platform_classes are the supported license platform classes of the device. """ licenses: "RepeatedLicenseInfo" = aristaproto.message_field(6) """licenses which are already installed on the device.""" applicable_license_skus: "___fmp__.RepeatedString" = aristaproto.message_field(7) """ applicable_license_skus are the applicable license SKUs for the device of the requested feature. """
[docs] @dataclass(eq=False, repr=False) class ApplicableDevicesKey(aristaproto.Message): """ ApplicableDevicesKey uniquely identifies an ApplicableDevice for a feature. """ feature: "Feature" = aristaproto.enum_field(1) """feature is the name of license feature.""" device_serial: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """device_serial is the serial number of the device."""
[docs] @dataclass(eq=False, repr=False) class Filter(aristaproto.Message): """Filter is used to filter devices based on licenses installed on it.""" has_perpetual_license: Optional[bool] = aristaproto.message_field( 1, wraps=aristaproto.TYPE_BOOL ) """ has_perpetual_license when set filters devices which has atleast one perpetual license for the given feature. """
[docs] @dataclass(eq=False, repr=False) class ApplicableDevices(aristaproto.Message): """ ApplicableDevices represents a feature and an applicable device data for the feature. To get this, We take devices from purchaseDevices and Inventory path, extract model for each device, check applicable feature using modelSKUMapping path and if the model has SKU of the requested feature, send it. License information of the device is aggregated from license/deviceLicenseMapping path. Note: This resource is only available in CVaaS. """ key: "ApplicableDevicesKey" = aristaproto.message_field(1) """key uniquely identifies applicable devices.""" device_licenses_data: "DeviceData" = aristaproto.message_field(2) """device_licenses_data is basic device data with licenses it has."""
[docs] @dataclass(eq=False, repr=False) class ApplicableLicenseBundlesKey(aristaproto.Message): """ ApplicableLicenseBundlesKey is used to request a list of license bundles applicable to a specific device serial for a given feature. The device's model number derived from the serial is used to determine compatibility and to prioritize bundles. """ feature: "Feature" = aristaproto.enum_field(1) """feature is the name of license feature.""" device_serial: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """device_serial is the serial number of the device."""
[docs] @dataclass(eq=False, repr=False) class LicenseBundle(aristaproto.Message): """ LicenseBundle represents a single license bundle with its UUID and preference status. """ license_bundle_uuid: Optional[str] = aristaproto.message_field(1, wraps=aristaproto.TYPE_STRING) """license_bundle_uuid is a unique id of the license bundle.""" sales_order_number: Optional[int] = aristaproto.message_field(2, wraps=aristaproto.TYPE_INT64) """ sales_order_number is the sales order number for the purchased license bundle. """ sku: Optional[str] = aristaproto.message_field(3, wraps=aristaproto.TYPE_STRING) """ sku represents the name of the sku which is combination of platform and feature name. Example: LIC-FIX-4-MACSEC """ plan_name: "PlanName" = aristaproto.enum_field(4) """plan_name indicates type of license (perpetual/subscription).""" start_time: datetime = aristaproto.message_field(5) """start_time is the time when the license comes into force.""" end_time: datetime = aristaproto.message_field(6) """end_time is the time when the license expires.""" total_license_count: Optional[int] = aristaproto.message_field(7, wraps=aristaproto.TYPE_UINT64) """ total_license_count is the total number of licenses purchased in this bundle. """ unassigned_license_count: Optional[int] = aristaproto.message_field( 8, wraps=aristaproto.TYPE_UINT64 ) """ unassigned_license_count is the number of licenses that are currently unassigned in this bundle. """ is_preferred: Optional[bool] = aristaproto.message_field(9, wraps=aristaproto.TYPE_BOOL) """is_preferred indicates if this bundle is the preferred bundle.""" preferred_bundle_reason: "PreferredBundleReason" = aristaproto.enum_field(10) """ preferred_bundle_reason indicates why this bundle is preferred. This field is only populated when is_preferred is true. """
[docs] @dataclass(eq=False, repr=False) class RepeatedLicenseBundle(aristaproto.Message): """RepeatedLicenseBundle is a wrapper around LicenseBundle.""" values: List["LicenseBundle"] = aristaproto.message_field(1) """values store the details of license bundles."""
[docs] @dataclass(eq=False, repr=False) class ApplicableLicenseBundles(aristaproto.Message): """ ApplicableLicenseBundles represents a list of license bundles that are compatible with a given device for a specific feature. The list may contain a preferred bundle based on a defined priority order. Note: This resource is only available in CVaaS. """ key: "ApplicableLicenseBundlesKey" = aristaproto.message_field(1) """key is the key identifying the bundle request.""" bundles: "RepeatedLicenseBundle" = aristaproto.message_field(2) """ bundles is a list of applicable license bundles. There should be at most one `preferred` bundle in the list. """
[docs] @dataclass(eq=False, repr=False) class MetaResponse(aristaproto.Message): """ """ time: datetime = aristaproto.message_field(1) """ Time holds the timestamp of the last item included in the metadata calculation. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(2) """ Operation indicates how the value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """ count: Optional[int] = aristaproto.message_field(3, wraps=aristaproto.TYPE_UINT32) """ Count is the number of items present under the conditions of the request. """
[docs] @dataclass(eq=False, repr=False) class ApplicableDevicesRequest(aristaproto.Message): """ """ key: "ApplicableDevicesKey" = aristaproto.message_field(1) """ Key uniquely identifies a ApplicableDevices instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ApplicableDevicesResponse(aristaproto.Message): """ """ value: "ApplicableDevices" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the ApplicableDevices instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ApplicableDevicesSomeRequest(aristaproto.Message): """ """ keys: List["ApplicableDevicesKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ApplicableDevicesSomeResponse(aristaproto.Message): """ """ value: "ApplicableDevices" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the ApplicableDevices instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ApplicableDevicesStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ApplicableDevices"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ filter: "Filter" = aristaproto.message_field(2) """ For each ApplicableDevices in the list, all populated fields are considered ANDed together as a filtering operation. Similarly, the list itself is ORed such that any individual filter that matches a given ApplicableDevices is streamed to the user. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ApplicableDevices at end. * Each ApplicableDevices response is fully-specified (all fields set). * start: Returns the state of each ApplicableDevices at start, followed by updates until now. * Each ApplicableDevices response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ApplicableDevices at start, followed by updates until end. * Each ApplicableDevices response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class ApplicableDevicesStreamResponse(aristaproto.Message): """ """ value: "ApplicableDevices" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this ApplicableDevices's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the ApplicableDevices value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class ApplicableDevicesBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ApplicableDevices"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ filter: "Filter" = aristaproto.message_field(2) """ For each ApplicableDevices in the list, all populated fields are considered ANDed together as a filtering operation. Similarly, the list itself is ORed such that any individual filter that matches a given ApplicableDevices is streamed to the user. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ApplicableDevices at end. * Each ApplicableDevices response is fully-specified (all fields set). * start: Returns the state of each ApplicableDevices at start, followed by updates until now. * Each ApplicableDevices response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ApplicableDevices at start, followed by updates until end. * Each ApplicableDevices response at start is fully-specified, but updates until end may be partial. """ max_messages: Optional[int] = aristaproto.message_field(4, wraps=aristaproto.TYPE_UINT32) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class ApplicableDevicesBatchedStreamResponse(aristaproto.Message): """ """ responses: List["ApplicableDevicesStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class ApplicableLicenseBundlesRequest(aristaproto.Message): """ """ key: "ApplicableLicenseBundlesKey" = aristaproto.message_field(1) """ Key uniquely identifies a ApplicableLicenseBundles instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ApplicableLicenseBundlesResponse(aristaproto.Message): """ """ value: "ApplicableLicenseBundles" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the ApplicableLicenseBundles instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ApplicableLicenseBundlesSomeRequest(aristaproto.Message): """ """ keys: List["ApplicableLicenseBundlesKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class ApplicableLicenseBundlesSomeResponse(aristaproto.Message): """ """ value: "ApplicableLicenseBundles" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the ApplicableLicenseBundles instance in this response. """
[docs] @dataclass(eq=False, repr=False) class ApplicableLicenseBundlesStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ApplicableLicenseBundles"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ApplicableLicenseBundles at end. * Each ApplicableLicenseBundles response is fully-specified (all fields set). * start: Returns the state of each ApplicableLicenseBundles at start, followed by updates until now. * Each ApplicableLicenseBundles response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ApplicableLicenseBundles at start, followed by updates until end. * Each ApplicableLicenseBundles response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class ApplicableLicenseBundlesStreamResponse(aristaproto.Message): """ """ value: "ApplicableLicenseBundles" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this ApplicableLicenseBundles's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the ApplicableLicenseBundles value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class ApplicableLicenseBundlesBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["ApplicableLicenseBundles"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each ApplicableLicenseBundles at end. * Each ApplicableLicenseBundles response is fully-specified (all fields set). * start: Returns the state of each ApplicableLicenseBundles at start, followed by updates until now. * Each ApplicableLicenseBundles response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each ApplicableLicenseBundles at start, followed by updates until end. * Each ApplicableLicenseBundles response at start is fully-specified, but updates until end may be partial. """ max_messages: Optional[int] = aristaproto.message_field(4, wraps=aristaproto.TYPE_UINT32) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class ApplicableLicenseBundlesBatchedStreamResponse(aristaproto.Message): """ """ responses: List["ApplicableLicenseBundlesStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentRequest(aristaproto.Message): """ """ key: "LicenseAssignmentKey" = aristaproto.message_field(1) """ Key uniquely identifies a LicenseAssignment instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentResponse(aristaproto.Message): """ """ value: "LicenseAssignment" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the LicenseAssignment instance in this response. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentSomeRequest(aristaproto.Message): """ """ keys: List["LicenseAssignmentKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentSomeResponse(aristaproto.Message): """ """ value: "LicenseAssignment" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the LicenseAssignment instance in this response. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["LicenseAssignment"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each LicenseAssignment at end. * Each LicenseAssignment response is fully-specified (all fields set). * start: Returns the state of each LicenseAssignment at start, followed by updates until now. * Each LicenseAssignment response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each LicenseAssignment at start, followed by updates until end. * Each LicenseAssignment response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentStreamResponse(aristaproto.Message): """ """ value: "LicenseAssignment" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this LicenseAssignment's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the LicenseAssignment value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["LicenseAssignment"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each LicenseAssignment at end. * Each LicenseAssignment response is fully-specified (all fields set). * start: Returns the state of each LicenseAssignment at start, followed by updates until now. * Each LicenseAssignment response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each LicenseAssignment at start, followed by updates until end. * Each LicenseAssignment response at start is fully-specified, but updates until end may be partial. """ max_messages: Optional[int] = aristaproto.message_field(4, wraps=aristaproto.TYPE_UINT32) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentBatchedStreamResponse(aristaproto.Message): """ """ responses: List["LicenseAssignmentStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigRequest(aristaproto.Message): """ """ key: "LicenseAssignmentKey" = aristaproto.message_field(1) """ Key uniquely identifies a LicenseAssignmentConfig instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigResponse(aristaproto.Message): """ """ value: "LicenseAssignmentConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the LicenseAssignmentConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigSomeRequest(aristaproto.Message): """ """ keys: List["LicenseAssignmentKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigSomeResponse(aristaproto.Message): """ """ value: "LicenseAssignmentConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the LicenseAssignmentConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["LicenseAssignmentConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each LicenseAssignmentConfig at end. * Each LicenseAssignmentConfig response is fully-specified (all fields set). * start: Returns the state of each LicenseAssignmentConfig at start, followed by updates until now. * Each LicenseAssignmentConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each LicenseAssignmentConfig at start, followed by updates until end. * Each LicenseAssignmentConfig response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigStreamResponse(aristaproto.Message): """ """ value: "LicenseAssignmentConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this LicenseAssignmentConfig's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the LicenseAssignmentConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["LicenseAssignmentConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each LicenseAssignmentConfig at end. * Each LicenseAssignmentConfig response is fully-specified (all fields set). * start: Returns the state of each LicenseAssignmentConfig at start, followed by updates until now. * Each LicenseAssignmentConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each LicenseAssignmentConfig at start, followed by updates until end. * Each LicenseAssignmentConfig response at start is fully-specified, but updates until end may be partial. """ max_messages: Optional[int] = aristaproto.message_field(4, wraps=aristaproto.TYPE_UINT32) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigBatchedStreamResponse(aristaproto.Message): """ """ responses: List["LicenseAssignmentConfigStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigSetRequest(aristaproto.Message): """ """ value: "LicenseAssignmentConfig" = aristaproto.message_field(1) """ LicenseAssignmentConfig carries the value to set into the datastore. See the documentation on the LicenseAssignmentConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigSetResponse(aristaproto.Message): """ """ value: "LicenseAssignmentConfig" = aristaproto.message_field(1) """ Value carries all the values given in the LicenseAssignmentConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigSetSomeRequest(aristaproto.Message): """ """ values: List["LicenseAssignmentConfig"] = aristaproto.message_field(1) """ value contains a list of LicenseAssignmentConfig values to write. It is possible to provide more values than can fit within either: - the maxiumum send size of the client - the maximum receive size of the server If this error occurs you must reduce the number of values sent. See gRPC \"maximum message size\" documentation for more information. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigSetSomeResponse(aristaproto.Message): """ """ key: "LicenseAssignmentKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigDeleteRequest(aristaproto.Message): """ """ key: "LicenseAssignmentKey" = aristaproto.message_field(1) """ Key indicates which LicenseAssignmentConfig instance to remove. This field must always be set. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigDeleteResponse(aristaproto.Message): """ """ key: "LicenseAssignmentKey" = aristaproto.message_field(1) """ Key echoes back the key of the deleted LicenseAssignmentConfig instance. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the deletion. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==DeletedAt will not include this instance. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigDeleteSomeRequest(aristaproto.Message): """ """ keys: List["LicenseAssignmentKey"] = aristaproto.message_field(1) """key contains a list of LicenseAssignmentConfig keys to delete"""
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigDeleteSomeResponse(aristaproto.Message): """ LicenseAssignmentConfigDeleteSomeResponse is only sent when there is an error. """ key: "LicenseAssignmentKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigDeleteAllRequest(aristaproto.Message): """ """ partial_eq_filter: List["LicenseAssignmentConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a DeleteAll. This requires all provided fields to be equal to the response. A filtered DeleteAll will use GetAll with filter to find things to delete. """
[docs] @dataclass(eq=False, repr=False) class LicenseAssignmentConfigDeleteAllResponse(aristaproto.Message): """ """ type: "___fmp__.DeleteError" = aristaproto.enum_field(1) """ This describes the class of delete error. A DeleteAllResponse is only sent when there is an error. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """This indicates the error message from the delete failure.""" key: "LicenseAssignmentKey" = aristaproto.message_field(3) """ This is the key of the LicenseAssignmentConfig instance that failed to be deleted. """ time: datetime = aristaproto.message_field(4) """Time indicates the (UTC) timestamp when the key was being deleted."""
[docs] @dataclass(eq=False, repr=False) class LicenseFileRequest(aristaproto.Message): """ """ key: "LicenseFileKey" = aristaproto.message_field(1) """ Key uniquely identifies a LicenseFile instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileResponse(aristaproto.Message): """ """ value: "LicenseFile" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the LicenseFile instance in this response. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileSomeRequest(aristaproto.Message): """ """ keys: List["LicenseFileKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileSomeResponse(aristaproto.Message): """ """ value: "LicenseFile" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the LicenseFile instance in this response. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["LicenseFile"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each LicenseFile at end. * Each LicenseFile response is fully-specified (all fields set). * start: Returns the state of each LicenseFile at start, followed by updates until now. * Each LicenseFile response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each LicenseFile at start, followed by updates until end. * Each LicenseFile response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileStreamResponse(aristaproto.Message): """ """ value: "LicenseFile" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """Time holds the timestamp of this LicenseFile's last modification.""" type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the LicenseFile value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["LicenseFile"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each LicenseFile at end. * Each LicenseFile response is fully-specified (all fields set). * start: Returns the state of each LicenseFile at start, followed by updates until now. * Each LicenseFile response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each LicenseFile at start, followed by updates until end. * Each LicenseFile response at start is fully-specified, but updates until end may be partial. """ max_messages: Optional[int] = aristaproto.message_field(4, wraps=aristaproto.TYPE_UINT32) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileBatchedStreamResponse(aristaproto.Message): """ """ responses: List["LicenseFileStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigRequest(aristaproto.Message): """ """ key: "LicenseFileKey" = aristaproto.message_field(1) """ Key uniquely identifies a LicenseFileConfig instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigResponse(aristaproto.Message): """ """ value: "LicenseFileConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the LicenseFileConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigSomeRequest(aristaproto.Message): """ """ keys: List["LicenseFileKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigSomeResponse(aristaproto.Message): """ """ value: "LicenseFileConfig" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the LicenseFileConfig instance in this response. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["LicenseFileConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each LicenseFileConfig at end. * Each LicenseFileConfig response is fully-specified (all fields set). * start: Returns the state of each LicenseFileConfig at start, followed by updates until now. * Each LicenseFileConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each LicenseFileConfig at start, followed by updates until end. * Each LicenseFileConfig response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigStreamResponse(aristaproto.Message): """ """ value: "LicenseFileConfig" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this LicenseFileConfig's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the LicenseFileConfig value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["LicenseFileConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each LicenseFileConfig at end. * Each LicenseFileConfig response is fully-specified (all fields set). * start: Returns the state of each LicenseFileConfig at start, followed by updates until now. * Each LicenseFileConfig response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each LicenseFileConfig at start, followed by updates until end. * Each LicenseFileConfig response at start is fully-specified, but updates until end may be partial. """ max_messages: Optional[int] = aristaproto.message_field(4, wraps=aristaproto.TYPE_UINT32) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigBatchedStreamResponse(aristaproto.Message): """ """ responses: List["LicenseFileConfigStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigSetRequest(aristaproto.Message): """ """ value: "LicenseFileConfig" = aristaproto.message_field(1) """ LicenseFileConfig carries the value to set into the datastore. See the documentation on the LicenseFileConfig struct for which fields are required. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigSetResponse(aristaproto.Message): """ """ value: "LicenseFileConfig" = aristaproto.message_field(1) """ Value carries all the values given in the LicenseFileConfigSetRequest as well as any server-generated values. """ time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the creation. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==CreatedAt will include this instance. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigSetSomeRequest(aristaproto.Message): """ """ values: List["LicenseFileConfig"] = aristaproto.message_field(1) """ value contains a list of LicenseFileConfig values to write. It is possible to provide more values than can fit within either: - the maxiumum send size of the client - the maximum receive size of the server If this error occurs you must reduce the number of values sent. See gRPC \"maximum message size\" documentation for more information. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigSetSomeResponse(aristaproto.Message): """ """ key: "LicenseFileKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigDeleteRequest(aristaproto.Message): """ """ key: "LicenseFileKey" = aristaproto.message_field(1) """ Key indicates which LicenseFileConfig instance to remove. This field must always be set. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigDeleteResponse(aristaproto.Message): """ """ key: "LicenseFileKey" = aristaproto.message_field(1) """Key echoes back the key of the deleted LicenseFileConfig instance.""" time: datetime = aristaproto.message_field(2) """ Time indicates the (UTC) timestamp at which the system recognizes the deletion. The only guarantees made about this timestamp are: - it is after the time the request was received - a time-ranged query with StartTime==DeletedAt will not include this instance. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigDeleteSomeRequest(aristaproto.Message): """ """ keys: List["LicenseFileKey"] = aristaproto.message_field(1) """key contains a list of LicenseFileConfig keys to delete"""
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigDeleteSomeResponse(aristaproto.Message): """ LicenseFileConfigDeleteSomeResponse is only sent when there is an error. """ key: "LicenseFileKey" = aristaproto.message_field(1) """ """ error: str = aristaproto.string_field(2) """ """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigDeleteAllRequest(aristaproto.Message): """ """ partial_eq_filter: List["LicenseFileConfig"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a DeleteAll. This requires all provided fields to be equal to the response. A filtered DeleteAll will use GetAll with filter to find things to delete. """
[docs] @dataclass(eq=False, repr=False) class LicenseFileConfigDeleteAllResponse(aristaproto.Message): """ """ type: "___fmp__.DeleteError" = aristaproto.enum_field(1) """ This describes the class of delete error. A DeleteAllResponse is only sent when there is an error. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """This indicates the error message from the delete failure.""" key: "LicenseFileKey" = aristaproto.message_field(3) """ This is the key of the LicenseFileConfig instance that failed to be deleted. """ time: datetime = aristaproto.message_field(4) """Time indicates the (UTC) timestamp when the key was being deleted."""
[docs] @dataclass(eq=False, repr=False) class PurchasedLicenseRequest(aristaproto.Message): """ """ key: "PurchasedLicenseKey" = aristaproto.message_field(1) """ Key uniquely identifies a PurchasedLicense instance to retrieve. This value must be populated. """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class PurchasedLicenseResponse(aristaproto.Message): """ """ value: "PurchasedLicense" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ time: datetime = aristaproto.message_field(2) """ Time carries the (UTC) timestamp of the last-modification of the PurchasedLicense instance in this response. """
[docs] @dataclass(eq=False, repr=False) class PurchasedLicenseSomeRequest(aristaproto.Message): """ """ keys: List["PurchasedLicenseKey"] = aristaproto.message_field(1) """ """ time: datetime = aristaproto.message_field(2) """ Time indicates the time for which you are interested in the data. If no time is given, the server will use the time at which it makes the request. """
[docs] @dataclass(eq=False, repr=False) class PurchasedLicenseSomeResponse(aristaproto.Message): """ """ value: "PurchasedLicense" = aristaproto.message_field(1) """ Value is the value requested. This structure will be fully-populated as it exists in the datastore. If optional fields were not given at creation, these fields will be empty or set to default values. """ error: Optional[str] = aristaproto.message_field(2, wraps=aristaproto.TYPE_STRING) """ Error is an optional field. It should be filled when there is an error in the GetSome process. """ time: datetime = aristaproto.message_field(3) """ Time carries the (UTC) timestamp of the last-modification of the PurchasedLicense instance in this response. """
[docs] @dataclass(eq=False, repr=False) class PurchasedLicenseStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["PurchasedLicense"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each PurchasedLicense at end. * Each PurchasedLicense response is fully-specified (all fields set). * start: Returns the state of each PurchasedLicense at start, followed by updates until now. * Each PurchasedLicense response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each PurchasedLicense at start, followed by updates until end. * Each PurchasedLicense response at start is fully-specified, but updates until end may be partial. """
[docs] @dataclass(eq=False, repr=False) class PurchasedLicenseStreamResponse(aristaproto.Message): """ """ value: "PurchasedLicense" = aristaproto.message_field(1) """ Value is a value deemed relevant to the initiating request. This structure will always have its key-field populated. Which other fields are populated, and why, depends on the value of Operation and what triggered this notification. """ time: datetime = aristaproto.message_field(2) """ Time holds the timestamp of this PurchasedLicense's last modification. """ type: "__subscriptions__.Operation" = aristaproto.enum_field(3) """ Operation indicates how the PurchasedLicense value in this response should be considered. Under non-subscribe requests, this value should always be INITIAL. In a subscription, once all initial data is streamed and the client begins to receive modification updates, you should not see INITIAL again. """
[docs] @dataclass(eq=False, repr=False) class PurchasedLicenseBatchedStreamRequest(aristaproto.Message): """ """ partial_eq_filter: List["PurchasedLicense"] = aristaproto.message_field(1) """ PartialEqFilter provides a way to server-side filter a GetAll/Subscribe. This requires all provided fields to be equal to the response. While transparent to users, this field also allows services to optimize internal subscriptions if filter(s) are sufficiently specific. """ time: "__time__.TimeBounds" = aristaproto.message_field(3) """ TimeRange allows limiting response data to within a specified time window. If this field is populated, at least one of the two time fields are required. For GetAll, the fields start and end can be used as follows: * end: Returns the state of each PurchasedLicense at end. * Each PurchasedLicense response is fully-specified (all fields set). * start: Returns the state of each PurchasedLicense at start, followed by updates until now. * Each PurchasedLicense response at start is fully-specified, but updates may be partial. * start and end: Returns the state of each PurchasedLicense at start, followed by updates until end. * Each PurchasedLicense response at start is fully-specified, but updates until end may be partial. """ max_messages: Optional[int] = aristaproto.message_field(4, wraps=aristaproto.TYPE_UINT32) """ MaxMessages limits the maximum number of messages that can be contained in one batch. MaxMessages is required to be at least 1. The maximum number of messages in a batch is min(max_messages, INTERNAL_BATCH_LIMIT) INTERNAL_BATCH_LIMIT is set based on the maximum message size. """
[docs] @dataclass(eq=False, repr=False) class PurchasedLicenseBatchedStreamResponse(aristaproto.Message): """ """ responses: List["PurchasedLicenseStreamResponse"] = aristaproto.message_field(1) """ Values are the values deemed relevant to the initiating request. The length of this structure is guaranteed to be between (inclusive) 1 and min(req.max_messages, INTERNAL_BATCH_LIMIT). """
[docs] class ApplicableDevicesServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, applicable_devices_request: "ApplicableDevicesRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ApplicableDevicesResponse": """ """ return await self._unary_unary( "/arista.license.v1.ApplicableDevicesService/GetOne", applicable_devices_request, ApplicableDevicesResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, applicable_devices_some_request: "ApplicableDevicesSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApplicableDevicesSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableDevicesService/GetSome", applicable_devices_some_request, ApplicableDevicesSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, applicable_devices_stream_request: "ApplicableDevicesStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApplicableDevicesStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableDevicesService/GetAll", applicable_devices_stream_request, ApplicableDevicesStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, applicable_devices_stream_request: "ApplicableDevicesStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApplicableDevicesStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableDevicesService/Subscribe", applicable_devices_stream_request, ApplicableDevicesStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, applicable_devices_stream_request: "ApplicableDevicesStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.license.v1.ApplicableDevicesService/GetMeta", applicable_devices_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, applicable_devices_stream_request: "ApplicableDevicesStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableDevicesService/SubscribeMeta", applicable_devices_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, applicable_devices_batched_stream_request: "ApplicableDevicesBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApplicableDevicesBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableDevicesService/GetAllBatched", applicable_devices_batched_stream_request, ApplicableDevicesBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, applicable_devices_batched_stream_request: "ApplicableDevicesBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApplicableDevicesBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableDevicesService/SubscribeBatched", applicable_devices_batched_stream_request, ApplicableDevicesBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class ApplicableLicenseBundlesServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, applicable_license_bundles_request: "ApplicableLicenseBundlesRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "ApplicableLicenseBundlesResponse": """ """ return await self._unary_unary( "/arista.license.v1.ApplicableLicenseBundlesService/GetOne", applicable_license_bundles_request, ApplicableLicenseBundlesResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, applicable_license_bundles_some_request: "ApplicableLicenseBundlesSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApplicableLicenseBundlesSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableLicenseBundlesService/GetSome", applicable_license_bundles_some_request, ApplicableLicenseBundlesSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, applicable_license_bundles_stream_request: "ApplicableLicenseBundlesStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApplicableLicenseBundlesStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableLicenseBundlesService/GetAll", applicable_license_bundles_stream_request, ApplicableLicenseBundlesStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, applicable_license_bundles_stream_request: "ApplicableLicenseBundlesStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApplicableLicenseBundlesStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableLicenseBundlesService/Subscribe", applicable_license_bundles_stream_request, ApplicableLicenseBundlesStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, applicable_license_bundles_stream_request: "ApplicableLicenseBundlesStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.license.v1.ApplicableLicenseBundlesService/GetMeta", applicable_license_bundles_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, applicable_license_bundles_stream_request: "ApplicableLicenseBundlesStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableLicenseBundlesService/SubscribeMeta", applicable_license_bundles_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, applicable_license_bundles_batched_stream_request: "ApplicableLicenseBundlesBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApplicableLicenseBundlesBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableLicenseBundlesService/GetAllBatched", applicable_license_bundles_batched_stream_request, ApplicableLicenseBundlesBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, applicable_license_bundles_batched_stream_request: "ApplicableLicenseBundlesBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[ApplicableLicenseBundlesBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.ApplicableLicenseBundlesService/SubscribeBatched", applicable_license_bundles_batched_stream_request, ApplicableLicenseBundlesBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class LicenseAssignmentServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, license_assignment_request: "LicenseAssignmentRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "LicenseAssignmentResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseAssignmentService/GetOne", license_assignment_request, LicenseAssignmentResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, license_assignment_some_request: "LicenseAssignmentSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentService/GetSome", license_assignment_some_request, LicenseAssignmentSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, license_assignment_stream_request: "LicenseAssignmentStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentService/GetAll", license_assignment_stream_request, LicenseAssignmentStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, license_assignment_stream_request: "LicenseAssignmentStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentService/Subscribe", license_assignment_stream_request, LicenseAssignmentStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, license_assignment_stream_request: "LicenseAssignmentStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseAssignmentService/GetMeta", license_assignment_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, license_assignment_stream_request: "LicenseAssignmentStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentService/SubscribeMeta", license_assignment_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, license_assignment_batched_stream_request: "LicenseAssignmentBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentService/GetAllBatched", license_assignment_batched_stream_request, LicenseAssignmentBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, license_assignment_batched_stream_request: "LicenseAssignmentBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentService/SubscribeBatched", license_assignment_batched_stream_request, LicenseAssignmentBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class LicenseAssignmentConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, license_assignment_config_request: "LicenseAssignmentConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "LicenseAssignmentConfigResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseAssignmentConfigService/GetOne", license_assignment_config_request, LicenseAssignmentConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, license_assignment_config_some_request: "LicenseAssignmentConfigSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentConfigSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentConfigService/GetSome", license_assignment_config_some_request, LicenseAssignmentConfigSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, license_assignment_config_stream_request: "LicenseAssignmentConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentConfigService/GetAll", license_assignment_config_stream_request, LicenseAssignmentConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, license_assignment_config_stream_request: "LicenseAssignmentConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentConfigService/Subscribe", license_assignment_config_stream_request, LicenseAssignmentConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, license_assignment_config_stream_request: "LicenseAssignmentConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseAssignmentConfigService/GetMeta", license_assignment_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, license_assignment_config_stream_request: "LicenseAssignmentConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentConfigService/SubscribeMeta", license_assignment_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, license_assignment_config_set_request: "LicenseAssignmentConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "LicenseAssignmentConfigSetResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseAssignmentConfigService/Set", license_assignment_config_set_request, LicenseAssignmentConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def set_some( self, license_assignment_config_set_some_request: "LicenseAssignmentConfigSetSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentConfigSetSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentConfigService/SetSome", license_assignment_config_set_some_request, LicenseAssignmentConfigSetSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete( self, license_assignment_config_delete_request: "LicenseAssignmentConfigDeleteRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "LicenseAssignmentConfigDeleteResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseAssignmentConfigService/Delete", license_assignment_config_delete_request, LicenseAssignmentConfigDeleteResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def delete_some( self, license_assignment_config_delete_some_request: "LicenseAssignmentConfigDeleteSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentConfigDeleteSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentConfigService/DeleteSome", license_assignment_config_delete_some_request, LicenseAssignmentConfigDeleteSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete_all( self, license_assignment_config_delete_all_request: "LicenseAssignmentConfigDeleteAllRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentConfigDeleteAllResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentConfigService/DeleteAll", license_assignment_config_delete_all_request, LicenseAssignmentConfigDeleteAllResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, license_assignment_config_batched_stream_request: "LicenseAssignmentConfigBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentConfigBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentConfigService/GetAllBatched", license_assignment_config_batched_stream_request, LicenseAssignmentConfigBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, license_assignment_config_batched_stream_request: "LicenseAssignmentConfigBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseAssignmentConfigBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseAssignmentConfigService/SubscribeBatched", license_assignment_config_batched_stream_request, LicenseAssignmentConfigBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class LicenseFileServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, license_file_request: "LicenseFileRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "LicenseFileResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseFileService/GetOne", license_file_request, LicenseFileResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, license_file_some_request: "LicenseFileSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileService/GetSome", license_file_some_request, LicenseFileSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, license_file_stream_request: "LicenseFileStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileService/GetAll", license_file_stream_request, LicenseFileStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, license_file_stream_request: "LicenseFileStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileService/Subscribe", license_file_stream_request, LicenseFileStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, license_file_stream_request: "LicenseFileStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseFileService/GetMeta", license_file_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, license_file_stream_request: "LicenseFileStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileService/SubscribeMeta", license_file_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, license_file_batched_stream_request: "LicenseFileBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileService/GetAllBatched", license_file_batched_stream_request, LicenseFileBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, license_file_batched_stream_request: "LicenseFileBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileService/SubscribeBatched", license_file_batched_stream_request, LicenseFileBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class LicenseFileConfigServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, license_file_config_request: "LicenseFileConfigRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "LicenseFileConfigResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseFileConfigService/GetOne", license_file_config_request, LicenseFileConfigResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, license_file_config_some_request: "LicenseFileConfigSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileConfigSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileConfigService/GetSome", license_file_config_some_request, LicenseFileConfigSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, license_file_config_stream_request: "LicenseFileConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileConfigService/GetAll", license_file_config_stream_request, LicenseFileConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, license_file_config_stream_request: "LicenseFileConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileConfigStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileConfigService/Subscribe", license_file_config_stream_request, LicenseFileConfigStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, license_file_config_stream_request: "LicenseFileConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseFileConfigService/GetMeta", license_file_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, license_file_config_stream_request: "LicenseFileConfigStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileConfigService/SubscribeMeta", license_file_config_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def set( self, license_file_config_set_request: "LicenseFileConfigSetRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "LicenseFileConfigSetResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseFileConfigService/Set", license_file_config_set_request, LicenseFileConfigSetResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def set_some( self, license_file_config_set_some_request: "LicenseFileConfigSetSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileConfigSetSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileConfigService/SetSome", license_file_config_set_some_request, LicenseFileConfigSetSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete( self, license_file_config_delete_request: "LicenseFileConfigDeleteRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "LicenseFileConfigDeleteResponse": """ """ return await self._unary_unary( "/arista.license.v1.LicenseFileConfigService/Delete", license_file_config_delete_request, LicenseFileConfigDeleteResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def delete_some( self, license_file_config_delete_some_request: "LicenseFileConfigDeleteSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileConfigDeleteSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileConfigService/DeleteSome", license_file_config_delete_some_request, LicenseFileConfigDeleteSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def delete_all( self, license_file_config_delete_all_request: "LicenseFileConfigDeleteAllRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileConfigDeleteAllResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileConfigService/DeleteAll", license_file_config_delete_all_request, LicenseFileConfigDeleteAllResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, license_file_config_batched_stream_request: "LicenseFileConfigBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileConfigBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileConfigService/GetAllBatched", license_file_config_batched_stream_request, LicenseFileConfigBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, license_file_config_batched_stream_request: "LicenseFileConfigBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[LicenseFileConfigBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.LicenseFileConfigService/SubscribeBatched", license_file_config_batched_stream_request, LicenseFileConfigBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] class PurchasedLicenseServiceStub(aristaproto.ServiceStub): """ """
[docs] async def get_one( self, purchased_license_request: "PurchasedLicenseRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "PurchasedLicenseResponse": """ """ return await self._unary_unary( "/arista.license.v1.PurchasedLicenseService/GetOne", purchased_license_request, PurchasedLicenseResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def get_some( self, purchased_license_some_request: "PurchasedLicenseSomeRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[PurchasedLicenseSomeResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.PurchasedLicenseService/GetSome", purchased_license_some_request, PurchasedLicenseSomeResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all( self, purchased_license_stream_request: "PurchasedLicenseStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[PurchasedLicenseStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.PurchasedLicenseService/GetAll", purchased_license_stream_request, PurchasedLicenseStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe( self, purchased_license_stream_request: "PurchasedLicenseStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[PurchasedLicenseStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.PurchasedLicenseService/Subscribe", purchased_license_stream_request, PurchasedLicenseStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_meta( self, purchased_license_stream_request: "PurchasedLicenseStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MetaResponse": """ """ return await self._unary_unary( "/arista.license.v1.PurchasedLicenseService/GetMeta", purchased_license_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def subscribe_meta( self, purchased_license_stream_request: "PurchasedLicenseStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[MetaResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.PurchasedLicenseService/SubscribeMeta", purchased_license_stream_request, MetaResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def get_all_batched( self, purchased_license_batched_stream_request: "PurchasedLicenseBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[PurchasedLicenseBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.PurchasedLicenseService/GetAllBatched", purchased_license_batched_stream_request, PurchasedLicenseBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
[docs] async def subscribe_batched( self, purchased_license_batched_stream_request: "PurchasedLicenseBatchedStreamRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "AsyncIterator[PurchasedLicenseBatchedStreamResponse]": """ """ async for response in self._unary_stream( "/arista.license.v1.PurchasedLicenseService/SubscribeBatched", purchased_license_batched_stream_request, PurchasedLicenseBatchedStreamResponse, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response
from .... import fmp as ___fmp__ from ... import subscriptions as __subscriptions__ from ... import time as __time__
[docs] class ApplicableDevicesServiceBase(ServiceBase): """ """
[docs] async def get_one( self, applicable_devices_request: "ApplicableDevicesRequest" ) -> "ApplicableDevicesResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, applicable_devices_some_request: "ApplicableDevicesSomeRequest" ) -> AsyncIterator[ApplicableDevicesSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, applicable_devices_stream_request: "ApplicableDevicesStreamRequest" ) -> AsyncIterator[ApplicableDevicesStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, applicable_devices_stream_request: "ApplicableDevicesStreamRequest" ) -> AsyncIterator[ApplicableDevicesStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, applicable_devices_stream_request: "ApplicableDevicesStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, applicable_devices_stream_request: "ApplicableDevicesStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, applicable_devices_batched_stream_request: "ApplicableDevicesBatchedStreamRequest" ) -> AsyncIterator[ApplicableDevicesBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, applicable_devices_batched_stream_request: "ApplicableDevicesBatchedStreamRequest" ) -> AsyncIterator[ApplicableDevicesBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[ApplicableDevicesRequest, ApplicableDevicesResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[ApplicableDevicesSomeRequest, ApplicableDevicesSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[ApplicableDevicesStreamRequest, ApplicableDevicesStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[ApplicableDevicesStreamRequest, ApplicableDevicesStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[ApplicableDevicesStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[ApplicableDevicesStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[ApplicableDevicesBatchedStreamRequest, ApplicableDevicesBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[ApplicableDevicesBatchedStreamRequest, ApplicableDevicesBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.license.v1.ApplicableDevicesService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, ApplicableDevicesRequest, ApplicableDevicesResponse, ), "/arista.license.v1.ApplicableDevicesService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, ApplicableDevicesSomeRequest, ApplicableDevicesSomeResponse, ), "/arista.license.v1.ApplicableDevicesService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, ApplicableDevicesStreamRequest, ApplicableDevicesStreamResponse, ), "/arista.license.v1.ApplicableDevicesService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, ApplicableDevicesStreamRequest, ApplicableDevicesStreamResponse, ), "/arista.license.v1.ApplicableDevicesService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, ApplicableDevicesStreamRequest, MetaResponse, ), "/arista.license.v1.ApplicableDevicesService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, ApplicableDevicesStreamRequest, MetaResponse, ), "/arista.license.v1.ApplicableDevicesService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, ApplicableDevicesBatchedStreamRequest, ApplicableDevicesBatchedStreamResponse, ), "/arista.license.v1.ApplicableDevicesService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, ApplicableDevicesBatchedStreamRequest, ApplicableDevicesBatchedStreamResponse, ), }
[docs] class ApplicableLicenseBundlesServiceBase(ServiceBase): """ """
[docs] async def get_one( self, applicable_license_bundles_request: "ApplicableLicenseBundlesRequest" ) -> "ApplicableLicenseBundlesResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, applicable_license_bundles_some_request: "ApplicableLicenseBundlesSomeRequest" ) -> AsyncIterator[ApplicableLicenseBundlesSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, applicable_license_bundles_stream_request: "ApplicableLicenseBundlesStreamRequest" ) -> AsyncIterator[ApplicableLicenseBundlesStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, applicable_license_bundles_stream_request: "ApplicableLicenseBundlesStreamRequest" ) -> AsyncIterator[ApplicableLicenseBundlesStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, applicable_license_bundles_stream_request: "ApplicableLicenseBundlesStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, applicable_license_bundles_stream_request: "ApplicableLicenseBundlesStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, applicable_license_bundles_batched_stream_request: "ApplicableLicenseBundlesBatchedStreamRequest", ) -> AsyncIterator[ApplicableLicenseBundlesBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, applicable_license_bundles_batched_stream_request: "ApplicableLicenseBundlesBatchedStreamRequest", ) -> AsyncIterator[ApplicableLicenseBundlesBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[ApplicableLicenseBundlesRequest, ApplicableLicenseBundlesResponse]", ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[ApplicableLicenseBundlesSomeRequest, ApplicableLicenseBundlesSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[ApplicableLicenseBundlesStreamRequest, ApplicableLicenseBundlesStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[ApplicableLicenseBundlesStreamRequest, ApplicableLicenseBundlesStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[ApplicableLicenseBundlesStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[ApplicableLicenseBundlesStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[ApplicableLicenseBundlesBatchedStreamRequest, ApplicableLicenseBundlesBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[ApplicableLicenseBundlesBatchedStreamRequest, ApplicableLicenseBundlesBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.license.v1.ApplicableLicenseBundlesService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, ApplicableLicenseBundlesRequest, ApplicableLicenseBundlesResponse, ), "/arista.license.v1.ApplicableLicenseBundlesService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, ApplicableLicenseBundlesSomeRequest, ApplicableLicenseBundlesSomeResponse, ), "/arista.license.v1.ApplicableLicenseBundlesService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, ApplicableLicenseBundlesStreamRequest, ApplicableLicenseBundlesStreamResponse, ), "/arista.license.v1.ApplicableLicenseBundlesService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, ApplicableLicenseBundlesStreamRequest, ApplicableLicenseBundlesStreamResponse, ), "/arista.license.v1.ApplicableLicenseBundlesService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, ApplicableLicenseBundlesStreamRequest, MetaResponse, ), "/arista.license.v1.ApplicableLicenseBundlesService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, ApplicableLicenseBundlesStreamRequest, MetaResponse, ), "/arista.license.v1.ApplicableLicenseBundlesService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, ApplicableLicenseBundlesBatchedStreamRequest, ApplicableLicenseBundlesBatchedStreamResponse, ), "/arista.license.v1.ApplicableLicenseBundlesService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, ApplicableLicenseBundlesBatchedStreamRequest, ApplicableLicenseBundlesBatchedStreamResponse, ), }
[docs] class LicenseAssignmentServiceBase(ServiceBase): """ """
[docs] async def get_one( self, license_assignment_request: "LicenseAssignmentRequest" ) -> "LicenseAssignmentResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, license_assignment_some_request: "LicenseAssignmentSomeRequest" ) -> AsyncIterator[LicenseAssignmentSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, license_assignment_stream_request: "LicenseAssignmentStreamRequest" ) -> AsyncIterator[LicenseAssignmentStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, license_assignment_stream_request: "LicenseAssignmentStreamRequest" ) -> AsyncIterator[LicenseAssignmentStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, license_assignment_stream_request: "LicenseAssignmentStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, license_assignment_stream_request: "LicenseAssignmentStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, license_assignment_batched_stream_request: "LicenseAssignmentBatchedStreamRequest" ) -> AsyncIterator[LicenseAssignmentBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, license_assignment_batched_stream_request: "LicenseAssignmentBatchedStreamRequest" ) -> AsyncIterator[LicenseAssignmentBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[LicenseAssignmentRequest, LicenseAssignmentResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[LicenseAssignmentSomeRequest, LicenseAssignmentSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[LicenseAssignmentStreamRequest, LicenseAssignmentStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[LicenseAssignmentStreamRequest, LicenseAssignmentStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[LicenseAssignmentStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[LicenseAssignmentStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[LicenseAssignmentBatchedStreamRequest, LicenseAssignmentBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[LicenseAssignmentBatchedStreamRequest, LicenseAssignmentBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.license.v1.LicenseAssignmentService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, LicenseAssignmentRequest, LicenseAssignmentResponse, ), "/arista.license.v1.LicenseAssignmentService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentSomeRequest, LicenseAssignmentSomeResponse, ), "/arista.license.v1.LicenseAssignmentService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentStreamRequest, LicenseAssignmentStreamResponse, ), "/arista.license.v1.LicenseAssignmentService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentStreamRequest, LicenseAssignmentStreamResponse, ), "/arista.license.v1.LicenseAssignmentService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, LicenseAssignmentStreamRequest, MetaResponse, ), "/arista.license.v1.LicenseAssignmentService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentStreamRequest, MetaResponse, ), "/arista.license.v1.LicenseAssignmentService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentBatchedStreamRequest, LicenseAssignmentBatchedStreamResponse, ), "/arista.license.v1.LicenseAssignmentService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentBatchedStreamRequest, LicenseAssignmentBatchedStreamResponse, ), }
[docs] class LicenseAssignmentConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, license_assignment_config_request: "LicenseAssignmentConfigRequest" ) -> "LicenseAssignmentConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, license_assignment_config_some_request: "LicenseAssignmentConfigSomeRequest" ) -> AsyncIterator[LicenseAssignmentConfigSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, license_assignment_config_stream_request: "LicenseAssignmentConfigStreamRequest" ) -> AsyncIterator[LicenseAssignmentConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, license_assignment_config_stream_request: "LicenseAssignmentConfigStreamRequest" ) -> AsyncIterator[LicenseAssignmentConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, license_assignment_config_stream_request: "LicenseAssignmentConfigStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, license_assignment_config_stream_request: "LicenseAssignmentConfigStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, license_assignment_config_set_request: "LicenseAssignmentConfigSetRequest" ) -> "LicenseAssignmentConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set_some( self, license_assignment_config_set_some_request: "LicenseAssignmentConfigSetSomeRequest" ) -> AsyncIterator[LicenseAssignmentConfigSetSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete( self, license_assignment_config_delete_request: "LicenseAssignmentConfigDeleteRequest" ) -> "LicenseAssignmentConfigDeleteResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_some( self, license_assignment_config_delete_some_request: "LicenseAssignmentConfigDeleteSomeRequest", ) -> AsyncIterator[LicenseAssignmentConfigDeleteSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all( self, license_assignment_config_delete_all_request: "LicenseAssignmentConfigDeleteAllRequest", ) -> AsyncIterator[LicenseAssignmentConfigDeleteAllResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, license_assignment_config_batched_stream_request: "LicenseAssignmentConfigBatchedStreamRequest", ) -> AsyncIterator[LicenseAssignmentConfigBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, license_assignment_config_batched_stream_request: "LicenseAssignmentConfigBatchedStreamRequest", ) -> AsyncIterator[LicenseAssignmentConfigBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigRequest, LicenseAssignmentConfigResponse]", ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigSomeRequest, LicenseAssignmentConfigSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigStreamRequest, LicenseAssignmentConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigStreamRequest, LicenseAssignmentConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigSetRequest, LicenseAssignmentConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) async def __rpc_set_some( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigSetSomeRequest, LicenseAssignmentConfigSetSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.set_some, stream, request, ) async def __rpc_delete( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigDeleteRequest, LicenseAssignmentConfigDeleteResponse]", ) -> None: request = await stream.recv_message() response = await self.delete(request) await stream.send_message(response) async def __rpc_delete_some( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigDeleteSomeRequest, LicenseAssignmentConfigDeleteSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_some, stream, request, ) async def __rpc_delete_all( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigDeleteAllRequest, LicenseAssignmentConfigDeleteAllResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_all, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigBatchedStreamRequest, LicenseAssignmentConfigBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[LicenseAssignmentConfigBatchedStreamRequest, LicenseAssignmentConfigBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.license.v1.LicenseAssignmentConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, LicenseAssignmentConfigRequest, LicenseAssignmentConfigResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentConfigSomeRequest, LicenseAssignmentConfigSomeResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentConfigStreamRequest, LicenseAssignmentConfigStreamResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentConfigStreamRequest, LicenseAssignmentConfigStreamResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, LicenseAssignmentConfigStreamRequest, MetaResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentConfigStreamRequest, MetaResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, LicenseAssignmentConfigSetRequest, LicenseAssignmentConfigSetResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/SetSome": grpclib.const.Handler( self.__rpc_set_some, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentConfigSetSomeRequest, LicenseAssignmentConfigSetSomeResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/Delete": grpclib.const.Handler( self.__rpc_delete, grpclib.const.Cardinality.UNARY_UNARY, LicenseAssignmentConfigDeleteRequest, LicenseAssignmentConfigDeleteResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/DeleteSome": grpclib.const.Handler( self.__rpc_delete_some, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentConfigDeleteSomeRequest, LicenseAssignmentConfigDeleteSomeResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/DeleteAll": grpclib.const.Handler( self.__rpc_delete_all, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentConfigDeleteAllRequest, LicenseAssignmentConfigDeleteAllResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentConfigBatchedStreamRequest, LicenseAssignmentConfigBatchedStreamResponse, ), "/arista.license.v1.LicenseAssignmentConfigService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, LicenseAssignmentConfigBatchedStreamRequest, LicenseAssignmentConfigBatchedStreamResponse, ), }
[docs] class LicenseFileServiceBase(ServiceBase): """ """
[docs] async def get_one(self, license_file_request: "LicenseFileRequest") -> "LicenseFileResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, license_file_some_request: "LicenseFileSomeRequest" ) -> AsyncIterator[LicenseFileSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, license_file_stream_request: "LicenseFileStreamRequest" ) -> AsyncIterator[LicenseFileStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, license_file_stream_request: "LicenseFileStreamRequest" ) -> AsyncIterator[LicenseFileStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, license_file_stream_request: "LicenseFileStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, license_file_stream_request: "LicenseFileStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, license_file_batched_stream_request: "LicenseFileBatchedStreamRequest" ) -> AsyncIterator[LicenseFileBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, license_file_batched_stream_request: "LicenseFileBatchedStreamRequest" ) -> AsyncIterator[LicenseFileBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[LicenseFileRequest, LicenseFileResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[LicenseFileSomeRequest, LicenseFileSomeResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[LicenseFileStreamRequest, LicenseFileStreamResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[LicenseFileStreamRequest, LicenseFileStreamResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[LicenseFileStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[LicenseFileStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[LicenseFileBatchedStreamRequest, LicenseFileBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[LicenseFileBatchedStreamRequest, LicenseFileBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.license.v1.LicenseFileService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, LicenseFileRequest, LicenseFileResponse, ), "/arista.license.v1.LicenseFileService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileSomeRequest, LicenseFileSomeResponse, ), "/arista.license.v1.LicenseFileService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileStreamRequest, LicenseFileStreamResponse, ), "/arista.license.v1.LicenseFileService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileStreamRequest, LicenseFileStreamResponse, ), "/arista.license.v1.LicenseFileService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, LicenseFileStreamRequest, MetaResponse, ), "/arista.license.v1.LicenseFileService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileStreamRequest, MetaResponse, ), "/arista.license.v1.LicenseFileService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileBatchedStreamRequest, LicenseFileBatchedStreamResponse, ), "/arista.license.v1.LicenseFileService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileBatchedStreamRequest, LicenseFileBatchedStreamResponse, ), }
[docs] class LicenseFileConfigServiceBase(ServiceBase): """ """
[docs] async def get_one( self, license_file_config_request: "LicenseFileConfigRequest" ) -> "LicenseFileConfigResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, license_file_config_some_request: "LicenseFileConfigSomeRequest" ) -> AsyncIterator[LicenseFileConfigSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, license_file_config_stream_request: "LicenseFileConfigStreamRequest" ) -> AsyncIterator[LicenseFileConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, license_file_config_stream_request: "LicenseFileConfigStreamRequest" ) -> AsyncIterator[LicenseFileConfigStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, license_file_config_stream_request: "LicenseFileConfigStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, license_file_config_stream_request: "LicenseFileConfigStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set( self, license_file_config_set_request: "LicenseFileConfigSetRequest" ) -> "LicenseFileConfigSetResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set_some( self, license_file_config_set_some_request: "LicenseFileConfigSetSomeRequest" ) -> AsyncIterator[LicenseFileConfigSetSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete( self, license_file_config_delete_request: "LicenseFileConfigDeleteRequest" ) -> "LicenseFileConfigDeleteResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_some( self, license_file_config_delete_some_request: "LicenseFileConfigDeleteSomeRequest" ) -> AsyncIterator[LicenseFileConfigDeleteSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all( self, license_file_config_delete_all_request: "LicenseFileConfigDeleteAllRequest" ) -> AsyncIterator[LicenseFileConfigDeleteAllResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, license_file_config_batched_stream_request: "LicenseFileConfigBatchedStreamRequest" ) -> AsyncIterator[LicenseFileConfigBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, license_file_config_batched_stream_request: "LicenseFileConfigBatchedStreamRequest" ) -> AsyncIterator[LicenseFileConfigBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[LicenseFileConfigRequest, LicenseFileConfigResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[LicenseFileConfigSomeRequest, LicenseFileConfigSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[LicenseFileConfigStreamRequest, LicenseFileConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[LicenseFileConfigStreamRequest, LicenseFileConfigStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[LicenseFileConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[LicenseFileConfigStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_set( self, stream: "grpclib.server.Stream[LicenseFileConfigSetRequest, LicenseFileConfigSetResponse]", ) -> None: request = await stream.recv_message() response = await self.set(request) await stream.send_message(response) async def __rpc_set_some( self, stream: "grpclib.server.Stream[LicenseFileConfigSetSomeRequest, LicenseFileConfigSetSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.set_some, stream, request, ) async def __rpc_delete( self, stream: "grpclib.server.Stream[LicenseFileConfigDeleteRequest, LicenseFileConfigDeleteResponse]", ) -> None: request = await stream.recv_message() response = await self.delete(request) await stream.send_message(response) async def __rpc_delete_some( self, stream: "grpclib.server.Stream[LicenseFileConfigDeleteSomeRequest, LicenseFileConfigDeleteSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_some, stream, request, ) async def __rpc_delete_all( self, stream: "grpclib.server.Stream[LicenseFileConfigDeleteAllRequest, LicenseFileConfigDeleteAllResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.delete_all, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[LicenseFileConfigBatchedStreamRequest, LicenseFileConfigBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[LicenseFileConfigBatchedStreamRequest, LicenseFileConfigBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.license.v1.LicenseFileConfigService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, LicenseFileConfigRequest, LicenseFileConfigResponse, ), "/arista.license.v1.LicenseFileConfigService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileConfigSomeRequest, LicenseFileConfigSomeResponse, ), "/arista.license.v1.LicenseFileConfigService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileConfigStreamRequest, LicenseFileConfigStreamResponse, ), "/arista.license.v1.LicenseFileConfigService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileConfigStreamRequest, LicenseFileConfigStreamResponse, ), "/arista.license.v1.LicenseFileConfigService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, LicenseFileConfigStreamRequest, MetaResponse, ), "/arista.license.v1.LicenseFileConfigService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileConfigStreamRequest, MetaResponse, ), "/arista.license.v1.LicenseFileConfigService/Set": grpclib.const.Handler( self.__rpc_set, grpclib.const.Cardinality.UNARY_UNARY, LicenseFileConfigSetRequest, LicenseFileConfigSetResponse, ), "/arista.license.v1.LicenseFileConfigService/SetSome": grpclib.const.Handler( self.__rpc_set_some, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileConfigSetSomeRequest, LicenseFileConfigSetSomeResponse, ), "/arista.license.v1.LicenseFileConfigService/Delete": grpclib.const.Handler( self.__rpc_delete, grpclib.const.Cardinality.UNARY_UNARY, LicenseFileConfigDeleteRequest, LicenseFileConfigDeleteResponse, ), "/arista.license.v1.LicenseFileConfigService/DeleteSome": grpclib.const.Handler( self.__rpc_delete_some, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileConfigDeleteSomeRequest, LicenseFileConfigDeleteSomeResponse, ), "/arista.license.v1.LicenseFileConfigService/DeleteAll": grpclib.const.Handler( self.__rpc_delete_all, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileConfigDeleteAllRequest, LicenseFileConfigDeleteAllResponse, ), "/arista.license.v1.LicenseFileConfigService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileConfigBatchedStreamRequest, LicenseFileConfigBatchedStreamResponse, ), "/arista.license.v1.LicenseFileConfigService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, LicenseFileConfigBatchedStreamRequest, LicenseFileConfigBatchedStreamResponse, ), }
[docs] class PurchasedLicenseServiceBase(ServiceBase): """ """
[docs] async def get_one( self, purchased_license_request: "PurchasedLicenseRequest" ) -> "PurchasedLicenseResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_some( self, purchased_license_some_request: "PurchasedLicenseSomeRequest" ) -> AsyncIterator[PurchasedLicenseSomeResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all( self, purchased_license_stream_request: "PurchasedLicenseStreamRequest" ) -> AsyncIterator[PurchasedLicenseStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe( self, purchased_license_stream_request: "PurchasedLicenseStreamRequest" ) -> AsyncIterator[PurchasedLicenseStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_meta( self, purchased_license_stream_request: "PurchasedLicenseStreamRequest" ) -> "MetaResponse": """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_meta( self, purchased_license_stream_request: "PurchasedLicenseStreamRequest" ) -> AsyncIterator[MetaResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_all_batched( self, purchased_license_batched_stream_request: "PurchasedLicenseBatchedStreamRequest" ) -> AsyncIterator[PurchasedLicenseBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_batched( self, purchased_license_batched_stream_request: "PurchasedLicenseBatchedStreamRequest" ) -> AsyncIterator[PurchasedLicenseBatchedStreamResponse]: """ """ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_get_one( self, stream: "grpclib.server.Stream[PurchasedLicenseRequest, PurchasedLicenseResponse]" ) -> None: request = await stream.recv_message() response = await self.get_one(request) await stream.send_message(response) async def __rpc_get_some( self, stream: "grpclib.server.Stream[PurchasedLicenseSomeRequest, PurchasedLicenseSomeResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_some, stream, request, ) async def __rpc_get_all( self, stream: "grpclib.server.Stream[PurchasedLicenseStreamRequest, PurchasedLicenseStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all, stream, request, ) async def __rpc_subscribe( self, stream: "grpclib.server.Stream[PurchasedLicenseStreamRequest, PurchasedLicenseStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe, stream, request, ) async def __rpc_get_meta( self, stream: "grpclib.server.Stream[PurchasedLicenseStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() response = await self.get_meta(request) await stream.send_message(response) async def __rpc_subscribe_meta( self, stream: "grpclib.server.Stream[PurchasedLicenseStreamRequest, MetaResponse]" ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_meta, stream, request, ) async def __rpc_get_all_batched( self, stream: "grpclib.server.Stream[PurchasedLicenseBatchedStreamRequest, PurchasedLicenseBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.get_all_batched, stream, request, ) async def __rpc_subscribe_batched( self, stream: "grpclib.server.Stream[PurchasedLicenseBatchedStreamRequest, PurchasedLicenseBatchedStreamResponse]", ) -> None: request = await stream.recv_message() await self._call_rpc_handler_server_stream( self.subscribe_batched, stream, request, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/arista.license.v1.PurchasedLicenseService/GetOne": grpclib.const.Handler( self.__rpc_get_one, grpclib.const.Cardinality.UNARY_UNARY, PurchasedLicenseRequest, PurchasedLicenseResponse, ), "/arista.license.v1.PurchasedLicenseService/GetSome": grpclib.const.Handler( self.__rpc_get_some, grpclib.const.Cardinality.UNARY_STREAM, PurchasedLicenseSomeRequest, PurchasedLicenseSomeResponse, ), "/arista.license.v1.PurchasedLicenseService/GetAll": grpclib.const.Handler( self.__rpc_get_all, grpclib.const.Cardinality.UNARY_STREAM, PurchasedLicenseStreamRequest, PurchasedLicenseStreamResponse, ), "/arista.license.v1.PurchasedLicenseService/Subscribe": grpclib.const.Handler( self.__rpc_subscribe, grpclib.const.Cardinality.UNARY_STREAM, PurchasedLicenseStreamRequest, PurchasedLicenseStreamResponse, ), "/arista.license.v1.PurchasedLicenseService/GetMeta": grpclib.const.Handler( self.__rpc_get_meta, grpclib.const.Cardinality.UNARY_UNARY, PurchasedLicenseStreamRequest, MetaResponse, ), "/arista.license.v1.PurchasedLicenseService/SubscribeMeta": grpclib.const.Handler( self.__rpc_subscribe_meta, grpclib.const.Cardinality.UNARY_STREAM, PurchasedLicenseStreamRequest, MetaResponse, ), "/arista.license.v1.PurchasedLicenseService/GetAllBatched": grpclib.const.Handler( self.__rpc_get_all_batched, grpclib.const.Cardinality.UNARY_STREAM, PurchasedLicenseBatchedStreamRequest, PurchasedLicenseBatchedStreamResponse, ), "/arista.license.v1.PurchasedLicenseService/SubscribeBatched": grpclib.const.Handler( self.__rpc_subscribe_batched, grpclib.const.Cardinality.UNARY_STREAM, PurchasedLicenseBatchedStreamRequest, PurchasedLicenseBatchedStreamResponse, ), }