Skip to content

compute_node.py

ComputeNode (CelonisApiObject)

ComputeNode object to interact with Celonis Event Collection API.

Source code in celonis_api/event_collection/compute_node.py
class ComputeNode(CelonisApiObject):
    """ComputeNode object to interact with Celonis Event Collection API."""

    @property
    def data(self) -> typing.Dict:
        """ComputeNode has no data. Always returns emtpy dict.

        Raises:
            PyCelonisNotImplementedError: when trying to set data.
        """
        return {}

    @data.setter
    def data(self, value):
        raise PyCelonisNotImplementedError("Set 'data' not implemented.")

    @property
    def url(self) -> str:
        """
        !!! api "API"
            - `GET: /integration/api/v1/compute/{datamodel_id}`
        """
        return f"{self.celonis.url}/integration/api/v1/compute/{self.id}"

    def get_data_file(
        self, pql_query: pql.PQL, file_path: typing.Union[str, pathlib.Path] = None, **kwargs
    ) -> pathlib.Path:
        """Exports the results of a PQL query as
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
        and returns the path to the exported file.

        !!! api "API"
            - `POST: /integration/api/v1/compute/{datamodel_id}/export/query`
                ```json
                    {
                        "dataCommand": {
                            "commands": [
                                {"queries": pql_query.query}
                            ]
                        }
                    }
                ```

        Args:
            pql_query: The table query to be executed.
            file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.

        Returns:
            Path to downloaded file containing the results of the query.
        """
        return self._compute(pql_query=pql_query, file_path=file_path)

    def get_data_frame(self, pql_query: pql.PQL, **kwargs) -> pd.DataFrame:
        """Exports the results of a PQL query as
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
        and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
        Uses [ComputeNode.get_data_file][celonis_api.event_collection.compute_node.ComputeNode.get_data_file].

        Args:
            pql_query: The table query to be executed.

        Returns:
            Dataframe containing the results of the query.
        """
        file_path = self._compute(pql_query=pql_query)
        return parquet_utils.read_parquet(file_path)

    def _compute(self, pql_query, file_path=None) -> pathlib.Path:
        self.celonis._tracker.track("Get data file", extra={"tracking_type": "DATA_PULL"})

        if file_path is None:
            tmp_file_path = pathlib.Path(tempfile.gettempdir())
            file_path = tmp_file_path / self._tmp_file_name()
        elif isinstance(file_path, str):
            file_path = pathlib.Path(file_path)

        pql_query = self._escalate_query_type(pql_query)
        export_status_result = self._send_query_to_server(pql_query)
        self._download_query_result(export_status_result=export_status_result, file_path=file_path)

        return file_path

    @staticmethod
    def _escalate_query_type(pql_query) -> pql.PQL:
        if isinstance(pql_query, str):
            pql_query = pql.PQLColumn(pql_query)
        if isinstance(pql_query, pql.PQLColumn):
            pql_query = pql.PQL(pql_query)
        if isinstance(pql_query, pql.PQL):
            return pql_query
        else:
            raise PyCelonisTypeError(f"Argument 'pql_query' must be PQL, PQLColumn or str, but got {type(pql_query)}.")

    @staticmethod
    def _tmp_file_name(i=None) -> str:
        if i is None:
            return f"celonis_pql_export_{time.time()}.parquet"
        else:
            return f"celonis_pql_export_{i}.parquet"

    def _download_query_result(self, export_status_result: typing.Dict, file_path: pathlib.Path) -> pathlib.Path:
        if file_path.is_file():
            file_path.unlink()
        if "exportChunks" in export_status_result:
            #  The export supports server-side chunking
            try:
                download_query_result = self._download_chunks(export_status_result, file_path)
                return download_query_result
            except PyCelonisHTTPError as e:
                self._logger.info(f"Could not download chunked export due to {str(e)}, trying unchunked endpoint.")

        export_id = export_status_result["id"]
        return self.celonis.api_request(f"{self.url}/export/{export_id}/result", file_path)

    def _send_query_to_server(self, pql_query: pql.PQL) -> typing.Dict:
        payload = {"dataCommand": {"commands": [{"queries": pql_query.query}]}}

        try:
            export_query_result = self.celonis.api_request(f"{self.url}/export/query", payload)
        except PyCelonisHTTPError as e:
            raise PyCelonisDataExportNotEnabledError(e.message if e.message else "")

        return self._get_export_status_result(export_query_result, pql_query)

    def _get_export_status_result(self, export_query_result: typing.Dict, pql_query: pql.PQL) -> typing.Dict:
        self._logger.info("PQL Export started...")

        iterations = 0
        error_count = 0
        while True:
            try:
                result = self.celonis.api_request(f"{self.url}/export/{export_query_result['id']}")
                status = result["exportStatus"]
                if status != "RUNNING":
                    self._logger.info(f"PQL Export status: {status}")
                    break

                error_count = 0
                iterations += 1
                if iterations % 5 == 0:
                    self._logger.info(f"PQL Export status: {status}...")
                time.sleep(1)
            except PyCelonisError as e:
                error_count += 1
                time.sleep(3)
                self._logger.exception("Failed to request export status, trying again...")
                if error_count > 5:
                    raise e

        if status != "DONE":
            raise PyCelonisHTTPError(
                f"Export failed. Status: {status} \n\n Query: {pql_query.query} \n\n Message "
                f"{result.get('message', '')} \n\n Export ID: {result.get('id', '')}"
            )

        return result

    def _download_chunks(self, export_status_result: typing.Dict, file_path: pathlib.Path) -> pathlib.Path:
        num_chunks = export_status_result["exportChunks"]
        with tempfile.TemporaryDirectory() as tmp_dir:
            tmp_dir_path = pathlib.Path(tmp_dir)

            downloaded_chunks = 0
            error_count = 0
            while downloaded_chunks < num_chunks:
                try:
                    chunk_path = self._get_chunk_path(tmp_dir_path, downloaded_chunks)
                    download_url = f"{self.url}/export/{export_status_result['id']}/{downloaded_chunks}/result"
                    self.celonis.api_request(download_url, chunk_path)
                    downloaded_chunks += 1
                    error_count = 0
                except PyCelonisError as e:
                    error_count += 1
                    time.sleep(3)
                    self._logger.exception(f"Failed to download chunk {downloaded_chunks}, trying again...")
                    if error_count > 5:
                        raise e

            self._unite_chunks(number_of_chunks=downloaded_chunks, file_path=file_path, tmp_dir_path=tmp_dir_path)
            return file_path

    def _get_chunk_path(self, tmp_dir_path: pathlib.Path, chunk: int) -> pathlib.Path:
        return tmp_dir_path / self._tmp_file_name(chunk)

    def _unite_chunks(self, number_of_chunks: int, file_path: pathlib.Path, tmp_dir_path: pathlib.Path):

        chunk_file = self._get_chunk_path(tmp_dir_path, 0)

        if number_of_chunks > 1:
            table = read_table(source=chunk_file)
            for chunk in range(1, number_of_chunks, 1):
                chunk_file = self._get_chunk_path(tmp_dir_path, chunk)
                chunk_table = read_table(source=chunk_file)
                table = concat_tables([table, chunk_table])
            write_table(table=table, where=file_path, allow_truncated_timestamps=True, coerce_timestamps="us")
        else:
            shutil.copy(chunk_file, file_path)

