Source code for cloudvision.cvlib.studio
# Copyright (c) 2022 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.
import json
from typing import Any, Dict, List, Tuple, Optional
import google.protobuf.wrappers_pb2 as pb
from google.protobuf.timestamp_pb2 import Timestamp
from grpc import RpcError, StatusCode
from arista.studio.v1 import models, services
from arista.time.time_pb2 import TimeBounds
from cloudvision.Connector.codec import Path
from cloudvision.Connector.grpc_client import create_notification
from fmp import wrappers_pb2 as fmp_wrappers
from .constants import (
INPUT_PATH_ARG,
MAINLINE_WS_ID,
STUDIO_ID_ARG,
WORKSPACE_ID_ARG,
TIMEOUT_REQUEST,
)
from .exceptions import (
InputException,
InputNotFoundException,
InputUpdateException,
InvalidContextException,
InvalidCredentials,
)
from .utils import extractJSONEncodedListArg
from .workspace import getWorkspaceLastSynced
[docs]
class Studio:
'''
Object to store studio context:
- workspaceId: Id of the workspace
- studioId: Id of the studio
- inputs: inputs provided to the studio
- deviceIds: Ids of the devices associated with this studio
- logger: The logger to be used with this studio
- execId: Id of the execution
- buildId: Id of the studio build
'''
def __init__(self, workspaceId: str, studioId: str, inputs=None,
deviceIds=None, logger=None, execId=None, buildId=None):
self.workspaceId = workspaceId
self.studioId = studioId
self.inputs = inputs
self.deviceIds = deviceIds
self.logger = logger
self.execId = execId
self.buildId = buildId
[docs]
class StudioCustomData:
'''
Object to store studio custom data context:
- context: stores system and user-defined parameters.
- chunk_size: chunk size of stored data.
'''
def __init__(self, context):
self.context = context
self.chunk_size = 1000 * 1024
def __getBuildPath(self, studioId, path, key) -> List[str]:
'''
Builds a path for use in store/retrieve of studio custom data during a build
using the studioId, path and key provided by the user. All paths contain
"workspace/<wsId>/build/<buildId>/studio/<studioId>/customData" as the root.
Raises InvalidContextException if not enough context information is present
to create a key
'''
if (studioId and self.context and self.context.studio
and self.context.studio.buildId):
workpaceId = self.context.getWorkspaceId()
return ["workspace", workpaceId, "status", "build",
self.context.studio.buildId, "studio", studioId,
"customData"] + path + [key]
raise InvalidContextException(
"store/retrieve requires context with studio and"
+ "build associated with it.")
def __getMainlinePath(self, studioId, path, key) -> List[str]:
'''
Builds a path for use in retrieve of studio custom data from mainline
using studioID, path and key. All paths contain
"/studio/<studioId>/customData" as the root.
'''
return ["studio", studioId, "customData"] + path + [key]
[docs]
def store(self, data: str = "", path: List[str] = [], key: str = ""):
'''
store puts the passed studio custom data into a path in the Database.
The data is stored in 1MB chunks.
Params:
- data: The string data to be stored.
- path: The path to store the data at, in the form of a list of strings.
paths have "workspace/<wsId>/build/<buildId>/studio/<studioId>/customData"
as the root.
- key: The key to store the data at in the path.
'''
if not isinstance(data, str):
raise TypeError("only string data is allowed.")
if not self.context or not self.context.studio:
raise InvalidContextException("store requires a studio in the context.")
if not data:
raise ValueError("no data added.")
if not key:
raise ValueError("invalid key.")
ts = Timestamp()
ts.GetCurrentTime()
storagePath = self.__getBuildPath(self.context.studio.studioId, path, key)
# Generate the list of path pointer notifs that lead to the new entry
notifs = []
for i, pathElem in enumerate(storagePath):
# skip creation of workspace and build pointers
if i <= 4:
continue
pathPointerPath = storagePath[:i]
pathPointerUpdate = [(pathElem, Path(keys=storagePath[:i + 1]))]
notifs.append(create_notification(ts, pathPointerPath,
updates=pathPointerUpdate))
try:
self.context.getCvClient().publish(dId="cvp", notifs=notifs)
# publish data
for i in range(0, len(data), self.chunk_size):
update = [(f"{key}_{i // self.chunk_size}", data[i:i + self.chunk_size])]
notifs = [create_notification(ts, storagePath, updates=update)]
self.context.getCvClient().publish(dId="cvp", notifs=notifs)
except RpcError as exc:
# If the exception is not a permissions error, reraise the original
# exception as something went wrong
if exc.code() != StatusCode.PERMISSION_DENIED:
raise
raise InvalidCredentials(
f"Context user does not have permission to write to path '{storagePath}'")
[docs]
def retrieve(self, studioId: str = "", path: List[str] = [], searchKey: str = ""):
'''
retrieve gets the custom data from a path and key written by a studio
in the Database.
Params:
- studioId: The studioId of studio that generates the data to be retrieved.
- path: The path to get the data from, path is a list of strings.
- key: The key to get the data from in the path.
'''
if not studioId:
raise ValueError("studioId must be provided")
if not searchKey:
raise ValueError("invalid key.")
if not self.context:
raise InvalidContextException("retrieve requires context to be set")
try:
data = dict()
try:
storagePath = self.__getBuildPath(studioId, path, searchKey)
data = self.context.Get(storagePath, [], "cvp")
except InvalidContextException as e:
self.context.logger.info(
self.context, "custom data not found in build: {}".format(e.message))
# get data from mainline if data is not generated during build.
if not data:
self.context.logger.info(self.context, "reading custom data from mainline.")
storagePath = self.__getMainlinePath(studioId, path, searchKey)
data = self.context.Get(storagePath, [], "cvp")
return ''.join(data[k] for k in
sorted(data.keys(), key=lambda x: int(x.split(f'{searchKey}_')[1]))
if isinstance(data[k], str))
except RpcError as exc:
if exc.code() != StatusCode.PERMISSION_DENIED:
raise
raise InvalidCredentials(
f"Context user does not have permission to read from path '{storagePath}'")
except IndexError:
raise ValueError("Invalid Key format: {}".format(data.keys()))
[docs]
def getStudioInputs(clientGetter, studioId: str, workspaceId: str, path: List[str] = []):
'''
Uses the passed ctx.getApiClient function reference to issue get the current input state for
given combination of studioId, workspaceId and path.
Path MUST be a non-None list, omitting this argument retrieves the full studio input tree.
This function falls back to mainline state at workspace creation time (or last rebase time)
to build up the state should the workspace studio state not be created yet and checks to see
if any deletes would affect the requested input.
Raises an InputNotFoundException if the input requested does not exist.
'''
if path is None:
raise TypeError("Path must be a non-None value")
inputs = __getStudioInputs(clientGetter, studioId, workspaceId)
if not inputs:
# If we're searching for inputs on mainline and we receive none from the getAll,
# raise an exception
if workspaceId == MAINLINE_WS_ID:
raise InputNotFoundException(
path, f"Mainline inputs for studio {studioId} do not exist")
# Check the config endpoint for the workspace to ensure that
# the mainline value has not been deleted
# We need to check each individual subpath for deletes as they will take precedence
# We don't need to check the timestamps as if they have been overwritten, then
# state will exist and we will not reach here
for i in range(len(path) + 1):
subpath = path[:i]
try:
conf = __getStudioInputConfig(clientGetter, studioId, workspaceId, subpath)
except InputNotFoundException:
continue
if conf.remove.value:
raise InputNotFoundException(
path, (f"Inputs for studio {studioId} at path {path} in "
f"workspace {workspaceId} have been deleted"))
# Get the lastRebasedAt timestamp, or if that's null, then the createdAt timestamp
# of the workspace such that the correct mainline state is retrieved
wsTs = getWorkspaceLastSynced(clientGetter, workspaceId)
mainlineInputs = __getStudioInputs(clientGetter, studioId, MAINLINE_WS_ID,
start=wsTs, end=wsTs)
if not mainlineInputs:
raise InputNotFoundException(path, f"Inputs for studio {studioId} do not exist")
inputs = mainlineInputs
# In the case where a path has been specified, the inputs reflect that subtree from the root
# input, rather than from the specified path. We need to iterate through the inputs down to
# the path desired to return only the requested portion
finalInput = inputs
for pthElem in path:
try:
finalInput = finalInput[pthElem]
except TypeError:
# Stringified input path will stringify all elements, even integer values,
# so there are cases where list elements are attempted to be accessed with
# stringified indices. Attempt conversion to int and retry
try:
idx = int(pthElem)
finalInput = finalInput[idx]
except IndexError as idxE:
raise InputNotFoundException(path, f"'{pthElem}' {idxE}") from None
except (TypeError, ValueError):
raise InputNotFoundException(
path, f"{pthElem} not present in inputs {finalInput}") from None
except (KeyError, IndexError) as e:
raise InputNotFoundException(
path, f"{pthElem} not present in inputs {finalInput}: {e}") from None
return finalInput
def __getStudioInputs(clientGetter, studioId: str, workspaceId: str, start=None, end=None):
client = clientGetter(services.InputsServiceStub)
wid = pb.StringValue(value=workspaceId)
sid = pb.StringValue(value=studioId)
key = models.InputsKey(studio_id=sid, workspace_id=wid)
p_filter = models.Inputs(key=key)
startTs = None
endTs = None
if start:
startTs = start
if end:
endTs = end
timeBound = TimeBounds(start=startTs, end=endTs)
req = services.InputsStreamRequest(time=timeBound)
req.partial_eq_filter.append(p_filter)
inputs = None
# We need to issue the get requests as part of a GetAll to allow for truncated inputs
for res in client.GetAll(req, timeout=TIMEOUT_REQUEST):
inpResp = res.value
if not inpResp.inputs:
continue
path = inpResp.key.path.values
split = json.loads(inpResp.inputs.value)
inputs = mergeStudioInputs(inputs, path, split)
return inputs
def __getStudioInputConfig(clientGetter, studioId: str, workspaceId: str, path: List[str] = []):
client = clientGetter(services.InputsConfigServiceStub)
wid = pb.StringValue(value=workspaceId)
sid = pb.StringValue(value=studioId)
key = models.InputsKey(studio_id=sid, workspace_id=wid,
path=fmp_wrappers.RepeatedString(values=path))
req = services.InputsConfigRequest(key=key)
try:
configResp = client.GetOne(req, timeout=TIMEOUT_REQUEST)
except RpcError as confExc:
# If the config does not exist for the workspace, return the mainline state
if confExc.code() == StatusCode.NOT_FOUND:
raise InputNotFoundException(
path, (f"Config not found for input key with studio {studioId}"
f"workspace {workspaceId} and path {path}"))
raise
return configResp.value
[docs]
def setStudioInput(clientGetter, studioId: str, workspaceId: str, inputPath: List[str],
value: str, remove: bool = False):
'''
Uses the passed ctx.getApiClient function reference to
issue a set to the Studio inputs rAPI with the associated input path and value
'''
try:
serialized = json.dumps(value)
except TypeError as e:
raise InputException(
message=f"Cannot set value as input: {e}", inputPath=inputPath) from None
client = clientGetter(services.InputsConfigServiceStub)
wid = pb.StringValue(value=workspaceId)
sid = pb.StringValue(value=studioId)
key = models.InputsKey(studio_id=sid,
workspace_id=wid,
path=fmp_wrappers.RepeatedString(values=inputPath))
if remove:
req = services.InputsConfigSetRequest(
value=models.InputsConfig(key=key, remove=pb.BoolValue(value=remove))
)
else:
req = services.InputsConfigSetRequest(
value=models.InputsConfig(key=key, inputs=pb.StringValue(value=serialized))
)
try:
client.Set(request=req, timeout=TIMEOUT_REQUEST)
except RpcError as exc:
raise InputUpdateException(inputPath, f"Value {value} was not set: {exc}") from None
[docs]
def setStudioInputs(clientGetter, studioId: str, workspaceId: str,
inputs: List[Tuple]):
'''
Uses the passed ctx.getApiClient function reference to
issue a setSome to the Studio inputs rAPI with the associated InputsConfig
The inputs list should contain tuples of a fixed size, either with a
length of 2 or a length of 3. Tuple: (Path, Inputs) or (Path, Inputs, Remove)
a mixed list [(path, value, remove), (path, value),..] is supported
The value doesn't matter if the remove flag is True
'''
client = clientGetter(services.InputsConfigServiceStub)
wid = pb.StringValue(value=workspaceId)
sid = pb.StringValue(value=studioId)
inputsConfigs = []
for entry in inputs:
if len(entry) == 2:
path, value = entry
key = models.InputsKey(studio_id=sid,
workspace_id=wid,
path=fmp_wrappers.RepeatedString(values=path))
try:
serialized = json.dumps(value)
except TypeError as e:
raise InputException(
message=f"Cannot set value as input: {e}", inputPath=path) from None
item = models.InputsConfig(key=key, inputs=pb.StringValue(value=serialized))
elif len(entry) == 3:
path, value, remove = entry
key = models.InputsKey(studio_id=sid,
workspace_id=wid,
path=fmp_wrappers.RepeatedString(values=path))
try:
serialized = json.dumps(value)
except TypeError as e:
raise InputException(
message=f"Cannot set value as input: {e}", inputPath=path) from None
if remove:
item = models.InputsConfig(key=key, remove=pb.BoolValue(value=remove))
else:
item = models.InputsConfig(key=key, inputs=pb.StringValue(value=serialized))
else:
raise InputException(
message=f"Invalid entry length: {len(entry)}", inputPath=entry[0]) from None
inputsConfigs.append(item)
req = services.InputsConfigSetSomeRequest(
values=inputsConfigs
)
try:
for res in client.SetSome(request=req, timeout=TIMEOUT_REQUEST):
pass
except RpcError as exc:
raise InputUpdateException(err=f"Inputs {inputs} was not set: {exc}") from None
[docs]
def extractInputElems(inputs, inputPath: List[str], elems: List[str] = [],
tagElems: List[str] = []):
'''
Takes lists of elements and tag elements, and traverses through the input tree towards the
Input path, extracting the most recent matching values for these elements from the inputs.
Returns these results in a single dict, so overwriting of results will occur if specified
elements/tag elements have the same name in the inputs tree
'''
results = {}
currInput = inputs
# Go through the input path to find the associated elements
# tags are stored in the form "tags":{ "query": "<tag>:<value>" }
for pthElem in inputPath:
# Check the current input element for existence of wanted elements
for elem in elems:
if elem in currInput:
results[elem] = currInput[elem]
# Check the current input element for existence of wanted tag elements
if "tags" in currInput:
query = currInput["tags"]["query"]
for elem in tagElems:
# Add the colon used to separate value and tag in the query
tagElem = elem + ":"
if tagElem in query:
results[elem] = query[len(tagElem):]
try:
currInput = currInput[pthElem]
except TypeError:
# Stringified input path will stringify all elements, even integer values,
# so there are cases where list elements are attempted to be accessed with
# stringified indices. Attempt conversion to int and retry
try:
idx = int(pthElem)
currInput = currInput[idx]
except IndexError as idxE:
raise InputNotFoundException(inputPath, f"'{pthElem}' {idxE}") from None
except (TypeError, ValueError):
raise InputNotFoundException(inputPath) from None
except (KeyError, IndexError) as e:
raise InputNotFoundException(inputPath, f"{e} not present in inputs") from None
# Ensure sane value and allow for current input to be None
if currInput is None and pthElem != inputPath[-1]:
raise InputNotFoundException(inputPath)
return results
[docs]
def getSimpleResolverQueryValue(query: str):
'''
Autofill action arguments may be resolver queries. In these cases the string
argument is in the form of "<tag>:<Value>" or more complex queries such as
"<tag>:<ValueA> OR <tag>:<ValueB>". This function is designed to extract the
query values from a simple query.
Params:
- query: The simple query string, e.g. "<tag>:<Value>"
Returns:
- The query value, e.g. "<Value>" from the above example.
Raises an InputException in the case where the passed query is not parsable as a simple query
'''
queryElems = query.split(":")
if len(queryElems) == 1:
raise InputException(f"Passed 'query' \"{query}\" does not appear to be a query")
if len(queryElems) > 2:
raise InputException(f"Passed query \"{query}\" is a complex query")
queryValue = queryElems[1]
if len(queryValue) == 0:
raise InputException(f"Passed query \"{query}\" is missing a value")
return queryValue
[docs]
def extractStudioInfoFromArgs(args: Dict):
'''
Studio Autofill actions contains studio related information in their arguments, but a studio
is not instantiated and associated with the context. As these actions require interfacing with
studio APIs, this function extracts the studio info (verifies this info is valid if needed)
and returns it to the user in the order below.
These are (All return values may be None in the case the field is not present);
- StudioID: The ID of the studio associated with the action
- WorkspaceID: The ID of the workspace associated with the
- InputPath: The string path elements leading to the input element in the action
NOTE: Input paths containing array/list indices will be stringified, so use caution when
iterating through the input tree using this. These are not converted to integer values
as they could clash with elements containing only numbers.
The `extractInputElems` method accounts for this and is suggested over manually traversing
the tree looking for elements
'''
studioId = args.get(STUDIO_ID_ARG)
workspaceId = args.get(WORKSPACE_ID_ARG)
inputPath = None
inputPathArg = args.get(INPUT_PATH_ARG) # This is a stringified list
if inputPathArg:
try:
inputPath = extractJSONEncodedListArg(inputPathArg)
except ValueError as e:
raise ValueError("Studio input path must be a list of strings") from e
return studioId, workspaceId, inputPath
[docs]
def GetOneWithWS(apiClientGetter, stateStub, stateGetReq, configStub, confGetReq):
'''
For Studio APIs, the state for a particular workspace can be difficult to determine.
A state for a particular workspace only exists if an update has occurred for that workspace.
State may exist in mainline, or the configuration change in the workspace may have explicitly
deleted the state.
GetOneWithWS does the following to try and provide state for the get request:
- Do a get on the X state endpoint in the particular workspace for the desired state
- If the state does NOT exist, issue another get on the X state endpoint for the
mainline state.
- If the state DOES exist there, check the X configuration endpoint of the workspace to
see if the state has been explicitly deleted there.
Params:
- apiClientGetter: The API client getter, i.e. ctx.getApiClient
- stateStub: The stub for the state endpoint
- stateGetReq: A workspace-aware get request to be made to the state client for the
desired workspace. It is assumed that the get request has a key field
"workspace_id", such that mainline can be queried in the case that the
workspace query does not return anything.
- configStub: The stub for the config endpoint
- confGetReq: A workspace-aware get request to be made to the config client for the
desired workspace.
Returns:
- The request's value, or None if the resource has been deleted
'''
if not hasattr(stateGetReq.key, 'workspace_id'):
raise ValueError("Passed request to GetOneWithWS has no key attribute 'workspace_id'")
stateClient = apiClientGetter(stateStub)
# Issue a get to the state endpoint for the workspace
try:
result = stateClient.GetOne(stateGetReq, timeout=TIMEOUT_REQUEST)
except RpcError as exc:
# If the state does not exist for the workspace, reraise the original
# exception as something went wrong
if exc.code() != StatusCode.NOT_FOUND:
raise
# In the case where the original get req is for the mainline,
# nothing further we can do
if stateGetReq.key.workspace_id.value == MAINLINE_WS_ID:
raise
# Get the lastRebasedAt timestamp, or if that's null, then the createdAt timestamp
# of the workspace such that the correct mainline state is retrieved
wsTs = getWorkspaceLastSynced(apiClientGetter, stateGetReq.key.workspace_id.value)
# Try again for the mainline state
stateGetReq.key.workspace_id.value = MAINLINE_WS_ID
stateGetReq.time = wsTs
try:
result = stateClient.GetOne(stateGetReq, timeout=TIMEOUT_REQUEST)
except RpcError as mainlineExc:
# Handle the mainline error as its own exception, such that stack
# traces don't contain nested exceptions such as "when handling the
# above exception, another exception occurred"
raise mainlineExc from None
# Check the config endpoint for the workspace to ensure that
# the mainline value has not been deleted
configClient = apiClientGetter(configStub)
try:
configResp = configClient.GetOne(confGetReq, timeout=TIMEOUT_REQUEST)
except RpcError as confExc:
# If the config does not exist for the workspace, return the mainline state
if confExc.code() == StatusCode.NOT_FOUND:
return result.value
# Handle the config error as its own exception, such that stack
# traces don't contain nested exceptions such as "when handling the
# above exception, another exception occurred"
raise confExc from None
# Remove is a config field for workspace-aware configuration apis
# If it is set it means that configuration has been explicitly
# deleted for this ws
if configResp.value.remove:
# Config has been explicitly removed, return nothing
return None
return result.value
[docs]
def mergeStudioInputs(rootInputs: Any, path: List[Any], inputsToInsert: Any):
'''
Due to grpc messaging limits, large inputs may be sent out to get requests
in chunks, and should be retrieved with a GetAll to ensure all inputs
for a given studio are received.
In the case where a studio resource returns inputs in multiple responses, they need to
be spliced together to form a cohesive input object.
Params:
- rootInputs: The root object to insert the new inputs into
- path: The path in the rootInputs to insert the inputs into
- inputsToInsert: The inputs to insert into the root inputs
Returns:
- The updated root inputs
'''
prevElem: Any | None = None
prev = rootInputs
currElem = None
curr = rootInputs
# Walk down the path from the root to the value at the final element, creating any sub-objects
# or sub-lists along the way if they don't exist.
for currElem in path:
# This element is a list index...
if currElem.isnumeric():
# If the current value is not a list, set it to one.
if not isinstance(curr, list):
if prevElem is None:
rootInputs = []
curr = rootInputs
elif prevElem.isnumeric():
prevElemInt = int(prevElem)
prev[prevElemInt] = []
curr = prev[prevElemInt]
else:
prev[prevElem] = []
curr = prev[prevElem]
# If this index is past the last index of the current list, extend the list until
# it is big enough for it.
currElemInt = int(currElem)
if currElemInt >= len(curr):
while len(curr) < currElemInt + 1:
curr.append(None)
# Move to the value at the index.
prevElem = currElem
prev = curr
curr = curr[currElemInt]
# Otherwise this element is an object key...
else:
# If the current value is not an object, set it to one.
if not isinstance(curr, dict):
if prevElem is None:
rootInputs = {}
curr = rootInputs
elif prevElem.isnumeric():
prevElemInt = int(prevElem)
prev[prevElemInt] = {}
curr = prev[prevElemInt]
else:
prev[prevElem] = {}
curr = prev[prevElem]
# If the current value does not contain this
# key, add it.
if currElem not in curr:
curr[currElem] = None
# Move to the value at the key.
prevElem = currElem
prev = curr
curr = curr[currElem]
# If the path leads to an object, then merge it with the previous object.
if isinstance(curr, dict):
curr.update(inputsToInsert)
# If it leads to any other type, then simply set it to the inputsToInsert.
else:
if currElem is None:
rootInputs = inputsToInsert
elif currElem.isnumeric():
prev[int(currElem)] = inputsToInsert
else:
prev[currElem] = inputsToInsert
return rootInputs
# The following functions are input validation helpers for autofill scripts.
# Note that while the studio template scripts operate on post build validated inputs,
# the autofill scripts operate on the raw studio inputs.
[docs]
def get_tag_value_from_resolver(resolver: Dict) -> str:
"""
Return the value in single-tag resolver
"""
tags = resolver.get('tags', {}) if resolver and isinstance(resolver, dict) else {}
return tags.get('query').split(':')[1] if tags.get('query') else ""
[docs]
def validate_resolver(resolver: Dict) -> Tuple[Optional[Dict], Optional[str]]:
"""
Validates a resolver entry containing a group.
Consider no inputs as an invalid (ie. irrelevant) case.
Return the resolver inputs, tag value if valid, else None, None.
"""
if (not resolver
or not isinstance(resolver, dict)
or not (res_inputs := resolver.get('inputs'))
or not isinstance(res_inputs, dict)
or not (res_tag_value := get_tag_value_from_resolver(resolver))):
return None, None
return res_inputs, res_tag_value