celonis.py
Celonis
¶
Base object to interact with Celonis Execution Management System (EMS) API.
This class is used to configure the connection to Celonis.
The basic idea of this class is the integration with Celonis
resource classes, e.g. 'Datamodel'. Each resource class requires
an instance of Celonis
to be passed to the initializer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
Celonis base URL. |
required |
api_token |
str |
Celonis API token. |
required |
key_type |
Union[str, KeyType] |
KeyType of API token. One of [ |
required |
read_only |
bool |
If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods. |
required |
verify_ssl |
bool |
Requiring requests to verify the TLS certificate at the remote end. |
required |
user_agent |
str |
Session header value for |
required |
timeout |
int |
How long to wait for the server to send data before giving up
(in seconds, 300 for data push/pull, max 600). |
120 |
total_retry |
int |
Total number of retries (by default disabled, max. 10). |
0 |
backoff_factor |
float |
Factor to apply between retry attempts after the second try (by default disabled). |
1 |
cookie |
Optional[str] |
Session header value for |
None |
connect |
bool |
If True connects to Celonis on initialization
(initial request to check if the |
True |
permissions |
bool |
If True provides permission information. |
True |
Source code in celonis_api/celonis.py
class Celonis:
"""Base object to interact with Celonis Execution Management System (EMS) API.
This class is used to configure the connection to Celonis.
The basic idea of this class is the integration with Celonis
resource classes, e.g. 'Datamodel'. Each resource class requires
an instance of `Celonis` to be passed to the initializer.
Args:
url: Celonis base URL.
api_token: Celonis API token.
key_type: KeyType of API token. One of [`APP_KEY`, `USER_KEY`] or [celonis_api.utils.KeyType][].
read_only: If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods.
verify_ssl: Requiring requests to verify the TLS certificate at the remote end.<br />
For more details see [`requests.Session`](https://docs.python-requests.org/en/latest/api/#requests.Session).
user_agent: Session header value for `User-Agent`.
timeout: How long to wait for the server to send data before giving up
(in seconds, 300 for data push/pull, max 600).<br />
For more details see
[`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
total_retry: Total number of retries (by default disabled, max. 10).<br />
For more details see
[`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
backoff_factor: Factor to apply between retry attempts after the second try (by default disabled).<br />
`[0.5 < backoff_factor < 5]` : (`{backoff_factor} * (2 ** ({total_retry} - 1))`).<br />
For more details see
[`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
cookie: Session header value for `x-celonis-api-scope`.
connect: If True connects to Celonis on initialization
(initial request to check if the `token` & `key_type` combination is correct).
permissions: If True provides permission information.
"""
def __init__(
self,
url: str,
api_token: str,
key_type: typing.Optional[typing.Union[str, 'KeyType']],
read_only: bool,
verify_ssl: bool,
user_agent: str,
timeout: int = 120,
total_retry: int = 0,
backoff_factor: float = 1,
cookie: typing.Optional[str] = None,
connect: bool = True,
permissions: bool = True,
):
self._tracker = PyCelonisTracker()
self._read_only = read_only
self._timeout = timeout
self._url = self.__determine_url(url)
self._session = self.__setup_session(
api_token, key_type, user_agent, total_retry, backoff_factor, cookie, verify_ssl
)
self._permissions: typing.List[typing.Dict] = []
if connect:
self.__connect()
if permissions:
self.__permissions()
def __determine_url(self, url) -> str:
regex = r"^(https?://)?([^/]+)"
result = re.search(regex, url)
if not result:
raise ValueError(f"Invalid URL format: {url}")
http = result[1] or "https://"
url = http + result[2]
self._logger.debug(f"Connecting to {url}")
return url
def __setup_session(
self,
api_token: str,
key_type: typing.Optional[typing.Union[str, KeyType]],
user_agent: str,
total_retry: int,
backoff_factor: float,
cookie: str = None,
verify_ssl: bool = True,
) -> Session:
session = Session()
session.proxies = getproxies()
session.verify = verify_ssl
if total_retry > 0:
self._setup_retry(session, total_retry, backoff_factor)
if user_agent:
session.headers.update({"User-Agent": user_agent})
if not key_type:
self._logger.warning(
f"Argument 'key_type' not set, defaults to 'APP_KEY'. Must be one of [{KeyType.values()}]."
)
key_type = KeyType.APP_KEY
if key_type == KeyType.USER_KEY:
if cookie:
session.headers.update({"x-celonis-api-scope": api_token, "cookie": cookie})
else:
session.headers.update({"authorization": f"Bearer {api_token}"})
elif key_type == KeyType.APP_KEY:
session.headers.update({"authorization": f"AppKey {api_token}"})
else:
raise PyCelonisTypeError(f"Must be one of [{KeyType.values()}], but got {key_type}.")
return session
def _setup_retry(self, session: Session, total_retry: int, backoff_factor: float):
if total_retry > 10:
raise PyCelonisValueError("Invalid 'total_retry' value [max 10].")
if not (0.5 <= backoff_factor <= 5):
raise PyCelonisValueError("Invalid 'backoff_factor' value [0.5 < backoff_factor < 5].")
retry = Retry(
total=total_retry,
backoff_factor=backoff_factor,
status_forcelist=[429, 502, 503, 504],
allowed_methods=["HEAD", "GET", "PUT", "POST", "DELETE", "OPTIONS"],
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("https://", adapter)
def __connect(self):
try:
response = self.api_request(f"{self._url}/api/cloud", timeout="connect")
name = (
"Hello " + response.get("nameOrEmail", "")
if isinstance(response, dict)
else "You are using an Application Key"
)
self._logger.info(f"Initial connect successful! {name}. PyCelonis Version: {__version__}")
self._tracker.track("Started new session", extra={"tracking_type": "PYCELONIS_SESSION"})
except PyCelonisError:
self._logger.exception(
f"Couldn't connect to Celonis EMS {self._url}.\n"
"Please check if you set the correct 'key_type' and the token is valid. "
"To learn more about setting up your Application Key correctly, "
f"check out {self.url}/help/display/CIBC/Application+Keys."
)
def __permissions(self):
try:
self._permissions = self.api_request(f"{self.url}/api/cloud/permissions")
pretty_permissions = json.dumps(self._permissions, sort_keys=True, indent=4)
self._logger.info(f"Your key has following permissions:\n{pretty_permissions}")
except PyCelonisError:
self._logger.exception(
"Failed to lookup permissions. This usually happens when you set the wrong 'key_type'."
)
@property
def url(self) -> str:
"""Celonis base URL."""
return self._url
@property
def read_only(self) -> bool:
"""If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods."""
return self._read_only
@property
def timeout(self) -> int:
"""How long to wait for the server to send data before giving up (in seconds)."""
return self._timeout
@property
def permissions(self) -> typing.List[typing.Dict]:
"""Dictionary listing permissions per Celonis Resource."""
return self._permissions
@property
def pools(self) -> 'CelonisCollection[Pool]':
"""Get all Pool.
!!! api "API"
- `GET: /integration/api/pools`
- `GET: /integration-hybrid/api/pools` (hybrid)
Returns:
Collection of Pools.
"""
try:
response = self.api_request(f"{self.url}/integration/api/pools")
pools = CelonisCollection([Pool(self, data) for data in response])
except PyCelonisError:
self._logger.exception("Failed to get pools!")
pools = CelonisCollection([])
try:
response = self.api_request(f"{self.url}/api/apps")
if any([x["key"] == "hybrid" for x in response]):
response = self.api_request(f"{self.url}/integration-hybrid/api/pools")
hybrid_pools = CelonisCollection([HybridPool(self, data) for data in response])
pools += hybrid_pools
except PyCelonisError:
self._logger.exception("Failed to get hybrid pools!")
return pools
def create_pool(self, name: str, safety_check: typing.Optional[bool] = True) -> Pool:
"""Creates a new Pool.
Args:
name: Name of the Pool.
safety_check: Weather to check if pool already exits or not.
Returns:
Instance of the newly created Pool object.
"""
if safety_check and name in self.pools.names:
raise PyCelonisValueError(f"Pool with name {name} already exists!")
try:
response = self.api_request(f"{self.url}/integration/api/pools", {"name": name}, method=HttpMethod.POST)
return Pool(self, response)
except PyCelonisHTTPError as e:
raise PyCelonisHTTPError(f"Failed to create pool!\n{e.message}")
@property
def datamodels(self) -> 'CelonisCollection[Datamodel]':
"""Get all data models by iterating over all Pool and collecting all Datamodels.
Returns:
Collection of Datamodels.
"""
return CelonisCollection(itertools.chain.from_iterable(utils.threaded(self.pools, getattr, "datamodels")))
def get_datamodel(self, datamodel_id: str) -> Datamodel:
"""Get a datamodel by ID. Much faster than datamodels.find().
Args:
datamodel_id: Id of the Datamodel.
Returns:
Datamodel by ID.
"""
try:
data_models: typing.Dict = self.api_request(f"{self.url}/process-mining/api/compute-pools/nodes")
except PyCelonisError:
data_models = self.api_request(f"{self.url}/package-manager/api/compute-pools/nodes/status")
dm = None
for node in data_models:
if node["dataModelId"] == datamodel_id:
pool = data_pool.Pool(self, {"id": node["poolId"]})
dm = data_model.Datamodel(pool, {"id": node["dataModelId"]})
if dm is None:
raise PyCelonisNotFoundError(
f"Either there exists no datamodel with id: {datamodel_id} "
"or you do not have permissions to access it."
)
return dm
def create_datamodel(self, name: str, pool: typing.Union[str, Pool]) -> Datamodel:
"""Creates a new Datamodel.
Args:
name: Name of the Datamodel.
pool: Name or ID of the Pool the new Datamodel should be linked to.
Returns:
The newly created Datamodel object.
"""
if not isinstance(pool, Pool):
pool = self.pools.find(pool)
return pool.create_datamodel(name)
@property
def analyses(self) -> 'CelonisCollection[Analysis]':
"""Get all Analysis.
!!! api "API"
- `GET: /process-mining/api/analysis`
Returns:
Collection of Analyses.
"""
try:
response = self.api_request(f"{self.url}/process-mining/api/analysis")
analyses = CelonisCollection([Analysis(self, data) for data in response])
except PyCelonisError:
self._logger.exception("Failed to get analyses!")
analyses = CelonisCollection([])
return analyses
def create_analysis(self, name: str, workspace: Workspace, **kwargs) -> Analysis:
"""Creates a new Analysis.
Args:
name: Name of the analysis.
workspace: Workspace object the new Analysis should be linked to.
**kwargs : Additional arguments which are passed to
[celonis_api.process_analytics.workspace.Workspace.create_analysis][]
Returns:
The newly created Analysis object.
"""
if not isinstance(workspace, Workspace):
workspace = self.workspaces.find(workspace.id)
return workspace.create_analysis(name, **kwargs)
@property
def workspaces(self) -> 'CelonisCollection[Workspace]':
"""Get all Workspaces.
!!! api "API"
- `GET: /process-mining/api/processes`
Returns:
Collection of Workspaces.
"""
try:
response = self.api_request(f"{self.url}/process-mining/api/processes")
workspaces = CelonisCollection([Workspace(self, data) for data in response])
except PyCelonisError:
self._logger.exception("Failed to get workspaces!")
workspaces = CelonisCollection([])
return workspaces
def create_workspace(self, name: str, datamodel: typing.Union[Datamodel, str]) -> Workspace:
"""Creates a new Workspace.
Args:
name : Name of the Workspace.
datamodel : Name or ID of the Datamodel the Workspace should be linked to.
Returns:
The newly created Workspace object.
"""
if not isinstance(datamodel, Datamodel):
datamodel = self.datamodels.find(datamodel)
return datamodel.create_workspace(name)
@property
def default_space(self) -> Space:
"""Get the Studio `Default` Space.
Returns:
Default Space.
"""
return self.spaces.find("Default")
@property
def spaces(self) -> 'CelonisCollection[Space]':
"""Get all Studio Spaces.
!!! api "API"
- `GET: /package-manager/api/spaces`
Returns:
Collection of Spaces.
"""
try:
response = self.api_request(f"{self.url}/package-manager/api/spaces")
spaces = CelonisCollection([Space(self, data) for data in response])
except PyCelonisError:
self._logger.exception("Failed to get spaces!")
spaces = CelonisCollection([])
return spaces
def create_space(self, name: str, icon_reference: str = "earth") -> Space:
"""Creates a new Studio Space.
Args:
name: Name of the Space.
icon_reference: Name of the icon to use for the Space. One of:
`earth`, `binocular`, `box-archive`, `building`, `building-2`, `catalog`, `earth-2`,
`factory`, `finance`, `horn`, `inventory`, `logistics`, `store`, `suitcase`, `wallet`.
Returns:
The newly created Space object.
"""
try:
payload = {"name": name, "iconReference": icon_reference}
response = self.api_request(f"{self.url}/package-manager/api/spaces", payload, method=HttpMethod.POST)
return Space(self, response["id"])
except PyCelonisHTTPError as e:
raise PyCelonisHTTPError(f"Failed to create space!\n{e.message}")
@property # type: ignore
@deprecated("Use: space.packages.")
def packages(self) -> 'CelonisCollection[Package]':
"""Get all Studio Packages by iterating over all Spaces and collecting all Packages.
!!! warning
Use `space.find("space_name").packages.find("package_name")` for better performance.
!!! api "API"
- `GET: /package-manager/api/nodes/tree`
Returns:
Collection of Studio Packages.
"""
response = self.api_request(f"{self.url}/package-manager/api/nodes/tree")
packages = []
for package_data in response:
if package_data["nodeType"] != 'PACKAGE':
continue
try:
packages.append(Package(self.spaces.find(package_data["spaceId"]), self, package_data))
except PyCelonisError:
self._logger.exception("Failed to get package.")
return CelonisCollection(packages)
@deprecated("Use: celonis_api.studio.space.Space.create_package.")
def create_package(self, name: str, key: str = None, package_type: str = "APP", space_id: str = None) -> Package:
"""Creates a new Studio Package.
!!! warning
This method is deprecated. Use [celonis_api.studio.space.Space.create_package][]:
```py
space.create_package("package_name", "package_key", "APP")
```
Args:
name: Name of the Package.
key: Key of the Package, if left blank defaults to the name of Package.
package_type: Package type. Can be one of "APP", "LIBRARY", "INSTRUMENT".
space_id: ID of Space where package will be created. If None will use the 'Default' Space.
Returns:
The newly created Studio Package object.
"""
raise PyCelonisError("Deprecated method.")
def get_knowledge_model_by_full_key(self, knowledge_model_key: str) -> KnowledgeModel:
"""Get a Knowledge Model by key (PACKAGE_KEY.KNOWLEDGE_MODEL_KEY)
Args:
knowledge_model_key: Full Knowledge Model key consisting of "PACKAGE_KEY.KNOWLEDGE_MODEL_KEY"
Returns:
The KnowledgeModel.
"""
if knowledge_model_key.count(".") != 1:
raise PyCelonisValueError(
f"'{knowledge_model_key}' is not a valid knowledge_model_key. "
"Must be of format 'PACKAGE_KEY.KNOWLEDGE_MODEL_KEY'."
)
package_key, km_key = knowledge_model_key.split(".")
package = self.packages.find(package_key)
return typing.cast(KnowledgeModel, package.get_node_by_key(km_key))
@property
def storage_buckets(self) -> 'CelonisCollection[Bucket]':
"""Get all Storage Buckets from file storage manager.
!!! api "API"
- `GET: /storage-manager/api/buckets`
Returns:
Collection of Bucket.
"""
try:
response = self.api_request(f"{self.url}/storage-manager/api/buckets")
storage_buckets = CelonisCollection([Bucket(self, data) for data in response])
except PyCelonisError:
self._logger.exception("Failed to get storage_buckets!")
storage_buckets = CelonisCollection([])
return storage_buckets
def create_storage_bucket(self, name: str, features: typing.List[str] = None) -> Bucket:
"""Creates a new Bucket in the file storage manager.
Args:
name: Name of the Bucket.
features: Features used for the Bucket. Currently only supports SFTP.
Returns:
The newly created Bucket object.
"""
try:
payload = {
"name": name,
"features": features or [],
}
response = self.api_request(f"{self.url}/storage-manager/api/buckets", payload, method=HttpMethod.POST)
return Bucket(self, response["id"])
except PyCelonisHTTPError as e:
raise PyCelonisHTTPError(f"Failed to create storage_bucket!\n{e.message}")
@property
def _tracking_server(self):
"""Get all Tracking Servers.
Returns:
Collection of TrackingServers.
"""
from pycelonis.celonis_api.experiment_tracking.tracking_server import TrackingServer
try:
response = self.api_request(f"{self.url}/machine-learning/api/mlflow")
tracking_servers = CelonisCollection([TrackingServer(self, data) for data in response])
except PyCelonisError:
self._logger.exception("Failed to get tracking_servers!")
tracking_servers = CelonisCollection([])
return tracking_servers
def _create_tracking_server(self, name: str):
"""Creates a new ML Tracking Server.
Args:
name: Name of the Tracking Server.
Returns:
The newly created Tracking Server object.
"""
from pycelonis.celonis_api.experiment_tracking.tracking_server import TrackingServer
try:
payload = {"name": name}
response = self.api_request(f"{self.url}/machine-learning/api/mlflow", payload, method=HttpMethod.POST)
return TrackingServer(self, response["id"])
except PyCelonisHTTPError as e:
raise PyCelonisHTTPError(f"Failed to create tracking_servers!\n{e.message}")
@tracer.wrap("api_request")
def api_request(
self,
url: str,
message: typing.Union[str, typing.Dict, pathlib.Path, typing.List] = None,
method: str = "auto",
timeout: typing.Union[str, int] = "default",
get_json: bool = True,
**extra,
):
"""Wrapper method for
[`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
The session object is created and configured during `Celonis` class initialization.
Args:
url: Absolut URL pointing to a Celonis resource.
message:
* If no message is given, performs `GET` request.
* If message is `"DELETE"` performs `DELETE` request.
* If message is `pathlib.Path`:
* If the path refers to an existing file performs `POST` request with file attached.
* Otherwise performs `GET` request and streams payload into the path.
* If message is a `list`, `dict` or `str`, performs `POST` request with message as JSON payload.
method: Can be used to override `auto` detection of request method.
One of [`auto`, `get`, `put`, `post`, `delete`]
timeout:
* How long to wait for the server to send data before giving up (in seconds, max. 600):
* `default` = 120
* `connect` = 6
* In the case of a file transfer, the timeout must be 300 or higher. If not set defaults to 300.
get_json: If True parses requires the response to be in JSON format and converts to dictionary.
**extra: Additional arguments which are passed to
[`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
Raises:
PyCelonisPermissionError: When the session is read-only or the API key has not sufficient permissions.
PyCelonisHTTPError: When timeout occurs or unexpected HTTP error.
PyCelonisValueError: When timeout is invalid.
Returns:
dict | list | str | pathlib.Path: Celonis resource response.
"""
timeout = self.__get_timeout(timeout)
kwargs = self.__get_kwargs(url, method, timeout, message)
self._tracker.track(
f'{kwargs["method"]} request to {url}',
extra={"tracking_type": "API_REQUEST", "target_url": url, "request_type": kwargs["method"]},
)
if self._read_only and kwargs["method"].lower() != "get" and "export" not in kwargs["url"]:
raise PyCelonisPermissionError(
"This is a read-only session. Use the argument read_only=False to get write access."
)
try:
response = self._session.request(**kwargs, **extra)
except ReadTimeout as e:
raise PyCelonisHTTPError(
f"A (read) time out error was raised. Please try to increase 'timeout' value.\nError: {e}"
)
except ConnectionError as e:
raise PyCelonisHTTPError(
f"A connection error was raised. Please check your internet connection first ('ping {self.url}')."
f"\nError: {e}"
)
if response.status_code in [401, 403]:
message = f"You don't have permission to perform {kwargs['method']} -> {url}."
if self.permissions:
pretty_permissions = json.dumps(self._permissions, sort_keys=True, indent=4)
message = f"{message}\navailable permissions:\n{pretty_permissions}"
raise PyCelonisPermissionError(message)
try:
response.raise_for_status()
except HTTPError as e:
self.__process_error(e, kwargs, response)
if kwargs.get("stream") and isinstance(message, pathlib.Path):
message.write_bytes(response.content)
return message
elif get_json and "application/json" in response.headers.get("Content-Type", ""):
return json.loads(response.text)
elif response.headers.get("Content-Type", "").startswith("application/octet-stream"):
return response
else:
if isinstance(response.text, str) and response.text.startswith("<!DOCTYPE html>"):
raise PyCelonisHTTPError(f"Got unexpected HTML document response:\n{response.text}", response=response)
return response.text
def __get_timeout(self, timeout: typing.Union[int, str]) -> int:
if isinstance(timeout, int) and timeout > 600:
raise PyCelonisValueError("Invalid 'timeout' value [max 600].")
elif isinstance(timeout, str):
if timeout == "connect":
timeout = 6
elif timeout == "default":
timeout = self._timeout
else:
raise PyCelonisValueError("Invalid 'timeout' str value, must be one of ['connect', 'default'].")
else:
raise PyCelonisTypeError("Argument 'timeout' must be type int or one of ['connect', 'default'].")
return timeout
def __get_kwargs(self, url: str, method: str, timeout: int, message) -> typing.Dict:
kwargs = dict(url=url, method=HttpMethod.GET, timeout=timeout)
if message:
if message == "DELETE":
kwargs["method"] = HttpMethod.DELETE
elif isinstance(message, str):
kwargs["method"] = HttpMethod.POST
kwargs["data"] = message.encode("utf-8")
elif isinstance(message, pathlib.Path):
kwargs["timeout"] = timeout if timeout > 300 else 300
if message.is_file():
kwargs["method"] = HttpMethod.POST
kwargs["files"] = {"file": open(message, "rb")}
else:
kwargs["stream"] = True
elif isinstance(message, (dict, list)):
kwargs["method"] = HttpMethod.POST
kwargs["json"] = message
if method != "auto":
kwargs["method"] = method
if self._logger.isEnabledFor(logging.DEBUG):
request_data = {k: v for k, v in kwargs.items() if k not in ["method", "url"]}
self._logger.debug(f'{kwargs["method"]} -> {url}, {request_data}')
return kwargs
def __process_error(self, error: HTTPError, req_kwargs: typing.Dict, response: Response):
request_data = {k: v for k, v in req_kwargs.items() if k not in ["method", "url"]}
response_data = {}
if response.text:
try:
response_data = json.loads(response.text)
except ValueError:
response_data = {"text": response.text}
if "reference" in response_data and self._tracker.enabled:
response_data["reference"] = self._tracker.support_id
error_message = textwrap.dedent(
f"""
Request : {req_kwargs["method"]} -> {req_kwargs["url"]}
Data : {request_data}
Response: {str(error.args[0])}
Data : {response_data}
"""
)
raise PyCelonisHTTPError(f'Failed to process {req_kwargs["url"]}.\n{error_message}', response=response)
@classmethod
def from_url(
cls,
url: str,
api_token: str,
key_type: typing.Optional[typing.Union[str, 'KeyType']],
read_only: bool,
verify_ssl: bool,
user_agent: str,
timeout: int = 120,
total_retry: int = 0,
backoff_factor: float = 0,
cookie: typing.Optional[str] = None,
connect: bool = True,
permissions: bool = True,
):
"""Convenient Celonis class method to interact directly with a the specified resource by URL.
Args:
url: Celonis base URL.
api_token: Celonis API token.
key_type: KeyType of API token. One of [`APP_KEY`, `USER_KEY`] or [celonis_api.utils.KeyType][].
read_only: If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods.
verify_ssl: Requiring requests to verify the TLS certificate at the remote end.<br />
For more details see
[`requests.Session`](https://docs.python-requests.org/en/latest/api/#requests.Session).
user_agent: Session header value for `User-Agent`.
timeout: How long to wait for the server to send data before giving up
(in seconds, 300 for data push/pull, max 600).<br />
For more details see
[`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
total_retry: Total number of retries (by default disabled, max. 10).<br />
For more details see
[`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
backoff_factor: Factor to apply between retry attempts after the second try (by default disabled).<br />
`[0 < backoff_factor < 5]` : (`{backoff_factor} * (2 ** ({total_retry} - 1))`).<br />
For more details see
[`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
cookie: Session header value for `x-celonis-api-scope`.
connect: If True connects to Celonis on initialization
(initial request to check if the `token` & `key_type` combination is correct).
permissions: If True provides permission information.
"""
import re
from urllib import parse
celonis = cls(
url=url,
api_token=api_token,
key_type=key_type,
read_only=read_only,
verify_ssl=verify_ssl,
user_agent=user_agent,
timeout=timeout,
total_retry=total_retry,
backoff_factor=backoff_factor,
cookie=cookie,
connect=connect,
permissions=permissions,
)
celonis._tracker.track("from_url", extra={"tracking_type": "FROM_URL"})
parsed_url = parse.urlparse(url)
if "cloud" not in parsed_url.netloc:
documents_match = re.search("documents/([^/]+)", parsed_url.fragment)
if documents_match:
return celonis.analyses.ids.get(documents_match.group(1))
data_model_match = re.search("data_model/([^/]+)", parsed_url.fragment)
if data_model_match:
return celonis.datamodels.ids.get(data_model_match.group(1))
if "process-mining" in parsed_url.path:
if "workspaces" in parsed_url.query:
return celonis.workspaces.find(parsed_url.query.split("=")[1])
analysis_match = re.search("analysis/([^/]+)]", parsed_url.path)
if analysis_match:
return celonis.analyses.ids.get(analysis_match.group(1))
if "integration" in parsed_url.path:
pool_regex_match = re.search("pools/([^/]+)", parsed_url.path)
data_job_match = re.search("data-jobs/([^/]+)", parsed_url.path)
extraction_match = re.search("extractions/([^/]+)", parsed_url.path)
transformations_match = re.search("transformations/([^/]+)", parsed_url.path)
if extraction_match and pool_regex_match and data_job_match:
return (
celonis.pools.ids.get(pool_regex_match.group(1)) # type: ignore
.data_jobs.ids.get(data_job_match.group(1))
.extractions.ids.get(extraction_match.group(1)) # type: ignore
)
elif pool_regex_match and data_job_match and transformations_match:
return (
celonis.pools.ids.get(pool_regex_match.group(1)) # type: ignore
.data_jobs.ids.get(data_job_match.group(1))
.transformations.ids.get(transformations_match.group(1))
)
elif data_job_match and pool_regex_match:
return celonis.pools.ids.get(pool_regex_match.group(1)).data_jobs.ids.get( # type: ignore
parsed_url.query.split("=")[1].split("&")[0]
)
elif pool_regex_match:
return celonis.pools.ids.get(pool_regex_match.group(1))
celonis._logger.warning("Could not find the resource you were looking for. Returning the Celonis object.")
return celonis
analyses: CelonisCollection[Analysis]
property
readonly
¶
Get all Analysis.
API
GET: /process-mining/api/analysis
Returns:
Type | Description |
---|---|
CelonisCollection[Analysis] |
Collection of Analyses. |
datamodels: CelonisCollection[Datamodel]
property
readonly
¶
Get all data models by iterating over all Pool and collecting all Datamodels.
Returns:
Type | Description |
---|---|
CelonisCollection[Datamodel] |
Collection of Datamodels. |
default_space: Space
property
readonly
¶
Get the Studio Default
Space.
Returns:
Type | Description |
---|---|
Space |
Default Space. |
packages: CelonisCollection[Package]
property
readonly
¶
Get all Studio Packages by iterating over all Spaces and collecting all Packages.
Warning
Use space.find("space_name").packages.find("package_name")
for better performance.
API
GET: /package-manager/api/nodes/tree
Returns:
Type | Description |
---|---|
CelonisCollection[Package] |
Collection of Studio Packages. |
permissions: List[Dict]
property
readonly
¶
Dictionary listing permissions per Celonis Resource.
pools: CelonisCollection[Pool]
property
readonly
¶
Get all Pool.
API
GET: /integration/api/pools
GET: /integration-hybrid/api/pools
(hybrid)
Returns:
Type | Description |
---|---|
CelonisCollection[Pool] |
Collection of Pools. |
read_only: bool
property
readonly
¶
If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods.
spaces: CelonisCollection[Space]
property
readonly
¶
Get all Studio Spaces.
API
GET: /package-manager/api/spaces
Returns:
Type | Description |
---|---|
CelonisCollection[Space] |
Collection of Spaces. |
storage_buckets: CelonisCollection[Bucket]
property
readonly
¶
Get all Storage Buckets from file storage manager.
API
GET: /storage-manager/api/buckets
Returns:
Type | Description |
---|---|
CelonisCollection[Bucket] |
Collection of Bucket. |
timeout: int
property
readonly
¶
How long to wait for the server to send data before giving up (in seconds).
url: str
property
readonly
¶
Celonis base URL.
workspaces: CelonisCollection[Workspace]
property
readonly
¶
Get all Workspaces.
API
GET: /process-mining/api/processes
Returns:
Type | Description |
---|---|
CelonisCollection[Workspace] |
Collection of Workspaces. |
api_request(self, url, message=None, method='auto', timeout='default', get_json=True, **extra)
¶
Wrapper method for
requests.Session.request
.
The session object is created and configured during Celonis
class initialization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
Absolut URL pointing to a Celonis resource. |
required |
message |
Union[str, Dict, pathlib.Path, List] |
|
None |
method |
str |
Can be used to override |
'auto' |
timeout |
Union[str, int] |
|
'default' |
get_json |
bool |
If True parses requires the response to be in JSON format and converts to dictionary. |
True |
**extra |
Additional arguments which are passed to
|
{} |
Exceptions:
Type | Description |
---|---|
PyCelonisPermissionError |
When the session is read-only or the API key has not sufficient permissions. |
PyCelonisHTTPError |
When timeout occurs or unexpected HTTP error. |
PyCelonisValueError |
When timeout is invalid. |
Returns:
Type | Description |
---|---|
dict | list | str | pathlib.Path |
Celonis resource response. |
Source code in celonis_api/celonis.py
@tracer.wrap("api_request")
def api_request(
self,
url: str,
message: typing.Union[str, typing.Dict, pathlib.Path, typing.List] = None,
method: str = "auto",
timeout: typing.Union[str, int] = "default",
get_json: bool = True,
**extra,
):
"""Wrapper method for
[`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
The session object is created and configured during `Celonis` class initialization.
Args:
url: Absolut URL pointing to a Celonis resource.
message:
* If no message is given, performs `GET` request.
* If message is `"DELETE"` performs `DELETE` request.
* If message is `pathlib.Path`:
* If the path refers to an existing file performs `POST` request with file attached.
* Otherwise performs `GET` request and streams payload into the path.
* If message is a `list`, `dict` or `str`, performs `POST` request with message as JSON payload.
method: Can be used to override `auto` detection of request method.
One of [`auto`, `get`, `put`, `post`, `delete`]
timeout:
* How long to wait for the server to send data before giving up (in seconds, max. 600):
* `default` = 120
* `connect` = 6
* In the case of a file transfer, the timeout must be 300 or higher. If not set defaults to 300.
get_json: If True parses requires the response to be in JSON format and converts to dictionary.
**extra: Additional arguments which are passed to
[`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
Raises:
PyCelonisPermissionError: When the session is read-only or the API key has not sufficient permissions.
PyCelonisHTTPError: When timeout occurs or unexpected HTTP error.
PyCelonisValueError: When timeout is invalid.
Returns:
dict | list | str | pathlib.Path: Celonis resource response.
"""
timeout = self.__get_timeout(timeout)
kwargs = self.__get_kwargs(url, method, timeout, message)
self._tracker.track(
f'{kwargs["method"]} request to {url}',
extra={"tracking_type": "API_REQUEST", "target_url": url, "request_type": kwargs["method"]},
)
if self._read_only and kwargs["method"].lower() != "get" and "export" not in kwargs["url"]:
raise PyCelonisPermissionError(
"This is a read-only session. Use the argument read_only=False to get write access."
)
try:
response = self._session.request(**kwargs, **extra)
except ReadTimeout as e:
raise PyCelonisHTTPError(
f"A (read) time out error was raised. Please try to increase 'timeout' value.\nError: {e}"
)
except ConnectionError as e:
raise PyCelonisHTTPError(
f"A connection error was raised. Please check your internet connection first ('ping {self.url}')."
f"\nError: {e}"
)
if response.status_code in [401, 403]:
message = f"You don't have permission to perform {kwargs['method']} -> {url}."
if self.permissions:
pretty_permissions = json.dumps(self._permissions, sort_keys=True, indent=4)
message = f"{message}\navailable permissions:\n{pretty_permissions}"
raise PyCelonisPermissionError(message)
try:
response.raise_for_status()
except HTTPError as e:
self.__process_error(e, kwargs, response)
if kwargs.get("stream") and isinstance(message, pathlib.Path):
message.write_bytes(response.content)
return message
elif get_json and "application/json" in response.headers.get("Content-Type", ""):
return json.loads(response.text)
elif response.headers.get("Content-Type", "").startswith("application/octet-stream"):
return response
else:
if isinstance(response.text, str) and response.text.startswith("<!DOCTYPE html>"):
raise PyCelonisHTTPError(f"Got unexpected HTML document response:\n{response.text}", response=response)
return response.text
create_analysis(self, name, workspace, **kwargs)
¶
Creates a new Analysis.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the analysis. |
required |
workspace |
Workspace |
Workspace object the new Analysis should be linked to. |
required |
**kwargs |
Additional arguments which are passed to celonis_api.process_analytics.workspace.Workspace.create_analysis |
{} |
Returns:
Type | Description |
---|---|
Analysis |
The newly created Analysis object. |
Source code in celonis_api/celonis.py
def create_analysis(self, name: str, workspace: Workspace, **kwargs) -> Analysis:
"""Creates a new Analysis.
Args:
name: Name of the analysis.
workspace: Workspace object the new Analysis should be linked to.
**kwargs : Additional arguments which are passed to
[celonis_api.process_analytics.workspace.Workspace.create_analysis][]
Returns:
The newly created Analysis object.
"""
if not isinstance(workspace, Workspace):
workspace = self.workspaces.find(workspace.id)
return workspace.create_analysis(name, **kwargs)
create_datamodel(self, name, pool)
¶
Creates a new Datamodel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the Datamodel. |
required |
pool |
Union[str, pycelonis.celonis_api.event_collection.data_pool.Pool] |
Name or ID of the Pool the new Datamodel should be linked to. |
required |
Returns:
Type | Description |
---|---|
Datamodel |
The newly created Datamodel object. |
Source code in celonis_api/celonis.py
def create_datamodel(self, name: str, pool: typing.Union[str, Pool]) -> Datamodel:
"""Creates a new Datamodel.
Args:
name: Name of the Datamodel.
pool: Name or ID of the Pool the new Datamodel should be linked to.
Returns:
The newly created Datamodel object.
"""
if not isinstance(pool, Pool):
pool = self.pools.find(pool)
return pool.create_datamodel(name)
create_package(self, name, key=None, package_type='APP', space_id=None)
¶
Creates a new Studio Package.
Warning
This method is deprecated. Use celonis_api.studio.space.Space.create_package:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the Package. |
required |
key |
str |
Key of the Package, if left blank defaults to the name of Package. |
None |
package_type |
str |
Package type. Can be one of "APP", "LIBRARY", "INSTRUMENT". |
'APP' |
space_id |
str |
ID of Space where package will be created. If None will use the 'Default' Space. |
None |
Returns:
Type | Description |
---|---|
Package |
The newly created Studio Package object. |
Source code in celonis_api/celonis.py
@deprecated("Use: celonis_api.studio.space.Space.create_package.")
def create_package(self, name: str, key: str = None, package_type: str = "APP", space_id: str = None) -> Package:
"""Creates a new Studio Package.
!!! warning
This method is deprecated. Use [celonis_api.studio.space.Space.create_package][]:
```py
space.create_package("package_name", "package_key", "APP")
```
Args:
name: Name of the Package.
key: Key of the Package, if left blank defaults to the name of Package.
package_type: Package type. Can be one of "APP", "LIBRARY", "INSTRUMENT".
space_id: ID of Space where package will be created. If None will use the 'Default' Space.
Returns:
The newly created Studio Package object.
"""
raise PyCelonisError("Deprecated method.")
create_pool(self, name, safety_check=True)
¶
Creates a new Pool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the Pool. |
required |
safety_check |
Optional[bool] |
Weather to check if pool already exits or not. |
True |
Returns:
Type | Description |
---|---|
Pool |
Instance of the newly created Pool object. |
Source code in celonis_api/celonis.py
def create_pool(self, name: str, safety_check: typing.Optional[bool] = True) -> Pool:
"""Creates a new Pool.
Args:
name: Name of the Pool.
safety_check: Weather to check if pool already exits or not.
Returns:
Instance of the newly created Pool object.
"""
if safety_check and name in self.pools.names:
raise PyCelonisValueError(f"Pool with name {name} already exists!")
try:
response = self.api_request(f"{self.url}/integration/api/pools", {"name": name}, method=HttpMethod.POST)
return Pool(self, response)
except PyCelonisHTTPError as e:
raise PyCelonisHTTPError(f"Failed to create pool!\n{e.message}")
create_space(self, name, icon_reference='earth')
¶
Creates a new Studio Space.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the Space. |
required |
icon_reference |
str |
Name of the icon to use for the Space. One of:
|
'earth' |
Returns:
Type | Description |
---|---|
Space |
The newly created Space object. |
Source code in celonis_api/celonis.py
def create_space(self, name: str, icon_reference: str = "earth") -> Space:
"""Creates a new Studio Space.
Args:
name: Name of the Space.
icon_reference: Name of the icon to use for the Space. One of:
`earth`, `binocular`, `box-archive`, `building`, `building-2`, `catalog`, `earth-2`,
`factory`, `finance`, `horn`, `inventory`, `logistics`, `store`, `suitcase`, `wallet`.
Returns:
The newly created Space object.
"""
try:
payload = {"name": name, "iconReference": icon_reference}
response = self.api_request(f"{self.url}/package-manager/api/spaces", payload, method=HttpMethod.POST)
return Space(self, response["id"])
except PyCelonisHTTPError as e:
raise PyCelonisHTTPError(f"Failed to create space!\n{e.message}")
create_storage_bucket(self, name, features=None)
¶
Creates a new Bucket in the file storage manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the Bucket. |
required |
features |
List[str] |
Features used for the Bucket. Currently only supports SFTP. |
None |
Returns:
Type | Description |
---|---|
Bucket |
The newly created Bucket object. |
Source code in celonis_api/celonis.py
def create_storage_bucket(self, name: str, features: typing.List[str] = None) -> Bucket:
"""Creates a new Bucket in the file storage manager.
Args:
name: Name of the Bucket.
features: Features used for the Bucket. Currently only supports SFTP.
Returns:
The newly created Bucket object.
"""
try:
payload = {
"name": name,
"features": features or [],
}
response = self.api_request(f"{self.url}/storage-manager/api/buckets", payload, method=HttpMethod.POST)
return Bucket(self, response["id"])
except PyCelonisHTTPError as e:
raise PyCelonisHTTPError(f"Failed to create storage_bucket!\n{e.message}")
create_workspace(self, name, datamodel)
¶
Creates a new Workspace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Name of the Workspace. |
required | |
datamodel |
Name or ID of the Datamodel the Workspace should be linked to. |
required |
Returns:
Type | Description |
---|---|
Workspace |
The newly created Workspace object. |
Source code in celonis_api/celonis.py
def create_workspace(self, name: str, datamodel: typing.Union[Datamodel, str]) -> Workspace:
"""Creates a new Workspace.
Args:
name : Name of the Workspace.
datamodel : Name or ID of the Datamodel the Workspace should be linked to.
Returns:
The newly created Workspace object.
"""
if not isinstance(datamodel, Datamodel):
datamodel = self.datamodels.find(datamodel)
return datamodel.create_workspace(name)
from_url(url, api_token, key_type, read_only, verify_ssl, user_agent, timeout=120, total_retry=0, backoff_factor=0, cookie=None, connect=True, permissions=True)
classmethod
¶
Convenient Celonis class method to interact directly with a the specified resource by URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
Celonis base URL. |
required |
api_token |
str |
Celonis API token. |
required |
key_type |
Union[str, KeyType] |
KeyType of API token. One of [ |
required |
read_only |
bool |
If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods. |
required |
verify_ssl |
bool |
Requiring requests to verify the TLS certificate at the remote end. |
required |
user_agent |
str |
Session header value for |
required |
timeout |
int |
How long to wait for the server to send data before giving up
(in seconds, 300 for data push/pull, max 600). |
120 |
total_retry |
int |
Total number of retries (by default disabled, max. 10). |
0 |
backoff_factor |
float |
Factor to apply between retry attempts after the second try (by default disabled). |
0 |
cookie |
Optional[str] |
Session header value for |
None |
connect |
bool |
If True connects to Celonis on initialization
(initial request to check if the |
True |
permissions |
bool |
If True provides permission information. |
True |
Source code in celonis_api/celonis.py
@classmethod
def from_url(
cls,
url: str,
api_token: str,
key_type: typing.Optional[typing.Union[str, 'KeyType']],
read_only: bool,
verify_ssl: bool,
user_agent: str,
timeout: int = 120,
total_retry: int = 0,
backoff_factor: float = 0,
cookie: typing.Optional[str] = None,
connect: bool = True,
permissions: bool = True,
):
"""Convenient Celonis class method to interact directly with a the specified resource by URL.
Args:
url: Celonis base URL.
api_token: Celonis API token.
key_type: KeyType of API token. One of [`APP_KEY`, `USER_KEY`] or [celonis_api.utils.KeyType][].
read_only: If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods.
verify_ssl: Requiring requests to verify the TLS certificate at the remote end.<br />
For more details see
[`requests.Session`](https://docs.python-requests.org/en/latest/api/#requests.Session).
user_agent: Session header value for `User-Agent`.
timeout: How long to wait for the server to send data before giving up
(in seconds, 300 for data push/pull, max 600).<br />
For more details see
[`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
total_retry: Total number of retries (by default disabled, max. 10).<br />
For more details see
[`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
backoff_factor: Factor to apply between retry attempts after the second try (by default disabled).<br />
`[0 < backoff_factor < 5]` : (`{backoff_factor} * (2 ** ({total_retry} - 1))`).<br />
For more details see
[`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
cookie: Session header value for `x-celonis-api-scope`.
connect: If True connects to Celonis on initialization
(initial request to check if the `token` & `key_type` combination is correct).
permissions: If True provides permission information.
"""
import re
from urllib import parse
celonis = cls(
url=url,
api_token=api_token,
key_type=key_type,
read_only=read_only,
verify_ssl=verify_ssl,
user_agent=user_agent,
timeout=timeout,
total_retry=total_retry,
backoff_factor=backoff_factor,
cookie=cookie,
connect=connect,
permissions=permissions,
)
celonis._tracker.track("from_url", extra={"tracking_type": "FROM_URL"})
parsed_url = parse.urlparse(url)
if "cloud" not in parsed_url.netloc:
documents_match = re.search("documents/([^/]+)", parsed_url.fragment)
if documents_match:
return celonis.analyses.ids.get(documents_match.group(1))
data_model_match = re.search("data_model/([^/]+)", parsed_url.fragment)
if data_model_match:
return celonis.datamodels.ids.get(data_model_match.group(1))
if "process-mining" in parsed_url.path:
if "workspaces" in parsed_url.query:
return celonis.workspaces.find(parsed_url.query.split("=")[1])
analysis_match = re.search("analysis/([^/]+)]", parsed_url.path)
if analysis_match:
return celonis.analyses.ids.get(analysis_match.group(1))
if "integration" in parsed_url.path:
pool_regex_match = re.search("pools/([^/]+)", parsed_url.path)
data_job_match = re.search("data-jobs/([^/]+)", parsed_url.path)
extraction_match = re.search("extractions/([^/]+)", parsed_url.path)
transformations_match = re.search("transformations/([^/]+)", parsed_url.path)
if extraction_match and pool_regex_match and data_job_match:
return (
celonis.pools.ids.get(pool_regex_match.group(1)) # type: ignore
.data_jobs.ids.get(data_job_match.group(1))
.extractions.ids.get(extraction_match.group(1)) # type: ignore
)
elif pool_regex_match and data_job_match and transformations_match:
return (
celonis.pools.ids.get(pool_regex_match.group(1)) # type: ignore
.data_jobs.ids.get(data_job_match.group(1))
.transformations.ids.get(transformations_match.group(1))
)
elif data_job_match and pool_regex_match:
return celonis.pools.ids.get(pool_regex_match.group(1)).data_jobs.ids.get( # type: ignore
parsed_url.query.split("=")[1].split("&")[0]
)
elif pool_regex_match:
return celonis.pools.ids.get(pool_regex_match.group(1))
celonis._logger.warning("Could not find the resource you were looking for. Returning the Celonis object.")
return celonis
get_datamodel(self, datamodel_id)
¶
Get a datamodel by ID. Much faster than datamodels.find().
Parameters:
Name | Type | Description | Default |
---|---|---|---|
datamodel_id |
str |
Id of the Datamodel. |
required |
Returns:
Type | Description |
---|---|
Datamodel |
Datamodel by ID. |
Source code in celonis_api/celonis.py
def get_datamodel(self, datamodel_id: str) -> Datamodel:
"""Get a datamodel by ID. Much faster than datamodels.find().
Args:
datamodel_id: Id of the Datamodel.
Returns:
Datamodel by ID.
"""
try:
data_models: typing.Dict = self.api_request(f"{self.url}/process-mining/api/compute-pools/nodes")
except PyCelonisError:
data_models = self.api_request(f"{self.url}/package-manager/api/compute-pools/nodes/status")
dm = None
for node in data_models:
if node["dataModelId"] == datamodel_id:
pool = data_pool.Pool(self, {"id": node["poolId"]})
dm = data_model.Datamodel(pool, {"id": node["dataModelId"]})
if dm is None:
raise PyCelonisNotFoundError(
f"Either there exists no datamodel with id: {datamodel_id} "
"or you do not have permissions to access it."
)
return dm
get_knowledge_model_by_full_key(self, knowledge_model_key)
¶
Get a Knowledge Model by key (PACKAGE_KEY.KNOWLEDGE_MODEL_KEY)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
knowledge_model_key |
str |
Full Knowledge Model key consisting of "PACKAGE_KEY.KNOWLEDGE_MODEL_KEY" |
required |
Returns:
Type | Description |
---|---|
KnowledgeModel |
The KnowledgeModel. |
Source code in celonis_api/celonis.py
def get_knowledge_model_by_full_key(self, knowledge_model_key: str) -> KnowledgeModel:
"""Get a Knowledge Model by key (PACKAGE_KEY.KNOWLEDGE_MODEL_KEY)
Args:
knowledge_model_key: Full Knowledge Model key consisting of "PACKAGE_KEY.KNOWLEDGE_MODEL_KEY"
Returns:
The KnowledgeModel.
"""
if knowledge_model_key.count(".") != 1:
raise PyCelonisValueError(
f"'{knowledge_model_key}' is not a valid knowledge_model_key. "
"Must be of format 'PACKAGE_KEY.KNOWLEDGE_MODEL_KEY'."
)
package_key, km_key = knowledge_model_key.split(".")
package = self.packages.find(package_key)
return typing.cast(KnowledgeModel, package.get_node_by_key(km_key))