Skip to content

celonis.py

Celonis

Base object to interact with Celonis Execution Management System (EMS) API.

This class is used to configure the connection to Celonis. The basic idea of this class is the integration with Celonis resource classes, e.g. 'Datamodel'. Each resource class requires an instance of Celonis to be passed to the initializer.

Parameters:

Name Type Description Default
url str

Celonis base URL.

required
api_token str

Celonis API token.

required
key_type Union[str, KeyType]

KeyType of API token. One of [APP_KEY, USER_KEY] or celonis_api.utils.KeyType.

required
read_only bool

If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods.

required
verify_ssl bool

Requiring requests to verify the TLS certificate at the remote end.
For more details see requests.Session.

required
user_agent str

Session header value for User-Agent.

required
timeout int

How long to wait for the server to send data before giving up (in seconds, 300 for data push/pull, max 600).
For more details see requests.Session.request.

120
total_retry int

Total number of retries (by default disabled, max. 10).
For more details see urllib3.Retry.

0
backoff_factor float

Factor to apply between retry attempts after the second try (by default disabled).
[0.5 < backoff_factor < 5] : ({backoff_factor} * (2 ** ({total_retry} - 1))).
For more details see urllib3.Retry.

1
cookie Optional[str]

Session header value for x-celonis-api-scope.

None
connect bool

If True connects to Celonis on initialization (initial request to check if the token & key_type combination is correct).

True
permissions bool

If True provides permission information.

