Skip to content

data_model.py

Datamodel (CelonisApiObjectChild)

Datamodel object to interact with Celonis Event Collection API.

Source code in celonis_api/event_collection/data_model.py
class Datamodel(CelonisApiObjectChild):
    """Datamodel object to interact with Celonis Event Collection API."""

    def __init__(self, parent: typing.Union[CelonisApiObject, str], id_or_data, **kwargs):
        super().__init__(parent=parent, id_or_data=id_or_data, **kwargs)
        self._compute_node = ComputeNodeFactory.create_compute_node(
            id=self.id, datamodel=self, celonis=self.parent.celonis
        )

    @property
    def _parent_class(self):
        from pycelonis.celonis_api.event_collection.data_pool import Pool

        return Pool

    @property
    def pool(self) -> 'Pool':
        from pycelonis.celonis_api.event_collection.data_pool import Pool

        return typing.cast(Pool, self.parent)

    @property
    def url(self) -> str:
        """
        !!! api "API"
            - `/integration/api/pools/{pool_id}/data-models/{datamodel_id}`
        """
        return f"{self.parent.url}/data-models/{self.id}"

    @property
    def tables(self) -> 'CelonisCollection[DatamodelTable]':
        """Get all Datamodel Tables.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{table_id}`

        Returns:
            Collection of Datamodel Tables.
        """
        return CelonisCollection([DatamodelTable(self, t) for t in self.data["tables"]])

    @deprecated("Use 'celonis_api.event_collection.data_model.Datamodel.process_configurations'.")  # type: ignore
    @property
    def process_configuration(self) -> 'DatamodelProcessConfiguration':
        """Get the Datamodels Process Configuration.
        !!! warning
            This method is deprecated. Use [celonis_api.event_collection.data_model.Datamodel.process_configurations][]:

            ```py
            dm.process_configurations[0]
            ```

        Returns:
            The first Process Configuration of the all Datamodels Process Configurations.
        """
        configs = self.process_configurations
        if len(configs) > 0:
            return configs[0]
        else:
            raise PyCelonisNotFoundError("Couldn't find any process configuration.")

    @property
    def process_configurations(self) -> 'CelonisCollection[DatamodelProcessConfiguration]':
        """Get all Datamodels Process Configurations.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations`

        Returns:
            A Collection the Datamodel's Process Configurations.
        """
        return CelonisCollection(DatamodelProcessConfiguration(self, pc) for pc in self._process_configuration_data)

    @property
    def _process_configuration_data(self) -> typing.List:
        try:
            return self.celonis.api_request(f"{self.url}/process-configurations")
        except PyCelonisHTTPError:
            return [self.celonis.api_request(f"{self.url}/process-configuration")]

    @property
    def default_activity_table(self) -> 'typing.Optional[DatamodelTable]':
        """Get/Set the default Activity Table via Process Configuration.

        ```py
        # Get
        default_at = dm.default_activity_table
        # Set
        dm.default_activity_table = default_at
        ```

        !!! api "API"
            - `PUT: /integration/api/pools/{pool_id}/data-models/process-configurations/default-activity-table`
                ```json
                {
                    "id": datamodel_table.id,
                    "dataSourceId": datamodel_table.data["dataSourceId"],
                    "aliasOrName": datamodel_table.alias,
                    "alias": datamodel_table.alias,
                    "dataModelId": self.id,
                    "name": datamodel_table.source_name,
                    "useDirectStorage": False,
                    "columns": [],
                }
                ```

        Returns:
            The default Activity Table.
        """
        activity_tables = self._process_configuration_data
        default_act = [a for a in activity_tables if a.get("defaultConfiguration", False)]
        if len(default_act) > 0:
            return self.tables.find(default_act[0]["activityTableId"])
        elif len(activity_tables) == 1:
            return self.tables.find(activity_tables[0]["activityTableId"])
        else:
            self._logger.info("No default activity table set.")
            return None

    @default_activity_table.setter
    def default_activity_table(self, table: typing.Union[str, 'DatamodelTable']):
        if isinstance(table, str):
            datamodel_table = self.tables.find(table)
        elif isinstance(table, DatamodelTable):
            datamodel_table = table
        else:
            raise PyCelonisTypeError("default_activity_table must be of type string or DatamodelTable")
        if datamodel_table.id not in [act["activityTableId"] for act in self._process_configuration_data]:
            raise PyCelonisValueError("default_activity_table must be an activity table")

        payload = {
            "id": datamodel_table.id,
            "dataSourceId": datamodel_table.data["dataSourceId"],
            "aliasOrName": datamodel_table.alias,
            "alias": datamodel_table.alias,
            "dataModelId": self.id,
            "name": datamodel_table.source_name,
            "useDirectStorage": False,
            "columns": [],
        }
        url = f"{self.url}/process-configurations/default-activity-table"
        self.celonis.api_request(url, message=payload, method=HttpMethod.PUT)

    def create_process_configuration(
        self,
        activity_table: typing.Union[str, 'DatamodelTable'] = None,
        case_table: typing.Union[str, 'DatamodelTable'] = None,
        case_column: str = None,
        activity_column: str = None,
        timestamp_column: str = None,
        sorting_column: str = None,
    ) -> 'CelonisCollection[DatamodelProcessConfiguration]':
        """Creates a new Process Configuration.

        !!! api "API"
            - `PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations`
                ```json
                {
                    "activityTableId": activity_table_id,
                    "caseIdColumn": case_column,
                    "activityColumn": activity_column,
                    "timestampColumn": timestamp_column,
                    "sortingColumn": sorting_column,
                    "caseTableId": case_table_id
                }
                ```

        Args:
            activity_table: Name of Activity Table or DatamodelTable object.
            case_table: Name of Case Table or DatamodelTable object.
            case_column: Case Column Name referring to column in activity table.
            activity_column: Activity Column Name referring to column in activity table.
            timestamp_column: Timestamp Column Name referring to column in activity table.
            sorting_column: Sorting Column Name.

        Returns:
            A Collection the Datamodel's Process Configurations.
        """
        payload = {}
        if activity_table:
            if not isinstance(activity_table, DatamodelTable):
                activity_table = self.tables.find(activity_table)

            _columns = {case_column, activity_column, timestamp_column}
            _activity_table_columns = set(col["name"] for col in activity_table.columns)
            check_columns = None in _columns or not _columns.issubset(_activity_table_columns)

            if check_columns:
                raise PyCelonisNotFoundError("Problem finding case, activity or timestamp column.")

            payload.update(
                {
                    "activityTableId": activity_table.id,
                    "caseIdColumn": case_column,
                    "activityColumn": activity_column,
                    "timestampColumn": timestamp_column,
                    "sortingColumn": sorting_column,
                }
            )

        if case_table:
            if not isinstance(case_table, DatamodelTable):
                case_table = self.tables.find(case_table)
            payload.update({"caseTableId": case_table.id})

        try:
            self.celonis.api_request(f"{self.url}/process-configurations", payload, method=HttpMethod.PUT)
        except PyCelonisHTTPError:
            self.celonis.api_request(f"{self.url}/process-configuration", payload, method=HttpMethod.PUT)

        return self.process_configurations

    @property
    def case_table_key(self) -> typing.List:
        """Get all Case Table Keys.

        Returns:
            Sorted List of Case Table Keys.
        """
        keys = []
        if len(self.process_configurations) > 0:
            config = self.process_configurations[0]
            tables = [config.case_table.id, config.activity_table.id]  # pylint: disable=no-member
        for fk in self.data["foreignKeys"]:
            if [fk["sourceTableId"], fk["targetTableId"]] == tables:
                for key in fk["columns"]:
                    keys.append(key["sourceColumnName"])
                break
            elif [fk["sourceTableId"], fk["targetTableId"]] == tables[::-1]:
                for key in fk["columns"]:
                    keys.append(key["targetColumnName"])
                break

        return sorted(keys)

    @property
    def foreign_keys(self) -> "ForeignKeyList":
        """Get all Foreign Keys.

        Returns:
            A List of Foreign Keys.
        """
        return ForeignKeyList(self.data["foreignKeys"], self.tables)

    def _find_table_object(self, table_name) -> 'DatamodelTable':
        if isinstance(table_name, DatamodelTable):
            return table_name
        else:
            table = self.tables.names.get(table_name)
            if table is None:
                table_list = list(self.tables.filter(lambda x: x.source_name == table_name))
                if table_list:
                    table = table_list[0]
                else:
                    raise PyCelonisNotFoundError(f"Table with name {table_name} can't be found in Data Model.")
            return table

    def _get_foreign_key_connection(self, source_table, target_table) -> typing.List:
        old_columns = []
        source_name = source_table.alias if source_table.alias is not None else source_table.name
        target_name = target_table.alias if target_table.alias is not None else target_table.name

        for fk in self.foreign_keys:
            if sorted((fk["source_table"], fk["target_table"])) == sorted((source_name, target_name)):
                for colset in fk["columns"]:
                    if (colset[0], colset[1]) not in old_columns:
                        old_columns.append(colset)

        return old_columns

    def create_foreign_key(
        self,
        source_table: typing.Union[str, "DatamodelTable"],
        target_table: typing.Union[str, "DatamodelTable"],
        columns: typing.List[typing.Tuple[str, str]],
        **kwargs,
    ) -> typing.Dict:
        """Creates a new Foreign Key between the source_table (child(1)) and the target_table (parent(N)).

        Args:
            source_table: Name of Source Table or DatamodelTable object.
            target_table: Name of Target Table or DatamodelTable object.
            columns: List of 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName' which represents
                the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..]

        Returns:
            The newly created Foreign Key configuration as Dictionary.

        Raises:
            PyCelonisValueError: If tables not found or columns not found or foreign key connection
                already exists between the specified tables.
            PyCelonisTypeError: If table columns have different data types.
        """

        # convert source_table and target_table to DatamodelTable
        source_table, target_table = map(self._find_table_object, [source_table, target_table])
        # assure that foreign keys are made between pairs of columns
        columns = [columns] if isinstance(columns, str) else columns  # type: ignore
        columns = [(c, c) if isinstance(c, str) else c for c in columns]
        if not all(map(lambda pair: len(pair) == 2, columns)):
            raise PyCelonisValueError("A pair must have exactly two elements.")

        # check if the foreign key already exists
        old_fk = self._get_foreign_key_connection(source_table, target_table)
        if old_fk:
            raise PyCelonisValueError(
                "A foreign key already exists between the two "
                f"given tables {source_table}, {target_table}: \n{old_fk}"
            )
        source_cols = {
            col["name"]: col["type"]
            for col in source_table.columns
            if col["name"] in list(map(operator.itemgetter(0), columns))
        }
        target_cols = {
            col["name"]: col["type"]
            for col in target_table.columns
            if col["name"] in list(map(operator.itemgetter(1), columns))
        }
        # check if given columns present
        for i, table_cols in enumerate([source_cols, target_cols]):
            key_cols = set(map(operator.itemgetter(i), columns))
            if not key_cols.issubset(set(table_cols)):
                raise PyCelonisValueError(
                    f"Please check if specified columns"
                    f" {key_cols.difference(table_cols)} "
                    f"are in either source or target table."
                )
        # check that data type coincide
        for tuple_item in columns:
            if source_cols.get(tuple_item[0]) != target_cols.get(tuple_item[1]):
                raise PyCelonisTypeError(
                    f"The specified columns {tuple_item[0]} and "
                    f"{tuple_item[1]} have different data types: "
                    f"{source_cols.get(tuple_item[0])} and "
                    f"{target_cols.get(tuple_item[1])}. "
                    "Join on columns with different data types not possible."
                )

        url, payload = self._payload_new_foreign_key(target_table, source_table, columns)
        return self.celonis.api_request(url, payload)

    @deprecated(
        "Use 'celonis_api.event_collection.data_pool.Pool.create_table'. "
        "After the table is created, you can add the new table from the "
        "pool to your datamodel via 'add_table_from_pool'."
    )
    def push_table(self, df_or_path, table_name, reload_datamodel=True, **kwargs) -> 'DatamodelTable':
        """Pushes a table to a Pool, adds it the Datamodel and by default reloads the Datamodel.

        !!! warning
            This method is deprecated. Use [create_table][celonis_api.event_collection.data_pool.Pool.create_table].

        Returns:
            The newly added Datamodel Table.
        """
        self.parent.push_table(df_or_path, table_name, **kwargs)
        table = self.tables.names.get(table_name) or self.add_table_from_pool(table_name)
        if reload_datamodel:
            self.reload(tables=table_name)
        return table

    def reload(self, from_cache: bool = False, tables: typing.List = None, wait_for_reload: bool = True) -> typing.Dict:
        """Reloads the Datamodel only if a reload is not already running.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/reload`
                ```json
                    { "forceComplete": from_cache }
                ```

        Args:
            from_cache: Deprecated, as it is no longer supported for data models. Will always be set to False.
            tables: List of Datamodel Tables or table ids. If given a partial reload will be performed on these tables,
                from_cache will be ignored.
            wait_for_reload: If True waits until the Datamodel is reloaded.

        Returns:
            Report of the reload.
        """
        if from_cache:
            self._logger.warning(
                "Reloading data models from cache is no longer supported. Defaulting to complete reload."
            )

        response = self.load_status

        if response is not None and response["loadStatus"] == "RUNNING":
            self._logger.warning("A Data Model Reload is already in progress. No new load is triggered.")
            return response

        if not tables:
            reload_type = "Complete"
            payload = {"forceComplete": True}
            self.celonis.api_request(f"{self.url}/reload", method=HttpMethod.POST, params=payload)
        else:
            reload_type = "Partial"
            payload = self._get_table_ids(tables)  # type: ignore
            url = (
                f"{self.celonis.url}/integration/api/v1/data-pools/"
                f"{self.pool.id}/data-models/{self.id}/load/partial-sync"
            )
            self.celonis.api_request(url, payload)

        response = self.load_status
        if wait_for_reload:
            response = self._wait_for_reload(reload_type)

        return response

    def _get_table_ids(self, tables: typing.List) -> typing.List:
        table_ids = []

        tables = [tables] if not isinstance(tables, list) else tables
        for tab in tables:
            if isinstance(tab, DatamodelTable):
                table_ids += [tab.id]
            else:
                dm_table = self.tables.names.get(tab, None)
                if dm_table is None:
                    raise PyCelonisValueError(f"The table {tab} does not exist in the datamodel.")
                table_ids += [dm_table.id]
        return table_ids

    def _wait_for_reload(self, reload_type="") -> typing.Dict:
        iterations = 0
        error_count = 0
        while True:
            try:
                load_status = self.load_status
                if load_status["loadStatus"] == "ERROR" and load_status["message"] == "Lost connection to load":
                    time.sleep(3)
                elif load_status["loadStatus"] != "RUNNING":
                    self._logger.info("Data Model reload done")
                    break

                error_count = 0
                iterations += 1
                if iterations % 5 == 0:
                    self._logger.info(f"{reload_type} Data Model reload running...")
                time.sleep(1)
            except (PyCelonisHTTPError, PyCelonisPermissionError) as e:
                error_count += 1
                time.sleep(3)
                self._logger.exception("Failed to request load status, trying again...")
                if error_count > 5:
                    raise e

        if load_status["loadStatus"] not in ["SUCCESS", "WARNING"]:
            raise PyCelonisHTTPError(f"Data Model reload did not succeed, status: {load_status}")

        return load_status

    @property
    def load_status(self) -> typing.Dict:
        """Get the Datamodels Load Status.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/load-history/load-info-sync`

        Returns:
            Load status.
        """
        response = self.celonis.api_request(f"{self.url}/load-history/load-info-sync")
        return response["loadInfo"]["currentComputeLoad"]

    def _payload_add_table_from_pool(
        self, connection: typing.Optional[typing.Union['DataConnection', str]], table_name: str, alias: str = None
    ) -> typing.Tuple[str, typing.List]:
        if connection:
            from pycelonis.celonis_api.event_collection.data_pool import DataConnection

            if isinstance(connection, DataConnection):
                connection = connection.id
            elif isinstance(connection, str) and check_uuid(connection):
                raise PyCelonisTypeError(
                    "Argument 'connection' should either be of type "
                    "DataConnection or str (valid uuid of DataConnection)."
                )

        # Add table from pool to datamodel
        url = f"{self.url.replace('/data-models/', '/data-model/')}/tables"
        payload = [{"name": table_name, "alias": alias, "dataModelId": self.id, "dataSourceId": connection}]

        return url, payload

    def _payload_new_foreign_key(
        self, target_table: 'DatamodelTable', source_table: 'DatamodelTable', foreign_key_columns: typing.List
    ) -> typing.Tuple[str, typing.Dict]:
        columns = []
        for foreign_key_tuple in foreign_key_columns:
            columns.append(
                {"sourceColumnName": f"{foreign_key_tuple[0]}", "targetColumnName": f"{foreign_key_tuple[1]}"}
            )

        if not source_table and not target_table:
            raise PyCelonisValueError("Arguments 'source_table' and 'target_table' must not be None.")

        url = f"{self.url}/foreign-keys"
        payload = {
            "dataModelId": self.id,
            "sourceTableId": source_table.id,
            "targetTableId": target_table.id,
            "columns": columns,
        }
        return url, payload

    def add_table_from_pool(
        self,
        table_name: str,  # objects_base.BaseDatamodelTable
        alias: str = None,
        connection: typing.Union['DataConnection', str] = None,
        new_foreign_key_to_table: str = None,
        added_table_join_type: str = "source",
        foreign_key_columns: typing.List[typing.Tuple[str, str]] = None,
        reload: str = None,
    ) -> "DatamodelTable":
        """Adds a Table from Pool to Datamodel. If the Table names and the ey parameters are provided
        it also can connect two tables directly.

        Args:
            table_name: Name of the table to be added in the Pool.
            alias: An alias name for the new table, by default None
            added_table_join_type: Add table to be pushed as 'source' table or as 'target' table regarding
                the connection. One of [`source`, `target`].
            new_foreign_key_to_table: Set connection to or from this table.
            foreign_key_columns : List 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName'
                which represents the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..]
                between two tables.
            connection: DataConnection object or ID, uses Global if not specified.
            reload: Reload type. One of  [`FORCE_COMPLETE`, `PARTIAL_ON_TABLE`],
                if not specified doesn't reload.

        Returns:
            DatamodelTable.
        """
        # check if table exists in pool
        pool_table_names = [table["name"] for table in self.pool.tables if "name" in table]
        if table_name not in pool_table_names:
            raise PyCelonisNotFoundError(
                f"table_name '{table_name}' not found in Data Pool tables: {', '.join(pool_table_names)}"
            )

        # check if table already added to dm
        self._check_table_already_added_to_dm(alias, table_name)

        url, payload = self._payload_add_table_from_pool(connection, table_name, alias)
        response = self.celonis.api_request(url, payload)

        # Connect tables via foreign keys
        if new_foreign_key_to_table:
            if added_table_join_type == 'source':
                source_table = alias if alias else table_name
                target_table = new_foreign_key_to_table
            elif added_table_join_type == 'target':
                source_table = new_foreign_key_to_table
                target_table = table_name
            else:
                raise PyCelonisValueError(
                    f"Entered value '{added_table_join_type}' for "
                    "'added_table_join_type' is invalid. Please use 'source' or 'target'."
                )

            if foreign_key_columns is None:
                raise PyCelonisValueError("Foreign key columns can't be none if new_foreign_key_to_table is set")

            self.create_foreign_key(source_table=source_table, target_table=target_table, columns=foreign_key_columns)

        if reload == "FORCE_COMPLETE":
            self.reload(from_cache=False)
        elif reload == "PARTIAL_ON_TABLE":
            self.reload(tables=response[0].get("aliasOrName"))

        return response

    def _check_table_already_added_to_dm(self, alias: typing.Optional[str], table_name: str):
        alias_or_name = alias if alias else table_name
        for table in self.tables:
            _table_name = table.data.get("aliasOrName")
            if alias_or_name == _table_name:
                raise PyCelonisValueError(f"Table with name {alias_or_name} already exists in the Data Model.")

    @property
    def workspaces(self) -> 'CelonisCollection[Workspace]':
        """Get all Process Mining Workspaces. Uses [celonis_api.celonis.Celonis.workspaces][].

        Returns:
            Collection of Workspaces.
        """
        return self.celonis.workspaces.filter(self.id, "dataModelId")

    def create_workspace(self, name) -> 'Workspace':
        """Creates a new Workspace.

        !!! api "API"
            - `POST: /process-mining/api/processes`
                ```json
                    {
                        "name": name,
                        "dataModelId": self.id
                    }
                ```

        Args:
            name : Name of the Workspace.

        Returns:
            The newly created Workspace object.
        """
        from pycelonis.celonis_api.process_analytics.workspace import Workspace

        if name in self.celonis.workspaces.names:
            raise PyCelonisValueError(f"Workspace with name {name} already exists!")

        payload = {"name": name, "dataModelId": self.id}
        response = self.celonis.api_request(f"{self.celonis.url}/process-mining/api/processes", payload)
        return Workspace(self.celonis, response)

    @property
    def last_change_dates(self) -> pd.DataFrame:
        """Returns the latest change date found in tables."""
        tables = self.tables
        df = pd.DataFrame()
        for t in tables:
            # Find maximum value of _CELONIS_CHANGE_DATE for table
            date_df = self._get_data_frame(
                pql.PQLColumn(f'MAX("{t.name}"."_CELONIS_CHANGE_DATE")', "MAX_CELONIS_CHANGE_DATE")
            )
            date_df.index = [t.name]
            df = df.append(date_df)  # type: ignore
        return df

    def _get_data_file(self, pql_query, file_path=None, chunksize=None, **kwargs) -> pathlib.Path:
        # needed for compatibility with cpm4 and base classes
        return self._compute_node.get_data_file(pql_query=pql_query, file_path=file_path, **kwargs)

    def get_data_file(
        self,
        pql_query: pql.PQL,
        file_path: typing.Union[str, pathlib.Path] = None,
        verify_query: bool = False,
        **kwargs,
    ) -> pathlib.Path:
        """Exports the results of a PQL query as
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
        and returns the path to the exported file.
        Uses [celonis_api.event_collection.compute_node.ComputeNode.get_data_file][].

        Args:
            pql_query: The table query to be executed.
            file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.
            verify_query: Whether query should be verified before execution.

        Returns:
            Path to downloaded file containing the results of the query.
        """
        if verify_query:
            self.verify_query(pql_query)
        return self._compute_node.get_data_file(pql_query=pql_query, file_path=file_path, **kwargs)

    @property
    def name_mapping(self) -> typing.Dict:
        """Get the Datamodels Name Mappings.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/name-mapping`

        Returns:
            Dictionary containing the Name Mappings.
        """
        return self.celonis.api_request(f"{self.url}/name-mapping")

    def download_name_mapping(self, file_path: typing.Union[str, pathlib.Path] = NAME_MAPPING_XLSX) -> pathlib.Path:
        """Downloads the Datamodels Name Mappings.

        Args:
            file_path: Download path to Excel File.

        Returns:
            Path to downloaded Excel File.
        """
        return self.celonis.api_request(f"{self.url}/name-mapping/template", pathlib.Path(file_path), method="GET")

    def upload_name_mapping(self, file_path: typing.Union[str, pathlib.Path]) -> typing.Dict:
        """Uploads the Datamodels Name Mappings Excel File.

        Args:
            file_path: Upload path to Excel File.

        Returns:
            Upload Result.
        """
        file_path = pathlib.Path(file_path)
        if not file_path.is_file():
            raise PyCelonisValueError("Argument 'file_path' must refer to a file.")

        additional_headers = {"x-csrf-token": self.celonis._session.cookies.get("XSRF-TOKEN")}
        return self.celonis.api_request(f"{self.url}/name-mapping/file", file_path, headers=additional_headers)

    def _backup_content(self, backup_path: str = ".", include_data: bool = True):
        """Creates a backup of the datamodel."""
        path = pathlib.Path(backup_path) / f"Backup of Datamodel - {self.name}"
        if path.exists():
            shutil.rmtree(path)
        path.mkdir()
        config_information = {
            "tables": [{"alias": t.alias, "source_name": t.source_name} for t in self.tables],
            "foreign_keys": self.foreign_keys,
            "process_configurations": [],
        }
        for pc in self.process_configurations:
            config_information["process_configurations"].append(
                {
                    "activity_table": getattr(pc.activity_table, "name", None),
                    "case_table": getattr(pc.case_table, "name", None),
                    "case_column": pc.case_column,
                    "activity_column": pc.activity_column,
                    "timestamp_column": pc.timestamp_column,
                    "sorting_column": pc.sorting_column,
                }
            )

        self.download_name_mapping(path / NAME_MAPPING_XLSX)

        (path / "datamodel_information.json").write_text(json.dumps(config_information, sort_keys=True, indent=2))

        if include_data:
            data_path = path / "data"
            data_path.mkdir()
            # download tables
            for t in self.tables:
                t._get_data_file(file_path=(data_path / f"{t.source_name}.parquet"), include_change_date=False)
        return path

    # TODO: refactor complex method
    def _rebuild_content_from_backup(self, backup_path, overwrite_existing_tables=False):  # noqa: C901
        """Rebuilds a datamodel from a backup."""
        path = pathlib.Path(backup_path)
        if not path.is_dir():
            raise PyCelonisValueError("Argument 'backup_path' must be directory")

        match = re.search("Backup of Datamodel - (.+)", str(path))
        if not (match and len(match.groups()) == 1):
            raise PyCelonisValueError("Name of the datamodel not found in backup_path")

        self.name = match[1]
        info = json.loads((path / "datamodel_information.json").read_text())
        data_path = path / "data"

        def push(table):
            alias, source_name = table["alias"], table["source_name"]
            name = alias or source_name
            if name in self.tables.names and not overwrite_existing_tables:
                return {}
            else:
                table_file = data_path / f"{source_name}.parquet"
                if table_file.is_file() and (
                    overwrite_existing_tables
                    or source_name not in [t["name"] for t in self.parent.tables if not t["dataSourceId"]]
                ):
                    self._logger.info(f"Pushing table: {source_name}")
                    return self.parent.create_table(table_file, source_name, if_exists="drop", wait_for_finish=False)
                else:
                    return {}

        jobs = list(map(push, info["tables"]))
        for j in jobs:
            job_id = j.get("id")
            if job_id:
                iterations = 0
                error_count = 0
                while True:
                    try:
                        status = self.parent.check_push_status(job_id)["status"]
                        if status not in ["RUNNING", "QUEUED"]:
                            self._logger.info(f"Data push job status: {status}")
                            break

                        error_count = 0
                        iterations += 1
                        if iterations % 10 == 0:
                            self._logger.info(f"Data push job status: {status}...")
                        time.sleep(1)
                    except PyCelonisError as e:
                        error_count += 1
                        time.sleep(3)
                        self._logger.exception("Data push job failed, trying again...")
                        if error_count > 5:
                            raise e

        for table in info["tables"]:
            alias, source_name = table["alias"], table["source_name"]
            _ = self.add_table_from_pool(source_name, alias)

        for fk in info["foreign_keys"]:
            self.create_foreign_key(**fk)

        for pc in info["process_configurations"]:
            self.create_process_configuration(**pc)

        name_mapping = path / NAME_MAPPING_XLSX
        if name_mapping.is_file():
            self.upload_name_mapping(name_mapping)

        self.reload(from_cache=False)
        return self

    def _get_data_frame(self, query, **kwargs):
        """Queries the datamodel and returns the results of the query in pandas DataFrame

        Parameters
        ----------
        query : PQL or PQLColumn
            The query to be executed.

        Returns
        -------
        pandas.DataFrame
            The results of the query.
        """
        return self._compute_node.get_data_frame(pql_query=query, **kwargs)

    def get_data_frame(self, query: pql.PQL, verify_query: bool = False, **kwargs) -> pd.DataFrame:
        """Exports the results of a PQL query as
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
        and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
        Uses [ComputeNode.get_data_frame][celonis_api.event_collection.compute_node.ComputeNode.get_data_frame].

        Args:
            query: The table query to be executed.
            verify_query: Whether query should be verified before execution.

        Returns:
            Dataframe containing the results of the query.
        """
        if verify_query:
            self.verify_query(query)
        return self._compute_node.get_data_frame(pql_query=query, **kwargs)

    def verify_query(self, query: typing.Union[pql.PQLColumn, pql.PQLFilter, pql.PQL, typing.List]):
        errors = PQLDebugger.debug(query, self)
        if len(errors) > 0:
            raise PyCelonisValueError("\n".join(errors))

