Skip to content

data_job.py

DataJob (CelonisApiObjectChild)

Source code in celonis_api/event_collection/data_job.py
class DataJob(CelonisApiObjectChild):
    @property
    def _parent_class(self):
        from pycelonis.celonis_api.event_collection.data_pool import Pool

        return Pool

    @property
    def url(self) -> str:
        """
        !!! api "API"
            - `/integration/api/pools/{pool_id}/jobs/{job_id}`
        """
        return f"{self.parent.url}/jobs/{self.id}"

    @property
    def transformations(self) -> 'CelonisCollection[Transformation]':
        """Get all Data Job Tasks and filter for `taskType = TRANSFORMATION`.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`

        Returns:
            Collection of Data Job Transformations.
        """
        response = self.celonis.api_request(f"{self.url}/tasks/")
        return CelonisCollection([Transformation(self, d, self.celonis) for d in response]).filter(
            "TRANSFORMATION", "taskType"
        )

    @property
    def extractions(self) -> 'CelonisCollection[Extraction]':
        """Get all Data Job Tasks and filter for `taskType = EXTRACTION`,
        only if the Data Jobs `dataSourceId` is available.

        !!! note
            Data Job in global scope do not have a `dataSourceId` defined.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`

        Returns:
            Collection of Data Job Extractions or empty Collection if global scope.
        """
        if self.data['dataSourceId'] is None:  # global data jobs do not contain extractions
            extraction_list: CelonisCollection[Extraction] = CelonisCollection([])
        else:
            response = self.celonis.api_request(f"{self.url}/tasks/")
            extraction_list = CelonisCollection([Extraction(self, d, self.celonis) for d in response]).filter(
                "EXTRACTION", "taskType"
            )
        return extraction_list

    @property
    def data_connection(self) -> 'CelonisCollection[DataConnection]':
        """Get all Data Job DataConnections via
        [Pool.data_connections][celonis_api.event_collection.data_pool.Pool.data_connections]
        and filter for Data Jobs `dataSourceId`, only if the Data Jobs `dataSourceId` is available.

        !!! note
            Data Job in global scope do not have a `dataSourceId` defined.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/data-sources/?excludeUnconfigured=False`

        Raises:
            PyCelonisValueError: If `dataSourceId` of Data Job is not available (global scope).

        Returns:
            Collection of Data Job Connections.
        """
        if self.data["dataSourceId"] is None:
            raise PyCelonisValueError("No Data Connection available (Data Job in global scope).")

        return self.parent.data_connections.filter(self.data["dataSourceId"], "id")

    @property
    def tables(self) -> typing.List:
        """Get all Data Jobs table schemas as list of dictionaries, e.g.

        ```json
        [{
            'name': 'pycelonis',
            'loaderSource': None,
            'available': False,
            'dataSourceId': None,
            'dataSourceName': None,
            'columns': [
                {'name': 'colA', 'length': 0, 'type': None},
                {'name': 'colB', 'length': 0, 'type': None},
                {'name': 'colC', 'length': 0, 'type': None},
                {'name': '_CELONIS_CHANGE_DATE', 'length': 0, 'type': None}
            ],
            'type': 'TABLE',
            'schemaName': '9ecdb9e0-4c30-46c6-b861-d805860c763c'
        },...]
        ```

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/jobs/{job_id}/schemas`

        Returns:
            List of Table Schemas.
        """
        response = self.celonis.api_request(f"{self.url}/schemas")
        return [d.get("tables") for d in response]

    @property
    def _templates(self) -> typing.Dict:
        return self.celonis.api_request(f"{self.parent.url}/templates/")

    def execute(self, mode: str = "DELTA", wait_for_execution: bool = False):
        """Executes the Data Job with all extractions and transformations.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute`
                ```json
                {
                    "jobId": self.id,
                    "poolId": self.parent.id,
                    "executeOnlySubsetOfExtractions": False,
                    "executeOnlySubsetOfTransformations": False,
                    "extractions": [{
                        "extractionId": extraction.id,
                        "tables": None,
                        "loadOnlySubsetOfTables": False
                    },... (for extraction in self.extractions)
                    ],
                    "mode": mode,
                    "transformations": None
                }
                ```

        Args:
            mode: Execution mode of the Data Job, either full execution or delta. One of [`DELTA`,`FULL`].
            wait_for_execution: Wait for the Data Job to complete, else only triggers.

        Raises:
            PyCelonisHTTPError: When Data Job failed (only if `wait_for_execution=True`).
        """
        extractions = [
            {"extractionId": e.id, "tables": None, "loadOnlySubsetOfTables": False} for e in self.extractions
        ]
        payload = {
            "executeOnlySubsetOfExtractions": False,
            "executeOnlySubsetOfTransformations": False,
            "extractions": extractions,
            "jobId": self.id,
            "mode": mode,
            "poolId": self.parent.id,
            "transformations": None,
        }
        self.celonis.api_request(f"{self.parent.url}/jobs/{self.id}/execute", payload)

        if wait_for_execution:
            self._wait_for_execution()

    def _wait_for_execution(self):
        iterations = 0
        error_count = 0
        while True:
            try:
                log_status = self.parent.check_data_job_execution_status()
                jobs = [job for job in log_status if job["id"] == self.id]
                if all(job["status"] not in ["RUNNING", "QUEUED", "NEW"] for job in jobs):
                    self._logger.info("Data Jobs done")
                    break

                error_count = 0
                iterations += 1
                if iterations % 5 == 0:
                    self._logger.info("Execution of Data Jobs running...")
                time.sleep(1)
            except PyCelonisHTTPError as e:
                error_count += 1
                time.sleep(3)
                self._logger.exception("Failed to request job status, trying again...")
                if error_count > 5:
                    raise e

        if any(job["status"] == "FAIL" for job in jobs):
            failed_jobs = [job for job in jobs if job["status"] == "FAIL"]
            raise PyCelonisHTTPError(f"Failed data job(s): {failed_jobs}")

    def create_extraction(self, name: str, task_id: str = None) -> Extraction:
        """Creates a new Extraction in Data Job.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`
                ```json
                {
                    "jobId": self.id,
                    "name": name,
                    "taskId": task_id,
                    "taskType": "EXTRACTION"
                }
                ```

        Args:
            name: Name of the Extraction.
            task_id: Task ID if the Extraction should be created from template.

        Returns:
            The newly created Extraction.
        """
        if task_id is None:
            payload = {"jobId": str(self.id), "name": name, "taskId": None, "taskType": "EXTRACTION"}
        else:
            payload = {"jobId": str(self.id), "taskId": task_id, "taskType": "EXTRACTION"}

        response = self.celonis.api_request(f"{self.url}/tasks/", payload)
        return Extraction(self, response, self.celonis)

    def create_transformation(
        self, name: str, statement: str = "", description: str = "", task_id: str = None
    ) -> 'Transformation':
        """Creates a new transformation in Data Job.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`
                ```json
                {
                    "jobId": self.id,
                    "description": description,
                    "name": name,
                    "taskId": task_id,
                    "taskType": "TRANSFORMATION",
                }
                ```

        Args:
            name: Name of the transformation.
            description: Description of the transformation.
            statement: SQL statement as a string.
            task_id : Task ID if the transformation should be created from template.

        Returns:
            The newly created Transformation.
        """
        if task_id is None:
            payload = {
                "description": description,
                "jobId": str(self.id),
                "name": name,
                "taskId": None,
                "taskType": "TRANSFORMATION",
            }
        else:
            payload = {"jobId": str(self.id), "taskId": task_id, "taskType": "TRANSFORMATION"}

        response = self.celonis.api_request(f"{self.url}/tasks/", payload)
        transformation = Transformation(self, response, self.celonis)
        transformation.statement = statement
        return transformation