True
Source code in celonis_api/celonis.py
class Celonis:
    """Base object to interact with Celonis Execution Management System (EMS) API.

    This class is used to configure the connection to Celonis.
    The basic idea of this class is the integration with Celonis
    resource classes, e.g. 'Datamodel'. Each resource class requires
    an instance of `Celonis` to be passed to the initializer.

    Args:
        url: Celonis base URL.
        api_token: Celonis API token.
        key_type: KeyType of API token. One of [`APP_KEY`, `USER_KEY`] or [celonis_api.utils.KeyType][].
        read_only: If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods.
        verify_ssl: Requiring requests to verify the TLS certificate at the remote end.<br />
            For more details see [`requests.Session`](https://docs.python-requests.org/en/latest/api/#requests.Session).
        user_agent: Session header value for `User-Agent`.
        timeout: How long to wait for the server to send data before giving up
            (in seconds, 300 for data push/pull, max 600).<br />
            For more details see
            [`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
        total_retry: Total number of retries (by default disabled, max. 10).<br />
            For more details see
            [`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
        backoff_factor: Factor to apply between retry attempts after the second try (by default disabled).<br />
            `[0.5 < backoff_factor < 5]` : (`{backoff_factor} * (2 ** ({total_retry} - 1))`).<br />
            For more details see
            [`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
        cookie: Session header value for `x-celonis-api-scope`.
        connect: If True connects to Celonis on initialization
            (initial request to check if the `token` & `key_type` combination is correct).
        permissions: If True provides permission information.
    """

    def __init__(
        self,
        url: str,
        api_token: str,
        key_type: typing.Optional[typing.Union[str, 'KeyType']],
        read_only: bool,
        verify_ssl: bool,
        user_agent: str,
        timeout: int = 120,
        total_retry: int = 0,
        backoff_factor: float = 1,
        cookie: typing.Optional[str] = None,
        connect: bool = True,
        permissions: bool = True,
    ):
        self._tracker = PyCelonisTracker()
        self._read_only = read_only
        self._timeout = timeout
        self._url = self.__determine_url(url)
        self._session = self.__setup_session(
            api_token, key_type, user_agent, total_retry, backoff_factor, cookie, verify_ssl
        )
        self._permissions: typing.List[typing.Dict] = []
        if connect:
            self.__connect()
        if permissions:
            self.__permissions()

    def __determine_url(self, url) -> str:
        regex = r"^(https?://)?([^/]+)"
        result = re.search(regex, url)

        if not result:
            raise ValueError(f"Invalid URL format: {url}")

        http = result[1] or "https://"
        url = http + result[2]
        self._logger.debug(f"Connecting to {url}")
        return url

    def __setup_session(
        self,
        api_token: str,
        key_type: typing.Optional[typing.Union[str, KeyType]],
        user_agent: str,
        total_retry: int,
        backoff_factor: float,
        cookie: str = None,
        verify_ssl: bool = True,
    ) -> Session:

        session = Session()
        session.proxies = getproxies()
        session.verify = verify_ssl

        if total_retry > 0:
            self._setup_retry(session, total_retry, backoff_factor)

        if user_agent:
            session.headers.update({"User-Agent": user_agent})

        if not key_type:
            self._logger.warning(
                f"Argument 'key_type' not set, defaults to 'APP_KEY'. Must be one of [{KeyType.values()}]."
            )
            key_type = KeyType.APP_KEY

        if key_type == KeyType.USER_KEY:
            if cookie:
                session.headers.update({"x-celonis-api-scope": api_token, "cookie": cookie})
            else:
                session.headers.update({"authorization": f"Bearer {api_token}"})
        elif key_type == KeyType.APP_KEY:
            session.headers.update({"authorization": f"AppKey {api_token}"})
        else:
            raise PyCelonisTypeError(f"Must be one of [{KeyType.values()}], but got {key_type}.")

        return session

    def _setup_retry(self, session: Session, total_retry: int, backoff_factor: float):
        if total_retry > 10:
            raise PyCelonisValueError("Invalid 'total_retry' value [max 10].")

        if not (0.5 <= backoff_factor <= 5):
            raise PyCelonisValueError("Invalid 'backoff_factor' value [0.5 < backoff_factor < 5].")

        retry = Retry(
            total=total_retry,
            backoff_factor=backoff_factor,
            status_forcelist=[429, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "PUT", "POST", "DELETE", "OPTIONS"],
        )
        adapter = HTTPAdapter(max_retries=retry)
        session.mount("https://", adapter)

    def __connect(self):
        try:
            response = self.api_request(f"{self._url}/api/cloud", timeout="connect")
            name = (
                "Hello " + response.get("nameOrEmail", "")
                if isinstance(response, dict)
                else "You are using an Application Key"
            )

            self._logger.info(f"Initial connect successful! {name}. PyCelonis Version: {__version__}")
            self._tracker.track("Started new session", extra={"tracking_type": "PYCELONIS_SESSION"})
        except PyCelonisError:
            self._logger.exception(
                f"Couldn't connect to Celonis EMS {self._url}.\n"
                "Please check if you set the correct 'key_type' and the token is valid. "
                "To learn more about setting up your Application Key correctly, "
                f"check out {self.url}/help/display/CIBC/Application+Keys."
            )

    def __permissions(self):
        try:
            self._permissions = self.api_request(f"{self.url}/api/cloud/permissions")
            pretty_permissions = json.dumps(self._permissions, sort_keys=True, indent=4)
            self._logger.info(f"Your key has following permissions:\n{pretty_permissions}")
        except PyCelonisError:
            self._logger.exception(
                "Failed to lookup permissions. This usually happens when you set the wrong 'key_type'."
            )

    @property
    def url(self) -> str:
        """Celonis base URL."""
        return self._url

    @property
    def read_only(self) -> bool:
        """If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods."""
        return self._read_only

    @property
    def timeout(self) -> int:
        """How long to wait for the server to send data before giving up (in seconds)."""
        return self._timeout

    @property
    def permissions(self) -> typing.List[typing.Dict]:
        """Dictionary listing permissions per Celonis Resource."""
        return self._permissions

    @property
    def pools(self) -> 'CelonisCollection[Pool]':
        """Get all Pool.

        !!! api "API"
            - `GET: /integration/api/pools`
            - `GET: /integration-hybrid/api/pools` (hybrid)

        Returns:
            Collection of Pools.
        """
        try:
            response = self.api_request(f"{self.url}/integration/api/pools")
            pools = CelonisCollection([Pool(self, data) for data in response])
        except PyCelonisError:
            self._logger.exception("Failed to get pools!")
            pools = CelonisCollection([])

        try:
            response = self.api_request(f"{self.url}/api/apps")
            if any([x["key"] == "hybrid" for x in response]):
                response = self.api_request(f"{self.url}/integration-hybrid/api/pools")
                hybrid_pools = CelonisCollection([HybridPool(self, data) for data in response])
                pools += hybrid_pools
        except PyCelonisError:
            self._logger.exception("Failed to get hybrid pools!")

        return pools

    def create_pool(self, name: str, safety_check: typing.Optional[bool] = True) -> Pool:
        """Creates a new Pool.

        Args:
            name: Name of the Pool.
            safety_check: Weather to check if pool already exits or not.

        Returns:
            Instance of the newly created Pool object.
        """
        if safety_check and name in self.pools.names:
            raise PyCelonisValueError(f"Pool with name {name} already exists!")
        try:
            response = self.api_request(f"{self.url}/integration/api/pools", {"name": name}, method=HttpMethod.POST)
            return Pool(self, response)
        except PyCelonisHTTPError as e:
            raise PyCelonisHTTPError(f"Failed to create pool!\n{e.message}")

    @property
    def datamodels(self) -> 'CelonisCollection[Datamodel]':
        """Get all data models by iterating over all Pool and collecting all Datamodels.

        Returns:
            Collection of Datamodels.
        """
        return CelonisCollection(itertools.chain.from_iterable(utils.threaded(self.pools, getattr, "datamodels")))

    def get_datamodel(self, datamodel_id: str) -> Datamodel:
        """Get a datamodel by ID. Much faster than datamodels.find().

        Args:
            datamodel_id: Id of the Datamodel.

        Returns:
            Datamodel by ID.
        """
        try:
            data_models: typing.Dict = self.api_request(f"{self.url}/process-mining/api/compute-pools/nodes")
        except PyCelonisError:
            data_models = self.api_request(f"{self.url}/package-manager/api/compute-pools/nodes/status")

        dm = None
        for node in data_models:
            if node["dataModelId"] == datamodel_id:
                pool = data_pool.Pool(self, {"id": node["poolId"]})
                dm = data_model.Datamodel(pool, {"id": node["dataModelId"]})
        if dm is None:
            raise PyCelonisNotFoundError(
                f"Either there exists no datamodel with id: {datamodel_id} "
                "or you do not have permissions to access it."
            )
        return dm

    def create_datamodel(self, name: str, pool: typing.Union[str, Pool]) -> Datamodel:
        """Creates a new Datamodel.

        Args:
            name: Name of the Datamodel.
            pool: Name or ID of the Pool the new Datamodel should be linked to.

        Returns:
            The newly created Datamodel object.
        """
        if not isinstance(pool, Pool):
            pool = self.pools.find(pool)
        return pool.create_datamodel(name)

    @property
    def analyses(self) -> 'CelonisCollection[Analysis]':
        """Get all Analysis.

        !!! api "API"
            - `GET: /process-mining/api/analysis`

        Returns:
            Collection of Analyses.
        """
        try:
            response = self.api_request(f"{self.url}/process-mining/api/analysis")
            analyses = CelonisCollection([Analysis(self, data) for data in response])
        except PyCelonisError:
            self._logger.exception("Failed to get analyses!")
            analyses = CelonisCollection([])

        return analyses

    def create_analysis(self, name: str, workspace: Workspace, **kwargs) -> Analysis:
        """Creates a new Analysis.

        Args:
            name: Name of the analysis.
            workspace: Workspace object the new Analysis should be linked to.
            **kwargs : Additional arguments which are passed to
                [celonis_api.process_analytics.workspace.Workspace.create_analysis][]

        Returns:
            The newly created Analysis object.
        """
        if not isinstance(workspace, Workspace):
            workspace = self.workspaces.find(workspace.id)
        return workspace.create_analysis(name, **kwargs)

    @property
    def workspaces(self) -> 'CelonisCollection[Workspace]':
        """Get all Workspaces.

        !!! api "API"
            - `GET: /process-mining/api/processes`

        Returns:
            Collection of Workspaces.
        """
        try:
            response = self.api_request(f"{self.url}/process-mining/api/processes")
            workspaces = CelonisCollection([Workspace(self, data) for data in response])
        except PyCelonisError:
            self._logger.exception("Failed to get workspaces!")
            workspaces = CelonisCollection([])

        return workspaces

    def create_workspace(self, name: str, datamodel: typing.Union[Datamodel, str]) -> Workspace:
        """Creates a new Workspace.

        Args:
            name : Name of the Workspace.
            datamodel : Name or ID of the Datamodel the Workspace should be linked to.

        Returns:
            The newly created Workspace object.
        """
        if not isinstance(datamodel, Datamodel):
            datamodel = self.datamodels.find(datamodel)
        return datamodel.create_workspace(name)

    @property
    def default_space(self) -> Space:
        """Get the Studio `Default` Space.

        Returns:
            Default Space.
        """
        return self.spaces.find("Default")

    @property
    def spaces(self) -> 'CelonisCollection[Space]':
        """Get all Studio Spaces.

        !!! api "API"
            - `GET: /package-manager/api/spaces`

        Returns:
            Collection of Spaces.
        """
        try:
            response = self.api_request(f"{self.url}/package-manager/api/spaces")
            spaces = CelonisCollection([Space(self, data) for data in response])
        except PyCelonisError:
            self._logger.exception("Failed to get spaces!")
            spaces = CelonisCollection([])

        return spaces

    def create_space(self, name: str, icon_reference: str = "earth") -> Space:
        """Creates a new Studio Space.

        Args:
            name: Name of the Space.
            icon_reference: Name of the icon to use for the Space. One of:
                    `earth`, `binocular`, `box-archive`, `building`, `building-2`, `catalog`, `earth-2`,
                    `factory`, `finance`, `horn`, `inventory`, `logistics`, `store`, `suitcase`, `wallet`.
        Returns:
            The newly created Space object.
        """
        try:
            payload = {"name": name, "iconReference": icon_reference}
            response = self.api_request(f"{self.url}/package-manager/api/spaces", payload, method=HttpMethod.POST)
            return Space(self, response["id"])
        except PyCelonisHTTPError as e:
            raise PyCelonisHTTPError(f"Failed to create space!\n{e.message}")

    @property  # type: ignore
    @deprecated("Use: space.packages.")
    def packages(self) -> 'CelonisCollection[Package]':
        """Get all Studio Packages by iterating over all Spaces and collecting all Packages.

        !!! warning
            Use `space.find("space_name").packages.find("package_name")` for better performance.

        !!! api "API"
            - `GET: /package-manager/api/nodes/tree`

        Returns:
            Collection of Studio Packages.
        """
        response = self.api_request(f"{self.url}/package-manager/api/nodes/tree")

        packages = []
        for package_data in response:
            if package_data["nodeType"] != 'PACKAGE':
                continue

            try:
                packages.append(Package(self.spaces.find(package_data["spaceId"]), self, package_data))
            except PyCelonisError:
                self._logger.exception("Failed to get package.")

        return CelonisCollection(packages)

    @deprecated("Use: celonis_api.studio.space.Space.create_package.")
    def create_package(self, name: str, key: str = None, package_type: str = "APP", space_id: str = None) -> Package:
        """Creates a new Studio Package.

        !!! warning
            This method is deprecated. Use [celonis_api.studio.space.Space.create_package][]:

            ```py
            space.create_package("package_name", "package_key", "APP")
            ```

        Args:
            name: Name of the Package.
            key: Key of the Package, if left blank defaults to the name of Package.
            package_type: Package type. Can be one of "APP", "LIBRARY", "INSTRUMENT".
            space_id: ID of Space where package will be created. If None will use the 'Default' Space.

        Returns:
            The newly created Studio Package object.
        """
        raise PyCelonisError("Deprecated method.")

    def get_knowledge_model_by_full_key(self, knowledge_model_key: str) -> KnowledgeModel:
        """Get a Knowledge Model by key (PACKAGE_KEY.KNOWLEDGE_MODEL_KEY)

        Args:
            knowledge_model_key: Full Knowledge Model key consisting of "PACKAGE_KEY.KNOWLEDGE_MODEL_KEY"

        Returns:
            The KnowledgeModel.
        """
        if knowledge_model_key.count(".") != 1:
            raise PyCelonisValueError(
                f"'{knowledge_model_key}' is not a valid knowledge_model_key. "
                "Must be of format 'PACKAGE_KEY.KNOWLEDGE_MODEL_KEY'."
            )

        package_key, km_key = knowledge_model_key.split(".")
        package = self.packages.find(package_key)
        return typing.cast(KnowledgeModel, package.get_node_by_key(km_key))

    @property
    def storage_buckets(self) -> 'CelonisCollection[Bucket]':
        """Get all Storage Buckets from file storage manager.

        !!! api "API"
            - `GET: /storage-manager/api/buckets`

        Returns:
            Collection of Bucket.
        """
        try:
            response = self.api_request(f"{self.url}/storage-manager/api/buckets")
            storage_buckets = CelonisCollection([Bucket(self, data) for data in response])
        except PyCelonisError:
            self._logger.exception("Failed to get storage_buckets!")
            storage_buckets = CelonisCollection([])

        return storage_buckets

    def create_storage_bucket(self, name: str, features: typing.List[str] = None) -> Bucket:
        """Creates a new Bucket in the file storage manager.

        Args:
            name: Name of the Bucket.
            features: Features used for the Bucket. Currently only supports SFTP.

        Returns:
            The newly created Bucket object.
        """
        try:
            payload = {
                "name": name,
                "features": features or [],
            }
            response = self.api_request(f"{self.url}/storage-manager/api/buckets", payload, method=HttpMethod.POST)
            return Bucket(self, response["id"])
        except PyCelonisHTTPError as e:
            raise PyCelonisHTTPError(f"Failed to create storage_bucket!\n{e.message}")

    @property
    def _tracking_server(self):
        """Get all Tracking Servers.

        Returns:
            Collection of TrackingServers.
        """
        from pycelonis.celonis_api.experiment_tracking.tracking_server import TrackingServer

        try:
            response = self.api_request(f"{self.url}/machine-learning/api/mlflow")
            tracking_servers = CelonisCollection([TrackingServer(self, data) for data in response])
        except PyCelonisError:
            self._logger.exception("Failed to get tracking_servers!")
            tracking_servers = CelonisCollection([])

        return tracking_servers

    def _create_tracking_server(self, name: str):
        """Creates a new ML Tracking Server.

        Args:
            name: Name of the Tracking Server.

        Returns:
            The newly created Tracking Server object.
        """
        from pycelonis.celonis_api.experiment_tracking.tracking_server import TrackingServer

        try:
            payload = {"name": name}
            response = self.api_request(f"{self.url}/machine-learning/api/mlflow", payload, method=HttpMethod.POST)
            return TrackingServer(self, response["id"])
        except PyCelonisHTTPError as e:
            raise PyCelonisHTTPError(f"Failed to create tracking_servers!\n{e.message}")

    @tracer.wrap("api_request")
    def api_request(
        self,
        url: str,
        message: typing.Union[str, typing.Dict, pathlib.Path, typing.List] = None,
        method: str = "auto",
        timeout: typing.Union[str, int] = "default",
        get_json: bool = True,
        **extra,
    ):
        """Wrapper method for
        [`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
        The session object is created and configured during `Celonis` class initialization.

        Args:
            url: Absolut URL pointing to a Celonis resource.
            message:
                * If no message is given, performs `GET` request.
                * If message is `"DELETE"` performs `DELETE` request.
                * If message is `pathlib.Path`:
                    * If the path refers to an existing file performs `POST` request with file attached.
                    * Otherwise performs `GET` request and streams payload into the path.
                * If message is a `list`, `dict` or `str`, performs `POST` request with message as JSON payload.
            method: Can be used to override `auto` detection of request method.
                    One of [`auto`, `get`, `put`, `post`, `delete`]
            timeout:
                * How long to wait for the server to send data before giving up (in seconds, max. 600):
                    * `default` = 120
                    * `connect` = 6
                    * In the case of a file transfer, the timeout must be 300 or higher. If not set defaults to 300.
            get_json: If True parses requires the response to be in JSON format and converts to dictionary.
            **extra: Additional arguments which are passed to
                [`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).

        Raises:
            PyCelonisPermissionError: When the session is read-only or the API key has not sufficient permissions.
            PyCelonisHTTPError: When timeout occurs or unexpected HTTP error.
            PyCelonisValueError: When timeout is invalid.

        Returns:
            dict | list | str | pathlib.Path: Celonis resource response.
        """

        timeout = self.__get_timeout(timeout)
        kwargs = self.__get_kwargs(url, method, timeout, message)

        self._tracker.track(
            f'{kwargs["method"]} request to {url}',
            extra={"tracking_type": "API_REQUEST", "target_url": url, "request_type": kwargs["method"]},
        )

        if self._read_only and kwargs["method"].lower() != "get" and "export" not in kwargs["url"]:
            raise PyCelonisPermissionError(
                "This is a read-only session. Use the argument read_only=False to get write access."
            )

        try:
            response = self._session.request(**kwargs, **extra)
        except ReadTimeout as e:
            raise PyCelonisHTTPError(
                f"A (read) time out error was raised. Please try to increase 'timeout' value.\nError: {e}"
            )
        except ConnectionError as e:
            raise PyCelonisHTTPError(
                f"A connection error was raised. Please check your internet connection first ('ping {self.url}')."
                f"\nError: {e}"
            )

        if response.status_code in [401, 403]:
            message = f"You don't have permission to perform {kwargs['method']} -> {url}."
            if self.permissions:
                pretty_permissions = json.dumps(self._permissions, sort_keys=True, indent=4)
                message = f"{message}\navailable permissions:\n{pretty_permissions}"
            raise PyCelonisPermissionError(message)

        try:
            response.raise_for_status()
        except HTTPError as e:
            self.__process_error(e, kwargs, response)

        if kwargs.get("stream") and isinstance(message, pathlib.Path):
            message.write_bytes(response.content)
            return message
        elif get_json and "application/json" in response.headers.get("Content-Type", ""):
            return json.loads(response.text)
        elif response.headers.get("Content-Type", "").startswith("application/octet-stream"):
            return response
        else:
            if isinstance(response.text, str) and response.text.startswith("<!DOCTYPE html>"):
                raise PyCelonisHTTPError(f"Got unexpected HTML document response:\n{response.text}", response=response)

            return response.text

    def __get_timeout(self, timeout: typing.Union[int, str]) -> int:
        if isinstance(timeout, int) and timeout > 600:
            raise PyCelonisValueError("Invalid 'timeout' value [max 600].")
        elif isinstance(timeout, str):
            if timeout == "connect":
                timeout = 6
            elif timeout == "default":
                timeout = self._timeout
            else:
                raise PyCelonisValueError("Invalid 'timeout' str value, must be one of ['connect', 'default'].")
        else:
            raise PyCelonisTypeError("Argument 'timeout' must be type int or one of ['connect', 'default'].")
        return timeout

    def __get_kwargs(self, url: str, method: str, timeout: int, message) -> typing.Dict:
        kwargs = dict(url=url, method=HttpMethod.GET, timeout=timeout)

        if message:
            if message == "DELETE":
                kwargs["method"] = HttpMethod.DELETE
            elif isinstance(message, str):
                kwargs["method"] = HttpMethod.POST
                kwargs["data"] = message.encode("utf-8")
            elif isinstance(message, pathlib.Path):
                kwargs["timeout"] = timeout if timeout > 300 else 300
                if message.is_file():
                    kwargs["method"] = HttpMethod.POST
                    kwargs["files"] = {"file": open(message, "rb")}
                else:
                    kwargs["stream"] = True
            elif isinstance(message, (dict, list)):
                kwargs["method"] = HttpMethod.POST
                kwargs["json"] = message

        if method != "auto":
            kwargs["method"] = method

        if self._logger.isEnabledFor(logging.DEBUG):
            request_data = {k: v for k, v in kwargs.items() if k not in ["method", "url"]}
            self._logger.debug(f'{kwargs["method"]} -> {url}, {request_data}')

        return kwargs

    def __process_error(self, error: HTTPError, req_kwargs: typing.Dict, response: Response):
        request_data = {k: v for k, v in req_kwargs.items() if k not in ["method", "url"]}
        response_data = {}
        if response.text:
            try:
                response_data = json.loads(response.text)
            except ValueError:
                response_data = {"text": response.text}
            if "reference" in response_data and self._tracker.enabled:
                response_data["reference"] = self._tracker.support_id

        error_message = textwrap.dedent(
            f"""
            Request : {req_kwargs["method"]} -> {req_kwargs["url"]}
            Data    : {request_data}

            Response: {str(error.args[0])}
            Data    : {response_data}
        """
        )

        raise PyCelonisHTTPError(f'Failed to process {req_kwargs["url"]}.\n{error_message}', response=response)

    @classmethod
    def from_url(
        cls,
        url: str,
        api_token: str,
        key_type: typing.Optional[typing.Union[str, 'KeyType']],
        read_only: bool,
        verify_ssl: bool,
        user_agent: str,
        timeout: int = 120,
        total_retry: int = 0,
        backoff_factor: float = 0,
        cookie: typing.Optional[str] = None,
        connect: bool = True,
        permissions: bool = True,
    ):
        """Convenient Celonis class method to interact directly with a the specified resource by URL.

        Args:
            url: Celonis base URL.
            api_token: Celonis API token.
            key_type: KeyType of API token. One of [`APP_KEY`, `USER_KEY`] or [celonis_api.utils.KeyType][].
            read_only: If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods.
            verify_ssl: Requiring requests to verify the TLS certificate at the remote end.<br />
                For more details see
                [`requests.Session`](https://docs.python-requests.org/en/latest/api/#requests.Session).
            user_agent: Session header value for `User-Agent`.
            timeout: How long to wait for the server to send data before giving up
                (in seconds, 300 for data push/pull, max 600).<br />
                For more details see
                [`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
            total_retry: Total number of retries (by default disabled, max. 10).<br />
                For more details see
                [`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
            backoff_factor: Factor to apply between retry attempts after the second try (by default disabled).<br />
                `[0 < backoff_factor < 5]` : (`{backoff_factor} * (2 ** ({total_retry} - 1))`).<br />
                For more details see
                [`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
            cookie: Session header value for `x-celonis-api-scope`.
            connect: If True connects to Celonis on initialization
                (initial request to check if the `token` & `key_type` combination is correct).
            permissions: If True provides permission information.
        """
        import re
        from urllib import parse

        celonis = cls(
            url=url,
            api_token=api_token,
            key_type=key_type,
            read_only=read_only,
            verify_ssl=verify_ssl,
            user_agent=user_agent,
            timeout=timeout,
            total_retry=total_retry,
            backoff_factor=backoff_factor,
            cookie=cookie,
            connect=connect,
            permissions=permissions,
        )

        celonis._tracker.track("from_url", extra={"tracking_type": "FROM_URL"})

        parsed_url = parse.urlparse(url)
        if "cloud" not in parsed_url.netloc:
            documents_match = re.search("documents/([^/]+)", parsed_url.fragment)
            if documents_match:
                return celonis.analyses.ids.get(documents_match.group(1))
            data_model_match = re.search("data_model/([^/]+)", parsed_url.fragment)
            if data_model_match:
                return celonis.datamodels.ids.get(data_model_match.group(1))

        if "process-mining" in parsed_url.path:
            if "workspaces" in parsed_url.query:
                return celonis.workspaces.find(parsed_url.query.split("=")[1])
            analysis_match = re.search("analysis/([^/]+)]", parsed_url.path)
            if analysis_match:
                return celonis.analyses.ids.get(analysis_match.group(1))

        if "integration" in parsed_url.path:
            pool_regex_match = re.search("pools/([^/]+)", parsed_url.path)
            data_job_match = re.search("data-jobs/([^/]+)", parsed_url.path)
            extraction_match = re.search("extractions/([^/]+)", parsed_url.path)
            transformations_match = re.search("transformations/([^/]+)", parsed_url.path)
            if extraction_match and pool_regex_match and data_job_match:
                return (
                    celonis.pools.ids.get(pool_regex_match.group(1))  # type: ignore
                    .data_jobs.ids.get(data_job_match.group(1))
                    .extractions.ids.get(extraction_match.group(1))  # type: ignore
                )
            elif pool_regex_match and data_job_match and transformations_match:
                return (
                    celonis.pools.ids.get(pool_regex_match.group(1))  # type: ignore
                    .data_jobs.ids.get(data_job_match.group(1))
                    .transformations.ids.get(transformations_match.group(1))
                )
            elif data_job_match and pool_regex_match:
                return celonis.pools.ids.get(pool_regex_match.group(1)).data_jobs.ids.get(  # type: ignore
                    parsed_url.query.split("=")[1].split("&")[0]
                )
            elif pool_regex_match:
                return celonis.pools.ids.get(pool_regex_match.group(1))

        celonis._logger.warning("Could not find the resource you were looking for. Returning the Celonis object.")
        return celonis