case_table_key: List property readonly

Get all Case Table Keys.

Returns:

Type Description
List

Sorted List of Case Table Keys.

default_activity_table: Optional[DatamodelTable] property writable

Get/Set the default Activity Table via Process Configuration.

# Get
default_at = dm.default_activity_table
# Set
dm.default_activity_table = default_at

API

  • PUT: /integration/api/pools/{pool_id}/data-models/process-configurations/default-activity-table
    {
        "id": datamodel_table.id,
        "dataSourceId": datamodel_table.data["dataSourceId"],
        "aliasOrName": datamodel_table.alias,
        "alias": datamodel_table.alias,
        "dataModelId": self.id,
        "name": datamodel_table.source_name,
        "useDirectStorage": False,
        "columns": [],
    }
    

Returns:

Type Description
Optional[DatamodelTable]

The default Activity Table.

foreign_keys: ForeignKeyList property readonly

Get all Foreign Keys.

Returns:

Type Description
ForeignKeyList

A List of Foreign Keys.

last_change_dates: DataFrame property readonly

Returns the latest change date found in tables.

load_status: Dict property readonly

Get the Datamodels Load Status.

API

  • GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/load-history/load-info-sync

Returns:

Type Description
Dict

Load status.

name_mapping: Dict property readonly

Get the Datamodels Name Mappings.

