Skip to content

data_pool.py

DataConnection (CelonisApiObjectChild)

Data Connection object.

Source code in celonis_api/event_collection/data_pool.py
class DataConnection(CelonisApiObjectChild):
    """Data Connection object."""

    @property
    def _parent_class(self):
        return Pool

    @property
    def url(self) -> str:
        """
        !!! api "API"
            - `/integration/api/pools/{pool_id}/data-sources/{data_connection_id}`
        """
        return f"{self.parent.url}/data-sources/{self.id}"

url: str property readonly

API

  • /integration/api/pools/{pool_id}/data-sources/{data_connection_id}

HybridPool (Pool)

Source code in celonis_api/event_collection/data_pool.py
class HybridPool(Pool):
    @property
    def url(self) -> str:
        """
        !!! api "API"
            - `/integration-hybrid/api/pools/{pool_id}`
        """
        return f"{self.celonis.url}/integration-hybrid/api/pools/{self.id}"

    @property
    def url_data_push(self):
        """
        !!! api "API"
            - `/integration-hybrid/api/v1/data-push/{pool_id}/jobs/`
        """
        return f"{self.celonis.url}/integration-hybrid/api/v1/data-push/{self.id}/jobs/"

    @property
    def url_connection_creation(self):
        """
        !!! api "API"
            - `/integration-hybrid/api/datasource/`
        """
        return f"{self.celonis.url}/integration-hybrid/api/datasource/"

url: str property readonly

API

  • /integration-hybrid/api/pools/{pool_id}

url_connection_creation property readonly

API

  • /integration-hybrid/api/datasource/

url_data_push property readonly

API

  • /integration-hybrid/api/v1/data-push/{pool_id}/jobs/

Pool (CelonisApiObject)

Pool object to interact to interact with Celonis Event Collection API.