analyses: CelonisCollection[Analysis] property readonly

Get all Analysis.

API

  • GET: /process-mining/api/analysis

Returns:

Type Description
CelonisCollection[Analysis]

Collection of Analyses.

datamodels: CelonisCollection[Datamodel] property readonly

Get all data models by iterating over all Pool and collecting all Datamodels.

Returns:

Type Description
CelonisCollection[Datamodel]

Collection of Datamodels.

default_space: Space property readonly

Get the Studio Default Space.

Returns:

Type Description
Space

Default Space.

packages: CelonisCollection[Package] property readonly

Get all Studio Packages by iterating over all Spaces and collecting all Packages.

Warning

Use space.find("space_name").packages.find("package_name") for better performance.

API

  • GET: /package-manager/api/nodes/tree

Returns:

Type Description
CelonisCollection[Package]

Collection of Studio Packages.

permissions: List[Dict] property readonly

Dictionary listing permissions per Celonis Resource.

pools: CelonisCollection[Pool] property readonly

Get all Pool.

API

  • GET: /integration/api/pools
  • GET: /integration-hybrid/api/pools (hybrid)

Returns:

Type Description
CelonisCollection[Pool]

Collection of Pools.

read_only: bool property readonly

If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods.

spaces: CelonisCollection[Space] property readonly