API

  • GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/name-mapping

Returns:

Type Description
Dict

Dictionary containing the Name Mappings.

process_configuration: DatamodelProcessConfiguration property readonly

Get the Datamodels Process Configuration.

Warning

This method is deprecated. Use celonis_api.event_collection.data_model.Datamodel.process_configurations:

dm.process_configurations[0]

Returns:

Type Description
DatamodelProcessConfiguration

The first Process Configuration of the all Datamodels Process Configurations.

process_configurations: CelonisCollection[DatamodelProcessConfiguration] property readonly

Get all Datamodels Process Configurations.

API

  • GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations

Returns:

Type Description
CelonisCollection[DatamodelProcessConfiguration]

A Collection the Datamodel's Process Configurations.

tables: CelonisCollection[DatamodelTable] property readonly

Get all Datamodel Tables.

API

  • GET: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{table_id}

Returns:

Type Description
CelonisCollection[DatamodelTable]

Collection of Datamodel Tables.

url: str property readonly

API

  • /integration/api/pools/{pool_id}/data-models/{datamodel_id}

workspaces: CelonisCollection[Workspace] property readonly

Get all Process Mining Workspaces. Uses celonis_api.celonis.Celonis.workspaces.

Returns:

Type Description
CelonisCollection[Workspace]