Source code in celonis_api/event_collection/data_pool.py
class Pool(CelonisApiObject):
    """Pool object to interact to interact with Celonis Event Collection API."""

    @property
    def url(self) -> str:
        """
        !!! api "API"
            - `/integration/api/pools/{pool_id}`
        """
        return f"{self.celonis.url}/integration/api/pools/{self.id}"

    @property
    def url_data_push(self):
        """
        !!! api "API"
            - `/integration/api/v1/data-push/{pool_id}/jobs/`
        """
        return f"{self.celonis.url}/integration/api/v1/data-push/{self.id}/jobs/"

    @property
    def url_connection_creation(self):
        """
        !!! api "API"
            - `/integration/api/datasource/`
        """
        return f"{self.celonis.url}/integration/api/datasource/"

    @property
    def datamodels(self) -> CelonisCollection:
        """Get all Datamodels of the Pool.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/data-models`

        Returns:
            Collection of Pool Datamodels.
        """
        response = self.celonis.api_request(f"{self.url}/data-models")
        return CelonisCollection([Datamodel(parent=self, id_or_data=data) for data in response])

    @property
    def tables(self) -> typing.List[typing.Dict]:
        """Get all Pool Tables.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/tables`

        Returns:
             A List of dictionaries containing Pool tables.
        """
        return self.celonis.api_request(f"{self.url}/tables")

    @property
    def data_connections(self) -> 'CelonisCollection[DataConnection]':
        """Get all Pool Data Connections.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/data-sources/`

        Returns:
            A Collection of Pool Data Connections.
        """
        response = self.celonis.api_request(f"{self.url}/data-sources/", params={"excludeUnconfigured": False})
        return CelonisCollection(DataConnection(self, data) for data in response)

    @property
    def data_jobs(self) -> 'CelonisCollection[DataJob]':
        """Get all Pool Data Jobs.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/jobs`

        Returns:
            A Collection of Pool Data Jobs.
        """
        response = self.celonis.api_request(f"{self.url}/jobs")
        return CelonisCollection([DataJob(self, d) for d in response])

    @property
    def variables(self) -> 'CelonisCollection[PoolParameter]':
        """Get all Pool Variables.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/variables`

        Returns:
            A Collection of Pool Variables.
        """
        response = self.celonis.api_request(self.url + "/variables/")
        return CelonisCollection([PoolParameter(self, v, self.celonis) for v in response])

    def find_table(self, table_name: str, data_source_id: str = None) -> typing.Optional[typing.Dict]:
        """Find a Table in the Pool.

        Args:
            table_name: Name of the Pool Table.
            data_source_id: ID of the Data Source.

        Returns:
            The Pool Table, if found.
        """
        table_found = None
        for table in self.tables:
            if table["name"].lower() == table_name.lower() and table["dataSourceId"] == data_source_id:
                table_found = table
                break

        return table_found

    def create_table(
        self,
        df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
        table_name: str,
        if_exists: str = "error",  # drop, replace_data_only
        column_config: typing.List[typing.Dict[str, typing.Any]] = None,
        connection: typing.Union["DataConnection", str] = None,
        wait_for_finish: bool = True,
        chunksize: int = 100_000,
    ) -> typing.Dict:
        """Creates a new Table in the Pool from a
        [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html).

        Args:
            df_or_path:
                * If DataFrame, df is chunked, written to parquet and uploaded.
                * If Path to parquet file, file is uploaded.
                * If Path to folder, any parquet file in folder is uploaded.
                * If str, value is converted to Path and handled as described above.
                (The index of the data frame is ignored and NOT pushed to Celonis.)
            table_name: Name of Table.
            if_exists:
                * `error` -> an error is thrown if a table of the same name already exists in the pool.
                * `drop` -> the existing able is dropped completely and a new table is created, by default `error`.
                * `replace_data_only` -> the column names and types of the old tables are not overwritten.
            column_config: Can be used to specify column types and string field length.
                ```json
                    [
                        {
                            "columnName":"BUKRS",
                            "fieldLength":100,
                            "columnType":"STRING"
                        }...
                    ]
                ```
                with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
            connection: The Data Connection to upload to, else uploads to Global.
            wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
            chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
                that are uploaded. If set to a value <1, no chunking is applied.

        Returns:
            The Data Job Status.

        Raises:
            PyCelonisValueError: If Table already exists and `if_exists='error'`.
            PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
            PyCelonisTypeError: If Path is not valid a file or folder.
        """
        self.celonis._tracker.track("Create table", extra={"tracking_type": "DATA_PUSH"})

        if if_exists not in ["error", "drop", "replace_data_only"]:
            raise PyCelonisValueError("Argument 'if_exists' must be one of ['error', 'drop', 'replace_data_only'].")

        data_source_id = self._get_data_source_id(connection)
        job_type = None

        if if_exists == "drop":
            job_type = "REPLACE"
        else:
            table = self.find_table(table_name, data_source_id)
            if table is not None:
                if if_exists == "error":
                    raise PyCelonisValueError(
                        f"Table with name {table_name} already exists in the "
                        "Data Pool. If you want to drop it and create a new table, set if_exists='drop'."
                    )
                elif if_exists == "replace_data_only":
                    if column_config is not None:
                        raise PyCelonisValueError(
                            "When argument if_exists='replace_data_only', "
                            "you cannot give an additional column_config. "
                            "The column_config is inferred from the pool table."
                        )
                    column_config = self.get_column_config(table, raise_error=True)

        # Create payload for data job
        payload, table_name = self._create_payload_for_data_job(
            table_name=table_name,
            column_config=column_config,
            data_source_id=data_source_id,
            job_type=job_type,
        )

        # Create data push job
        response = self.celonis.api_request(self.url_data_push, payload)
        job_id = response["id"]
        # Create parquet uploader
        chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
        chunk_url = f"{chunk_base_url}/upserted"

        # Create df chunks and upload
        if isinstance(df_or_path, pd.DataFrame):
            self._upload_dataframe(
                df=df_or_path,
                table_name=table_name,
                chunksize=chunksize,
                chunk_url=chunk_url,
                chunk_base_url=chunk_base_url,
            )
        # Or upload file(s)
        elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
            self._upload_parquet_files(path=df_or_path, chunk_url=chunk_url, chunk_base_url=chunk_base_url)

        # Start job execution
        _ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
        response = self.check_push_status(job_id)

        # Wait for job to finish
        if wait_for_finish and response.get("status", "") != "DONE":
            response = self._wait_until_data_job_finish(job_id=job_id)

        return response

    def append_table(
        self,
        df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
        table_name: str,
        column_config: typing.List[typing.Dict[str, typing.Any]] = None,
        connection: typing.Union["DataConnection", str] = None,
        wait_for_finish: bool = True,
        chunksize: int = 100_000,
    ) -> typing.Dict:
        """Appends a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
        to an existing Table in the Pool.

        Args:
            df_or_path:
                * If DataFrame, df is chunked, written to parquet and uploaded.
                * If Path to parquet file, file is uploaded.
                * If Path to folder, any parquet file in folder is uploaded.
                * If str, value is converted to Path and handled as described above.
            table_name: Name of Table.
            column_config: Can be used to specify column types and string field length.
                ```json
                    [
                        {
                            "columnName":"BUKRS",
                            "fieldLength":100,
                            "columnType":"STRING"
                        }...
                    ]
                ```
                with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
            connection: The Data Connection to upload to, else uploads to Global.
            wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
            chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
                that are uploaded. If set to a value <1, no chunking is applied.

        Returns:
            The Data Job Status.

        Raises:
            PyCelonisValueError: If Table already exists and `if_exists='error'`.
            PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
            PyCelonisTypeError: If Path is not valid a file or folder.
        """
        self.celonis._tracker.track("Append table", extra={"tracking_type": "DATA_PUSH"})

        data_source_id = self._get_data_source_id(connection)
        table = self.find_table(table_name, data_source_id)
        if table is None:
            raise PyCelonisNotFoundError(
                f"The target table \"{table_name}\" could not be found in the Data Pool. "
                "Please check spelling of table name."
            )

        if not column_config:
            column_config = self.get_column_config(table)

        # Create payload for data job
        payload, table_name = self._create_payload_for_data_job(
            table_name=table_name, column_config=column_config, data_source_id=data_source_id, job_type="DELTA"
        )

        # Create data push job
        response = self.celonis.api_request(self.url_data_push, payload)
        job_id = response["id"]

        # Create parquet uploader
        chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
        chunk_url = f"{chunk_base_url}/upserted"

        # Create df chunks and upload
        if isinstance(df_or_path, pd.DataFrame):
            self._upload_dataframe(
                df=df_or_path,
                table_name=table_name,
                chunksize=chunksize,
                chunk_url=chunk_url,
                chunk_base_url=chunk_base_url,
                check_column=None,
            )
        # Or upload file(s)
        elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
            self._upload_parquet_files(
                path=df_or_path,
                chunk_url=chunk_url,
                chunk_base_url=chunk_base_url,
                check_column=None,
            )

        # Start job execution
        _ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
        response = self.check_push_status(job_id)

        # Wait for job to finish
        if wait_for_finish and response.get("status", "") != "DONE":
            response = self._wait_until_data_job_finish(job_id=job_id)

        return response

    def upsert_table(
        self,
        df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
        table_name: str,
        primary_keys: typing.List[str],
        column_config: typing.List[typing.Dict[str, typing.Any]] = None,
        connection: typing.Union["DataConnection", str] = None,
        wait_for_finish: bool = True,
        chunksize: int = 100_000,
    ) -> typing.Dict:
        """Upserts the [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
        an existing Table in the Pool.

        Args:
            df_or_path:
                * If DataFrame, df is chunked, written to parquet and uploaded.
                * If Path to parquet file, file is uploaded.
                * If Path to folder, any parquet file in folder is uploaded.
                * If str, value is converted to Path and handled as described above.
            table_name: Name of Table.
            primary_keys: List of Table primary keys.
            column_config: Can be used to specify column types and string field length.
                ```json
                    [
                        {
                            "columnName":"BUKRS",
                            "fieldLength":100,
                            "columnType":"STRING"
                        }...
                    ]
                ```
                with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
            connection: The Data Connection to upload to, else uploads to Global.
            wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
            chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
                that are uploaded. If set to a value <1, no chunking is applied.

        Returns:
            The Data Job Status.

        Raises:
            PyCelonisValueError: If Table already exists and `if_exists='error'`.
            PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
            PyCelonisTypeError: If Path is not valid a file or folder.
        """
        self.celonis._tracker.track("Upsert table", extra={"tracking_type": "DATA_PUSH"})

        data_source_id = self._get_data_source_id(connection)
        table = self.find_table(table_name, data_source_id)
        if table is None:
            raise PyCelonisNotFoundError(
                f"The target table \"{table_name}\" could not be found in the Data Pool. "
                "Please check spelling of table name."
            )

        if not column_config:
            column_config = self.get_column_config(table)
        # Create payload for data job
        payload, table_name = self._create_payload_for_data_job(
            table_name=table_name,
            column_config=column_config,
            data_source_id=data_source_id,
            job_type="DELTA",
            keys=primary_keys,
        )

        # Create data push job
        response = self.celonis.api_request(self.url_data_push, payload)
        job_id = response["id"]

        # Create parquet uploader
        chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
        chunk_url = f"{chunk_base_url}/upserted"

        # Create df chunks and upload
        if isinstance(df_or_path, pd.DataFrame):
            self._upload_dataframe(
                df=df_or_path,
                table_name=table_name,
                chunksize=chunksize,
                chunk_url=chunk_url,
                chunk_base_url=chunk_base_url,
                check_column=None,
            )
        # Or upload file(s)
        elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
            self._upload_parquet_files(
                path=df_or_path,
                chunk_url=chunk_url,
                chunk_base_url=chunk_base_url,
                check_column=None,
            )

        # Start job execution
        _ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
        response = self.check_push_status(job_id)

        # Wait for job to finish
        if wait_for_finish and response.get("status", "") != "DONE":
            response = self._wait_until_data_job_finish(job_id=job_id)

        return response

    @deprecated("Use 'create_table', 'append_table' or 'upsert_table'.")
    def push_table(
        self,
        df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
        table_name: str,
        if_exists: str = "error",
        primary_keys: typing.Optional[typing.List[str]] = None,
        column_config: typing.List[typing.Dict[str, typing.Any]] = None,
        connection: typing.Union["DataConnection", str] = None,
        wait_for_finish: bool = True,
        chunksize: int = 100_000,
    ) -> typing.Dict:
        """Pushes a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
        [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
        to the specified Table in the Pool.

        !!! warning
            Deprecation: The method 'push_table' is deprecated and will be removed in the next release.
            Use one of:
            [create_table][celonis_api.event_collection.data_pool.Pool.create_table],
            [append_table][celonis_api.event_collection.data_pool.Pool.append_table],
            [upsert_table][celonis_api.event_collection.data_pool.Pool.upsert_table].

        Args:
            df_or_path:
                * If DataFrame, df is chunked, written to parquet and uploaded.
                * If Path to parquet file, file is uploaded.
                * If Path to folder, any parquet file in folder is uploaded.
                * If str, value is converted to Path and handled as described above.
            table_name: Name of Table.
            if_exists:
                * `error` -> an error is thrown if a table of the same name already exists in the pool.
                * `drop` -> the existing able is dropped completely and a new table is created, by default `error`.
                * `replace_data_only` -> the column names and types of the old tables are not overwritten.
            primary_keys: List of Table primary keys.
            column_config: Can be used to specify column types and string field length.
                ```json
                    [
                        {
                            "columnName":"BUKRS",
                            "fieldLength":100,
                            "columnType":"STRING"
                        }...
                    ]
                ```
                with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
            connection: The Data Connection to upload to, else uploads to Global.
            wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
            chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
                that are uploaded. If set to a value <1, no chunking is applied.

        Returns:
            The Data Job Status.
        """
        if if_exists == "error":
            response = self.create_table(
                df_or_path=df_or_path,
                table_name=table_name,
                if_exists=if_exists,
                column_config=column_config,
                connection=connection,
                wait_for_finish=wait_for_finish,
                chunksize=chunksize,
            )
        elif if_exists in ["replace", "replace_data_only"]:
            response = self.create_table(
                df_or_path=df_or_path,
                table_name=table_name,
                if_exists="drop",
                column_config=column_config,
                connection=connection,
                wait_for_finish=wait_for_finish,
                chunksize=chunksize,
            )
        elif if_exists == "append":
            response = self.append_table(
                df_or_path=df_or_path,
                table_name=table_name,
                connection=connection,
                wait_for_finish=wait_for_finish,
                chunksize=chunksize,
            )
        elif if_exists == "upsert":
            if primary_keys is None:
                raise PyCelonisValueError("Argument 'primary_keys' must be set for upsert.")
            else:
                response = self.upsert_table(
                    df_or_path=df_or_path,
                    primary_keys=primary_keys,
                    table_name=table_name,
                    connection=connection,
                    wait_for_finish=wait_for_finish,
                    chunksize=chunksize,
                )
        else:
            raise PyCelonisValueError(f"if_exists {if_exists} not supported")
        return response

    def _check_matching_columns(self, local_table: pa.Table, remote_table: pd.DataFrame):
        """Compares column names and column types of the local table to be pushed with the remote table.

        Args:
            local_table: The local table to be pushed.
            remote_table: The remote table which is located in the current data pool.

        Raises:
            PyCelonisValueError: If the table to be pushed has column names which
                are not included in the remote table (table in data pool).
        """
        local_col_names = set(local_table.column_names)
        remote_col_names = set(remote_table.columns.values.tolist())
        different_names = local_col_names.difference(remote_col_names)
        if len(different_names) != 0:
            raise PyCelonisValueError(
                "The table you want to push has column names which are "
                "not represented in the remote table,"
                f"wrong column names: {different_names}. "
                "Please check spelling or change column names!"
            )

    def _upload_file(
        self,
        filepath: pathlib.Path,
        chunk_url: str,
        chunk_base_url: str,
        df_remote_table: typing.Optional[pd.DataFrame] = None,
        threaded: bool = True,
    ):
        """Uploads parquet file(s) to pool.

        Parameters
        ----------
        filepath : pathlib.Path
            Path to parquet file.
        chunk_url : str
            Designated chunk url.
        chunk_base_url : str
            Designated chunk base url.
        df_remote_table : pd.DataFrame, optional
            The targe table as data frame used to check column name
            in a sub-method, by default None.
        threaded : bool, optional
            Set threaded true, if request needs to be parallelized,
            by default True.

        Returns
        -------
        Response
            The response from the file upload data job.

        Raises
        ------
        exceptions.HTTPError
            Raises when upload job failed.
        """

        parquet_table = pq.read_table(filepath)

        if df_remote_table is not None:
            self._check_matching_columns(local_table=parquet_table, remote_table=df_remote_table)

        try:
            parquet_table = parquet_table.drop(["_CELONIS_CHANGE_DATE"])
            pq.write_table(parquet_table, filepath)
        except KeyError:
            pass  # This happens if the column does not exist on the pq file

        del parquet_table
        n_chunks_before = len(self.celonis.api_request(chunk_base_url))
        time_before = time.time()

        try:
            return self.celonis.api_request(chunk_url, filepath)
        except PyCelonisError as e:
            error = True
            if time.time() - time_before > 110:
                for _ in range(24):
                    self._logger.info("Checking upload status in 10 seconds")
                    time.sleep(10)
                    n_chunks_after = len(self.celonis.api_request(chunk_base_url))
                    if n_chunks_after > n_chunks_before:
                        error = False
            if error:
                self._logger.exception(
                    f"File upload problem."
                    f"\n Maybe the file <{filepath}> "
                    f"is not a supported parquet/csv file.\n"
                    f"You could try: \n\n\tfrom pycelonis.utils.parquet_utils"
                    f" import read_parquet\n\tread_parquet('{filepath}')"
                    f"\n\nto check whether your parquet file is valid."
                )
                raise e

    def _upload_df_chunk(
        self,
        chunk,
        table_name: str,
        chunk_url: str,
        chunk_base_url: str,
        df_remote_table: typing.Optional[pd.DataFrame] = None,
    ):
        """Stores the cunk as a parquet file
           and uploads it then via _upload_file.

        Parameters
        ----------
        chunk : [type]
            [description]
        table_name : str
            The target table name in pool.
        chunk_url : str,
            Designated chunk url.
        chunk_base_url : str
            Designated chunk base url.
        df_remote_table : pd.DataFrame, optional
            The targe table as data frame used to
            check column name in a sub-method, by default None.
        """

        file_name = f"celonis_push_{table_name}_{time.time()}_{chunk[1]}.parquet"
        tmp_file = pathlib.Path(tempfile.gettempdir()) / file_name
        parquet_utils.write_parquet(chunk[0], tmp_file)
        if df_remote_table is not None:
            self._upload_file(
                tmp_file, chunk_url=chunk_url, chunk_base_url=chunk_base_url, df_remote_table=df_remote_table
            )
        else:
            self._upload_file(tmp_file, chunk_url=chunk_url, chunk_base_url=chunk_base_url)
        os.remove(tmp_file)

    def _get_data_source_id(self, connection: typing.Union[str, 'DataConnection'] = None):
        if connection is None:
            data_source_id = None
        else:
            if isinstance(connection, DataConnection):
                data_source_id = connection.id
            elif isinstance(connection, str) and utils.check_uuid(connection):
                data_source_id = connection
            else:
                raise PyCelonisTypeError("Argument 'connection' should be of type DataConnection or str (valid uuid).")

        return data_source_id

    def _create_payload_for_data_job(
        self,
        table_name: str,
        column_config: typing.List[typing.Dict[str, typing.Any]] = None,
        data_source_id: str = None,
        job_type: str = None,
        keys: list = None,
    ):

        if "." in table_name:
            table_name = table_name.replace(".", "_")
            self._logger.warning(f'"." replaced by "_" in {table_name}.')

        payload = {"targetName": table_name, "dataPoolId": self.id}

        if data_source_id:
            payload["connectionId"] = data_source_id
        if column_config:
            column_config = copy.deepcopy(column_config)
            column_config = self._convert_column_config_fieldlength(column_config)
            payload["tableSchema"] = {"tableName": table_name, "columns": column_config}
        if job_type:
            payload["type"] = job_type
        if keys:
            payload["keys"] = keys

        return payload, table_name

    def _upload_dataframe(
        self,
        df: pd.DataFrame,
        table_name: str,
        chunk_url: str,
        chunk_base_url: str,
        chunksize: int,
        check_column: typing.Optional[pd.DataFrame] = None,
    ):
        """Takes the local table (DataFrame) and splits it in chunks if
        demanded. Then thoses chunks are passed to parallalized method
        to store them as parquet files. Those parquet files are then pushed
        to the remote data pool.

        Parameters
        ----------
        df : pd.DataFrame
            The local table to be pushed.
        table_name : str
            The name of the local table.
        chunk_url : str,
            Designated chunk url.
        chunk_base_url : str
            Designated chunk base url.
        check_column : pd.DataFrame, optional
            The targe table as a data frame used to check column name
            in a sub-method, by default None.
        chunksize : int
            Sets how large one chunk (of the table to be pushed) should be.
        """

        if chunksize > 0:
            chunks = [(df[pos : pos + chunksize], pos) for pos in range(0, len(df), chunksize)]
        else:
            chunks = [(df, 0)]

        [
            r
            for r in utils.threaded(
                chunks,
                self._upload_df_chunk,
                table_name=table_name,
                chunk_url=chunk_url,
                chunk_base_url=chunk_base_url,
                df_remote_table=check_column,
            )
        ]

    def _upload_parquet_files(
        self,
        path: typing.Union[pd.DataFrame, pathlib.Path, str],
        chunk_url: str,
        chunk_base_url: str,
        check_column: typing.Optional[pd.DataFrame] = None,
    ) -> None:
        """Upload parquet files based on the path to the file or to the
           directory.

        Parameters
        ----------
        path : pathlib.Path or str
            Path to parquet file or path to folder,
            which contains one or more parquet files.
        table_name : str
            The name of the local table.
        chunk_url : str,
            Designated chunk url.
        chunk_base_url : str
            Designated chunk base url.
        check_column : pd.DataFrame, optional
            The targe table as a data frame used to check column name
            in a sub-method, by default None.

        Raises
        ------
        FileNotFoundError
            Raised when path is not file and not folder.
        """

        path = pathlib.Path(path)  # type: ignore
        if path.is_file():
            self._upload_file(
                path,
                chunk_url=chunk_url,
                chunk_base_url=chunk_base_url,
                df_remote_table=check_column,
                threaded=False,
            )
        elif path.is_dir():
            [
                r
                for r in utils.threaded(
                    path.glob("*.parquet"),
                    self._upload_file,
                    chunk_url=chunk_url,
                    chunk_base_url=chunk_base_url,
                    df_remote_table=check_column,
                )
            ]
        else:
            raise PyCelonisTypeError("Argument 'path' must be either file or directory.")

    def _convert_column_config_fieldlength(self, column_config):
        """Vertica multiplies the provided column length in column config  with 4 silently, so we divide the
        one provided by the user such that what actually is seen in vertica is the same as what will end
        up in vertica"""

        for col in column_config:
            if "fieldLength" in col.keys() and col['columnType'] == 'STRING':
                conv_len = math.ceil(int(col["fieldLength"]) / 4)
                if col["fieldLength"] % 4 != 0:
                    self._logger.warning(
                        f"The field length parameter of column config needs to be multiples of 4."
                        f"field length of {col['columnName']} will be rounded to {conv_len}."
                    )
                col["fieldLength"] = conv_len
        return column_config

    def get_column_config(
        self, table: typing.Union[str, typing.Dict], raise_error: bool = False
    ) -> typing.Optional[typing.List[typing.Dict[str, typing.Any]]]:
        """Get a Column Configuration of a Pool Table.

        Column Config List:
            ```json
            [
                {'columnName': 'colA', 'columnType': 'DATETIME'},
                {'columnName': 'colB', 'columnType': 'FLOAT'},
                {'columnName': 'colC', 'columnType': 'STRING', 'fieldLength': 80}
            ]
            ```


        Args:
            table: Name of the Pool Table or dictionary with `{'name': '', 'schemaName': ''}`.
            raise_error: Raises a [celonis_api.errors.PyCelonisValueError][] if Table data types are `None` or table
                has 99+ columns, else only logs warning.

        Returns:
            The Column Configuration of the Pool Table (Always ignoring '_CELONIS_CHANGE_DATE').
        """
        if isinstance(table, str):
            table_list = [t for t in self.tables if t["name"] == table]
            if not table_list:
                raise PyCelonisNotFoundError(f'Table {table} not found in pool')
            table = table_list[0]
        try:
            column_config = self._get_raw_column_config(table_name=table['name'], data_source_id=table['dataSourceId'])
        except Exception as e:
            if raise_error:
                raise e
            self._logger.warning(str(e))
            return None
        column_config = self._clean_column_config(column_config=column_config)
        return column_config

    def _get_raw_column_config(
        self, table_name: str, data_source_id: str = None
    ) -> typing.List[typing.Dict[str, typing.Any]]:
        url = f'{self.url}/columns?tableName={table_name}'
        if data_source_id is not None:
            url += f"&schemaName={data_source_id}"  # Inconsistency in naming on integration side
        return self.celonis.api_request(url)

    @staticmethod
    def _clean_column_config(
        column_config: typing.List[typing.Dict[str, typing.Any]]
    ) -> typing.List[typing.Dict[str, typing.Any]]:
        """function to clean config from /columns endpoint because it is different than config needed for push api"""
        clean_config = []
        for column in column_config:
            if column["name"] != "_CELONIS_CHANGE_DATE":
                clean_column = {"columnName": column["name"], "columnType": column["type"]}
                if column["type"] == "STRING" and column.get("length") is not None:
                    clean_column["fieldLength"] = column["length"]
                clean_config += [clean_column]
        return clean_config

    def check_push_status(self, job_id: str = "") -> typing.Dict:
        """Checks the Status of a Data Push Job.

        !!! api "API"
            - `GET: /integration/api/v1/data-push/{pool_id}/jobs/{job_id}`

        Args:
            job_id: The ID of the job to check. If empty returns all job status.

        Returns:
            Status of Data Push Job(s).
        """
        return self.celonis.api_request(f"{self.url_data_push}{job_id}")

    def check_data_job_execution_status(self) -> typing.List:
        """Checks the Status of Data Job Executions.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/logs/status`

        Returns:
            Status of all Data Job Executions.
        """
        return self.celonis.api_request(f"{self.url}/logs/status")

    def create_datamodel(self, name: str) -> 'Datamodel':
        """Creates a new Datamodel in the Pool.

        Args:
            name: Name of the Datamodel.

        Returns:
            The newly created Datamodel object.
        """
        url = f"{self.url}/data-models"
        payload = {"name": name, "poolId": self.id, "configurationSkipped": True}
        response = self.celonis.api_request(url, payload)
        return Datamodel(self, response)

    @deprecated("Use the online wizard to set up Data Connections.")
    def create_data_connection(
        self,
        client: str,
        host: str,
        password: str,
        system_number: str,
        user: str,
        name: str,
        connector_type: str,
        uplink_id: str = None,
        use_uplink: bool = True,
        compression_type: str = "GZIP",
        **kwargs,
    ) -> 'DataConnection':
        """Creates a new Data Connection (Currently, only SAP connection are supported).

        !!! warning
            This method is deprecated and will be removed in the next release.
            Use the online wizard to set up Data Connections.

        Args:
            client: Client.
            host: Host.
            user: Username.
            password: Password.
            system_number: System Number.
            name: Name of the Data Connection.
            connector_type: Type of the Data Connection. One of ['SAP'].
            uplink_id: ID of an Uplink Connection.
            use_uplink: Whether to use an Uplink Connection or not.
            compression_type: Compression Type.
            **kwargs:

        Returns:
            The newly created Data Connection.
        """
        if connector_type == "sap":
            url = f"{self.url_connection_creation}/sap"
            payload = {
                "config": {
                    "client": client,
                    "compressionType": compression_type,
                    "groupName": "",
                    "host": host,
                    "parallelTables": 4,
                    "password": password,
                    "r3Name": "",
                    "serviceName": "",
                    "sncPartnerName": "",
                    "systemNumber": system_number,
                    "type": connector_type,
                    "useLogonGroup": False,
                    "useSnc": False,
                    "user": user,
                },
                "name": name,
                "poolId": self.id,
                "targetSchemaName": "",
                "type": connector_type,
                "uplinkId": uplink_id,
                "useUplink": use_uplink,
            }
        else:
            raise PyCelonisError("Currently, only SAP connection are supported.")

        response = self.celonis.api_request(url, payload)
        return DataConnection(self, response, self.celonis)

    def move(self, to: str):
        """Moves the Pool to another team.

        !!! api "API"
            - `POST: /integration/api/pools/move`
                ```json
                {
                    "subsetOfDataModels": False,
                    "dataPoolId": self.id,
                    "selectedDataModels": [],
                    "moveToDomain": to
                }
                ```

        Args:
            to: Name of the host domain (e.g. `move` for https://move.eu-1.celonis.cloud).
        """
        url = f"{self.celonis.url}/integration/api/pools/move"
        payload = {
            "subsetOfDataModels": False,
            "dataPoolId": self.id,
            "selectedDataModels": [k for k, v in self.datamodels.ids.items()],
            "moveToDomain": to,
        }
        self.celonis.api_request(url, payload)

    def create_pool_parameter(
        self,
        pool_variable: typing.Union[typing.Dict, 'PoolParameter'] = None,
        name: str = None,
        placeholder: str = None,
        description: str = None,
        data_type: str = "STRING",
        var_type: str = "PUBLIC_CONSTANT",
        values: typing.List = None,
    ) -> 'PoolParameter':
        """Creates a new Variable with the specified properties in the Pool.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/variables/`
                ```json
                {
                    "poolId": self.id,
                    "dataType":"<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
                    "name": "",
                    "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
                    "description": "",
                    "placeholder": "",
                    "values": [
                        {"value": ""},...
                    ],
                }
                ```

        Args:
            pool_variable: Pool Parameter object or dictionary (see API), if `None` all other arguments must be set.
            name: Name of the Variable (same as `pool_variable["name"]`).
            placeholder: Placeholder of the Variable.
            description: Description of the Variable.
            data_type: Data type of the Variable (see options `pool_variable`).
            var_type: Type of the Variable (see options `pool_variable`).
            values: List of Variable values.

        Returns:
            The newly create Pool Parameter object.
        """
        payload = None
        if isinstance(pool_variable, PoolParameter):
            payload = {
                "dataType": pool_variable.data["dataType"],
                "name": pool_variable.data["name"],
                "type": pool_variable.data["type"],
                "description": pool_variable.data["description"],
                "placeholder": pool_variable.data["placeholder"],
                "poolId": self.id,
                "values": pool_variable.data["values"],
            }
        elif isinstance(pool_variable, dict):
            payload = pool_variable
            payload["poolId"] = self.id
        elif pool_variable is None:
            if name is None:
                raise PyCelonisValueError("name can't be None if no pool_variable is given")

            payload = {
                "dataType": data_type,
                "name": name,
                "type": var_type,
                "description": description,
                "placeholder": placeholder if placeholder is not None else name.upper(),
                "poolId": self.id,
                "values": values if values is not None else [{"value": "Undefined"}],
            }

        created_parameter = self.celonis.api_request(f"{self.url}/variables/", payload)
        return PoolParameter(self, created_parameter, self.celonis)

    def create_data_job(self, name: str, data_source_id: str = None) -> 'DataJob':
        """Creates a new Data Job with the specified name in the Pool.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/jobs/`
                ```json
                {
                    "dataPoolId": self.id,
                    "dataSourceId": data_source_id,
                    "name": name
                }
                ```

        Args:
            name: Name of the Data Job.
            data_source_id: ID of the Data Source that the new Data Job will be connected to.
                If not specified, the default global source will be connected to.

        Returns:
            The newly created Data Job object.
        """
        payload = {"dataPoolId": self.id, "dataSourceId": data_source_id, "name": name}
        response = self.celonis.api_request(f"{self.url}/jobs", payload)
        return DataJob(self, response, self.celonis)

    def _drop_remote_table(self, table_name: str, data_source_id: str = None):
        rand_number = random.randint(1000, 9999)
        data_job, transformation = None, None
        try:
            data_job = self.create_data_job(f"PyCelonis_Drop_Table_{rand_number}", data_source_id=data_source_id)
            transformation = data_job.create_transformation(f"drop_{rand_number}")
            transformation.statement = f"DROP TABLE IF EXISTS {table_name};"
            transformation.execute_from_workbench()
        except PyCelonisError as e:
            if transformation:
                transformation.delete()
            if data_job:
                data_job.delete()

            raise e

        table = self.find_table(table_name, data_source_id)
        if table is not None:
            raise PyCelonisNotFoundError(
                f"FAILED: Dropping table - '{table_name}' failed. Remote table still exists in data pool."
            )
        self._logger.info(f"SUCCESS: Table - '{table_name}' from data pool was successful dropped.")

    def _pull_remote_table_from_pool(
        self,
        table_name: str,
        data_source_id: str = None,
        row_limit: int = 1,
        sort_by_column: typing.Optional[str] = None,
    ) -> pd.DataFrame:
        rand_number = random.randint(1000, 9999)
        data_job, transformation = None, None
        try:
            data_job = self.create_data_job(f"PyCelonis_Extract_Table_{rand_number}", data_source_id=data_source_id)
            transformation = data_job.create_transformation(f"extract_{rand_number}")
            df_remote_table = transformation.get_data_frame(f"SELECT * FROM {table_name} LIMIT {row_limit};")
        except PyCelonisError as e:
            if transformation:
                transformation.delete()
            if data_job:
                data_job.delete()

            raise e

        if "_CELONIS_CHANGE_DATE" in df_remote_table:
            df_remote_table.drop(columns=["_CELONIS_CHANGE_DATE"], inplace=True)

        if sort_by_column:
            df_remote_table.sort_values(by=[sort_by_column], inplace=True, ignore_index=True)

        return df_remote_table

    def _wait_until_data_job_finish(self, job_id) -> typing.Dict:
        self._logger.info("Data push job started...")

        iterations = 0
        error_count = 0
        while True:
            try:
                response = self.check_push_status(job_id)
                status = response.get("status", "")
                if status not in ["RUNNING", "QUEUED", "NEW"]:
                    self._logger.info(f"Data push job status: {status}")
                    break

                error_count = 0
                iterations += 1
                if iterations % 5 == 0:
                    self._logger.info(f"Data push job status: {status}...")
                time.sleep(1)
            except PyCelonisHTTPError as e:
                error_count += 1
                self._logger.exception("Failed to request status, trying again...")
                time.sleep(3)
                if error_count > 5:
                    raise e

        if status == "ERROR":
            error_logs = response.get("logs", [])
            error_logs = "\n".join(str(log) for log in error_logs)
            raise PyCelonisHTTPError(f"Data push job failed. Error logs:\n{error_logs}")

        return response