data_connection: CelonisCollection[DataConnection] property readonly

Get all Data Job DataConnections via Pool.data_connections and filter for Data Jobs dataSourceId, only if the Data Jobs dataSourceId is available.

Note

Data Job in global scope do not have a dataSourceId defined.

API

  • GET: /integration/api/pools/{pool_id}/data-sources/?excludeUnconfigured=False

Exceptions:

Type Description
PyCelonisValueError

If dataSourceId of Data Job is not available (global scope).

Returns:

Type Description
CelonisCollection[DataConnection]

Collection of Data Job Connections.

extractions: CelonisCollection[Extraction] property readonly

Get all Data Job Tasks and filter for taskType = EXTRACTION, only if the Data Jobs dataSourceId is available.

Note

Data Job in global scope do not have a dataSourceId defined.

API

  • GET: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/

Returns:

Type Description
CelonisCollection[Extraction]

Collection of Data Job Extractions or empty Collection if global scope.

tables: List property readonly

Get all Data Jobs table schemas as list of dictionaries, e.g.

[{
    'name': 'pycelonis',
    'loaderSource': None,
    'available': False,
    'dataSourceId': None,
    'dataSourceName': None,
    'columns': [
        {'name': 'colA', 'length': 0, 'type': None},
        {'name': 'colB', 'length': 0, 'type': None},
        {'name': 'colC', 'length': 0, 'type': None},
        {'name': '_CELONIS_CHANGE_DATE', 'length': 0, 'type': None}
    ],
    'type': 'TABLE',
    'schemaName': '9ecdb9e0-4c30-46c6-b861-d805860c763c'
},...]