Collection of Workspaces.

add_table_from_pool(self, table_name, alias=None, connection=None, new_foreign_key_to_table=None, added_table_join_type='source', foreign_key_columns=None, reload=None)

Adds a Table from Pool to Datamodel. If the Table names and the ey parameters are provided it also can connect two tables directly.

Parameters:

Name Type Description Default
table_name str

Name of the table to be added in the Pool.

required
alias str

An alias name for the new table, by default None

None
added_table_join_type str

Add table to be pushed as 'source' table or as 'target' table regarding the connection. One of [source, target].

'source'
new_foreign_key_to_table str

Set connection to or from this table.

None
foreign_key_columns

List 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName' which represents the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..] between two tables.

None
connection Union[DataConnection, str]

DataConnection object or ID, uses Global if not specified.

None
reload str

Reload type. One of [FORCE_COMPLETE, PARTIAL_ON_TABLE], if not specified doesn't reload.

None

Returns:

Type Description
DatamodelTable

DatamodelTable.

Source code in celonis_api/event_collection/data_model.py
def add_table_from_pool(
    self,
    table_name: str,  # objects_base.BaseDatamodelTable
    alias: str = None,
    connection: typing.Union['DataConnection', str] = None,
    new_foreign_key_to_table: str = None,
    added_table_join_type: str = "source",
    foreign_key_columns: typing.List[typing.Tuple[str, str]] = None,
    reload: str = None,
) -> "DatamodelTable":
    """Adds a Table from Pool to Datamodel. If the Table names and the ey parameters are provided
    it also can connect two tables directly.

    Args:
        table_name: Name of the table to be added in the Pool.
        alias: An alias name for the new table, by default None
        added_table_join_type: Add table to be pushed as 'source' table or as 'target' table regarding
            the connection. One of [`source`, `target`].
        new_foreign_key_to_table: Set connection to or from this table.
        foreign_key_columns : List 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName'
            which represents the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..]
            between two tables.
        connection: DataConnection object or ID, uses Global if not specified.
        reload: Reload type. One of  [`FORCE_COMPLETE`, `PARTIAL_ON_TABLE`],
            if not specified doesn't reload.

    Returns:
        DatamodelTable.
    """
    # check if table exists in pool
    pool_table_names = [table["name"] for table in self.pool.tables if "name" in table]
    if table_name not in pool_table_names:
        raise PyCelonisNotFoundError(
            f"table_name '{table_name}' not found in Data Pool tables: {', '.join(pool_table_names)}"
        )

    # check if table already added to dm
    self._check_table_already_added_to_dm(alias, table_name)

    url, payload = self._payload_add_table_from_pool(connection, table_name, alias)
    response = self.celonis.api_request(url, payload)

    # Connect tables via foreign keys
    if new_foreign_key_to_table:
        if added_table_join_type == 'source':
            source_table = alias if alias else table_name
            target_table = new_foreign_key_to_table
        elif added_table_join_type == 'target':
            source_table = new_foreign_key_to_table
            target_table = table_name
        else:
            raise PyCelonisValueError(
                f"Entered value '{added_table_join_type}' for "
                "'added_table_join_type' is invalid. Please use 'source' or 'target'."
            )

        if foreign_key_columns is None:
            raise PyCelonisValueError("Foreign key columns can't be none if new_foreign_key_to_table is set")

        self.create_foreign_key(source_table=source_table, target_table=target_table, columns=foreign_key_columns)

    if reload == "FORCE_COMPLETE":
        self.reload(from_cache=False)
    elif reload == "PARTIAL_ON_TABLE":
        self.reload(tables=response[0].get("aliasOrName"))

    return response