data_connections: CelonisCollection[DataConnection] property readonly

Get all Pool Data Connections.

API

  • GET: /integration/api/pools/{pool_id}/data-sources/

Returns:

Type Description
CelonisCollection[DataConnection]

A Collection of Pool Data Connections.

data_jobs: CelonisCollection[DataJob] property readonly

Get all Pool Data Jobs.

API

  • GET: /integration/api/pools/{pool_id}/jobs

Returns:

Type Description
CelonisCollection[DataJob]

A Collection of Pool Data Jobs.

datamodels: CelonisCollection property readonly

Get all Datamodels of the Pool.

API

  • GET: /integration/api/pools/{pool_id}/data-models

Returns:

Type Description
CelonisCollection

Collection of Pool Datamodels.

tables: List[Dict] property readonly

Get all Pool Tables.

API

  • GET: /integration/api/pools/{pool_id}/tables

Returns:

Type Description
List[Dict]

A List of dictionaries containing Pool tables.

url: str property readonly

API

  • /integration/api/pools/{pool_id}

url_connection_creation property readonly

API

  • /integration/api/datasource/

url_data_push property readonly

API

  • /integration/api/v1/data-push/{pool_id}/jobs/

variables: CelonisCollection[PoolParameter] property readonly

Get all Pool Variables.