API

  • GET: /integration/api/pools/{pool_id}/jobs/{job_id}/schemas

Returns:

Type Description
List

List of Table Schemas.

transformations: CelonisCollection[Transformation] property readonly

Get all Data Job Tasks and filter for taskType = TRANSFORMATION.

API

  • GET: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/

Returns:

Type Description
CelonisCollection[Transformation]

Collection of Data Job Transformations.

url: str property readonly

API

  • /integration/api/pools/{pool_id}/jobs/{job_id}

create_extraction(self, name, task_id=None)

Creates a new Extraction in Data Job.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/
    {
        "jobId": self.id,
        "name": name,
        "taskId": task_id,
        "taskType": "EXTRACTION"
    }
    

Parameters:

Name Type Description Default
name str

Name of the Extraction.

required
task_id str

Task ID if the Extraction should be created from template.

None

Returns:

Type Description
Extraction

The newly created Extraction.

Source code in celonis_api/event_collection/data_job.py
def create_extraction(self, name: str, task_id: str = None) -> Extraction:
    """Creates a new Extraction in Data Job.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`
            ```json
            {
                "jobId": self.id,
                "name": name,
                "taskId": task_id,
                "taskType": "EXTRACTION"
            }
            ```

    Args:
        name: Name of the Extraction.
        task_id: Task ID if the Extraction should be created from template.

    Returns:
        The newly created Extraction.
    """
    if task_id is None:
        payload = {"jobId": str(self.id), "name": name, "taskId": None, "taskType": "EXTRACTION"}
    else:
        payload = {"jobId": str(self.id), "taskId": task_id, "taskType": "EXTRACTION"}

    response = self.celonis.api_request(f"{self.url}/tasks/", payload)
    return Extraction(self, response, self.celonis)

create_transformation(self, name, statement='', description='', task_id=None)

Creates a new transformation in Data Job.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/
    {
        "jobId": self.id,
        "description": description,
        "name": name,
        "taskId": task_id,
        "taskType": "TRANSFORMATION",
    }
    

Parameters:

Name Type Description Default
name str

Name of the transformation.

required
description str

Description of the transformation.

''
statement str

SQL statement as a string.

''
task_id

Task ID if the transformation should be created from template.

None

Returns:

Type Description
Transformation

The newly created Transformation.

Source code in celonis_api/event_collection/data_job.py
def create_transformation(
    self, name: str, statement: str = "", description: str = "", task_id: str = None
) -> 'Transformation':
    """Creates a new transformation in Data Job.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`
            ```json
            {
                "jobId": self.id,
                "description": description,
                "name": name,
                "taskId": task_id,
                "taskType": "TRANSFORMATION",
            }
            ```

    Args:
        name: Name of the transformation.
        description: Description of the transformation.
        statement: SQL statement as a string.
        task_id : Task ID if the transformation should be created from template.

    Returns:
        The newly created Transformation.
    """
    if task_id is None:
        payload = {
            "description": description,
            "jobId": str(self.id),
            "name": name,
            "taskId": None,
            "taskType": "TRANSFORMATION",
        }
    else:
        payload = {"jobId": str(self.id), "taskId": task_id, "taskType": "TRANSFORMATION"}

    response = self.celonis.api_request(f"{self.url}/tasks/", payload)
    transformation = Transformation(self, response, self.celonis)
    transformation.statement = statement
    return transformation

execute(self, mode='DELTA', wait_for_execution=False)

Executes the Data Job with all extractions and transformations.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute
    {
        "jobId": self.id,
        "poolId": self.parent.id,
        "executeOnlySubsetOfExtractions": False,
        "executeOnlySubsetOfTransformations": False,
        "extractions": [{
            "extractionId": extraction.id,
            "tables": None,
            "loadOnlySubsetOfTables": False
        },... (for extraction in self.extractions)
        ],
        "mode": mode,
        "transformations": None
    }
    

Parameters:

Name Type Description Default
mode str

Execution mode of the Data Job, either full execution or delta. One of [DELTA,FULL].

'DELTA'
wait_for_execution bool

Wait for the Data Job to complete, else only triggers.

False

Exceptions:

Type Description
PyCelonisHTTPError

When Data Job failed (only if wait_for_execution=True).