create_foreign_key(self, source_table, target_table, columns, **kwargs)

Creates a new Foreign Key between the source_table (child(1)) and the target_table (parent(N)).

Parameters:

Name Type Description Default
source_table Union[str, DatamodelTable]

Name of Source Table or DatamodelTable object.

required
target_table Union[str, DatamodelTable]

Name of Target Table or DatamodelTable object.

required
columns List[Tuple[str, str]]

List of 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName' which represents the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..]

required

Returns:

Type Description
Dict

The newly created Foreign Key configuration as Dictionary.

Exceptions:

Type Description
PyCelonisValueError

If tables not found or columns not found or foreign key connection already exists between the specified tables.

PyCelonisTypeError

If table columns have different data types.

Source code in celonis_api/event_collection/data_model.py
def create_foreign_key(
    self,
    source_table: typing.Union[str, "DatamodelTable"],
    target_table: typing.Union[str, "DatamodelTable"],
    columns: typing.List[typing.Tuple[str, str]],
    **kwargs,
) -> typing.Dict:
    """Creates a new Foreign Key between the source_table (child(1)) and the target_table (parent(N)).

    Args:
        source_table: Name of Source Table or DatamodelTable object.
        target_table: Name of Target Table or DatamodelTable object.
        columns: List of 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName' which represents
            the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..]

    Returns:
        The newly created Foreign Key configuration as Dictionary.

    Raises:
        PyCelonisValueError: If tables not found or columns not found or foreign key connection
            already exists between the specified tables.
        PyCelonisTypeError: If table columns have different data types.
    """

    # convert source_table and target_table to DatamodelTable
    source_table, target_table = map(self._find_table_object, [source_table, target_table])
    # assure that foreign keys are made between pairs of columns
    columns = [columns] if isinstance(columns, str) else columns  # type: ignore
    columns = [(c, c) if isinstance(c, str) else c for c in columns]
    if not all(map(lambda pair: len(pair) == 2, columns)):
        raise PyCelonisValueError("A pair must have exactly two elements.")

    # check if the foreign key already exists
    old_fk = self._get_foreign_key_connection(source_table, target_table)
    if old_fk:
        raise PyCelonisValueError(
            "A foreign key already exists between the two "
            f"given tables {source_table}, {target_table}: \n{old_fk}"
        )
    source_cols = {
        col["name"]: col["type"]
        for col in source_table.columns
        if col["name"] in list(map(operator.itemgetter(0), columns))
    }
    target_cols = {
        col["name"]: col["type"]
        for col in target_table.columns
        if col["name"] in list(map(operator.itemgetter(1), columns))
    }
    # check if given columns present
    for i, table_cols in enumerate([source_cols, target_cols]):
        key_cols = set(map(operator.itemgetter(i), columns))
        if not key_cols.issubset(set(table_cols)):
            raise PyCelonisValueError(
                f"Please check if specified columns"
                f" {key_cols.difference(table_cols)} "
                f"are in either source or target table."
            )
    # check that data type coincide
    for tuple_item in columns:
        if source_cols.get(tuple_item[0]) != target_cols.get(tuple_item[1]):
            raise PyCelonisTypeError(
                f"The specified columns {tuple_item[0]} and "
                f"{tuple_item[1]} have different data types: "
                f"{source_cols.get(tuple_item[0])} and "
                f"{target_cols.get(tuple_item[1])}. "
                "Join on columns with different data types not possible."
            )

    url, payload = self._payload_new_foreign_key(target_table, source_table, columns)
    return self.celonis.api_request(url, payload)

create_process_configuration(self, activity_table=None, case_table=None, case_column=None, activity_column=None, timestamp_column=None, sorting_column=None)

Creates a new Process Configuration.

API

  • PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations
    {
        "activityTableId": activity_table_id,
        "caseIdColumn": case_column,
        "activityColumn": activity_column,
        "timestampColumn": timestamp_column,
        "sortingColumn": sorting_column,
        "caseTableId": case_table_id
    }
    

Parameters:

Name Type Description Default
activity_table Union[str, DatamodelTable]

Name of Activity Table or DatamodelTable object.

None
case_table Union[str, DatamodelTable]

Name of Case Table or DatamodelTable object.

None
case_column str

Case Column Name referring to column in activity table.

None
activity_column str

Activity Column Name referring to column in activity table.

None
timestamp_column str

Timestamp Column Name referring to column in activity table.

None
sorting_column str

Sorting Column Name.

None

Returns:

Type Description
CelonisCollection[DatamodelProcessConfiguration]

A Collection the Datamodel's Process Configurations.

Source code in celonis_api/event_collection/data_model.py
def create_process_configuration(
    self,
    activity_table: typing.Union[str, 'DatamodelTable'] = None,
    case_table: typing.Union[str, 'DatamodelTable'] = None,
    case_column: str = None,
    activity_column: str = None,
    timestamp_column: str = None,
    sorting_column: str = None,
) -> 'CelonisCollection[DatamodelProcessConfiguration]':
    """Creates a new Process Configuration.

    !!! api "API"
        - `PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations`
            ```json
            {
                "activityTableId": activity_table_id,
                "caseIdColumn": case_column,
                "activityColumn": activity_column,
                "timestampColumn": timestamp_column,
                "sortingColumn": sorting_column,
                "caseTableId": case_table_id
            }
            ```

    Args:
        activity_table: Name of Activity Table or DatamodelTable object.
        case_table: Name of Case Table or DatamodelTable object.
        case_column: Case Column Name referring to column in activity table.
        activity_column: Activity Column Name referring to column in activity table.
        timestamp_column: Timestamp Column Name referring to column in activity table.
        sorting_column: Sorting Column Name.

    Returns:
        A Collection the Datamodel's Process Configurations.
    """
    payload = {}
    if activity_table:
        if not isinstance(activity_table, DatamodelTable):
            activity_table = self.tables.find(activity_table)

        _columns = {case_column, activity_column, timestamp_column}
        _activity_table_columns = set(col["name"] for col in activity_table.columns)
        check_columns = None in _columns or not _columns.issubset(_activity_table_columns)

        if check_columns:
            raise PyCelonisNotFoundError("Problem finding case, activity or timestamp column.")

        payload.update(
            {
                "activityTableId": activity_table.id,
                "caseIdColumn": case_column,
                "activityColumn": activity_column,
                "timestampColumn": timestamp_column,
                "sortingColumn": sorting_column,
            }
        )

    if case_table:
        if not isinstance(case_table, DatamodelTable):
            case_table = self.tables.find(case_table)
        payload.update({"caseTableId": case_table.id})

    try:
        self.celonis.api_request(f"{self.url}/process-configurations", payload, method=HttpMethod.PUT)
    except PyCelonisHTTPError:
        self.celonis.api_request(f"{self.url}/process-configuration", payload, method=HttpMethod.PUT)

    return self.process_configurations

create_workspace(self, name)

Creates a new Workspace.

API

  • POST: /process-mining/api/processes
        {
            "name": name,
            "dataModelId": self.id
        }
    

Parameters:

Name Type Description Default
name

Name of the Workspace.

required

Returns:

Type Description
Workspace

The newly created Workspace object.

Source code in celonis_api/event_collection/data_model.py
def create_workspace(self, name) -> 'Workspace':
    """Creates a new Workspace.

    !!! api "API"
        - `POST: /process-mining/api/processes`
            ```json
                {
                    "name": name,
                    "dataModelId": self.id
                }
            ```

    Args:
        name : Name of the Workspace.

    Returns:
        The newly created Workspace object.
    """
    from pycelonis.celonis_api.process_analytics.workspace import Workspace

    if name in self.celonis.workspaces.names:
        raise PyCelonisValueError(f"Workspace with name {name} already exists!")

    payload = {"name": name, "dataModelId": self.id}
    response = self.celonis.api_request(f"{self.celonis.url}/process-mining/api/processes", payload)
    return Workspace(self.celonis, response)

download_name_mapping(self, file_path='name_mapping.xlsx')

Downloads the Datamodels Name Mappings.

Parameters:

Name Type Description Default
file_path Union[str, pathlib.Path]

Download path to Excel File.

'name_mapping.xlsx'

Returns:

Type Description
Path

Path to downloaded Excel File.