data: Dict property writable

ComputeNode has no data. Always returns emtpy dict.

Exceptions:

Type Description
PyCelonisNotImplementedError

when trying to set data.

url: str property readonly

API

  • GET: /integration/api/v1/compute/{datamodel_id}

get_data_file(self, pql_query, file_path=None, **kwargs)

Exports the results of a PQL query as pyarrow.parquet.ParquetFile and returns the path to the exported file.

API

  • POST: /integration/api/v1/compute/{datamodel_id}/export/query
        {
            "dataCommand": {
                "commands": [
                    {"queries": pql_query.query}
                ]
            }
        }
    

Parameters:

Name Type Description Default
pql_query PQL

The table query to be executed.

required
file_path Union[str, pathlib.Path]

The output path for the export. Defaults to tmpdir/celonis_pql_export_<current_time>.parquet.

None

Returns:

Type Description
Path

Path to downloaded file containing the results of the query.

Source code in celonis_api/event_collection/compute_node.py
def get_data_file(
    self, pql_query: pql.PQL, file_path: typing.Union[str, pathlib.Path] = None, **kwargs
) -> pathlib.Path:
    """Exports the results of a PQL query as
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
    and returns the path to the exported file.

    !!! api "API"
        - `POST: /integration/api/v1/compute/{datamodel_id}/export/query`
            ```json
                {
                    "dataCommand": {
                        "commands": [
                            {"queries": pql_query.query}
                        ]
                    }
                }
            ```

    Args:
        pql_query: The table query to be executed.
        file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.

    Returns:
        Path to downloaded file containing the results of the query.
    """
    return self._compute(pql_query=pql_query, file_path=file_path)

