# Copyright (c) 2022 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.
import json
import signal
from collections.abc import Callable
from enum import IntEnum
from logging import getLogger
from time import process_time_ns
from typing import Any, Dict, List, Optional
import grpc
import requests
from google.protobuf.timestamp_pb2 import Timestamp
from grpc import RpcError, StatusCode
from cloudvision.Connector.codec import Path
from cloudvision.Connector.grpc_client import (
GRPCClient,
create_notification,
create_query,
)
from .action import Action
from .changecontrol import ChangeControl
from .connections import AuthAndEndpoints, addHeaderInterceptor
from .constants import (
BUILD_ID_ARG,
STUDIO_ID_ARG,
STUDIO_IDS_ARG,
TIMEOUT_REQUEST,
TIMEOUT_REQUEST_DI,
WORKSPACE_ID_ARG,
)
from .device import Device, Interface
from .exceptions import (
ConnectionFailed,
DeviceCommandsFailed,
InvalidContextException,
InvalidCredentials,
LoggingFailed,
TimeoutExpiry,
)
from .execution import Execution
from .logger import Logger
from .studio import Studio, StudioCustomData
from .tags import Tag, Tags
from .topology import Topology
from .user import User
from .utils import extractJSONEncodedListArg
from .workspace import Workspace
ACCESS_TOKEN = "access_token"
CMDS = "cmds"
DEVICE_ID = "deviceID"
EXEC_ACTION = "execaction"
HEADER_ACCEPT = "Accept"
HEADER_AUTH = "Authorization"
HEADER_CONTENT_TYPE = "Content-Type"
HEADER_JSON_APPLICATION = "application/json"
HEADERS = {HEADER_ACCEPT: HEADER_JSON_APPLICATION}
HOST = "host"
JSON = "json"
REQ_FORMAT = "format"
STOP_ON_ERROR = "stopOnError"
TIMEOUT_CLI = "readTimeout"
TIMEOUT_CONN = "connTimeout"
TMP_STORAGE_PATH = ["action", "tmp"]
USERNAME = "username"
DATASET_TYPE = "dataset_type"
ORGANIZATION = "organization"
GRPC_RETRY_POLICY_JSON = json.dumps(
{
"methodConfig": [
{
"name": [{"service": ""}],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "1s",
"maxBackoff": "16s",
"backoffMultiplier": 2.0,
"retryableStatusCodes": ["UNAVAILABLE"],
},
}
]
}
)
systemLogger = getLogger(__name__)
[docs]
class LoggingLevel(IntEnum):
Trace = 0,
Debug = 1,
Info = 2,
Warn = 3,
Error = 4,
Critical = 5
[docs]
def monitorTimerHandler(signum, frame):
'''
A handler function for the timer that will raise our custom exception
Needs to be declared out here so that we can compare it to ensure that
no ongoing timer is running
'''
raise TimeoutExpiry
[docs]
class Context:
'''
Context object that stores a number of system and user-defined parameters:
- user: Info on the user executing this script
- device: Info on the device the script is operating on, if applicable
- action: Info on the action associated with the current context, if applicable
- changeControl: Common change-control script parameters (Deprecated, use action)
- studio: Common studio parameters
- execution: Info on the standalone execution context
- connections: Object storing connection info used by the context, e.g. apiserver address
- logger: Logging functions to be used by the context
'''
def __init__(self, user: User,
device: Optional[Device] = None,
action: Optional[Action] = None,
changeControl: Optional[ChangeControl] = None,
studio: Optional[Studio] = None,
execution: Optional[Execution] = None,
connections: Optional[AuthAndEndpoints] = None,
logger: Optional[Logger] = None,
loggingLevel: Optional[LoggingLevel] = None):
self.user = user
self.device = device
self.action = action
self.changeControl = changeControl
self.studio = studio
self.execution = execution
# When connections is None, replace with an empty AuthAndEndpoints obj
# so that further lookups succeed without throwing exceptions
self.connections = connections if connections else AuthAndEndpoints()
# In the case where the context is not passed a logger, create a backup one and use that
self.logger = logger if logger else self.__getBackupLogger()
self.__connector = None
self.__serviceChann = None
self.topology: Optional[Topology] = None
self.preserveWhitespace = False
self.loggingLevel = loggingLevel if loggingLevel else LoggingLevel.Info
self.stats: Dict = {}
self.benchmarking = False
self.tags = Tags(self)
self.studioCustomData = StudioCustomData(self)
self.workspace: Optional[Workspace] = None
[docs]
def cleanup(self):
'''
Cleans up open channels and other connections
Called by the script execution infra once a script or template has been executed
'''
if self.__connector:
self.__connector.close()
if self.__serviceChann:
self.__serviceChann.close()
[docs]
def getDevice(self):
'''
Returns the device associated to the context.
'''
return self.device
[docs]
def getDeviceHostname(self, device: Device = None):
'''
Returns the hostname of the device associated to the context.
Retrieves that information if the context device doesn't have it
'''
if not device:
device = self.device
if not device:
raise InvalidContextException(("getDeviceHostname requires either a device or the"
" calling context to have a device associated with it"))
if not device.hostName:
cmdResponse = self.runDeviceCmds(["enable", "show hostname"], device)
if len(cmdResponse) != 2:
raise DeviceCommandsFailed((f"'show hostname' failed on device {device.id}"
f" with response: {cmdResponse}"))
hostnameErr = cmdResponse[1].get('error')
if hostnameErr:
raise DeviceCommandsFailed((f"'show hostname' failed on device {device.id}"
f" with error: {hostnameErr}"))
device.hostName = cmdResponse[1]['response']['hostname']
return device.hostName
[docs]
def setTopology(self, topology: Topology):
'''
Sets the topology of the context.
Called during context initialisation during script execution,
to set the Topology to that of the Inventory and Topology Studio.
Does not need to be called by the script writers.
'''
self.topology = topology
self.topology.setLogger(systemLogger)
# Studios can have a device associated with it through it's inputs, which is only extracted
# after the topology is set, not passed into the context at it's creation.
# In the case where a device was not passed to the ctx, and a topology is set where there is
# only a single device in the topology, set that device as the one used in the context
if self.topology and self.studio and self.device is None:
topologyDevs = self.topology.getDevices(self.studio.deviceIds)
if len(topologyDevs) == 1:
self.device = topologyDevs[0]
[docs]
def getCvClient(self):
'''
Instantiates a cloudvision connector client to the cv database
based on the current user auth token
:return: cloudvision Connector client
'''
if self.__connector:
return self.__connector
if self.connections.apiserverAddr is None or self.user is None:
return None
connector = GRPCClient(self.connections.apiserverAddr,
ca=self.connections.aerisCACert,
tokenValue=self.user.token)
self.__connector = connector
return connector
[docs]
def getApiClient(self, stub):
'''
Instantiates a resource api client to the service server
based on the current user auth token and passed stub
:param: stub of the resource api to connect to
:return: resource api client to the passed stub
'''
def add_user_context(chann):
# If provided, add the user context to the grpc metadata
# This allows the context to access services correctly
if self.user is not None:
username_interceptor = addHeaderInterceptor(USERNAME, self.user.username)
chann = grpc.intercept_channel(chann, username_interceptor)
return chann
if self.__serviceChann:
return stub(self.__serviceChann)
if self.connections.serviceAddr is None:
systemLogger.error(
"service address is None, trying to establish connection to apiserver")
# use api server's secure grpc channel if service address is not provided
if self.getCvClient():
self.__serviceChann = self.getCvClient().channel
self.__serviceChann = add_user_context(self.__serviceChann)
return stub(self.__serviceChann)
systemLogger.error(
"cannot establish connection to apiserver: %s", self.__connector)
raise ConnectionFailed("cannot establish connection to api server")
# Verify that we have a correct token
token = self.user.token
if token is None:
systemLogger.error("no valid token for authenticating the API Client")
raise InvalidCredentials("no valid token for authenticating the API Client")
# Try and read the ca cert
caData = None
if self.connections.serviceCACert:
with open(self.connections.serviceCACert, 'rb') as cf:
caData = cf.read()
# Build the grpc connection with the token and the potential ca cert
creds = grpc.ssl_channel_credentials(root_certificates=caData)
tokCreds = grpc.access_token_call_credentials(token)
creds = grpc.composite_channel_credentials(creds, tokCreds)
channel_options = []
channel_options.append(("grpc.enable_retries", 1))
channel_options.append(("grpc.service_config", GRPC_RETRY_POLICY_JSON))
self.__serviceChann = grpc.secure_channel(
self.connections.serviceAddr, creds, channel_options
)
self.__serviceChann = add_user_context(self.__serviceChann)
return stub(self.__serviceChann)
[docs]
def runDeviceCmds(self, commandsList: List[str], device: Optional[Device] = None, fmt=JSON,
validateResponse=True, timeout=TIMEOUT_REQUEST_DI):
'''
Sends a post request to DI, encodes commandsList in message body.
Receives output of cli commands from DI as json object.
:param commandsList:
:param device: device that the commands are run on.
Defaults to the context change control device if unspecified
:param validateResponse: Validates that the commands ran successfully. Defaults to true and
will raise an exception in the case an error message is present in
the command response.
Can be set to false to allow commands to run without raising any
resultant error as an exception, e.g. device reboot commands can
cause heartbeat error messages in the response, but we can discard
them as the device will reboot.
:return: json object containing output of commandsList (if validateResponse is True)
OR
raw request response (if validateResponse is False)
:raises: InvalidContextException when context is invalid for execution of device commands
requests.ConnectionError if connection cannot be established to the command
endpoint address
HTTPError if the status code from the command request is not a 200
DeviceCommandsFailed if validating command responses and the contents
contain an error code/message (can occur if request is a 200)
'''
if not self or not self.action:
raise InvalidContextException(
"runDeviceCmds is only available in action contexts")
if device is None:
if self.device is None:
raise InvalidContextException(
"runDeviceCmds is only available when a device is set")
device = self.device
if not device.id:
raise InvalidContextException(
"runDeviceCmds requires a device with an id")
if not self.connections.serviceAddr or not self.connections.commandEndpoint:
raise InvalidContextException("runDeviceCmds must have a valid service "
"address and command endpoint specified")
# From the DI service documentation about the HOST field:
# Host can be either IP address or hostname
deviceInteractionHost = device.ip if device.ip else device.hostName
request = {
HOST: deviceInteractionHost,
DEVICE_ID: device.id,
CMDS: commandsList,
TIMEOUT_CLI: self.connections.cliTimeout,
TIMEOUT_CONN: self.connections.connectionTimeout,
REQ_FORMAT: fmt,
STOP_ON_ERROR: False
}
data = json.dumps(request)
accessToken = ""
if self:
accessToken = self.user.token
cookies = {ACCESS_TOKEN: accessToken}
try:
runCmdURL = f"https://{self.connections.serviceAddr}/{self.connections.commandEndpoint}"
self.debug(f"Executing the following command(s) on device {device.id}: {commandsList}")
response = requests.post(runCmdURL, data=data, headers=HEADERS,
cookies=cookies, timeout=timeout, verify=False)
except requests.ConnectionError as e:
self.error(f"Got exception while establishing connection to DI : {e}")
raise
self.debug(f"Status code received from DI : {response.status_code}")
if response.status_code != 200:
response.raise_for_status()
# In the case where a response is not validated do not check for errors or convert
# the response to JSON. Simply return it as-is
if not validateResponse:
return response
resp = response.json()
# Check that some other issue did not occur. It has been seen that a statuscode 200 was
# received for the response, but when the response contents are jsonified and returned,
# they can be a simple dictionary with two entries, 'errorCode' and 'errorMessage',
# instead of the usual list of dictionaries for each command response.
# This is not caused by the commands that the user provided, but is an issue with
# the request that was sent to the command endpoint
# If that occurs, raise a DeviceCommandsFailedException
# An example of this is {'errorCode': '341604', 'errorMessage': 'Invalid request'}
if all(key in resp for key in ["errorCode", "errorMessage"]):
errCode = resp["errorCode"]
errMsg = resp["errorMessage"]
raise DeviceCommandsFailed((f"Commands failed to run on device \"{device.id}\","
f" returned {errCode}:\"{errMsg}\""), errCode, errMsg)
# Check that none of the commands have outright failed
for i, cmdResp in enumerate(resp):
err = cmdResp.get("error")
if err:
raise DeviceCommandsFailed((f"Command \"{commandsList[i]}\" failed to run on "
f"device \"{device.id}\", returned {err}"))
return resp
[docs]
@staticmethod
def doWithTimeout(f, timeout: int):
'''
Takes a function and a timeout in seconds.
Will call and return the result of f, but raises a cvlib.TimeoutExpiry
exception if it runs longer than <timeout>
NOTE: If there is an attempt to recursively call this function, an InvalidContextException
will be raised.
'''
# Store the default alarm signal so we can set it back again when we're done
originalSigHandler = signal.getsignal(signal.SIGALRM)
# Ensure that the current signal is not the monitorTimerHandler which would indicate a
# recursive call. This is not supported as due to any ongoing alarm timeouts of previous
# calls will be be overwritten and we can only have 1 alarm at a time.
if originalSigHandler is monitorTimerHandler:
raise InvalidContextException("Cannot recursively call doWithTimeout")
# Set up a signal handler that will cause a signal.SIGALRM signal to trigger our timer
# handler
signal.signal(signal.SIGALRM, monitorTimerHandler)
# Set an alarm to fire in <timeout> seconds. This will call the handler we bound earlier
signal.alarm(timeout)
try:
return f()
finally:
# Always turn off the alarm, whether returning a value or propagating an exception
signal.alarm(0)
# Reset the alarm signal handler
signal.signal(signal.SIGALRM, originalSigHandler)
[docs]
def initializeStudioCtxFromArgs(self):
'''
initializeStudioCtxFromArgs associates studio(s) and a workspace with the current
context from argument information available in the current context's action class.
This allows for actions such as Studio Autofill Actions and Studio Build Hook Actions
to associate a studio with their active contexts, allowing them to access various helper
methods that require the presence of a studio or workspace with the active context,
such as those offered by the tags class.
NOTE: Will raise InvalidContextException if called and either a studio is already
bound to the context or no action is available in the context
'''
if self.studio or self.workspace:
raise InvalidContextException(
"initializeStudioCtxFromArgs already has studio ctx initialised")
if not self.action:
raise InvalidContextException("initializeStudioCtxFromArgs must be"
" run in an action context")
buildId = self.action.args.get(BUILD_ID_ARG)
# Will be set if action is in a studio scope
studioId = self.action.args.get(STUDIO_ID_ARG)
# Will be present for some execution contexts, e.g. Studio Build Hook actions
studioIdsStr = self.action.args.get(STUDIO_IDS_ARG)
workspaceId = self.action.args.get(WORKSPACE_ID_ARG)
studioIds = None
if studioIdsStr:
try:
studioIds = extractJSONEncodedListArg(studioIdsStr)
except ValueError as e:
self.warning((f"Unable to extract json encoded list '{STUDIO_IDS_ARG}': {e}\n"
f"Ignoring {STUDIO_IDS_ARG} contents"))
if not workspaceId:
raise InvalidContextException(
("initializeStudioCtxFromArgs: Missing minimum required argument"
f" {WORKSPACE_ID_ARG} for studio ctx initialisation"))
self.workspace = Workspace(
workspaceId=workspaceId,
studioIds=studioIds,
buildId=buildId
)
if studioId:
self.studio = Studio(
workspaceId=workspaceId,
studioId=studioId,
buildId=buildId
)
[docs]
def getWorkspaceId(self):
if not (self.workspace or self.studio):
raise InvalidContextException(("Context does not have a workspace or studio "
"associated with it"))
return self.workspace.id if self.workspace else self.studio.workspaceId
[docs]
def httpGet(self, path: str):
'''
Issues a https GET to a given endpoint in CloudVision and returns the json
content if there are no errors
'''
if not (self.user and self.user.token):
raise InvalidContextException("httpGet requires an authenticated"
+ " user associated with the context")
if not self.connections.serviceAddr:
raise InvalidContextException("httpGet must have a valid service address specified")
# Perform a split to ensure to drop any ports that are provided as part of the serviceAddr
url = "https://" + self.connections.serviceAddr.split(':', maxsplit=1)[0]
endpoint = url + path
headers = {
HEADER_ACCEPT: HEADER_JSON_APPLICATION,
HEADER_CONTENT_TYPE: HEADER_JSON_APPLICATION,
HEADER_AUTH: f"Bearer {self.user.token}"
}
try:
response = requests.get(endpoint, headers=headers, verify=False)
response.raise_for_status()
respJson = json.loads(response.text)
except requests.ConnectionError as e:
self.error(f"Got exception while establishing connection to url '{endpoint}': {e}")
raise
except requests.HTTPError as e:
self.error(f"Got error response from get on '{endpoint}': {e}")
raise
return respJson
[docs]
def httpGetConfig(self, path: str):
'''
Issues a http get to retrieve the device config content at a cvp url and formats it.
'''
rawConfig = self.httpGet(path).get('config')
if not rawConfig:
return ""
formattedConfig = rawConfig.replace('\\n', '\n').replace('\\t', '\t')
return formattedConfig
[docs]
def httpPost(self, path, request={}):
'''
Issues a https POST to a given endpoint in CloudVision
'''
data = json.dumps(request)
if not (self.user and self.user.token):
raise InvalidContextException("httpPost requires an authenticated"
+ " user associated with the context")
if not self.connections.serviceAddr:
raise InvalidContextException("httpPost must have a valid service address specified")
# Perform a split to ensure to drop any ports that are provided as part of the serviceAddr
url = "https://" + self.connections.serviceAddr.split(':', maxsplit=1)[0]
endpoint = url + path
headers = {
HEADER_ACCEPT: HEADER_JSON_APPLICATION,
HEADER_CONTENT_TYPE: HEADER_JSON_APPLICATION,
HEADER_AUTH: f"Bearer {self.user.token}"
}
try:
response = requests.post(endpoint, data=data, headers=headers, verify=False)
response.raise_for_status()
except requests.ConnectionError as e:
self.error(f"Got exception while establishing connection to url '{endpoint}': {e}")
raise
except requests.HTTPError as e:
self.error(f"Got error response from get on '{endpoint}': {e}")
raise
[docs]
def Get(self, path: List[str], keys: List[str] = [], dataset: str = "analytics"):
'''
Get issues a get request to the provided path/key(s) combo, and returns the contents
of that path as a dictionary. Wildcarding is not advised as the returned dictionary
is only a single level deep, so adding wildcards will cause overwrites in the results.
Params:
- path: The path to issue the get to, in the form of a list of strings
- keys: The key(s) to get at the path. Defaults to all keys
- dataset: The dataset to issue the get to. Defaults to the `analytics` dataset
'''
client: GRPCClient = self.getCvClient()
query = create_query(pathKeys=[(path, keys)], dId=dataset)
results = {}
for batch in client.get([query]):
for notif in batch["notifications"]:
results.update(notif.get("updates", {}))
return results
def _getGenericKey(self) -> str:
'''
Creates a generic key for use in store/retrieve based off of the available
context information such that overwrites will be done on successive runs of
the same calling studio/WS or action.
When building the key based on the context;
- If it is a studio context, the key generated will be in the form of
"<studioId>:<buildId>" if a build ID is present, else "<studioId>"
- If it is an action context, the key generated will be in the form of
"<executionID>"
Raises InvalidContextException if not enough context information is present
to create a key
'''
if self.studio is not None:
if self.studio.buildId:
return ":".join([self.studio.studioId, self.studio.buildId])
return self.studio.studioId
if self.action and self.execution:
return self.execution.executionId
raise InvalidContextException(
"store/retrieve without key requires a studio or action in the context")
def _getStoragePath(self, additionalElems: List[str] = []) -> List[str]:
'''
Builds a generic path for use in store/retrieve based off of either the passed
additional elements provided by the user or the available context information.
All paths will contain "action/tmp" as the root.
When building a path based on the context;
- If it is a studio context, the path generated will be in the form of
/action/tmp/workspace/<workspaceId>/studio
- If it is an action context, the path generated will be in the form of
/action/tmp/action/<actionId>
Raises InvalidContextException if no additional elems were passed by the user
and not enough context information is present to create a path
'''
storage_path = TMP_STORAGE_PATH.copy()
if additionalElems:
storage_path.extend(additionalElems)
return storage_path
if self.studio and self.studio.workspaceId:
storage_path.extend(["workspace", self.studio.workspaceId, "studio"])
return storage_path
if self.action and self.action.id:
storage_path.extend(["action", self.action.id])
return storage_path
raise InvalidContextException(
"store without specified path requires a studio or action in the context")
[docs]
def store(self, data, path: List[str] = [], customKey=""):
'''
store puts the passed data into a path in the Database
NOTE: This function is only available to those with write permissions to the
'action' path in the cvp dataset (granted by the action module), as that is
where the store is.
This should be used in conjunction with the retrieve method to ensure that
the entry is cleaned up after use.
Params:
- data: The data to store
- path: The path to store the data at, in the form of a list of strings.
If this argument is omitted, a generic path will be created for
use. All paths have "action/tmp" as the root.
- customKey: The key to store the data at in the path. If this argument is
omitted, a generic string key will be created for use.
Raises InvalidContextException if not enough context information is
present to create a generic key/path (if required)
'''
key = customKey if customKey else self._getGenericKey()
storagePath = self._getStoragePath(additionalElems=path)
update = [(key, data)]
ts = Timestamp()
ts.GetCurrentTime()
notifs = [create_notification(ts, storagePath, updates=update)]
# Generate the list of path pointer notifs that lead to the new entry
for i, pathElem in enumerate(storagePath):
if i == 0:
# We don't want to keep writing the path pointer to actions
# in the top level of the cvp dataset
continue
pathPointerPath = storagePath[:i]
pathPointerUpdate = [(pathElem, Path(keys=storagePath[:i + 1]))]
notifs.append(create_notification(ts, pathPointerPath, updates=pathPointerUpdate))
try:
self.getCvClient().publish(dId="cvp", notifs=notifs)
except RpcError as exc:
# If the exception is not a permissions error, reraise the original
# exception as something went wrong
if exc.code() != StatusCode.PERMISSION_DENIED:
raise
raise InvalidCredentials(
f"Context user does not have permission to write to path '{storagePath}'")
[docs]
def retrieve(self, path: List[str] = [], customKey="", delete=True):
'''
retrieve gets the passed key's data from the provided path from the
Database store.
NOTE: This function is only available to those with read permissions to the
'action' path in the cvp dataset (granted by the action module), as that is
where the store is.
Params:
- path: The path where the data is stored at, in the form of a list
of strings. If this argument is omitted, a generic path will be
created for use. All paths have "action/tmp" as the root.
- customKey: The key where the data is stored at in the path. If this argument
is omitted, a generic string key will be created for use.
- delete: Boolean flag marking whether a delete should be issued to the
store for the key/path combo to clean up after use.
Deleting once the contents have been retrieved is the default.
Raises InvalidContextException if not enough context information is
present to create a generic key/path (if required)
'''
key = customKey if customKey else self._getGenericKey()
storagePath = self._getStoragePath(additionalElems=path)
try:
results = self.Get(path=storagePath, keys=[key], dataset="cvp")
except RpcError as exc:
# If the exception is not a permissions error, reraise the original
# exception as something went wrong
if exc.code() != StatusCode.PERMISSION_DENIED:
raise
raise InvalidCredentials(
f"Context user does not have permission to read from path '{storagePath}'")
if delete:
ts = Timestamp()
ts.GetCurrentTime()
try:
self.getCvClient().publish(
dId="cvp", notifs=[create_notification(ts, storagePath, deletes=[key])])
except RpcError as exc:
# If the exception is not a permissions error, reraise the original
# exception as something went wrong
if exc.code() != StatusCode.PERMISSION_DENIED:
raise
raise InvalidCredentials(
f"Context user does not have permission to write to path '{storagePath}'")
return results.get(key)
[docs]
def clear(self, path: List, keys: List = [], fullPathOnly: bool = False):
"""
clear issues deletes to the backend data store for cleanup, where retrieve with
delete is not required or not suitable. By default, it walks upwards through
each element in the path provided and issues a delete-all at each sub-path
to purge the information store. The fullPathOnly flag can be set to true to only
issue a deletes to the full path, instead of at all sub-paths in the path. A list
of keys to specifically delete can be provided to only delete those fields.
NOTE: While this function accepts wildcards, note that using them may
impact other storage paths used in other actions.
Params:
- path: The path where the data should be purged, in the form of a list
of strings or Connector Wildcards.
- keys: The list of keys to delete. Defaults to a delete-all.
- fullPathOnly: Boolean flag marking whether a delete-all should only be issued
to the store for full path.
By default, deletes are issued at all sub-paths.
"""
storagePath = self._getStoragePath(additionalElems=path)
client = self.getCvClient()
ts = Timestamp()
ts.GetCurrentTime()
# Repeatedly issue delete-alls until we reach the root storage path
while len(storagePath) > len(TMP_STORAGE_PATH):
client.publish(dId="cvp",
notifs=[create_notification(ts, storagePath, deletes=keys)])
if fullPathOnly:
break
storagePath.pop()
[docs]
@staticmethod
def showIf(linefmt, args):
if args:
return linefmt.format(args)
return ''
[docs]
def alog(self, message, userName=None, customKey=None, tags: Dict[str, str] = None,
ignoreFailures=False):
"""
Creates an audit log entry in CloudVision, with the provided info.
The context's associated device name and id will be added to the audit log metadata
if it is available in the context.
Note: This method is a no-op when run in a Studio template rendering context. Use the
preferred ctx.info, ctx.error, ctx.debug etc. logging methods there instead.
Args:
message: The string message for the audit log entry
userName: The user to make the audit log entry under. If unspecified, will
use the context's user's username
customKey: A custom key that will be used to alias the audit log entry if provided
tags: A string dictionary of additional custom tags to add to the audit log
entry. The action ID is always added as a tag to the audit log
ignoreFailures: Prevents logging exceptions from being raised
"""
try:
self.logger.alog(self, message, userName, customKey, tags)
except LoggingFailed:
if not ignoreFailures:
raise
[docs]
def trace(self, msg, ignoreFailures=False, tags: Dict[str, str] = None):
"""
Creates a trace level log if the context's logging level is set to allow for it
If the logging level is higher, is a no-op
Args:
msg: The string message for the log entry
ignoreFailures: Prevents logging exceptions from being raised
tags: A string dictionary of additional custom tags to add to the log
entry. Some system tags are always inserted, e.g. buildID
when logging is done in a studio context.
"""
if self.getLoggingLevel() > LoggingLevel.Trace:
return
try:
self.logger.trace(self, msg, tags)
except LoggingFailed:
if not ignoreFailures:
raise
[docs]
def debug(self, msg, ignoreFailures=False, tags: Dict[str, str] = None):
"""
Creates a debug level log if the context's logging level is set to allow for it
If the logging level is higher, is a no-op
Args:
msg: The string message for the log entry
ignoreFailures: Prevents logging exceptions from being raised
tags: A string dictionary of additional custom tags to add to the log
entry. Some system tags are always inserted, e.g. buildID
when logging is done in a studio context.
"""
if self.getLoggingLevel() > LoggingLevel.Debug:
return
try:
self.logger.debug(self, msg, tags)
except LoggingFailed:
if not ignoreFailures:
raise
[docs]
def info(self, msg, ignoreFailures=False, tags: Dict[str, str] = None):
"""
Creates an info level log if the context's logging level is set to allow for it
If the logging level is higher, is a no-op
Args:
msg: The string message for the log entry
ignoreFailures: Prevents logging exceptions from being raised
tags: A string dictionary of additional custom tags to add to the log
entry. Some system tags are always inserted, e.g. buildID
when logging is done in a studio context.
"""
if self.getLoggingLevel() > LoggingLevel.Info:
return
try:
self.logger.info(self, msg, tags)
except LoggingFailed:
if not ignoreFailures:
raise
[docs]
def warning(self, msg, ignoreFailures=False, tags: Dict[str, str] = None):
"""
Creates a warning level log if the context's logging level is set to allow for it
If the logging level is higher, is a no-op
Args:
msg: The string message for the log entry
ignoreFailures: Prevents logging exceptions from being raised
tags: A string dictionary of additional custom tags to add to the log
entry. Some system tags are always inserted, e.g. buildID
when logging is done in a studio context.
"""
if self.getLoggingLevel() > LoggingLevel.Warn:
return
try:
self.logger.warning(self, msg, tags)
except LoggingFailed:
if not ignoreFailures:
raise
[docs]
def error(self, msg, ignoreFailures=False, tags: Dict[str, str] = None):
"""
Creates an error level log if the context's logging level is set to allow for it
If the logging level is higher, is a no-op
Args:
msg: The string message for the log entry
ignoreFailures: Prevents logging exceptions from being raised
tags: A string dictionary of additional custom tags to add to the log
entry. Some system tags are always inserted, e.g. buildID
when logging is done in a studio context.
"""
if self.getLoggingLevel() > LoggingLevel.Error:
return
try:
self.logger.error(self, msg, tags)
except LoggingFailed:
if not ignoreFailures:
raise
[docs]
def critical(self, msg, ignoreFailures=False, tags: Dict[str, str] = None):
"""
Creates a critical level log
Args:
msg: The string message for the log entry
ignoreFailures: Prevents logging exceptions from being raised
tags: A string dictionary of additional custom tags to add to the log
entry. Some system tags are always inserted, e.g. buildID
when logging is done in a studio context.
"""
try:
self.logger.critical(self, msg, tags)
except LoggingFailed:
if not ignoreFailures:
raise
[docs]
def keepBlankLines(self, preserve=True):
# This function is only relevant for Studio Templates.
# Script executor code introspects this value to decide whether to
# clean rendered templates post-rendering
self.preserveWhitespace = preserve
[docs]
def setLoggingLevel(self, loggingLevel: LoggingLevel):
"""
Takes a logging level value and applies it for use in logging call checks
"""
self.loggingLevel = loggingLevel
[docs]
def getLoggingLevel(self):
"""
Gets the current logging level of the context
"""
return self.loggingLevel
[docs]
def activateDebugMode(self):
"""
Activates debug logging by setting the logging level to debug
"""
self.loggingLevel = LoggingLevel.Debug
[docs]
def deactivateDebugMode(self):
"""
Deactivates debug logging by setting the logging level to info
"""
self.loggingLevel = LoggingLevel.Info
# In the case where the context has no logger defined,
# we can create a compatible backup logger using the system logger
# This is called in init if no logger is provided
def __getBackupLogger(self) -> Logger:
def backupAlog(_, message, _userName=None, _customKey=None, tags=None):
systemLogger.info(message)
def backupDebugOrTrace(_, message, tags=None):
systemLogger.debug(message)
def backupInfo(_, message, tags=None):
systemLogger.info(message)
def backupWarning(_, message, tags=None):
systemLogger.warning(message)
def backupError(_, message, tags=None):
systemLogger.error(message)
def backupCritical(_, message, tags=None):
systemLogger.critical(message)
return Logger(
alog=backupAlog,
trace=backupDebugOrTrace,
debug=backupDebugOrTrace,
info=backupInfo,
warning=backupWarning,
error=backupError,
critical=backupCritical
)
[docs]
def benchmarkingOn(self):
'''
Turns on benchmarking to collect stats such as time consumed in a routine
of the template
To use add the following lines into the template:
ctx.benchmarkingOn() - place this as the first line after imports in the template
@ctx.benchmark - decorate the functions of the template to be benchmarked
ctx.benchmarkDump() - place this as the last line in the template
'''
self.benchmarking = True
[docs]
def benchmarkingOff(self):
self.benchmarking = False
[docs]
def benchmark(self, func: Callable[..., Any]) -> Callable[..., Any]:
def wrapper(*args: Any, **kwargs: Any) -> Any:
startTime = process_time_ns()
result = func(*args, **kwargs)
timer = process_time_ns() - startTime
if not self.stats.get(func.__name__):
self.stats[func.__name__] = {'sum': 0, 'count': 0, 'instances': []}
self.stats[func.__name__]['count'] += 1
self.stats[func.__name__]['sum'] += timer
self.stats[func.__name__]['instances'].append(timer)
return result
if not self.benchmarking:
return func
return wrapper
[docs]
def benchmarkDump(self):
if not self.stats:
return
self.logger.info(self, f'benchmarks for device {self.device.id}:')
for fun, timings in self.stats.items():
timings['sum'] = timings['sum'] / 1e9
timings['average'] = timings['sum'] / timings['count']
self.logger.info(self,
f"{'functions':<40}:{'total(s)':>8}{'avg(s)':>8}"
+ f"{'count':>10}")
self.logger.info(self, "-" * 40 + "-" * 8 + "-" * 8 + "-" * 11)
for fun, timings in dict(sorted(self.stats.items(),
key=lambda item: item[1]['sum'],
reverse=True)).items():
self.logger.info(self,
f"{fun:<40}:{timings['sum']:>8.4f}{timings['average']:>8.4f}"
+ f"{timings['count']:>10}")
[docs]
def getDevicesByTag(self, tag: Tag, inTopology: bool = True):
'''
Returns list of devices that have the user tag assigned to them.
If tag.value is unspecified then returns devices having that label assigned.
By default only devices in the topology are returned.
'''
devices = []
# Note use list instead of .items()
# parallel thread might add/delete tags
for devId in list(allTags := self.tags._getAllDeviceTags()):
tags = allTags.get(devId, {})
if tags.get(tag.label) and (
not tag.value or tag.value in tags.get(tag.label, [])):
if dev := self.topology._deviceMap.get(devId) if self.topology else None:
devices.append(dev)
elif not inTopology:
devices.append(Device(deviceId=devId))
return devices
[docs]
def getInterfacesByTag(self, tag: Tag, inTopology: bool = True):
'''
Returns list of interfaces that have the user tag assigned to them.
If tag.value is unspecified then returns interfaces having that label assigned.
By default only interfaces in the topology are returned.
'''
interfaces = []
# Note use list instead of .items()
# parallel thread might add/delete tags
for devId in list(allTags := self.tags._getAllInterfaceTags()):
for intfId in list(devIntfTags := allTags.get(devId, {})):
tags = devIntfTags.get(intfId, {})
if tags.get(tag.label) and (
not tag.value or tag.value in tags.get(tag.label, [])):
if dev := self.topology._deviceMap.get(devId) if self.topology else None:
if intf := dev.getInterface(intfId):
interfaces.append(intf)
elif not inTopology:
interfaces.append(Interface(name=intfId, device=dev))
elif not inTopology:
interfaces.append(
Interface(name=intfId,
device=Device(deviceId=devId)))
return interfaces