API

  • GET: /integration/api/pools/{pool_id}/variables

Returns:

Type Description
CelonisCollection[PoolParameter]

A Collection of Pool Variables.

append_table(self, df_or_path, table_name, column_config=None, connection=None, wait_for_finish=True, chunksize=100000)

Appends a pandas.DataFrame or pyarrow.parquet.ParquetFile to an existing Table in the Pool.

Parameters:

Name Type Description Default
df_or_path Union[pandas.core.frame.DataFrame, pathlib.Path, str]
  • If DataFrame, df is chunked, written to parquet and uploaded.
  • If Path to parquet file, file is uploaded.
  • If Path to folder, any parquet file in folder is uploaded.
  • If str, value is converted to Path and handled as described above.
required
table_name str

Name of Table.

required
column_config List[Dict[str, Any]]

Can be used to specify column types and string field length.

    [
        {
            "columnName":"BUKRS",
            "fieldLength":100,
            "columnType":"STRING"
        }...
    ]
with columnType one of [INTEGER, DATE, TIME, DATETIME, FLOAT, BOOLEAN, STRING].
None
connection Union[DataConnection, str]

The Data Connection to upload to, else uploads to Global.

None
wait_for_finish bool

Waits for the upload to finish processing, set to False to trigger only.

True
chunksize int