Get all Studio Spaces.

API

  • GET: /package-manager/api/spaces

Returns:

Type Description
CelonisCollection[Space]

Collection of Spaces.

storage_buckets: CelonisCollection[Bucket] property readonly

Get all Storage Buckets from file storage manager.

API

  • GET: /storage-manager/api/buckets

Returns:

Type Description
CelonisCollection[Bucket]

Collection of Bucket.

timeout: int property readonly

How long to wait for the server to send data before giving up (in seconds).

url: str property readonly

Celonis base URL.

workspaces: CelonisCollection[Workspace] property readonly

Get all Workspaces.

API

  • GET: /process-mining/api/processes

Returns:

Type Description
CelonisCollection[Workspace]

Collection of Workspaces.

api_request(self, url, message=None, method='auto', timeout='default', get_json=True, **extra)

Wrapper method for requests.Session.request. The session object is created and configured during Celonis class initialization.

Parameters:

Name Type Description Default
url str

Absolut URL pointing to a Celonis resource.

required
message Union[str, Dict, pathlib.Path, List]
  • If no message is given, performs GET request.
  • If message is "DELETE" performs DELETE request.
  • If message is pathlib.Path:
    • If the path refers to an existing file performs POST request with file attached.
    • Otherwise performs GET request and streams payload into the path.
  • If message is a list, dict or str, performs POST request with message as JSON payload.