Source code in celonis_api/event_collection/data_job.py
def execute(self, mode: str = "DELTA", wait_for_execution: bool = False):
    """Executes the Data Job with all extractions and transformations.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute`
            ```json
            {
                "jobId": self.id,
                "poolId": self.parent.id,
                "executeOnlySubsetOfExtractions": False,
                "executeOnlySubsetOfTransformations": False,
                "extractions": [{
                    "extractionId": extraction.id,
                    "tables": None,
                    "loadOnlySubsetOfTables": False
                },... (for extraction in self.extractions)
                ],
                "mode": mode,
                "transformations": None
            }
            ```

    Args:
        mode: Execution mode of the Data Job, either full execution or delta. One of [`DELTA`,`FULL`].
        wait_for_execution: Wait for the Data Job to complete, else only triggers.

    Raises:
        PyCelonisHTTPError: When Data Job failed (only if `wait_for_execution=True`).
    """
    extractions = [
        {"extractionId": e.id, "tables": None, "loadOnlySubsetOfTables": False} for e in self.extractions
    ]
    payload = {
        "executeOnlySubsetOfExtractions": False,
        "executeOnlySubsetOfTransformations": False,
        "extractions": extractions,
        "jobId": self.id,
        "mode": mode,
        "poolId": self.parent.id,
        "transformations": None,
    }
    self.celonis.api_request(f"{self.parent.url}/jobs/{self.id}/execute", payload)

    if wait_for_execution:
        self._wait_for_execution()

Transformation (CelonisApiObjectChild)

