Skip to content

data_extraction.py

Extraction (CelonisApiObjectChild)

Extraction object to interact with Celonis Event Collection Extraction API.

Source code in celonis_api/event_collection/data_extraction.py
class Extraction(CelonisApiObjectChild):
    """Extraction object to interact with Celonis Event Collection Extraction API."""

    @property
    def _parent_class(self):
        from pycelonis.celonis_api.event_collection.data_job import DataJob

        return DataJob

    @property
    def url(self) -> str:
        """
        !!! api "API"
            - `/integration/api/pools/{pool_id}/jobs/{job_id}/extractions/{extraction_id}/expanded`
        """
        return f"{self.parent.url}/extractions/{self.id}/expanded"

    @property
    def tables(self) -> 'CelonisCollection[ExtractionTable]':
        """Get all Tables from Extraction.

        Returns:
            Collection of Extraction Tables.
        """
        return CelonisCollection([ExtractionTable(self, d) for d in self.data.get("tables", [])])

    @property
    def variables(self) -> typing.Dict:
        """Get all Pool Variables.

        !!! api "API"
            - `GET: /integration/api/pools/{pool_id}/tasks/{self.id}/variables/`

        Returns:
            Dictionary of Pool variables..
        """
        return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/")

    def add_table(
        self, table: typing.Union[str, typing.Dict, 'ExtractionTable'], schema_name: str = None
    ) -> 'ExtractionTable':
        """Adds a table to the Extraction.

        Args:
            table:
                * str: should be table name.
                * dict: should be dictionary with values:
                        ```json
                        {
                            "schemaName": "",
                            "tableName": ""
                        }
                        ```
                * ExtractionTable: extracts from `data`
            schema_name: Schema name, as for example necessary for JDBC connections.

        Returns:
            The newly created Extraction Table.
        """
        if isinstance(table, ExtractionTable):
            table = table._data

        payload = [
            {
                "schemaName": schema_name or table.get("schemaName") if isinstance(table, dict) else None,
                "tableName": table if isinstance(table, str) else table.get("tableName"),
                "taskId": self._data["taskId"],
                "id": None,
                "jobId": self.parent.id,
            }
        ]
        response = self.celonis.api_request(f"{self.parent.url}/extractions/{self.id}/tables/", payload)

        if isinstance(table, dict):
            table.pop("taskId")
            table.pop("id")
            table.pop("jobId")
            response[0].update(table)
            response[0]["jobId"] = self.parent.id
            response = self.celonis.api_request(
                f"{self.parent.url}/extractions/{self.id}/tables/", response, method=HttpMethod.PUT
            )

        return ExtractionTable(self, response[0])

    def create_extraction_parameter(self, variable: typing.Dict) -> typing.Dict:
        """Creates a new Variable with the specified properties in the Extraction.

        !!! api "API"
            - `POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables`
                ```json
                    {
                        "dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
                                    LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
                        "name": "",
                        "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
                        "description": "",
                        "settings": {
                            "poolVariableID": ""
                        },
                        "placeholder": "",
                        "values": [
                            {"value": ""},...
                        ],
                        "defaultValues": [],
                        "dynamicColumn": "",
                        "dynamicTable": "",
                        "dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>",
                        "parameterType": "<CUSTOM|DATASOURCE>"
                    }
                ```

        Args:
            variable: Dictionary.

        Returns:
            The newly created Variable as Dictionary.
        """
        if variable['settings'].get('poolVariableId', None) is not None and variable["poolId"] != self.data["poolId"]:
            raise PyCelonisValueError(
                "The extraction parameter you are trying to create is referencing a data pool parameter "
                f"from another pool with id: {variable['poolId']}. "
                "(see variable['settings']['poolVariableId']). This is not supported."
            )
        payload = {
            "dataType": variable["dataType"],
            "name": variable["name"],
            "type": variable["type"],
            "description": variable["description"],
            "settings": variable["settings"],
            "placeholder": variable["placeholder"],
            "poolId": self.data["poolId"],
            "values": variable["values"],
            "id": None,
            "taskId": self.data["taskId"],
            "defaultSettings": variable["defaultSettings"],
            "defaultValues": variable["defaultValues"],
            "dynamicColumn": variable["dynamicColumn"],
            "dynamicTable": variable["dynamicTable"],
            "parameterType": variable["parameterType"],
        }

        return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/", payload)

tables: CelonisCollection[ExtractionTable] property readonly

Get all Tables from Extraction.

Returns:

Type Description
CelonisCollection[ExtractionTable]

Collection of Extraction Tables.

url: str property readonly

API

  • /integration/api/pools/{pool_id}/jobs/{job_id}/extractions/{extraction_id}/expanded

variables: Dict property readonly

Get all Pool Variables.

API

  • GET: /integration/api/pools/{pool_id}/tasks/{self.id}/variables/

Returns:

Type Description
Dict

Dictionary of Pool variables..

add_table(self, table, schema_name=None)

Adds a table to the Extraction.

Parameters:

Name Type Description Default
table Union[str, Dict, ExtractionTable]
  • str: should be table name.
  • dict: should be dictionary with values:
    {
        "schemaName": "",
        "tableName": ""
    }
    
  • ExtractionTable: extracts from data
required
schema_name str

Schema name, as for example necessary for JDBC connections.

None

Returns:

Type Description
ExtractionTable

The newly created Extraction Table.

Source code in celonis_api/event_collection/data_extraction.py
def add_table(
    self, table: typing.Union[str, typing.Dict, 'ExtractionTable'], schema_name: str = None
) -> 'ExtractionTable':
    """Adds a table to the Extraction.

    Args:
        table:
            * str: should be table name.
            * dict: should be dictionary with values:
                    ```json
                    {
                        "schemaName": "",
                        "tableName": ""
                    }
                    ```
            * ExtractionTable: extracts from `data`
        schema_name: Schema name, as for example necessary for JDBC connections.

    Returns:
        The newly created Extraction Table.
    """
    if isinstance(table, ExtractionTable):
        table = table._data

    payload = [
        {
            "schemaName": schema_name or table.get("schemaName") if isinstance(table, dict) else None,
            "tableName": table if isinstance(table, str) else table.get("tableName"),
            "taskId": self._data["taskId"],
            "id": None,
            "jobId": self.parent.id,
        }
    ]
    response = self.celonis.api_request(f"{self.parent.url}/extractions/{self.id}/tables/", payload)

    if isinstance(table, dict):
        table.pop("taskId")
        table.pop("id")
        table.pop("jobId")
        response[0].update(table)
        response[0]["jobId"] = self.parent.id
        response = self.celonis.api_request(
            f"{self.parent.url}/extractions/{self.id}/tables/", response, method=HttpMethod.PUT
        )

    return ExtractionTable(self, response[0])

create_extraction_parameter(self, variable)

Creates a new Variable with the specified properties in the Extraction.

API

  • POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables
        {
            "dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
                        LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
            "name": "",
            "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
            "description": "",
            "settings": {
                "poolVariableID": ""
            },
            "placeholder": "",
            "values": [
                {"value": ""},...
            ],
            "defaultValues": [],
            "dynamicColumn": "",
            "dynamicTable": "",
            "dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>",
            "parameterType": "<CUSTOM|DATASOURCE>"
        }
    

Parameters:

Name Type Description Default
variable Dict

Dictionary.

required

Returns:

Type Description
Dict

The newly created Variable as Dictionary.

Source code in celonis_api/event_collection/data_extraction.py
def create_extraction_parameter(self, variable: typing.Dict) -> typing.Dict:
    """Creates a new Variable with the specified properties in the Extraction.

    !!! api "API"
        - `POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables`
            ```json
                {
                    "dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
                                LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
                    "name": "",
                    "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
                    "description": "",
                    "settings": {
                        "poolVariableID": ""
                    },
                    "placeholder": "",
                    "values": [
                        {"value": ""},...
                    ],
                    "defaultValues": [],
                    "dynamicColumn": "",
                    "dynamicTable": "",
                    "dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>",
                    "parameterType": "<CUSTOM|DATASOURCE>"
                }
            ```

    Args:
        variable: Dictionary.

    Returns:
        The newly created Variable as Dictionary.
    """
    if variable['settings'].get('poolVariableId', None) is not None and variable["poolId"] != self.data["poolId"]:
        raise PyCelonisValueError(
            "The extraction parameter you are trying to create is referencing a data pool parameter "
            f"from another pool with id: {variable['poolId']}. "
            "(see variable['settings']['poolVariableId']). This is not supported."
        )
    payload = {
        "dataType": variable["dataType"],
        "name": variable["name"],
        "type": variable["type"],
        "description": variable["description"],
        "settings": variable["settings"],
        "placeholder": variable["placeholder"],
        "poolId": self.data["poolId"],
        "values": variable["values"],
        "id": None,
        "taskId": self.data["taskId"],
        "defaultSettings": variable["defaultSettings"],
        "defaultValues": variable["defaultValues"],
        "dynamicColumn": variable["dynamicColumn"],
        "dynamicTable": variable["dynamicTable"],
        "parameterType": variable["parameterType"],
    }

    return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/", payload)

ExtractionTable (CelonisDataObject)

Source code in celonis_api/event_collection/data_extraction.py
class ExtractionTable(CelonisDataObject):
    _name_data_key = "tableName"

    @property
    def _parent_class(self):
        return Extraction

    @property
    def data(self) -> typing.Dict:
        """Extraction table data."""
        return find_object_from_list(self.parent.data["tables"], self.id)

    @property
    def filter(self) -> typing.Dict:
        """Extraction table filter Definition."""
        return self._data["filterDefinition"]

    @property
    def delta_filter(self) -> typing.Dict:
        """Extraction table delta filter Definition."""
        return self._data["deltaFilterDefinition"]

data: Dict property readonly

Extraction table data.

delta_filter: Dict property readonly

Extraction table delta filter Definition.

filter: Dict property readonly

Extraction table filter Definition.