If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied.

100000

Returns:

Type Description
Dict

The Data Job Status.

Exceptions:

Type Description
PyCelonisValueError

If Table already exists and if_exists='error'.

PyCelonisTypeError

When connection is not DataConnection object or ID of Data Connection.

PyCelonisTypeError

If Path is not valid a file or folder.

Source code in celonis_api/event_collection/data_pool.py
def append_table(
    self,
    df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
    table_name: str,
    column_config: typing.List[typing.Dict[str, typing.Any]] = None,
    connection: typing.Union["DataConnection", str] = None,
    wait_for_finish: bool = True,
    chunksize: int = 100_000,
) -> typing.Dict:
    """Appends a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
    to an existing Table in the Pool.

    Args:
        df_or_path:
            * If DataFrame, df is chunked, written to parquet and uploaded.
            * If Path to parquet file, file is uploaded.
            * If Path to folder, any parquet file in folder is uploaded.
            * If str, value is converted to Path and handled as described above.
        table_name: Name of Table.
        column_config: Can be used to specify column types and string field length.
            ```json
                [
                    {
                        "columnName":"BUKRS",
                        "fieldLength":100,
                        "columnType":"STRING"
                    }...
                ]
            ```
            with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
        connection: The Data Connection to upload to, else uploads to Global.
        wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
        chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
            that are uploaded. If set to a value <1, no chunking is applied.

    Returns:
        The Data Job Status.

    Raises:
        PyCelonisValueError: If Table already exists and `if_exists='error'`.
        PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
        PyCelonisTypeError: If Path is not valid a file or folder.
    """
    self.celonis._tracker.track("Append table", extra={"tracking_type": "DATA_PUSH"})

    data_source_id = self._get_data_source_id(connection)
    table = self.find_table(table_name, data_source_id)
    if table is None:
        raise PyCelonisNotFoundError(
            f"The target table \"{table_name}\" could not be found in the Data Pool. "
            "Please check spelling of table name."
        )

    if not column_config:
        column_config = self.get_column_config(table)

    # Create payload for data job
    payload, table_name = self._create_payload_for_data_job(
        table_name=table_name, column_config=column_config, data_source_id=data_source_id, job_type="DELTA"
    )

    # Create data push job
    response = self.celonis.api_request(self.url_data_push, payload)
    job_id = response["id"]

    # Create parquet uploader
    chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
    chunk_url = f"{chunk_base_url}/upserted"

    # Create df chunks and upload
    if isinstance(df_or_path, pd.DataFrame):
        self._upload_dataframe(
            df=df_or_path,
            table_name=table_name,
            chunksize=chunksize,
            chunk_url=chunk_url,
            chunk_base_url=chunk_base_url,
            check_column=None,
        )
    # Or upload file(s)
    elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
        self._upload_parquet_files(
            path=df_or_path,
            chunk_url=chunk_url,
            chunk_base_url=chunk_base_url,
            check_column=None,
        )

    # Start job execution
    _ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
    response = self.check_push_status(job_id)

    # Wait for job to finish
    if wait_for_finish and response.get("status", "") != "DONE":
        response = self._wait_until_data_job_finish(job_id=job_id)

    return response

check_data_job_execution_status(self)

Checks the Status of Data Job Executions.

API

  • GET: /integration/api/pools/{pool_id}/logs/status

Returns:

Type Description
List

Status of all Data Job Executions.

Source code in celonis_api/event_collection/data_pool.py
def check_data_job_execution_status(self) -> typing.List:
    """Checks the Status of Data Job Executions.

    !!! api "API"
        - `GET: /integration/api/pools/{pool_id}/logs/status`

    Returns:
        Status of all Data Job Executions.
    """
    return self.celonis.api_request(f"{self.url}/logs/status")

check_push_status(self, job_id='')

Checks the Status of a Data Push Job.

API

  • GET: /integration/api/v1/data-push/{pool_id}/jobs/{job_id}

Parameters:

Name Type Description Default
job_id str

The ID of the job to check. If empty returns all job status.

''

Returns:

Type Description
Dict

Status of Data Push Job(s).

Source code in celonis_api/event_collection/data_pool.py
def check_push_status(self, job_id: str = "") -> typing.Dict:
    """Checks the Status of a Data Push Job.

    !!! api "API"
        - `GET: /integration/api/v1/data-push/{pool_id}/jobs/{job_id}`

    Args:
        job_id: The ID of the job to check. If empty returns all job status.

    Returns:
        Status of Data Push Job(s).
    """
    return self.celonis.api_request(f"{self.url_data_push}{job_id}")

create_data_connection(self, client, host, password, system_number, user, name, connector_type, uplink_id=None, use_uplink=True, compression_type='GZIP', **kwargs)

Creates a new Data Connection (Currently, only SAP connection are supported).

Warning

This method is deprecated and will be removed in the next release. Use the online wizard to set up Data Connections.

Parameters:

Name Type Description Default
client str