Source code in celonis_api/event_collection/data_job.py
class Transformation(CelonisApiObjectChild):
    @property
    def _parent_class(self):
        return DataJob

    @property
    def url(self) -> str:
        """
        !!! api "API"
            - `/integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}`
        """
        return f"{self.parent.url}/transformations/{self.id}"

    @property
    def variables(self) -> typing.Dict:
        """Get all Transformation Variables.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables/`

        Returns:
            Collection of Transformation Variables.
        """
        return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/")

    @property
    def description(self) -> str:
        """Get/Set Transformation Description.

        ```py
        # Get
        description = transformation.description
        # Set
        transformation.description = "Some description"
        ```

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}`
            - `PUT: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}`
                ```json
                {
                    "description": desc,
                    "name": self.name
                }
                ```

        Returns:
            Transformation Description.
        """
        return self.celonis.api_request(f"{self.url}")["description"]

    @description.setter
    def description(self, desc):
        payload = {"description": desc, "name": self.name}
        self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}", payload, method=HttpMethod.PUT)

    @property
    def statement(self) -> str:
        """Get the Transformation's SQL Statement.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}`

        Returns:
            Transformation's SQL Statement.
        """
        response = self.celonis.api_request(f"{self.url}/statement")
        if response.get("legalNote") is not None:
            self._logger.warning(response.get("legalNote"))
        return response["statement"]

    @statement.setter
    def statement(self, stmt):
        self.celonis.api_request(f"{self.url}/statement", stmt, method=HttpMethod.PUT)

    def execute(self, wait_for_execution=False):
        """Executes the Transformation.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute`
                ```json
                {
                    "jobId": self.parent.id,
                    "poolId": self.parent.parent.id,
                    "executeOnlySubsetOfExtractions": True,
                    "executeOnlySubsetOfTransformations": True,
                    "extractions": [],
                    "mode": "DELTA",
                    "transformations": self.id
                }
                ```

        Args:
            wait_for_execution: Wait for the Transformation to complete, else only triggers.

        Raises:
            PyCelonisHTTPError: When Transformation failed (only if `wait_for_execution=True`).
        """
        payload = {
            "executeOnlySubsetOfTransformations": True,
            "executeOnlySubsetOfExtractions": True,
            "extractions": [],
            "jobId": self.parent.id,
            "poolId": self.parent.parent.id,
            "mode": "DELTA",
            "transformations": [self.id],
        }
        self.celonis.api_request(f"{self.parent.url}/execute", payload)

        if wait_for_execution:
            self._wait_for_execution()

    def _wait_for_execution(self):
        iterations = 0
        error_count = 0
        while True:
            try:
                log_status = self.parent.parent.check_data_job_execution_status()
                jobs = [job for job in log_status if job["id"] == self.parent.id]
                if not jobs:
                    raise PyCelonisHTTPError(f"Failed to find any executions related to data job: {self.parent.id}.")
                if all(job["status"] not in ["RUNNING", "QUEUED", "NEW"] for job in jobs):
                    self._logger.info("Transformation done")
                    break

                error_count = 0
                if iterations % 5 == 0:
                    self._logger.info("Transformation running...")
                iterations += 1
                time.sleep(1)
            except PyCelonisHTTPError as e:
                error_count += 1
                time.sleep(3)
                self._logger.exception("Failed to request Transformation status, trying again...")
                if error_count > 5:
                    raise e

        if any(job["status"] == "FAIL" for job in jobs):
            failed_jobs = [job for job in jobs if job["status"] == "FAIL"]
            raise PyCelonisHTTPError(f"Failed Transformation: {failed_jobs}")

    def execute_from_workbench(self, statement: str = None) -> typing.Dict:
        """Executes the Transformation Statement using the `Edit Transformation` Workbench and waits for the execution.
        This is useful to test SQL statements.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/workbench/{transformation_id}/execute`
                ```json
                {
                    "jobId": self.parent.id,
                    "transformationId": self.id,
                    "statement": statement,
                }
                ```

        Args:
            statement: An SQL Statement.

        Raises:
            PyCelonisHTTPError: When Transformation failed.

        Returns:
            The Transformation Statement result.
        """
        if statement is None:
            statement = self.statement

        url = self.url.replace("/transformations/", "/workbench/")
        payload = {"jobId": self.parent.id, "transformationId": self.id, "statement": statement}
        execution_id = self.celonis.api_request(f"{url}/execute", payload)["id"]
        execution_url = f"{url}/{execution_id}"

        iterations = 0
        error_count = 0
        while True:
            try:
                status = self.celonis.api_request(execution_url, params={"fullCheck": False})["status"]
                if status not in ["RUNNING", "QUEUED", "NEW"]:
                    self._logger.info(f"Transformation status: {status}")
                    break

                error_count = 0
                iterations += 1
                if iterations % 10 == 0:
                    self._logger.info(f"Transformation status: {status}...")
                time.sleep(1)
            except PyCelonisHTTPError as e:
                error_count += 1
                time.sleep(3)
                self._logger.exception("Failed to request transformation status, trying again...")
                if error_count > 5:
                    raise e

        results = self.celonis.api_request(f"{execution_url}/results")
        if not (results and results[0]["status"] == "SUCCESS"):
            raise PyCelonisHTTPError(f"Transformation did not succeed, status: {results or status}")

        return results

    @deprecated(
        "This function will be removed in future versions because it only returns a sample of the actual result. "
        "For further information see the docs."
    )
    def get_data_frame(self, statement: str = None) -> pd.DataFrame:
        """Executes the Transformation Statement using
        [execute_from_workbench][celonis_api.event_collection.data_job.Transformation.execute_from_workbench]
        and converts the result into a
        [pandas.Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
        !!! warning
            This method is deprecated because of the following reason:
            The execution in the workbench implies that the DataFrame will only contain a sample of the actual result.
            This function should only be used for debugging/exploring the transformation.

        Args:
            statement: An SQL Statement.

        Returns:
            The Transformation Statement result as Dataframe.
        """
        result = self.execute_from_workbench(statement=statement)
        table = result[0]["result"]["tableContent"]
        columns = [d["name"] for d in result[0]["result"]["columns"]]
        return pd.DataFrame(table, columns=columns)

    def create_transformation_parameter(self, variable: typing.Dict) -> typing.Dict:
        """Creates a new Transformation Parameter with the specified variable properties.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables`
                ```json
                    {
                        "dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
                                    LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
                        "name": "",
                        "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
                        "description": "",
                        "settings": {
                            "poolVariableID": ""
                        },
                        "placeholder": "",
                        "values": [
                            {"value": ""},...
                        ],
                        "defaultValues": [],
                        "dynamicColumn": "",
                        "dynamicTable": "",
                        "dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>"
                    }
                ```

        Args:
            variable: Dictionary.

        Returns:
            The newly created Transformation Parameter as Dictionary.
        """
        if variable['settings'].get('poolVariableId', None) is not None and variable["poolId"] != self.data["poolId"]:
            raise PyCelonisValueError(
                "The transformation parameter you are trying to create is referencing a data pool parameter "
                f"from another pool with id: {variable['poolId']}. "
                "(see variable['settings']['poolVariableId']). This is not supported."
            )
        payload = {
            "dataType": variable["dataType"],
            "name": variable["name"],
            "type": variable["type"],
            "description": variable["description"],
            "settings": variable["settings"],
            "placeholder": variable["placeholder"],
            "poolId": self.data["poolId"],
            "values": variable["values"],
            "id": None,
            "taskId": self.data["taskId"],
            "defaultSettings": variable["defaultSettings"],
            "defaultValues": variable["defaultValues"],
            "dynamicColumn": variable["dynamicColumn"],
            "dynamicTable": variable["dynamicTable"],
        }
        return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/", payload)

    def to_template(self, protection: str = "OPEN") -> str:
        """Transforms a Transformation to a Template and sets the protection status.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/templates/`
                ```json
                    {"taskInstanceId": self.id}
                ```
            - `POST /integration/api/pools/{pool_id}/templates/{template_id}/protection`
                ```json
                    {"protectionStatus": protection}
                ```
        Args:
            protection: Protection status of the transformation template, one of [`OPEN`, `LOCKED`, `VIEWABLE`].

        Returns:
            The ID of the Task Template.
        """
        response = self.celonis.api_request(f"{self.parent.parent.url}/templates/", {"taskInstanceId": self.id})
        if protection != "OPEN":
            payload = {"protectionStatus": protection}
            self.celonis.api_request(f"{self.parent.parent.url}/templates/{response['id']}/protection", payload)

        return response["id"]

    def backup_content(self, backup_path: str = ".") -> pathlib.Path:
        """Creates a backup of transformation in JSON format.

        Args:
            backup_path: Path where to store the backup folder.

        Returns:
            Path to the backup folder.
        """
        path = pathlib.Path(backup_path) / f"Backup of Transformation - {pathify(self.name)}.sql"
        if path.exists():
            path.unlink()
        if self.statement or self.description:
            text = f"/*DESCRIPTION:\n{self.description}\n*/\n" if self.description else ""
            text += self.statement if self.statement else ""
            path.write_text(text)
        return path

    def enable(self) -> typing.Dict:
        """Enables the Transformation.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled`

        Returns:
            The updated Transformation as Dictionary.
        """
        return self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}/enabled", method=HttpMethod.POST)

    def disable(self) -> typing.Dict:
        """Disables the Transformation.

        !!! api "API"
            - `DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled`

        Returns:
            The updated Transformation as Dictionary.
        """
        return self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}/enabled", method=HttpMethod.DELETE)

    def delete(self):
        """Deletes the Transformation.

        !!! api "API"
            - `DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}`

        """
        self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}", method=HttpMethod.DELETE)