get_data_frame(self, pql_query, **kwargs)

Exports the results of a PQL query as pyarrow.parquet.ParquetFile and converts it to a pandas.DataFrame. Uses ComputeNode.get_data_file.

Parameters:

Name Type Description Default
pql_query PQL

The table query to be executed.

required

Returns:

Type Description
DataFrame

Dataframe containing the results of the query.

Source code in celonis_api/event_collection/compute_node.py
def get_data_frame(self, pql_query: pql.PQL, **kwargs) -> pd.DataFrame:
    """Exports the results of a PQL query as
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
    and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
    Uses [ComputeNode.get_data_file][celonis_api.event_collection.compute_node.ComputeNode.get_data_file].

    Args:
        pql_query: The table query to be executed.

    Returns:
        Dataframe containing the results of the query.
    """
    file_path = self._compute(pql_query=pql_query)
    return parquet_utils.read_parquet(file_path)

HybridPseudoComputeNode

Hybrid ComputeNode object to interact with Celonis Hybrid Event Collection API.

Source code in celonis_api/event_collection/compute_node.py
class HybridPseudoComputeNode:
    """Hybrid ComputeNode object to interact with Celonis Hybrid Event Collection API."""

    def __init__(self, id: str, datamodel: 'Datamodel'):
        self.id = id
        self.datamodel = datamodel

    @property
    def data(self) -> typing.Dict:
        return {}

    @data.setter
    def data(self, value):
        raise PyCelonisNotImplementedError("Set 'data' not implemented.")

    @property
    def url(self) -> str:
        raise PyCelonisNotImplementedError("Property 'url' not implemented.")

    @property
    def _parent_class(self):
        from pycelonis.celonis_api.event_collection.data_model import Datamodel

        return Datamodel

    def get_data_file(
        self, pql_query: pql.PQL, file_path: typing.Union[str, pathlib.Path] = None, **kwargs
    ) -> pathlib.Path:
        """Exports the results of a PQL query as
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
        and returns the path to the exported file.

        This method will create a temporary Studio Space, Package and Analysis to then export the result via
        [Analysis.get_data_file][celonis_api.process_analytics.analysis.Analysis.get_data_file].
        The temporary space will be deleted at before the result is returned.
        It is recommended to set up an Analysis in Studio yourself and use it to export the query result.

        Args:
            pql_query: The table query to be executed.
            file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.

        Returns:
            Path to downloaded file containing the results of the query.
        """
        name = f"zzz___TEMP___{random.randint(1, 1000000)}"
        temp_space = None
        try:
            temp_space = self.datamodel.celonis.create_space(name=name + "_SPACE")
            temp_package = temp_space.create_package(name=name + "_PACKAGE")
            temp_analysis = temp_package.create_analysis(name=name + "_ANALYSIS", data_model_id=self.datamodel.id)
            file_path = temp_analysis.get_data_file(pql_query, file_path, **kwargs)
        except PyCelonisPermissionError:
            raise PyCelonisPermissionError(
                "For querying Hybrid datamodel you need ALL Studio Permissions:\n"
                "[CREATE SPACE, EDIT ALL SPACES, DELETE ALL SPACES, "
                "USE ALL PACKAGES, CREATE PACKAGE, EDIT ALL PACKAGES, DELETE ALL PACKAGES]."
            )
        finally:
            if temp_space:
                temp_space.delete()

        return file_path

    def get_data_frame(self, pql_query: pql.PQL, **kwargs) -> pd.DataFrame:
        """Exports the results of a PQL query as
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
        file and converts it to a
        [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
        Uses [get_data_file][celonis_api.event_collection.compute_node.HybridPseudoComputeNode.get_data_file].

        Args:
            pql_query: The table query to be executed.

        Returns:
            Dataframe containing the results of the query.
        """
        file_path = self.get_data_file(pql_query=pql_query, **kwargs)
        return parquet_utils.read_parquet(file_path)

    def _get_export_status_result(self, *args, **kwargs):
        raise PyCelonisNotImplementedError("_get_export_status_result not implemented for HybridPseudoComputeNode")

    def _download_query_result(self, *args, **kwargs):
        raise PyCelonisNotImplementedError("_download_query_result not implemented for HybridPseudoComputeNode")