None
method str

Can be used to override auto detection of request method. One of [auto, get, put, post, delete]

'auto'
timeout Union[str, int]
  • How long to wait for the server to send data before giving up (in seconds, max. 600):
    • default = 120
    • connect = 6
    • In the case of a file transfer, the timeout must be 300 or higher. If not set defaults to 300.
'default'
get_json bool

If True parses requires the response to be in JSON format and converts to dictionary.

True
**extra

Additional arguments which are passed to requests.Session.request.

{}

Exceptions:

Type Description
PyCelonisPermissionError

When the session is read-only or the API key has not sufficient permissions.

PyCelonisHTTPError

When timeout occurs or unexpected HTTP error.

PyCelonisValueError

When timeout is invalid.

Returns:

Type Description
dict | list | str | pathlib.Path

Celonis resource response.

Source code in celonis_api/celonis.py
@tracer.wrap("api_request")
def api_request(
    self,
    url: str,
    message: typing.Union[str, typing.Dict, pathlib.Path, typing.List] = None,
    method: str = "auto",
    timeout: typing.Union[str, int] = "default",
    get_json: bool = True,
    **extra,
):
    """Wrapper method for
    [`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
    The session object is created and configured during `Celonis` class initialization.

    Args:
        url: Absolut URL pointing to a Celonis resource.
        message:
            * If no message is given, performs `GET` request.
            * If message is `"DELETE"` performs `DELETE` request.
            * If message is `pathlib.Path`:
                * If the path refers to an existing file performs `POST` request with file attached.
                * Otherwise performs `GET` request and streams payload into the path.
            * If message is a `list`, `dict` or `str`, performs `POST` request with message as JSON payload.
        method: Can be used to override `auto` detection of request method.
                One of [`auto`, `get`, `put`, `post`, `delete`]
        timeout:
            * How long to wait for the server to send data before giving up (in seconds, max. 600):
                * `default` = 120
                * `connect` = 6
                * In the case of a file transfer, the timeout must be 300 or higher. If not set defaults to 300.
        get_json: If True parses requires the response to be in JSON format and converts to dictionary.
        **extra: Additional arguments which are passed to
            [`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).

    Raises:
        PyCelonisPermissionError: When the session is read-only or the API key has not sufficient permissions.
        PyCelonisHTTPError: When timeout occurs or unexpected HTTP error.
        PyCelonisValueError: When timeout is invalid.

    Returns:
        dict | list | str | pathlib.Path: Celonis resource response.
    """

    timeout = self.__get_timeout(timeout)
    kwargs = self.__get_kwargs(url, method, timeout, message)

    self._tracker.track(
        f'{kwargs["method"]} request to {url}',
        extra={"tracking_type": "API_REQUEST", "target_url": url, "request_type": kwargs["method"]},
    )

    if self._read_only and kwargs["method"].lower() != "get" and "export" not in kwargs["url"]:
        raise PyCelonisPermissionError(
            "This is a read-only session. Use the argument read_only=False to get write access."
        )

    try:
        response = self._session.request(**kwargs, **extra)
    except ReadTimeout as e:
        raise PyCelonisHTTPError(
            f"A (read) time out error was raised. Please try to increase 'timeout' value.\nError: {e}"
        )
    except ConnectionError as e:
        raise PyCelonisHTTPError(
            f"A connection error was raised. Please check your internet connection first ('ping {self.url}')."
            f"\nError: {e}"
        )

    if response.status_code in [401, 403]:
        message = f"You don't have permission to perform {kwargs['method']} -> {url}."
        if self.permissions:
            pretty_permissions = json.dumps(self._permissions, sort_keys=True, indent=4)
            message = f"{message}\navailable permissions:\n{pretty_permissions}"
        raise PyCelonisPermissionError(message)

    try:
        response.raise_for_status()
    except HTTPError as e:
        self.__process_error(e, kwargs, response)

    if kwargs.get("stream") and isinstance(message, pathlib.Path):
        message.write_bytes(response.content)
        return message
    elif get_json and "application/json" in response.headers.get("Content-Type", ""):
        return json.loads(response.text)
    elif response.headers.get("Content-Type", "").startswith("application/octet-stream"):
        return response
    else:
        if isinstance(response.text, str) and response.text.startswith("<!DOCTYPE html>"):
            raise PyCelonisHTTPError(f"Got unexpected HTML document response:\n{response.text}", response=response)

        return response.text