description: str property writable

Get/Set Transformation Description.

# Get
description = transformation.description
# Set
transformation.description = "Some description"

API

  • GET: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}
  • PUT: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}
    {
        "description": desc,
        "name": self.name
    }
    

Returns:

Type Description
str

Transformation Description.

statement: str property writable

Get the Transformation's SQL Statement.

API

  • GET: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}

Returns:

Type Description
str

Transformation's SQL Statement.

url: str property readonly

API

  • /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}

variables: Dict property readonly

Get all Transformation Variables.

API

  • GET: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables/

Returns:

Type Description
Dict

Collection of Transformation Variables.

backup_content(self, backup_path='.')

Creates a backup of transformation in JSON format.

Parameters:

Name Type Description Default
backup_path str

Path where to store the backup folder.

'.'

Returns:

Type Description
Path

Path to the backup folder.

Source code in celonis_api/event_collection/data_job.py
def backup_content(self, backup_path: str = ".") -> pathlib.Path:
    """Creates a backup of transformation in JSON format.

    Args:
        backup_path: Path where to store the backup folder.

    Returns:
        Path to the backup folder.
    """
    path = pathlib.Path(backup_path) / f"Backup of Transformation - {pathify(self.name)}.sql"
    if path.exists():
        path.unlink()
    if self.statement or self.description:
        text = f"/*DESCRIPTION:\n{self.description}\n*/\n" if self.description else ""
        text += self.statement if self.statement else ""
        path.write_text(text)
    return path

create_transformation_parameter(self, variable)

Creates a new Transformation Parameter with the specified variable properties.

API

  • POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables
        {
            "dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
                        LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
            "name": "",
            "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
            "description": "",
            "settings": {
                "poolVariableID": ""
            },
            "placeholder": "",
            "values": [
                {"value": ""},...
            ],
            "defaultValues": [],
            "dynamicColumn": "",
            "dynamicTable": "",
            "dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>"
        }
    

Parameters:

Name Type Description Default
variable Dict

Dictionary.

required

Returns:

Type Description
Dict

The newly created Transformation Parameter as Dictionary.