Client.

required
host str

Host.

required
user str

Username.

required
password str

Password.

required
system_number str

System Number.

required
name str

Name of the Data Connection.

required
connector_type str

Type of the Data Connection. One of ['SAP'].

required
uplink_id str

ID of an Uplink Connection.

None
use_uplink bool

Whether to use an Uplink Connection or not.

True
compression_type str

Compression Type.

'GZIP'
**kwargs {}

Returns:

Type Description
DataConnection

The newly created Data Connection.

Source code in celonis_api/event_collection/data_pool.py
@deprecated("Use the online wizard to set up Data Connections.")
def create_data_connection(
    self,
    client: str,
    host: str,
    password: str,
    system_number: str,
    user: str,
    name: str,
    connector_type: str,
    uplink_id: str = None,
    use_uplink: bool = True,
    compression_type: str = "GZIP",
    **kwargs,
) -> 'DataConnection':
    """Creates a new Data Connection (Currently, only SAP connection are supported).

    !!! warning
        This method is deprecated and will be removed in the next release.
        Use the online wizard to set up Data Connections.

    Args:
        client: Client.
        host: Host.
        user: Username.
        password: Password.
        system_number: System Number.
        name: Name of the Data Connection.
        connector_type: Type of the Data Connection. One of ['SAP'].
        uplink_id: ID of an Uplink Connection.
        use_uplink: Whether to use an Uplink Connection or not.
        compression_type: Compression Type.
        **kwargs:

    Returns:
        The newly created Data Connection.
    """
    if connector_type == "sap":
        url = f"{self.url_connection_creation}/sap"
        payload = {
            "config": {
                "client": client,
                "compressionType": compression_type,
                "groupName": "",
                "host": host,
                "parallelTables": 4,
                "password": password,
                "r3Name": "",
                "serviceName": "",
                "sncPartnerName": "",
                "systemNumber": system_number,
                "type": connector_type,
                "useLogonGroup": False,
                "useSnc": False,
                "user": user,
            },
            "name": name,
            "poolId": self.id,
            "targetSchemaName": "",
            "type": connector_type,
            "uplinkId": uplink_id,
            "useUplink": use_uplink,
        }
    else:
        raise PyCelonisError("Currently, only SAP connection are supported.")

    response = self.celonis.api_request(url, payload)
    return DataConnection(self, response, self.celonis)

create_data_job(self, name, data_source_id=None)

Creates a new Data Job with the specified name in the Pool.

API

  • POST: /integration/api/pools/{pool_id}/jobs/
    {
        "dataPoolId": self.id,
        "dataSourceId": data_source_id,
        "name": name
    }
    

Parameters:

Name Type Description Default
name str

Name of the Data Job.

required
data_source_id str

ID of the Data Source that the new Data Job will be connected to. If not specified, the default global source will be connected to.

None

Returns:

Type Description
DataJob

The newly created Data Job object.

Source code in celonis_api/event_collection/data_pool.py
def create_data_job(self, name: str, data_source_id: str = None) -> 'DataJob':
    """Creates a new Data Job with the specified name in the Pool.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/jobs/`
            ```json
            {
                "dataPoolId": self.id,
                "dataSourceId": data_source_id,
                "name": name
            }
            ```

    Args:
        name: Name of the Data Job.
        data_source_id: ID of the Data Source that the new Data Job will be connected to.
            If not specified, the default global source will be connected to.

    Returns:
        The newly created Data Job object.
    """
    payload = {"dataPoolId": self.id, "dataSourceId": data_source_id, "name": name}
    response = self.celonis.api_request(f"{self.url}/jobs", payload)
    return DataJob(self, response, self.celonis)

create_datamodel(self, name)

Creates a new Datamodel in the Pool.

Parameters:

Name Type Description Default
name str

Name of the Datamodel.

required

Returns:

Type Description
Datamodel

The newly created Datamodel object.

Source code in celonis_api/event_collection/data_pool.py
def create_datamodel(self, name: str) -> 'Datamodel':
    """Creates a new Datamodel in the Pool.

    Args:
        name: Name of the Datamodel.

    Returns:
        The newly created Datamodel object.
    """
    url = f"{self.url}/data-models"
    payload = {"name": name, "poolId": self.id, "configurationSkipped": True}
    response = self.celonis.api_request(url, payload)
    return Datamodel(self, response)

create_pool_parameter(self, pool_variable=None, name=None, placeholder=None, description=None, data_type='STRING', var_type='PUBLIC_CONSTANT', values=None)

Creates a new Variable with the specified properties in the Pool.

API

  • POST: /integration/api/pools/{pool_id}/variables/
    {
        "poolId": self.id,
        "dataType":"<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
        "name": "",
        "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
        "description": "",
        "placeholder": "",
        "values": [
            {"value": ""},...
        ],
    }
    

Parameters:

Name Type Description Default
pool_variable Union[Dict, PoolParameter]

Pool Parameter object or dictionary (see API), if None all other arguments must be set.

None
name str

Name of the Variable (same as pool_variable["name"]).

None
placeholder str

Placeholder of the Variable.

None
description str

Description of the Variable.

None
data_type str

Data type of the Variable (see options pool_variable).

'STRING'
var_type str

Type of the Variable (see options pool_variable).

'PUBLIC_CONSTANT'
values List

List of Variable values.

None

Returns:

Type Description
PoolParameter

The newly create Pool Parameter object.

Source code in celonis_api/event_collection/data_pool.py
def create_pool_parameter(
    self,
    pool_variable: typing.Union[typing.Dict, 'PoolParameter'] = None,
    name: str = None,
    placeholder: str = None,
    description: str = None,
    data_type: str = "STRING",
    var_type: str = "PUBLIC_CONSTANT",
    values: typing.List = None,
) -> 'PoolParameter':
    """Creates a new Variable with the specified properties in the Pool.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/variables/`
            ```json
            {
                "poolId": self.id,
                "dataType":"<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
                "name": "",
                "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
                "description": "",
                "placeholder": "",
                "values": [
                    {"value": ""},...
                ],
            }
            ```

    Args:
        pool_variable: Pool Parameter object or dictionary (see API), if `None` all other arguments must be set.
        name: Name of the Variable (same as `pool_variable["name"]`).
        placeholder: Placeholder of the Variable.
        description: Description of the Variable.
        data_type: Data type of the Variable (see options `pool_variable`).
        var_type: Type of the Variable (see options `pool_variable`).
        values: List of Variable values.

    Returns:
        The newly create Pool Parameter object.
    """
    payload = None
    if isinstance(pool_variable, PoolParameter):
        payload = {
            "dataType": pool_variable.data["dataType"],
            "name": pool_variable.data["name"],
            "type": pool_variable.data["type"],
            "description": pool_variable.data["description"],
            "placeholder": pool_variable.data["placeholder"],
            "poolId": self.id,
            "values": pool_variable.data["values"],
        }
    elif isinstance(pool_variable, dict):
        payload = pool_variable
        payload["poolId"] = self.id
    elif pool_variable is None:
        if name is None:
            raise PyCelonisValueError("name can't be None if no pool_variable is given")

        payload = {
            "dataType": data_type,
            "name": name,
            "type": var_type,
            "description": description,
            "placeholder": placeholder if placeholder is not None else name.upper(),
            "poolId": self.id,
            "values": values if values is not None else [{"value": "Undefined"}],
        }

    created_parameter = self.celonis.api_request(f"{self.url}/variables/", payload)
    return PoolParameter(self, created_parameter, self.celonis)

create_table(self, df_or_path, table_name, if_exists='error', column_config=None, connection=None, wait_for_finish=True, chunksize=100000)

Creates a new Table in the Pool from a pandas.DataFrame or pyarrow.parquet.ParquetFile.

Parameters:

Name Type Description Default
df_or_path Union[pandas.core.frame.DataFrame, pathlib.Path, str]
  • If DataFrame, df is chunked, written to parquet and uploaded.
  • If Path to parquet file, file is uploaded.
  • If Path to folder, any parquet file in folder is uploaded.
  • If str, value is converted to Path and handled as described above. (The index of the data frame is ignored and NOT pushed to Celonis.)
required
table_name str

Name of Table.

required
if_exists str
  • error -> an error is thrown if a table of the same name already exists in the pool.
  • drop -> the existing able is dropped completely and a new table is created, by default error.
  • replace_data_only -> the column names and types of the old tables are not overwritten.
'error'
column_config List[Dict[str, Any]]

Can be used to specify column types and string field length.

    [
        {
            "columnName":"BUKRS",
            "fieldLength":100,
            "columnType":"STRING"
        }...
    ]
with columnType one of [INTEGER, DATE, TIME, DATETIME, FLOAT, BOOLEAN, STRING].
None
connection Union[DataConnection, str]

The Data Connection to upload to, else uploads to Global.

None
wait_for_finish bool

Waits for the upload to finish processing, set to False to trigger only.

True
chunksize int

If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied.

100000

Returns:

Type Description
Dict

The Data Job Status.