get_data_file(self, pql_query, file_path=None, **kwargs)

Exports the results of a PQL query as pyarrow.parquet.ParquetFile and returns the path to the exported file.

This method will create a temporary Studio Space, Package and Analysis to then export the result via Analysis.get_data_file. The temporary space will be deleted at before the result is returned. It is recommended to set up an Analysis in Studio yourself and use it to export the query result.

Parameters:

Name Type Description Default
pql_query PQL

The table query to be executed.

required
file_path Union[str, pathlib.Path]

The output path for the export. Defaults to tmpdir/celonis_pql_export_<current_time>.parquet.

None

Returns:

Type Description
Path

Path to downloaded file containing the results of the query.

Source code in celonis_api/event_collection/compute_node.py
def get_data_file(
    self, pql_query: pql.PQL, file_path: typing.Union[str, pathlib.Path] = None, **kwargs
) -> pathlib.Path:
    """Exports the results of a PQL query as
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
    and returns the path to the exported file.

    This method will create a temporary Studio Space, Package and Analysis to then export the result via
    [Analysis.get_data_file][celonis_api.process_analytics.analysis.Analysis.get_data_file].
    The temporary space will be deleted at before the result is returned.
    It is recommended to set up an Analysis in Studio yourself and use it to export the query result.

    Args:
        pql_query: The table query to be executed.
        file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.

    Returns:
        Path to downloaded file containing the results of the query.
    """
    name = f"zzz___TEMP___{random.randint(1, 1000000)}"
    temp_space = None
    try:
        temp_space = self.datamodel.celonis.create_space(name=name + "_SPACE")
        temp_package = temp_space.create_package(name=name + "_PACKAGE")
        temp_analysis = temp_package.create_analysis(name=name + "_ANALYSIS", data_model_id=self.datamodel.id)
        file_path = temp_analysis.get_data_file(pql_query, file_path, **kwargs)
    except PyCelonisPermissionError:
        raise PyCelonisPermissionError(
            "For querying Hybrid datamodel you need ALL Studio Permissions:\n"
            "[CREATE SPACE, EDIT ALL SPACES, DELETE ALL SPACES, "
            "USE ALL PACKAGES, CREATE PACKAGE, EDIT ALL PACKAGES, DELETE ALL PACKAGES]."
        )
    finally:
        if temp_space:
            temp_space.delete()

    return file_path

get_data_frame(self, pql_query, **kwargs)

Exports the results of a PQL query as pyarrow.parquet.ParquetFile file and converts it to a pandas.DataFrame. Uses get_data_file.

Parameters:

Name Type Description Default
pql_query PQL

The table query to be executed.

required

Returns:

Type Description
DataFrame

Dataframe containing the results of the query.

Source code in celonis_api/event_collection/compute_node.py
def get_data_frame(self, pql_query: pql.PQL, **kwargs) -> pd.DataFrame:
    """Exports the results of a PQL query as
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
    file and converts it to a
    [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
    Uses [get_data_file][celonis_api.event_collection.compute_node.HybridPseudoComputeNode.get_data_file].

    Args:
        pql_query: The table query to be executed.

    Returns:
        Dataframe containing the results of the query.
    """
    file_path = self.get_data_file(pql_query=pql_query, **kwargs)
    return parquet_utils.read_parquet(file_path)