Source code in celonis_api/event_collection/data_model.py
def download_name_mapping(self, file_path: typing.Union[str, pathlib.Path] = NAME_MAPPING_XLSX) -> pathlib.Path:
    """Downloads the Datamodels Name Mappings.

    Args:
        file_path: Download path to Excel File.

    Returns:
        Path to downloaded Excel File.
    """
    return self.celonis.api_request(f"{self.url}/name-mapping/template", pathlib.Path(file_path), method="GET")

get_data_file(self, pql_query, file_path=None, verify_query=False, **kwargs)

Exports the results of a PQL query as pyarrow.parquet.ParquetFile and returns the path to the exported file. Uses celonis_api.event_collection.compute_node.ComputeNode.get_data_file.

Parameters:

Name Type Description Default
pql_query PQL

The table query to be executed.

required
file_path Union[str, pathlib.Path]

The output path for the export. Defaults to tmpdir/celonis_pql_export_<current_time>.parquet.

None
verify_query bool

Whether query should be verified before execution.

False

Returns:

Type Description
Path

Path to downloaded file containing the results of the query.

Source code in celonis_api/event_collection/data_model.py
def get_data_file(
    self,
    pql_query: pql.PQL,
    file_path: typing.Union[str, pathlib.Path] = None,
    verify_query: bool = False,
    **kwargs,
) -> pathlib.Path:
    """Exports the results of a PQL query as
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
    and returns the path to the exported file.
    Uses [celonis_api.event_collection.compute_node.ComputeNode.get_data_file][].

    Args:
        pql_query: The table query to be executed.
        file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.
        verify_query: Whether query should be verified before execution.

    Returns:
        Path to downloaded file containing the results of the query.
    """
    if verify_query:
        self.verify_query(pql_query)
    return self._compute_node.get_data_file(pql_query=pql_query, file_path=file_path, **kwargs)

get_data_frame(self, query, verify_query=False, **kwargs)

Exports the results of a PQL query as pyarrow.parquet.ParquetFile and converts it to a pandas.DataFrame. Uses ComputeNode.get_data_frame.

Parameters:

Name Type Description Default
query PQL

The table query to be executed.

required
verify_query bool

Whether query should be verified before execution.

False

Returns:

Type Description
DataFrame

Dataframe containing the results of the query.

Source code in celonis_api/event_collection/data_model.py
def get_data_frame(self, query: pql.PQL, verify_query: bool = False, **kwargs) -> pd.DataFrame:
    """Exports the results of a PQL query as
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
    and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
    Uses [ComputeNode.get_data_frame][celonis_api.event_collection.compute_node.ComputeNode.get_data_frame].

    Args:
        query: The table query to be executed.
        verify_query: Whether query should be verified before execution.

    Returns:
        Dataframe containing the results of the query.
    """
    if verify_query:
        self.verify_query(query)
    return self._compute_node.get_data_frame(pql_query=query, **kwargs)

push_table(self, df_or_path, table_name, reload_datamodel=True, **kwargs)

Pushes a table to a Pool, adds it the Datamodel and by default reloads the Datamodel.

Warning

This method is deprecated. Use create_table.

Returns:

Type Description
DatamodelTable

The newly added Datamodel Table.

Source code in celonis_api/event_collection/data_model.py
@deprecated(
    "Use 'celonis_api.event_collection.data_pool.Pool.create_table'. "
    "After the table is created, you can add the new table from the "
    "pool to your datamodel via 'add_table_from_pool'."
)
def push_table(self, df_or_path, table_name, reload_datamodel=True, **kwargs) -> 'DatamodelTable':
    """Pushes a table to a Pool, adds it the Datamodel and by default reloads the Datamodel.

    !!! warning
        This method is deprecated. Use [create_table][celonis_api.event_collection.data_pool.Pool.create_table].

    Returns:
        The newly added Datamodel Table.
    """
    self.parent.push_table(df_or_path, table_name, **kwargs)
    table = self.tables.names.get(table_name) or self.add_table_from_pool(table_name)
    if reload_datamodel:
        self.reload(tables=table_name)
    return table

reload(self, from_cache=False, tables=None, wait_for_reload=True)

Reloads the Datamodel only if a reload is not already running.

API

  • POST: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/reload
        { "forceComplete": from_cache }
    

Parameters:

Name Type Description Default
from_cache bool

Deprecated, as it is no longer supported for data models. Will always be set to False.

False
tables List

List of Datamodel Tables or table ids. If given a partial reload will be performed on these tables, from_cache will be ignored.

None
wait_for_reload bool

If True waits until the Datamodel is reloaded.

True

Returns:

Type Description
Dict

Report of the reload.

Source code in celonis_api/event_collection/data_model.py
def reload(self, from_cache: bool = False, tables: typing.List = None, wait_for_reload: bool = True) -> typing.Dict:
    """Reloads the Datamodel only if a reload is not already running.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/reload`
            ```json
                { "forceComplete": from_cache }
            ```

    Args:
        from_cache: Deprecated, as it is no longer supported for data models. Will always be set to False.
        tables: List of Datamodel Tables or table ids. If given a partial reload will be performed on these tables,
            from_cache will be ignored.
        wait_for_reload: If True waits until the Datamodel is reloaded.

    Returns:
        Report of the reload.
    """
    if from_cache:
        self._logger.warning(
            "Reloading data models from cache is no longer supported. Defaulting to complete reload."
        )

    response = self.load_status

    if response is not None and response["loadStatus"] == "RUNNING":
        self._logger.warning("A Data Model Reload is already in progress. No new load is triggered.")
        return response

    if not tables:
        reload_type = "Complete"
        payload = {"forceComplete": True}
        self.celonis.api_request(f"{self.url}/reload", method=HttpMethod.POST, params=payload)
    else:
        reload_type = "Partial"
        payload = self._get_table_ids(tables)  # type: ignore
        url = (
            f"{self.celonis.url}/integration/api/v1/data-pools/"
            f"{self.pool.id}/data-models/{self.id}/load/partial-sync"
        )
        self.celonis.api_request(url, payload)

    response = self.load_status
    if wait_for_reload:
        response = self._wait_for_reload(reload_type)

    return response

upload_name_mapping(self, file_path)

Uploads the Datamodels Name Mappings Excel File.

Parameters:

Name Type Description Default
file_path Union[str, pathlib.Path]

Upload path to Excel File.

required

Returns:

Type Description
Dict

Upload Result.

Source code in celonis_api/event_collection/data_model.py
def upload_name_mapping(self, file_path: typing.Union[str, pathlib.Path]) -> typing.Dict:
    """Uploads the Datamodels Name Mappings Excel File.

    Args:
        file_path: Upload path to Excel File.

    Returns:
        Upload Result.
    """
    file_path = pathlib.Path(file_path)
    if not file_path.is_file():
        raise PyCelonisValueError("Argument 'file_path' must refer to a file.")

    additional_headers = {"x-csrf-token": self.celonis._session.cookies.get("XSRF-TOKEN")}
    return self.celonis.api_request(f"{self.url}/name-mapping/file", file_path, headers=additional_headers)

DatamodelProcessConfiguration (CelonisDataObject)

Datamodel ProcessConfiguration object.