Source code in celonis_api/event_collection/data_job.py
def create_transformation_parameter(self, variable: typing.Dict) -> typing.Dict:
    """Creates a new Transformation Parameter with the specified variable properties.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables`
            ```json
                {
                    "dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
                                LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
                    "name": "",
                    "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
                    "description": "",
                    "settings": {
                        "poolVariableID": ""
                    },
                    "placeholder": "",
                    "values": [
                        {"value": ""},...
                    ],
                    "defaultValues": [],
                    "dynamicColumn": "",
                    "dynamicTable": "",
                    "dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>"
                }
            ```

    Args:
        variable: Dictionary.

    Returns:
        The newly created Transformation Parameter as Dictionary.
    """
    if variable['settings'].get('poolVariableId', None) is not None and variable["poolId"] != self.data["poolId"]:
        raise PyCelonisValueError(
            "The transformation parameter you are trying to create is referencing a data pool parameter "
            f"from another pool with id: {variable['poolId']}. "
            "(see variable['settings']['poolVariableId']). This is not supported."
        )
    payload = {
        "dataType": variable["dataType"],
        "name": variable["name"],
        "type": variable["type"],
        "description": variable["description"],
        "settings": variable["settings"],
        "placeholder": variable["placeholder"],
        "poolId": self.data["poolId"],
        "values": variable["values"],
        "id": None,
        "taskId": self.data["taskId"],
        "defaultSettings": variable["defaultSettings"],
        "defaultValues": variable["defaultValues"],
        "dynamicColumn": variable["dynamicColumn"],
        "dynamicTable": variable["dynamicTable"],
    }
    return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/", payload)

delete(self)

Deletes the Transformation.

API

  • DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}
Source code in celonis_api/event_collection/data_job.py
def delete(self):
    """Deletes the Transformation.

    !!! api "API"
        - `DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}`

    """
    self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}", method=HttpMethod.DELETE)

disable(self)

Disables the Transformation.

API

  • DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled

Returns:

Type Description
Dict

The updated Transformation as Dictionary.

Source code in celonis_api/event_collection/data_job.py
def disable(self) -> typing.Dict:
    """Disables the Transformation.

    !!! api "API"
        - `DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled`

    Returns:
        The updated Transformation as Dictionary.
    """
    return self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}/enabled", method=HttpMethod.DELETE)

enable(self)

Enables the Transformation.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled

Returns:

Type Description
Dict

The updated Transformation as Dictionary.

Source code in celonis_api/event_collection/data_job.py
def enable(self) -> typing.Dict:
    """Enables the Transformation.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled`

    Returns:
        The updated Transformation as Dictionary.
    """
    return self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}/enabled", method=HttpMethod.POST)

execute(self, wait_for_execution=False)

Executes the Transformation.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute
    {
        "jobId": self.parent.id,
        "poolId": self.parent.parent.id,
        "executeOnlySubsetOfExtractions": True,
        "executeOnlySubsetOfTransformations": True,
        "extractions": [],
        "mode": "DELTA",
        "transformations": self.id
    }
    

Parameters:

Name Type Description Default
wait_for_execution

Wait for the Transformation to complete, else only triggers.

False

Exceptions:

Type Description
PyCelonisHTTPError

When Transformation failed (only if wait_for_execution=True).

Source code in celonis_api/event_collection/data_job.py
def execute(self, wait_for_execution=False):
    """Executes the Transformation.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute`
            ```json
            {
                "jobId": self.parent.id,
                "poolId": self.parent.parent.id,
                "executeOnlySubsetOfExtractions": True,
                "executeOnlySubsetOfTransformations": True,
                "extractions": [],
                "mode": "DELTA",
                "transformations": self.id
            }
            ```

    Args:
        wait_for_execution: Wait for the Transformation to complete, else only triggers.

    Raises:
        PyCelonisHTTPError: When Transformation failed (only if `wait_for_execution=True`).
    """
    payload = {
        "executeOnlySubsetOfTransformations": True,
        "executeOnlySubsetOfExtractions": True,
        "extractions": [],
        "jobId": self.parent.id,
        "poolId": self.parent.parent.id,
        "mode": "DELTA",
        "transformations": [self.id],
    }
    self.celonis.api_request(f"{self.parent.url}/execute", payload)

    if wait_for_execution:
        self._wait_for_execution()

execute_from_workbench(self, statement=None)

Executes the Transformation Statement using the Edit Transformation Workbench and waits for the execution. This is useful to test SQL statements.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/workbench/{transformation_id}/execute
    {
        "jobId": self.parent.id,
        "transformationId": self.id,
        "statement": statement,
    }
    

Parameters:

Name Type Description Default
statement str

An SQL Statement.

None

Exceptions:

Type Description
PyCelonisHTTPError

When Transformation failed.

Returns:

Type Description
Dict

The Transformation Statement result.