create_analysis(self, name, workspace, **kwargs)

Creates a new Analysis.

Parameters:

Name Type Description Default
name str

Name of the analysis.

required
workspace Workspace

Workspace object the new Analysis should be linked to.

required
**kwargs

Additional arguments which are passed to celonis_api.process_analytics.workspace.Workspace.create_analysis

{}

Returns:

Type Description
Analysis

The newly created Analysis object.

Source code in celonis_api/celonis.py
def create_analysis(self, name: str, workspace: Workspace, **kwargs) -> Analysis:
    """Creates a new Analysis.

    Args:
        name: Name of the analysis.
        workspace: Workspace object the new Analysis should be linked to.
        **kwargs : Additional arguments which are passed to
            [celonis_api.process_analytics.workspace.Workspace.create_analysis][]

    Returns:
        The newly created Analysis object.
    """
    if not isinstance(workspace, Workspace):
        workspace = self.workspaces.find(workspace.id)
    return workspace.create_analysis(name, **kwargs)

create_datamodel(self, name, pool)

Creates a new Datamodel.

Parameters:

Name Type Description Default
name str

Name of the Datamodel.

required
pool Union[str, pycelonis.celonis_api.event_collection.data_pool.Pool]

Name or ID of the Pool the new Datamodel should be linked to.

required

Returns:

Type Description
Datamodel

The newly created Datamodel object.

Source code in celonis_api/celonis.py
def create_datamodel(self, name: str, pool: typing.Union[str, Pool]) -> Datamodel:
    """Creates a new Datamodel.

    Args:
        name: Name of the Datamodel.
        pool: Name or ID of the Pool the new Datamodel should be linked to.

    Returns:
        The newly created Datamodel object.
    """
    if not isinstance(pool, Pool):
        pool = self.pools.find(pool)
    return pool.create_datamodel(name)

create_package(self, name, key=None, package_type='APP', space_id=None)

Creates a new Studio Package.

Warning

This method is deprecated. Use celonis_api.studio.space.Space.create_package:

space.create_package("package_name", "package_key", "APP")

Parameters:

Name Type Description Default
name str

Name of the Package.

required
key str

Key of the Package, if left blank defaults to the name of Package.

None
package_type str

Package type. Can be one of "APP", "LIBRARY", "INSTRUMENT".

'APP'
space_id str

ID of Space where package will be created. If None will use the 'Default' Space.

None

Returns:

Type Description
Package

The newly created Studio Package object.