Source code in celonis_api/event_collection/data_model.py
class DatamodelProcessConfiguration(CelonisDataObject):
    """Datamodel ProcessConfiguration object."""

    def __init__(self, parent, data):
        super().__init__(parent, data)
        self._name = self.activity_table.name

    @property
    def data(self):
        return self._data

    @property
    def activity_table(self) -> DatamodelTable:
        """Get the Datamodels Process Configuration Activity Table.

        Returns:
            Activity Table.
        """
        _id = self.data["activityTableId"]
        return _id and self.parent.tables.find(_id)

    @property
    def case_table(self) -> DatamodelTable:
        """Get the Datamodels Process Configuration Case Table.

        Returns:
            Case Table.
        """
        _id = self.data["caseTableId"]
        return _id and self.parent.tables.find(_id)

    @property
    def case_column(self) -> str:
        """Get the Datamodels Process Configuration Case Column.

        Returns:
            Case Column.
        """
        return self.data["caseIdColumn"]

    @property
    def activity_column(self) -> str:
        """Get the Datamodels Process Configuration Activity Column.

        Returns:
            Activity Column.
        """
        return self.data["activityColumn"]

    @property
    def timestamp_column(self) -> str:
        """Get the Datamodels Process Configuration Timestamp Column.

        Returns:
            Timestamp Column.
        """
        return self.data["timestampColumn"]

    @property
    def sorting_column(self) -> str:
        """Get the Datamodels Process Configuration Sorting Column.

        Returns:
            Sorting Column.
        """
        return self.data["sortingColumn"]

    @property
    def default_configuration(self) -> bool:
        """Get whether the Datamodels Process Configuration is set to default.

        Returns:
            Process Configuration default status.
        """
        return self.data["defaultConfiguration"]

    def edit_configuration(
        self,
        activity_table: 'DatamodelTable' = None,
        case_table: 'DatamodelTable' = None,
        case_column: str = None,
        activity_column: str = None,
        timestamp_column: str = None,
        sorting_column: str = None,
    ) -> 'DatamodelProcessConfiguration':
        """Edit the Datamodels Process Configuration.

        !!! api "API"
            - `PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations`
                ```json
                {
                    "activityTableId": activity_table_id,
                    "caseIdColumn": case_column,
                    "activityColumn": activity_column,
                    "timestampColumn": timestamp_column,
                    "sortingColumn": sorting_column,
                    "caseTableId": case_table_id
                }
                ```

        Args:
            activity_table: Name of Activity Table or DatamodelTable object.
            case_table: Name of Case Table or DatamodelTable object.
            case_column: Case Column Name referring to column in activity table.
            activity_column: Activity Column Name referring to column in activity table.
            timestamp_column: Timestamp Column Name referring to column in activity table.
            sorting_column: Sorting Column Name.

        Returns:
            Updated Process Configuration.
        """
        keys = [
            "id",
            "activityTableId",
            "caseIdColumn",
            "activityColumn",
            "timestampColumn",
            "sortingColumn",
            "caseTableId",
        ]
        payload = {key: self.data[key] for key in keys}

        if activity_table:
            _columns = {case_column, activity_column, timestamp_column}
            _activity_table_columns = set(col["name"] for col in activity_table.columns)
            check_columns = None in _columns or not _columns.issubset(_activity_table_columns)

            if check_columns:
                raise PyCelonisValueError("Problem finding case, activity or timestamp column.")

            payload.update(
                {
                    "activityTableId": activity_table.id,
                    "caseIdColumn": case_column,
                    "activityColumn": activity_column,
                    "timestampColumn": timestamp_column,
                    "sortingColumn": sorting_column,
                }
            )

        if case_table:
            payload.update({"caseTableId": case_table.id})

        try:
            self.parent.celonis.api_request(f"{self.parent.url}/process-configurations", payload, method=HttpMethod.PUT)
        except exceptions.HTTPError:
            self.parent.celonis.api_request(f"{self.parent.url}/process-configuration", payload, method=HttpMethod.PUT)
        return self

    def __repr__(self):
        case = self.case_table and f"\nCase table: {self.case_table}"
        act = self.activity_table and (
            f"\nActivity table: {self.activity_table},"
            + f"\nCase column: {self.case_column},"
            + f"\nActivity column: {self.activity_column},"
            + f"\nTimestamp column: {self.timestamp_column},"
            + f"\nSorting column: {self.sorting_column}"
        )
        return f"<DatamodelProcessConfiguration of {self.parent}{case}{act}>"

activity_column: str property readonly

Get the Datamodels Process Configuration Activity Column.

Returns:

Type Description
str

Activity Column.

activity_table: DatamodelTable property readonly

Get the Datamodels Process Configuration Activity Table.

Returns:

Type Description
DatamodelTable

Activity Table.

case_column: str property readonly

Get the Datamodels Process Configuration Case Column.

Returns:

Type Description
str

Case Column.

case_table: DatamodelTable property readonly

Get the Datamodels Process Configuration Case Table.

Returns:

Type Description
DatamodelTable

Case Table.

data property readonly

A reference to the data of this object.

default_configuration: bool property readonly

Get whether the Datamodels Process Configuration is set to default.

Returns:

Type Description
bool

Process Configuration default status.

sorting_column: str property readonly

Get the Datamodels Process Configuration Sorting Column.

Returns:

Type Description
str

Sorting Column.

timestamp_column: str property readonly

Get the Datamodels Process Configuration Timestamp Column.

Returns:

Type Description
str

Timestamp Column.

edit_configuration(self, activity_table=None, case_table=None, case_column=None, activity_column=None, timestamp_column=None, sorting_column=None)

Edit the Datamodels Process Configuration.

API

  • PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations
    {
        "activityTableId": activity_table_id,
        "caseIdColumn": case_column,
        "activityColumn": activity_column,
        "timestampColumn": timestamp_column,
        "sortingColumn": sorting_column,
        "caseTableId": case_table_id
    }
    

Parameters:

Name Type Description Default
activity_table DatamodelTable

Name of Activity Table or DatamodelTable object.

None
case_table DatamodelTable

Name of Case Table or DatamodelTable object.

None
case_column str

Case Column Name referring to column in activity table.

None
activity_column str

Activity Column Name referring to column in activity table.

None
timestamp_column str

Timestamp Column Name referring to column in activity table.

None
sorting_column str

Sorting Column Name.

None

Returns:

Type Description
DatamodelProcessConfiguration

Updated Process Configuration.

Source code in celonis_api/event_collection/data_model.py
def edit_configuration(
    self,
    activity_table: 'DatamodelTable' = None,
    case_table: 'DatamodelTable' = None,
    case_column: str = None,
    activity_column: str = None,
    timestamp_column: str = None,
    sorting_column: str = None,
) -> 'DatamodelProcessConfiguration':
    """Edit the Datamodels Process Configuration.

    !!! api "API"
        - `PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations`
            ```json
            {
                "activityTableId": activity_table_id,
                "caseIdColumn": case_column,
                "activityColumn": activity_column,
                "timestampColumn": timestamp_column,
                "sortingColumn": sorting_column,
                "caseTableId": case_table_id
            }
            ```

    Args:
        activity_table: Name of Activity Table or DatamodelTable object.
        case_table: Name of Case Table or DatamodelTable object.
        case_column: Case Column Name referring to column in activity table.
        activity_column: Activity Column Name referring to column in activity table.
        timestamp_column: Timestamp Column Name referring to column in activity table.
        sorting_column: Sorting Column Name.

    Returns:
        Updated Process Configuration.
    """
    keys = [
        "id",
        "activityTableId",
        "caseIdColumn",
        "activityColumn",
        "timestampColumn",
        "sortingColumn",
        "caseTableId",
    ]
    payload = {key: self.data[key] for key in keys}

    if activity_table:
        _columns = {case_column, activity_column, timestamp_column}
        _activity_table_columns = set(col["name"] for col in activity_table.columns)
        check_columns = None in _columns or not _columns.issubset(_activity_table_columns)

        if check_columns:
            raise PyCelonisValueError("Problem finding case, activity or timestamp column.")

        payload.update(
            {
                "activityTableId": activity_table.id,
                "caseIdColumn": case_column,
                "activityColumn": activity_column,
                "timestampColumn": timestamp_column,
                "sortingColumn": sorting_column,
            }
        )

    if case_table:
        payload.update({"caseTableId": case_table.id})

    try:
        self.parent.celonis.api_request(f"{self.parent.url}/process-configurations", payload, method=HttpMethod.PUT)
    except exceptions.HTTPError:
        self.parent.celonis.api_request(f"{self.parent.url}/process-configuration", payload, method=HttpMethod.PUT)
    return self

DatamodelTable (CelonisApiObjectChild)

Celonis DatamodelTable object to interact with Celonis Event Collection API.

Source code in celonis_api/event_collection/data_model.py
class DatamodelTable(CelonisApiObjectChild):
    """Celonis DatamodelTable object to interact with Celonis Event Collection API."""

    _name_data_key = "aliasOrName"

    @property
    def _parent_class(self):
        return Datamodel

    @property
    def url(self) -> str:
        """
        !!! api "API"
            - `/integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}`
        """
        return f"{self.parent.url.replace('/data-models/', '/data-model/')}/tables/{self.id}"

    @property
    def columns(self) -> typing.List:
        """Get all Datamodel Table Column Definitions
        (e.g.: `{'name': 'EVENTTIME_START', 'length': 26, 'type': 'DATE'}`).

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}/columns`

        Returns:
            List of Column definitions.
        """
        return self.celonis.api_request(self.url + "/columns")

    @property
    def alias(self):
        """Datamodel Table Alias."""
        return self.data["alias"]

    @alias.setter
    def alias(self, value):
        try:
            self.data["alias"] = value
        except ValueError:
            pass

    @property
    def source_name(self):
        """Datamodel Table Name."""
        return self.data["name"]

    @source_name.setter
    def source_name(self, value):
        try:
            self.data["name"] = value
        except ValueError:
            pass

    def delete(self):
        """Deletes the Datamodel Table.

        !!! api "API"
            - `DELETE: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}`
        """
        self.celonis.api_request(self.url, HttpMethod.DELETE)

    def get_data_frame(
        self,
        max_rows: typing.Optional[int] = None,
        verify_query: bool = False,
        include_change_date: bool = False,
        **kwargs,
    ) -> pd.DataFrame:
        """Exports the results of the Datamodel Table Columns PQL query as
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
        and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
        Uses [Datamodel.get_data_frame][celonis_api.event_collection.data_model.Datamodel.get_data_frame].

        Args:
            max_rows: Limits the number of rows (e.g. max_rows = 100) that are queried.
            verify_query: Whether query should be verified before execution.
            include_change_date: Whether '_CELONIS_CHANGE_DATE' column is included or not.

        Returns:
            Dataframe containing the results of the query.
        """
        query = self._get_table_query(include_change_date=include_change_date)

        if max_rows:
            query.limit = max_rows

        return self.parent.get_data_frame(query=query, verify_query=verify_query, **kwargs)

    def _get_table_query(self, include_change_date=False):
        query = pql.PQL()
        cols = self.columns
        for c in cols:
            colname = c["name"] if isinstance(c, dict) else c.name
            if include_change_date or colname != "_CELONIS_CHANGE_DATE":
                query += pql.PQLColumn(f'"{self.name}"."{colname}"', colname)

        return query

    def _get_data_file(self, file_path=None, include_change_date=False):
        query = self._get_table_query(include_change_date=include_change_date)
        return self.parent._get_data_file(query, file_path)