Source code in celonis_api/event_collection/data_job.py
def execute_from_workbench(self, statement: str = None) -> typing.Dict:
    """Executes the Transformation Statement using the `Edit Transformation` Workbench and waits for the execution.
    This is useful to test SQL statements.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/workbench/{transformation_id}/execute`
            ```json
            {
                "jobId": self.parent.id,
                "transformationId": self.id,
                "statement": statement,
            }
            ```

    Args:
        statement: An SQL Statement.

    Raises:
        PyCelonisHTTPError: When Transformation failed.

    Returns:
        The Transformation Statement result.
    """
    if statement is None:
        statement = self.statement

    url = self.url.replace("/transformations/", "/workbench/")
    payload = {"jobId": self.parent.id, "transformationId": self.id, "statement": statement}
    execution_id = self.celonis.api_request(f"{url}/execute", payload)["id"]
    execution_url = f"{url}/{execution_id}"

    iterations = 0
    error_count = 0
    while True:
        try:
            status = self.celonis.api_request(execution_url, params={"fullCheck": False})["status"]
            if status not in ["RUNNING", "QUEUED", "NEW"]:
                self._logger.info(f"Transformation status: {status}")
                break

            error_count = 0
            iterations += 1
            if iterations % 10 == 0:
                self._logger.info(f"Transformation status: {status}...")
            time.sleep(1)
        except PyCelonisHTTPError as e:
            error_count += 1
            time.sleep(3)
            self._logger.exception("Failed to request transformation status, trying again...")
            if error_count > 5:
                raise e

    results = self.celonis.api_request(f"{execution_url}/results")
    if not (results and results[0]["status"] == "SUCCESS"):
        raise PyCelonisHTTPError(f"Transformation did not succeed, status: {results or status}")

    return results

get_data_frame(self, statement=None)

Executes the Transformation Statement using execute_from_workbench and converts the result into a pandas.Dataframe.

Warning

This method is deprecated because of the following reason: The execution in the workbench implies that the DataFrame will only contain a sample of the actual result. This function should only be used for debugging/exploring the transformation.

Parameters:

Name Type Description Default
statement str

An SQL Statement.

None

Returns:

Type Description
DataFrame

The Transformation Statement result as Dataframe.

Source code in celonis_api/event_collection/data_job.py
@deprecated(
    "This function will be removed in future versions because it only returns a sample of the actual result. "
    "For further information see the docs."
)
def get_data_frame(self, statement: str = None) -> pd.DataFrame:
    """Executes the Transformation Statement using
    [execute_from_workbench][celonis_api.event_collection.data_job.Transformation.execute_from_workbench]
    and converts the result into a
    [pandas.Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
    !!! warning
        This method is deprecated because of the following reason:
        The execution in the workbench implies that the DataFrame will only contain a sample of the actual result.
        This function should only be used for debugging/exploring the transformation.

    Args:
        statement: An SQL Statement.

    Returns:
        The Transformation Statement result as Dataframe.
    """
    result = self.execute_from_workbench(statement=statement)
    table = result[0]["result"]["tableContent"]
    columns = [d["name"] for d in result[0]["result"]["columns"]]
    return pd.DataFrame(table, columns=columns)

to_template(self, protection='OPEN')

Transforms a Transformation to a Template and sets the protection status.

API

  • POST: /integration/api/pools/{pool_id}/templates/
        {"taskInstanceId": self.id}
    
  • POST /integration/api/pools/{pool_id}/templates/{template_id}/protection
        {"protectionStatus": protection}
    

Parameters:

Name Type Description Default
protection str

Protection status of the transformation template, one of [OPEN, LOCKED, VIEWABLE].

'OPEN'

Returns:

Type Description
str

The ID of the Task Template.

Source code in celonis_api/event_collection/data_job.py
def to_template(self, protection: str = "OPEN") -> str:
    """Transforms a Transformation to a Template and sets the protection status.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/templates/`
            ```json
                {"taskInstanceId": self.id}
            ```
        - `POST /integration/api/pools/{pool_id}/templates/{template_id}/protection`
            ```json
                {"protectionStatus": protection}
            ```
    Args:
        protection: Protection status of the transformation template, one of [`OPEN`, `LOCKED`, `VIEWABLE`].

    Returns:
        The ID of the Task Template.
    """
    response = self.celonis.api_request(f"{self.parent.parent.url}/templates/", {"taskInstanceId": self.id})
    if protection != "OPEN":
        payload = {"protectionStatus": protection}
        self.celonis.api_request(f"{self.parent.parent.url}/templates/{response['id']}/protection", payload)

    return response["id"]