Source code in celonis_api/celonis.py
@deprecated("Use: celonis_api.studio.space.Space.create_package.")
def create_package(self, name: str, key: str = None, package_type: str = "APP", space_id: str = None) -> Package:
    """Creates a new Studio Package.

    !!! warning
        This method is deprecated. Use [celonis_api.studio.space.Space.create_package][]:

        ```py
        space.create_package("package_name", "package_key", "APP")
        ```

    Args:
        name: Name of the Package.
        key: Key of the Package, if left blank defaults to the name of Package.
        package_type: Package type. Can be one of "APP", "LIBRARY", "INSTRUMENT".
        space_id: ID of Space where package will be created. If None will use the 'Default' Space.

    Returns:
        The newly created Studio Package object.
    """
    raise PyCelonisError("Deprecated method.")

create_pool(self, name, safety_check=True)

Creates a new Pool.

Parameters:

Name Type Description Default
name str

Name of the Pool.

required
safety_check Optional[bool]

Weather to check if pool already exits or not.

True

Returns:

Type Description
Pool

Instance of the newly created Pool object.

Source code in celonis_api/celonis.py
def create_pool(self, name: str, safety_check: typing.Optional[bool] = True) -> Pool:
    """Creates a new Pool.

    Args:
        name: Name of the Pool.
        safety_check: Weather to check if pool already exits or not.

    Returns:
        Instance of the newly created Pool object.
    """
    if safety_check and name in self.pools.names:
        raise PyCelonisValueError(f"Pool with name {name} already exists!")
    try:
        response = self.api_request(f"{self.url}/integration/api/pools", {"name": name}, method=HttpMethod.POST)
        return Pool(self, response)
    except PyCelonisHTTPError as e:
        raise PyCelonisHTTPError(f"Failed to create pool!\n{e.message}")

create_space(self, name, icon_reference='earth')

Creates a new Studio Space.

Parameters:

Name Type Description Default
name str

Name of the Space.

required
icon_reference str

Name of the icon to use for the Space. One of: earth, binocular, box-archive, building, building-2, catalog, earth-2, factory, finance, horn, inventory, logistics, store, suitcase, wallet.

'earth'

Returns:

Type Description
Space

The newly created Space object.

Source code in celonis_api/celonis.py
def create_space(self, name: str, icon_reference: str = "earth") -> Space:
    """Creates a new Studio Space.

    Args:
        name: Name of the Space.
        icon_reference: Name of the icon to use for the Space. One of:
                `earth`, `binocular`, `box-archive`, `building`, `building-2`, `catalog`, `earth-2`,
                `factory`, `finance`, `horn`, `inventory`, `logistics`, `store`, `suitcase`, `wallet`.
    Returns:
        The newly created Space object.
    """
    try:
        payload = {"name": name, "iconReference": icon_reference}
        response = self.api_request(f"{self.url}/package-manager/api/spaces", payload, method=HttpMethod.POST)
        return Space(self, response["id"])
    except PyCelonisHTTPError as e:
        raise PyCelonisHTTPError(f"Failed to create space!\n{e.message}")

create_storage_bucket(self, name, features=None)

Creates a new Bucket in the file storage manager.

Parameters:

Name Type Description Default
name str

Name of the Bucket.

required
features List[str]

Features used for the Bucket. Currently only supports SFTP.

None

Returns:

Type Description
Bucket

The newly created Bucket object.

Source code in celonis_api/celonis.py
def create_storage_bucket(self, name: str, features: typing.List[str] = None) -> Bucket:
    """Creates a new Bucket in the file storage manager.

    Args:
        name: Name of the Bucket.
        features: Features used for the Bucket. Currently only supports SFTP.

    Returns:
        The newly created Bucket object.
    """
    try:
        payload = {
            "name": name,
            "features": features or [],
        }
        response = self.api_request(f"{self.url}/storage-manager/api/buckets", payload, method=HttpMethod.POST)
        return Bucket(self, response["id"])
    except PyCelonisHTTPError as e:
        raise PyCelonisHTTPError(f"Failed to create storage_bucket!\n{e.message}")

create_workspace(self, name, datamodel)

Creates a new Workspace.

Parameters:

Name Type Description Default
name

Name of the Workspace.

required
datamodel

Name or ID of the Datamodel the Workspace should be linked to.

required

Returns:

Type Description
Workspace

The newly created Workspace object.

Source code in celonis_api/celonis.py
def create_workspace(self, name: str, datamodel: typing.Union[Datamodel, str]) -> Workspace:
    """Creates a new Workspace.

    Args:
        name : Name of the Workspace.
        datamodel : Name or ID of the Datamodel the Workspace should be linked to.

    Returns:
        The newly created Workspace object.
    """
    if not isinstance(datamodel, Datamodel):
        datamodel = self.datamodels.find(datamodel)
    return datamodel.create_workspace(name)

from_url(url, api_token, key_type, read_only, verify_ssl, user_agent, timeout=120, total_retry=0, backoff_factor=0, cookie=None, connect=True, permissions=True) classmethod

Convenient Celonis class method to interact directly with a the specified resource by URL.

Parameters:

Name Type Description Default
url str

Celonis base URL.

required
api_token str

Celonis API token.

required
key_type Union[str, KeyType]

KeyType of API token. One of [APP_KEY, USER_KEY] or celonis_api.utils.KeyType.

required
read_only bool

If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods.

required
verify_ssl bool

Requiring requests to verify the TLS certificate at the remote end.
For more details see requests.Session.

required
user_agent str

Session header value for User-Agent.

required
timeout int

How long to wait for the server to send data before giving up (in seconds, 300 for data push/pull, max 600).
For more details see requests.Session.request.

120
total_retry int

Total number of retries (by default disabled, max. 10).
For more details see urllib3.Retry.

0
backoff_factor float

Factor to apply between retry attempts after the second try (by default disabled).
[0 < backoff_factor < 5] : ({backoff_factor} * (2 ** ({total_retry} - 1))).
For more details see urllib3.Retry.

0
cookie Optional[str]

Session header value for x-celonis-api-scope.

None
connect bool

If True connects to Celonis on initialization (initial request to check if the token & key_type combination is correct).

True
permissions bool

If True provides permission information.