alias property writable

Datamodel Table Alias.

columns: List property readonly

Get all Datamodel Table Column Definitions (e.g.: {'name': 'EVENTTIME_START', 'length': 26, 'type': 'DATE'}).

API

  • GET: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}/columns

Returns:

Type Description
List

List of Column definitions.

source_name property writable

Datamodel Table Name.

url: str property readonly

API

  • /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}

delete(self)

Deletes the Datamodel Table.

API

  • DELETE: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}
Source code in celonis_api/event_collection/data_model.py
def delete(self):
    """Deletes the Datamodel Table.

    !!! api "API"
        - `DELETE: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}`
    """
    self.celonis.api_request(self.url, HttpMethod.DELETE)

get_data_frame(self, max_rows=None, verify_query=False, include_change_date=False, **kwargs)

Exports the results of the Datamodel Table Columns PQL query as pyarrow.parquet.ParquetFile and converts it to a pandas.DataFrame. Uses Datamodel.get_data_frame.

Parameters:

Name Type Description Default
max_rows Optional[int]

Limits the number of rows (e.g. max_rows = 100) that are queried.

None
verify_query bool

Whether query should be verified before execution.

False
include_change_date bool

Whether '_CELONIS_CHANGE_DATE' column is included or not.

False

Returns:

Type Description
DataFrame

Dataframe containing the results of the query.

Source code in celonis_api/event_collection/data_model.py
def get_data_frame(
    self,
    max_rows: typing.Optional[int] = None,
    verify_query: bool = False,
    include_change_date: bool = False,
    **kwargs,
) -> pd.DataFrame:
    """Exports the results of the Datamodel Table Columns PQL query as
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
    and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
    Uses [Datamodel.get_data_frame][celonis_api.event_collection.data_model.Datamodel.get_data_frame].

    Args:
        max_rows: Limits the number of rows (e.g. max_rows = 100) that are queried.
        verify_query: Whether query should be verified before execution.
        include_change_date: Whether '_CELONIS_CHANGE_DATE' column is included or not.

    Returns:
        Dataframe containing the results of the query.
    """
    query = self._get_table_query(include_change_date=include_change_date)

    if max_rows:
        query.limit = max_rows

    return self.parent.get_data_frame(query=query, verify_query=verify_query, **kwargs)

ForeignKey (dict)

Datamodel Foreign Key object.

Parameters:

Name Type Description Default
source_table DatamodelTable

Source table of foreign key.

required
target_table DatamodelTable

Target table of foreign key.

required
columns List[Tuple[str, str]]

List of column tuples for foreign key.

required
Source code in celonis_api/event_collection/data_model.py
class ForeignKey(dict):
    """Datamodel Foreign Key object.

    Args:
        source_table: Source table of foreign key.
        target_table: Target table of foreign key.
        columns: List of column tuples for foreign key.
    """

    def __init__(
        self, source_table: DatamodelTable, target_table: DatamodelTable, columns: typing.List[typing.Tuple[str, str]]
    ):
        self.source_table_object = source_table
        self.target_table_object = target_table
        self.columns = columns

        super().__init__(
            {
                "source_table": (
                    self.source_table_object.alias
                    if self.source_table_object.alias is not None
                    else self.source_table_object.name
                ),
                "target_table": (
                    self.target_table_object.alias
                    if self.target_table_object.alias is not None
                    else self.target_table_object.name
                ),
                "columns": self.columns,
            }
        )

ForeignKeyList (list)

List of ForeignKey.

Parameters:

Name Type Description Default
foreign_keys_raw Dict

Dictionary of raw foreign keys

required
tables CelonisCollection

Collection of DatamodelTable

required
Source code in celonis_api/event_collection/data_model.py
class ForeignKeyList(list):
    """List of [ForeignKey][celonis_api.event_collection.data_model.ForeignKey].

    Args:
        foreign_keys_raw: Dictionary of raw foreign keys
        tables: Collection of [DatamodelTable][celonis_api.event_collection.data_model.DatamodelTable]
    """

    def __init__(self, foreign_keys_raw: typing.Dict, tables: CelonisCollection):
        foreign_keys = []

        for fk in foreign_keys_raw:
            source_table = tables.ids[fk["sourceTableId"]]
            target_table = tables.ids[fk["targetTableId"]]
            columns = [(col["sourceColumnName"], col["targetColumnName"]) for col in fk["columns"]]

            foreign_keys.append(ForeignKey(source_table=source_table, target_table=target_table, columns=columns))

        super().__init__(foreign_keys)

    def find_keys_by_source_name(self, name: str) -> typing.List[ForeignKey]:
        """Get all foreign keys with given source name.

        Args:
            name: Source name to search for (source name of table, not alias).

        Returns:
            List of foreign keys with the same source name.
        """
        return [fk for fk in self if fk.source_table_object.source_name == name]

    def find_keys_by_target_name(self, name: str) -> typing.List[ForeignKey]:
        """Returns all foreign keys with given target name

        Args:
            name: Target name to search for (source name of table, not alias)

        Returns:
            List of foreign keys with the same target name.
        """
        return [fk for fk in self if fk.target_table_object.source_name == name]

    def find_keys_by_name(self, name: str) -> typing.List[ForeignKey]:
        """Get all foreign keys with given target name or source name.

        Args:
            name: Target and/or source name to search for (source name of table, not alias).

        Returns:
            List of foreign keys with the same target name or same source name.
        """
        return self.find_keys_by_source_name(name) + self.find_keys_by_target_name(name)

find_keys_by_name(self, name)

Get all foreign keys with given target name or source name.

Parameters:

Name Type Description Default
name str

Target and/or source name to search for (source name of table, not alias).

required

Returns:

Type Description
List[celonis_api.event_collection.data_model.ForeignKey]

List of foreign keys with the same target name or same source name.

Source code in celonis_api/event_collection/data_model.py
def find_keys_by_name(self, name: str) -> typing.List[ForeignKey]:
    """Get all foreign keys with given target name or source name.

    Args:
        name: Target and/or source name to search for (source name of table, not alias).

    Returns:
        List of foreign keys with the same target name or same source name.
    """
    return self.find_keys_by_source_name(name) + self.find_keys_by_target_name(name)

find_keys_by_source_name(self, name)

Get all foreign keys with given source name.

Parameters:

Name Type Description Default
name str

Source name to search for (source name of table, not alias).

required

Returns:

Type Description
List[celonis_api.event_collection.data_model.ForeignKey]

List of foreign keys with the same source name.

Source code in celonis_api/event_collection/data_model.py
def find_keys_by_source_name(self, name: str) -> typing.List[ForeignKey]:
    """Get all foreign keys with given source name.

    Args:
        name: Source name to search for (source name of table, not alias).

    Returns:
        List of foreign keys with the same source name.
    """
    return [fk for fk in self if fk.source_table_object.source_name == name]

find_keys_by_target_name(self, name)

Returns all foreign keys with given target name

Parameters:

Name Type Description Default
name str

Target name to search for (source name of table, not alias)

required

Returns:

Type Description
List[celonis_api.event_collection.data_model.ForeignKey]

List of foreign keys with the same target name.

Source code in celonis_api/event_collection/data_model.py
def find_keys_by_target_name(self, name: str) -> typing.List[ForeignKey]:
    """Returns all foreign keys with given target name

    Args:
        name: Target name to search for (source name of table, not alias)

    Returns:
        List of foreign keys with the same target name.
    """
    return [fk for fk in self if fk.target_table_object.source_name == name]