Exceptions:

Type Description
PyCelonisValueError

If Table already exists and if_exists='error'.

PyCelonisTypeError

When connection is not DataConnection object or ID of Data Connection.

PyCelonisTypeError

If Path is not valid a file or folder.

Source code in celonis_api/event_collection/data_pool.py
def create_table(
    self,
    df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
    table_name: str,
    if_exists: str = "error",  # drop, replace_data_only
    column_config: typing.List[typing.Dict[str, typing.Any]] = None,
    connection: typing.Union["DataConnection", str] = None,
    wait_for_finish: bool = True,
    chunksize: int = 100_000,
) -> typing.Dict:
    """Creates a new Table in the Pool from a
    [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html).

    Args:
        df_or_path:
            * If DataFrame, df is chunked, written to parquet and uploaded.
            * If Path to parquet file, file is uploaded.
            * If Path to folder, any parquet file in folder is uploaded.
            * If str, value is converted to Path and handled as described above.
            (The index of the data frame is ignored and NOT pushed to Celonis.)
        table_name: Name of Table.
        if_exists:
            * `error` -> an error is thrown if a table of the same name already exists in the pool.
            * `drop` -> the existing able is dropped completely and a new table is created, by default `error`.
            * `replace_data_only` -> the column names and types of the old tables are not overwritten.
        column_config: Can be used to specify column types and string field length.
            ```json
                [
                    {
                        "columnName":"BUKRS",
                        "fieldLength":100,
                        "columnType":"STRING"
                    }...
                ]
            ```
            with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
        connection: The Data Connection to upload to, else uploads to Global.
        wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
        chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
            that are uploaded. If set to a value <1, no chunking is applied.

    Returns:
        The Data Job Status.

    Raises:
        PyCelonisValueError: If Table already exists and `if_exists='error'`.
        PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
        PyCelonisTypeError: If Path is not valid a file or folder.
    """
    self.celonis._tracker.track("Create table", extra={"tracking_type": "DATA_PUSH"})

    if if_exists not in ["error", "drop", "replace_data_only"]:
        raise PyCelonisValueError("Argument 'if_exists' must be one of ['error', 'drop', 'replace_data_only'].")

    data_source_id = self._get_data_source_id(connection)
    job_type = None

    if if_exists == "drop":
        job_type = "REPLACE"
    else:
        table = self.find_table(table_name, data_source_id)
        if table is not None:
            if if_exists == "error":
                raise PyCelonisValueError(
                    f"Table with name {table_name} already exists in the "
                    "Data Pool. If you want to drop it and create a new table, set if_exists='drop'."
                )
            elif if_exists == "replace_data_only":
                if column_config is not None:
                    raise PyCelonisValueError(
                        "When argument if_exists='replace_data_only', "
                        "you cannot give an additional column_config. "
                        "The column_config is inferred from the pool table."
                    )
                column_config = self.get_column_config(table, raise_error=True)

    # Create payload for data job
    payload, table_name = self._create_payload_for_data_job(
        table_name=table_name,
        column_config=column_config,
        data_source_id=data_source_id,
        job_type=job_type,
    )

    # Create data push job
    response = self.celonis.api_request(self.url_data_push, payload)
    job_id = response["id"]
    # Create parquet uploader
    chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
    chunk_url = f"{chunk_base_url}/upserted"

    # Create df chunks and upload
    if isinstance(df_or_path, pd.DataFrame):
        self._upload_dataframe(
            df=df_or_path,
            table_name=table_name,
            chunksize=chunksize,
            chunk_url=chunk_url,
            chunk_base_url=chunk_base_url,
        )
    # Or upload file(s)
    elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
        self._upload_parquet_files(path=df_or_path, chunk_url=chunk_url, chunk_base_url=chunk_base_url)

    # Start job execution
    _ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
    response = self.check_push_status(job_id)

    # Wait for job to finish
    if wait_for_finish and response.get("status", "") != "DONE":
        response = self._wait_until_data_job_finish(job_id=job_id)

    return response

find_table(self, table_name, data_source_id=None)

Find a Table in the Pool.

Parameters:

Name Type Description Default
table_name str

Name of the Pool Table.

required
data_source_id str

ID of the Data Source.

None

Returns:

Type Description
Optional[Dict]

The Pool Table, if found.

Source code in celonis_api/event_collection/data_pool.py
def find_table(self, table_name: str, data_source_id: str = None) -> typing.Optional[typing.Dict]:
    """Find a Table in the Pool.

    Args:
        table_name: Name of the Pool Table.
        data_source_id: ID of the Data Source.

    Returns:
        The Pool Table, if found.
    """
    table_found = None
    for table in self.tables:
        if table["name"].lower() == table_name.lower() and table["dataSourceId"] == data_source_id:
            table_found = table
            break

    return table_found

get_column_config(self, table, raise_error=False)

Get a Column Configuration of a Pool Table.

Column Config List:

[
    {'columnName': 'colA', 'columnType': 'DATETIME'},
    {'columnName': 'colB', 'columnType': 'FLOAT'},
    {'columnName': 'colC', 'columnType': 'STRING', 'fieldLength': 80}
]

Parameters:

Name Type Description Default
table Union[str, Dict]

Name of the Pool Table or dictionary with {'name': '', 'schemaName': ''}.

required
raise_error bool

Raises a celonis_api.errors.PyCelonisValueError if Table data types are None or table has 99+ columns, else only logs warning.

False

Returns:

Type Description
Optional[List[Dict[str, Any]]]

The Column Configuration of the Pool Table (Always ignoring '_CELONIS_CHANGE_DATE').

Source code in celonis_api/event_collection/data_pool.py
def get_column_config(
    self, table: typing.Union[str, typing.Dict], raise_error: bool = False
) -> typing.Optional[typing.List[typing.Dict[str, typing.Any]]]:
    """Get a Column Configuration of a Pool Table.

    Column Config List:
        ```json
        [
            {'columnName': 'colA', 'columnType': 'DATETIME'},
            {'columnName': 'colB', 'columnType': 'FLOAT'},
            {'columnName': 'colC', 'columnType': 'STRING', 'fieldLength': 80}
        ]
        ```


    Args:
        table: Name of the Pool Table or dictionary with `{'name': '', 'schemaName': ''}`.
        raise_error: Raises a [celonis_api.errors.PyCelonisValueError][] if Table data types are `None` or table
            has 99+ columns, else only logs warning.

    Returns:
        The Column Configuration of the Pool Table (Always ignoring '_CELONIS_CHANGE_DATE').
    """
    if isinstance(table, str):
        table_list = [t for t in self.tables if t["name"] == table]
        if not table_list:
            raise PyCelonisNotFoundError(f'Table {table} not found in pool')
        table = table_list[0]
    try:
        column_config = self._get_raw_column_config(table_name=table['name'], data_source_id=table['dataSourceId'])
    except Exception as e:
        if raise_error:
            raise e
        self._logger.warning(str(e))
        return None
    column_config = self._clean_column_config(column_config=column_config)
    return column_config

move(self, to)

Moves the Pool to another team.

API

  • POST: /integration/api/pools/move
    {
        "subsetOfDataModels": False,
        "dataPoolId": self.id,
        "selectedDataModels": [],
        "moveToDomain": to
    }
    

Parameters:

Name Type Description Default
to str