True
Source code in celonis_api/celonis.py
@classmethod
def from_url(
    cls,
    url: str,
    api_token: str,
    key_type: typing.Optional[typing.Union[str, 'KeyType']],
    read_only: bool,
    verify_ssl: bool,
    user_agent: str,
    timeout: int = 120,
    total_retry: int = 0,
    backoff_factor: float = 0,
    cookie: typing.Optional[str] = None,
    connect: bool = True,
    permissions: bool = True,
):
    """Convenient Celonis class method to interact directly with a the specified resource by URL.

    Args:
        url: Celonis base URL.
        api_token: Celonis API token.
        key_type: KeyType of API token. One of [`APP_KEY`, `USER_KEY`] or [celonis_api.utils.KeyType][].
        read_only: If True only GET methods are allowed, set to False to enable PUT, POST and DELETE methods.
        verify_ssl: Requiring requests to verify the TLS certificate at the remote end.<br />
            For more details see
            [`requests.Session`](https://docs.python-requests.org/en/latest/api/#requests.Session).
        user_agent: Session header value for `User-Agent`.
        timeout: How long to wait for the server to send data before giving up
            (in seconds, 300 for data push/pull, max 600).<br />
            For more details see
            [`requests.Session.request`](https://docs.python-requests.org/en/latest/api/#requests.Session.request).
        total_retry: Total number of retries (by default disabled, max. 10).<br />
            For more details see
            [`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
        backoff_factor: Factor to apply between retry attempts after the second try (by default disabled).<br />
            `[0 < backoff_factor < 5]` : (`{backoff_factor} * (2 ** ({total_retry} - 1))`).<br />
            For more details see
            [`urllib3.Retry`](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry).
        cookie: Session header value for `x-celonis-api-scope`.
        connect: If True connects to Celonis on initialization
            (initial request to check if the `token` & `key_type` combination is correct).
        permissions: If True provides permission information.
    """
    import re
    from urllib import parse

    celonis = cls(
        url=url,
        api_token=api_token,
        key_type=key_type,
        read_only=read_only,
        verify_ssl=verify_ssl,
        user_agent=user_agent,
        timeout=timeout,
        total_retry=total_retry,
        backoff_factor=backoff_factor,
        cookie=cookie,
        connect=connect,
        permissions=permissions,
    )

    celonis._tracker.track("from_url", extra={"tracking_type": "FROM_URL"})

    parsed_url = parse.urlparse(url)
    if "cloud" not in parsed_url.netloc:
        documents_match = re.search("documents/([^/]+)", parsed_url.fragment)
        if documents_match:
            return celonis.analyses.ids.get(documents_match.group(1))
        data_model_match = re.search("data_model/([^/]+)", parsed_url.fragment)
        if data_model_match:
            return celonis.datamodels.ids.get(data_model_match.group(1))

    if "process-mining" in parsed_url.path:
        if "workspaces" in parsed_url.query:
            return celonis.workspaces.find(parsed_url.query.split("=")[1])
        analysis_match = re.search("analysis/([^/]+)]", parsed_url.path)
        if analysis_match:
            return celonis.analyses.ids.get(analysis_match.group(1))

    if "integration" in parsed_url.path:
        pool_regex_match = re.search("pools/([^/]+)", parsed_url.path)
        data_job_match = re.search("data-jobs/([^/]+)", parsed_url.path)
        extraction_match = re.search("extractions/([^/]+)", parsed_url.path)
        transformations_match = re.search("transformations/([^/]+)", parsed_url.path)
        if extraction_match and pool_regex_match and data_job_match:
            return (
                celonis.pools.ids.get(pool_regex_match.group(1))  # type: ignore
                .data_jobs.ids.get(data_job_match.group(1))
                .extractions.ids.get(extraction_match.group(1))  # type: ignore
            )
        elif pool_regex_match and data_job_match and transformations_match:
            return (
                celonis.pools.ids.get(pool_regex_match.group(1))  # type: ignore
                .data_jobs.ids.get(data_job_match.group(1))
                .transformations.ids.get(transformations_match.group(1))
            )
        elif data_job_match and pool_regex_match:
            return celonis.pools.ids.get(pool_regex_match.group(1)).data_jobs.ids.get(  # type: ignore
                parsed_url.query.split("=")[1].split("&")[0]
            )
        elif pool_regex_match:
            return celonis.pools.ids.get(pool_regex_match.group(1))

    celonis._logger.warning("Could not find the resource you were looking for. Returning the Celonis object.")
    return celonis

get_datamodel(self, datamodel_id)

Get a datamodel by ID. Much faster than datamodels.find().

Parameters:

Name Type Description Default
datamodel_id str

Id of the Datamodel.

required

Returns:

Type Description
Datamodel

Datamodel by ID.

Source code in celonis_api/celonis.py
def get_datamodel(self, datamodel_id: str) -> Datamodel:
    """Get a datamodel by ID. Much faster than datamodels.find().

    Args:
        datamodel_id: Id of the Datamodel.

    Returns:
        Datamodel by ID.
    """
    try:
        data_models: typing.Dict = self.api_request(f"{self.url}/process-mining/api/compute-pools/nodes")
    except PyCelonisError:
        data_models = self.api_request(f"{self.url}/package-manager/api/compute-pools/nodes/status")

    dm = None
    for node in data_models:
        if node["dataModelId"] == datamodel_id:
            pool = data_pool.Pool(self, {"id": node["poolId"]})
            dm = data_model.Datamodel(pool, {"id": node["dataModelId"]})
    if dm is None:
        raise PyCelonisNotFoundError(
            f"Either there exists no datamodel with id: {datamodel_id} "
            "or you do not have permissions to access it."
        )
    return dm

get_knowledge_model_by_full_key(self, knowledge_model_key)

Get a Knowledge Model by key (PACKAGE_KEY.KNOWLEDGE_MODEL_KEY)

Parameters:

Name Type Description Default
knowledge_model_key str

Full Knowledge Model key consisting of "PACKAGE_KEY.KNOWLEDGE_MODEL_KEY"

required

Returns:

Type Description
KnowledgeModel

The KnowledgeModel.

Source code in celonis_api/celonis.py
def get_knowledge_model_by_full_key(self, knowledge_model_key: str) -> KnowledgeModel:
    """Get a Knowledge Model by key (PACKAGE_KEY.KNOWLEDGE_MODEL_KEY)

    Args:
        knowledge_model_key: Full Knowledge Model key consisting of "PACKAGE_KEY.KNOWLEDGE_MODEL_KEY"

    Returns:
        The KnowledgeModel.
    """
    if knowledge_model_key.count(".") != 1:
        raise PyCelonisValueError(
            f"'{knowledge_model_key}' is not a valid knowledge_model_key. "
            "Must be of format 'PACKAGE_KEY.KNOWLEDGE_MODEL_KEY'."
        )

    package_key, km_key = knowledge_model_key.split(".")
    package = self.packages.find(package_key)
    return typing.cast(KnowledgeModel, package.get_node_by_key(km_key))