Name of the host domain (e.g. move for https://move.eu-1.celonis.cloud).

required
Source code in celonis_api/event_collection/data_pool.py
def move(self, to: str):
    """Moves the Pool to another team.

    !!! api "API"
        - `POST: /integration/api/pools/move`
            ```json
            {
                "subsetOfDataModels": False,
                "dataPoolId": self.id,
                "selectedDataModels": [],
                "moveToDomain": to
            }
            ```

    Args:
        to: Name of the host domain (e.g. `move` for https://move.eu-1.celonis.cloud).
    """
    url = f"{self.celonis.url}/integration/api/pools/move"
    payload = {
        "subsetOfDataModels": False,
        "dataPoolId": self.id,
        "selectedDataModels": [k for k, v in self.datamodels.ids.items()],
        "moveToDomain": to,
    }
    self.celonis.api_request(url, payload)

push_table(self, df_or_path, table_name, if_exists='error', primary_keys=None, column_config=None, connection=None, wait_for_finish=True, chunksize=100000)

Pushes a pandas.DataFrame or pyarrow.parquet.ParquetFile to the specified Table in the Pool.

Warning

Deprecation: The method 'push_table' is deprecated and will be removed in the next release. Use one of: create_table, append_table, upsert_table.

Parameters:

Name Type Description Default
df_or_path Union[pandas.core.frame.DataFrame, pathlib.Path, str]
  • If DataFrame, df is chunked, written to parquet and uploaded.
  • If Path to parquet file, file is uploaded.
  • If Path to folder, any parquet file in folder is uploaded.
  • If str, value is converted to Path and handled as described above.
required
table_name str

Name of Table.

required
if_exists str
  • error -> an error is thrown if a table of the same name already exists in the pool.
  • drop -> the existing able is dropped completely and a new table is created, by default error.
  • replace_data_only -> the column names and types of the old tables are not overwritten.
'error'
primary_keys Optional[List[str]]

List of Table primary keys.

None
column_config List[Dict[str, Any]]

Can be used to specify column types and string field length.

    [
        {
            "columnName":"BUKRS",
            "fieldLength":100,
            "columnType":"STRING"
        }...
    ]
with columnType one of [INTEGER, DATE, TIME, DATETIME, FLOAT, BOOLEAN, STRING].
None
connection Union[DataConnection, str]

The Data Connection to upload to, else uploads to Global.

None
wait_for_finish bool

Waits for the upload to finish processing, set to False to trigger only.

True
chunksize int

If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied.

100000

Returns:

Type Description
Dict

The Data Job Status.

Source code in celonis_api/event_collection/data_pool.py
@deprecated("Use 'create_table', 'append_table' or 'upsert_table'.")
def push_table(
    self,
    df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
    table_name: str,
    if_exists: str = "error",
    primary_keys: typing.Optional[typing.List[str]] = None,
    column_config: typing.List[typing.Dict[str, typing.Any]] = None,
    connection: typing.Union["DataConnection", str] = None,
    wait_for_finish: bool = True,
    chunksize: int = 100_000,
) -> typing.Dict:
    """Pushes a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
    to the specified Table in the Pool.

    !!! warning
        Deprecation: The method 'push_table' is deprecated and will be removed in the next release.
        Use one of:
        [create_table][celonis_api.event_collection.data_pool.Pool.create_table],
        [append_table][celonis_api.event_collection.data_pool.Pool.append_table],
        [upsert_table][celonis_api.event_collection.data_pool.Pool.upsert_table].

    Args:
        df_or_path:
            * If DataFrame, df is chunked, written to parquet and uploaded.
            * If Path to parquet file, file is uploaded.
            * If Path to folder, any parquet file in folder is uploaded.
            * If str, value is converted to Path and handled as described above.
        table_name: Name of Table.
        if_exists:
            * `error` -> an error is thrown if a table of the same name already exists in the pool.
            * `drop` -> the existing able is dropped completely and a new table is created, by default `error`.
            * `replace_data_only` -> the column names and types of the old tables are not overwritten.
        primary_keys: List of Table primary keys.
        column_config: Can be used to specify column types and string field length.
            ```json
                [
                    {
                        "columnName":"BUKRS",
                        "fieldLength":100,
                        "columnType":"STRING"
                    }...
                ]
            ```
            with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
        connection: The Data Connection to upload to, else uploads to Global.
        wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
        chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
            that are uploaded. If set to a value <1, no chunking is applied.

    Returns:
        The Data Job Status.
    """
    if if_exists == "error":
        response = self.create_table(
            df_or_path=df_or_path,
            table_name=table_name,
            if_exists=if_exists,
            column_config=column_config,
            connection=connection,
            wait_for_finish=wait_for_finish,
            chunksize=chunksize,
        )
    elif if_exists in ["replace", "replace_data_only"]:
        response = self.create_table(
            df_or_path=df_or_path,
            table_name=table_name,
            if_exists="drop",
            column_config=column_config,
            connection=connection,
            wait_for_finish=wait_for_finish,
            chunksize=chunksize,
        )
    elif if_exists == "append":
        response = self.append_table(
            df_or_path=df_or_path,
            table_name=table_name,
            connection=connection,
            wait_for_finish=wait_for_finish,
            chunksize=chunksize,
        )
    elif if_exists == "upsert":
        if primary_keys is None:
            raise PyCelonisValueError("Argument 'primary_keys' must be set for upsert.")
        else:
            response = self.upsert_table(
                df_or_path=df_or_path,
                primary_keys=primary_keys,
                table_name=table_name,
                connection=connection,
                wait_for_finish=wait_for_finish,
                chunksize=chunksize,
            )
    else:
        raise PyCelonisValueError(f"if_exists {if_exists} not supported")
    return response

upsert_table(self, df_or_path, table_name, primary_keys, column_config=None, connection=None, wait_for_finish=True, chunksize=100000)

Upserts the pandas.DataFrame or pyarrow.parquet.ParquetFile an existing Table in the Pool.

Parameters:

Name Type Description Default
df_or_path Union[pandas.core.frame.DataFrame, pathlib.Path, str]
  • If DataFrame, df is chunked, written to parquet and uploaded.
  • If Path to parquet file, file is uploaded.
  • If Path to folder, any parquet file in folder is uploaded.
  • If str, value is converted to Path and handled as described above.
required
table_name str

Name of Table.

required
primary_keys List[str]

List of Table primary keys.

required
column_config List[Dict[str, Any]]

Can be used to specify column types and string field length.

    [
        {
            "columnName":"BUKRS",
            "fieldLength":100,
            "columnType":"STRING"
        }...
    ]
with columnType one of [INTEGER, DATE, TIME, DATETIME, FLOAT, BOOLEAN, STRING].
None
connection Union[DataConnection, str]

The Data Connection to upload to, else uploads to Global.

None
wait_for_finish bool

Waits for the upload to finish processing, set to False to trigger only.

True
chunksize int

If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied.

100000

Returns:

Type Description
Dict

The Data Job Status.

Exceptions:

Type Description
PyCelonisValueError

If Table already exists and if_exists='error'.

PyCelonisTypeError

When connection is not DataConnection object or ID of Data Connection.

PyCelonisTypeError

If Path is not valid a file or folder.

Source code in celonis_api/event_collection/data_pool.py
def upsert_table(
    self,
    df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
    table_name: str,
    primary_keys: typing.List[str],
    column_config: typing.List[typing.Dict[str, typing.Any]] = None,
    connection: typing.Union["DataConnection", str] = None,
    wait_for_finish: bool = True,
    chunksize: int = 100_000,
) -> typing.Dict:
    """Upserts the [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
    [pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
    an existing Table in the Pool.

    Args:
        df_or_path:
            * If DataFrame, df is chunked, written to parquet and uploaded.
            * If Path to parquet file, file is uploaded.
            * If Path to folder, any parquet file in folder is uploaded.
            * If str, value is converted to Path and handled as described above.
        table_name: Name of Table.
        primary_keys: List of Table primary keys.
        column_config: Can be used to specify column types and string field length.
            ```json
                [
                    {
                        "columnName":"BUKRS",
                        "fieldLength":100,
                        "columnType":"STRING"
                    }...
                ]
            ```
            with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
        connection: The Data Connection to upload to, else uploads to Global.
        wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
        chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
            that are uploaded. If set to a value <1, no chunking is applied.

    Returns:
        The Data Job Status.

    Raises:
        PyCelonisValueError: If Table already exists and `if_exists='error'`.
        PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
        PyCelonisTypeError: If Path is not valid a file or folder.
    """
    self.celonis._tracker.track("Upsert table", extra={"tracking_type": "DATA_PUSH"})

    data_source_id = self._get_data_source_id(connection)
    table = self.find_table(table_name, data_source_id)
    if table is None:
        raise PyCelonisNotFoundError(
            f"The target table \"{table_name}\" could not be found in the Data Pool. "
            "Please check spelling of table name."
        )

    if not column_config:
        column_config = self.get_column_config(table)
    # Create payload for data job
    payload, table_name = self._create_payload_for_data_job(
        table_name=table_name,
        column_config=column_config,
        data_source_id=data_source_id,
        job_type="DELTA",
        keys=primary_keys,
    )

    # Create data push job
    response = self.celonis.api_request(self.url_data_push, payload)
    job_id = response["id"]

    # Create parquet uploader
    chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
    chunk_url = f"{chunk_base_url}/upserted"

    # Create df chunks and upload
    if isinstance(df_or_path, pd.DataFrame):
        self._upload_dataframe(
            df=df_or_path,
            table_name=table_name,
            chunksize=chunksize,
            chunk_url=chunk_url,
            chunk_base_url=chunk_base_url,
            check_column=None,
        )
    # Or upload file(s)
    elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
        self._upload_parquet_files(
            path=df_or_path,
            chunk_url=chunk_url,
            chunk_base_url=chunk_base_url,
            check_column=None,
        )

    # Start job execution
    _ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
    response = self.check_push_status(job_id)

    # Wait for job to finish
    if wait_for_finish and response.get("status", "") != "DONE":
        response = self._wait_until_data_job_finish(job_id=job_id)

    return response

PoolParameter (CelonisApiObjectChild)

Pool Parameter object.

Source code in celonis_api/event_collection/data_pool.py
class PoolParameter(CelonisApiObjectChild):
    """Pool Parameter object."""

    @property
    def _parent_class(self):
        return Pool

    @property
    def url(self) -> str:
        """
        !!! api "API"
            - `/integration/api/pools/{pool_id}/variables/{variable_id}`
        """
        return f"{self.parent.url}/variables/{self.id}"

url: str property readonly

API

  • /integration/api/pools/{pool_id}/variables/{variable_id}