data_extraction.py
Extraction (CelonisApiObjectChild)
¶
Extraction object to interact with Celonis Event Collection Extraction API.
Source code in celonis_api/event_collection/data_extraction.py
class Extraction(CelonisApiObjectChild):
"""Extraction object to interact with Celonis Event Collection Extraction API."""
@property
def _parent_class(self):
from pycelonis.celonis_api.event_collection.data_job import DataJob
return DataJob
@property
def url(self) -> str:
"""
!!! api "API"
- `/integration/api/pools/{pool_id}/jobs/{job_id}/extractions/{extraction_id}/expanded`
"""
return f"{self.parent.url}/extractions/{self.id}/expanded"
@property
def tables(self) -> 'CelonisCollection[ExtractionTable]':
"""Get all Tables from Extraction.
Returns:
Collection of Extraction Tables.
"""
return CelonisCollection([ExtractionTable(self, d) for d in self.data.get("tables", [])])
@property
def variables(self) -> typing.Dict:
"""Get all Pool Variables.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/tasks/{self.id}/variables/`
Returns:
Dictionary of Pool variables..
"""
return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/")
def add_table(
self, table: typing.Union[str, typing.Dict, 'ExtractionTable'], schema_name: str = None
) -> 'ExtractionTable':
"""Adds a table to the Extraction.
Args:
table:
* str: should be table name.
* dict: should be dictionary with values:
```json
{
"schemaName": "",
"tableName": ""
}
```
* ExtractionTable: extracts from `data`
schema_name: Schema name, as for example necessary for JDBC connections.
Returns:
The newly created Extraction Table.
"""
if isinstance(table, ExtractionTable):
table = table._data
payload = [
{
"schemaName": schema_name or table.get("schemaName") if isinstance(table, dict) else None,
"tableName": table if isinstance(table, str) else table.get("tableName"),
"taskId": self._data["taskId"],
"id": None,
"jobId": self.parent.id,
}
]
response = self.celonis.api_request(f"{self.parent.url}/extractions/{self.id}/tables/", payload)
if isinstance(table, dict):
table.pop("taskId")
table.pop("id")
table.pop("jobId")
response[0].update(table)
response[0]["jobId"] = self.parent.id
response = self.celonis.api_request(
f"{self.parent.url}/extractions/{self.id}/tables/", response, method=HttpMethod.PUT
)
return ExtractionTable(self, response[0])
def create_extraction_parameter(self, variable: typing.Dict) -> typing.Dict:
"""Creates a new Variable with the specified properties in the Extraction.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables`
```json
{
"dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
"name": "",
"type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
"description": "",
"settings": {
"poolVariableID": ""
},
"placeholder": "",
"values": [
{"value": ""},...
],
"defaultValues": [],
"dynamicColumn": "",
"dynamicTable": "",
"dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>",
"parameterType": "<CUSTOM|DATASOURCE>"
}
```
Args:
variable: Dictionary.
Returns:
The newly created Variable as Dictionary.
"""
if variable['settings'].get('poolVariableId', None) is not None and variable["poolId"] != self.data["poolId"]:
raise PyCelonisValueError(
"The extraction parameter you are trying to create is referencing a data pool parameter "
f"from another pool with id: {variable['poolId']}. "
"(see variable['settings']['poolVariableId']). This is not supported."
)
payload = {
"dataType": variable["dataType"],
"name": variable["name"],
"type": variable["type"],
"description": variable["description"],
"settings": variable["settings"],
"placeholder": variable["placeholder"],
"poolId": self.data["poolId"],
"values": variable["values"],
"id": None,
"taskId": self.data["taskId"],
"defaultSettings": variable["defaultSettings"],
"defaultValues": variable["defaultValues"],
"dynamicColumn": variable["dynamicColumn"],
"dynamicTable": variable["dynamicTable"],
"parameterType": variable["parameterType"],
}
return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/", payload)
tables: CelonisCollection[ExtractionTable]
property
readonly
¶
Get all Tables from Extraction.
Returns:
Type | Description |
---|---|
CelonisCollection[ExtractionTable] |
Collection of Extraction Tables. |
url: str
property
readonly
¶
API
/integration/api/pools/{pool_id}/jobs/{job_id}/extractions/{extraction_id}/expanded
variables: Dict
property
readonly
¶
Get all Pool Variables.
API
GET: /integration/api/pools/{pool_id}/tasks/{self.id}/variables/
Returns:
Type | Description |
---|---|
Dict |
Dictionary of Pool variables.. |
add_table(self, table, schema_name=None)
¶
Adds a table to the Extraction.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Union[str, Dict, ExtractionTable] |
|
required |
schema_name |
str |
Schema name, as for example necessary for JDBC connections. |
None |
Returns:
Type | Description |
---|---|
ExtractionTable |
The newly created Extraction Table. |
Source code in celonis_api/event_collection/data_extraction.py
def add_table(
self, table: typing.Union[str, typing.Dict, 'ExtractionTable'], schema_name: str = None
) -> 'ExtractionTable':
"""Adds a table to the Extraction.
Args:
table:
* str: should be table name.
* dict: should be dictionary with values:
```json
{
"schemaName": "",
"tableName": ""
}
```
* ExtractionTable: extracts from `data`
schema_name: Schema name, as for example necessary for JDBC connections.
Returns:
The newly created Extraction Table.
"""
if isinstance(table, ExtractionTable):
table = table._data
payload = [
{
"schemaName": schema_name or table.get("schemaName") if isinstance(table, dict) else None,
"tableName": table if isinstance(table, str) else table.get("tableName"),
"taskId": self._data["taskId"],
"id": None,
"jobId": self.parent.id,
}
]
response = self.celonis.api_request(f"{self.parent.url}/extractions/{self.id}/tables/", payload)
if isinstance(table, dict):
table.pop("taskId")
table.pop("id")
table.pop("jobId")
response[0].update(table)
response[0]["jobId"] = self.parent.id
response = self.celonis.api_request(
f"{self.parent.url}/extractions/{self.id}/tables/", response, method=HttpMethod.PUT
)
return ExtractionTable(self, response[0])
create_extraction_parameter(self, variable)
¶
Creates a new Variable with the specified properties in the Extraction.
API
POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables
{ "dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN| LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>", "name": "", "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>", "description": "", "settings": { "poolVariableID": "" }, "placeholder": "", "values": [ {"value": ""},... ], "defaultValues": [], "dynamicColumn": "", "dynamicTable": "", "dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>", "parameterType": "<CUSTOM|DATASOURCE>" }
Parameters:
Name | Type | Description | Default |
---|---|---|---|
variable |
Dict |
Dictionary. |
required |
Returns:
Type | Description |
---|---|
Dict |
The newly created Variable as Dictionary. |
Source code in celonis_api/event_collection/data_extraction.py
def create_extraction_parameter(self, variable: typing.Dict) -> typing.Dict:
"""Creates a new Variable with the specified properties in the Extraction.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables`
```json
{
"dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
"name": "",
"type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
"description": "",
"settings": {
"poolVariableID": ""
},
"placeholder": "",
"values": [
{"value": ""},...
],
"defaultValues": [],
"dynamicColumn": "",
"dynamicTable": "",
"dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>",
"parameterType": "<CUSTOM|DATASOURCE>"
}
```
Args:
variable: Dictionary.
Returns:
The newly created Variable as Dictionary.
"""
if variable['settings'].get('poolVariableId', None) is not None and variable["poolId"] != self.data["poolId"]:
raise PyCelonisValueError(
"The extraction parameter you are trying to create is referencing a data pool parameter "
f"from another pool with id: {variable['poolId']}. "
"(see variable['settings']['poolVariableId']). This is not supported."
)
payload = {
"dataType": variable["dataType"],
"name": variable["name"],
"type": variable["type"],
"description": variable["description"],
"settings": variable["settings"],
"placeholder": variable["placeholder"],
"poolId": self.data["poolId"],
"values": variable["values"],
"id": None,
"taskId": self.data["taskId"],
"defaultSettings": variable["defaultSettings"],
"defaultValues": variable["defaultValues"],
"dynamicColumn": variable["dynamicColumn"],
"dynamicTable": variable["dynamicTable"],
"parameterType": variable["parameterType"],
}
return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/", payload)
ExtractionTable (CelonisDataObject)
¶
Source code in celonis_api/event_collection/data_extraction.py
class ExtractionTable(CelonisDataObject):
_name_data_key = "tableName"
@property
def _parent_class(self):
return Extraction
@property
def data(self) -> typing.Dict:
"""Extraction table data."""
return find_object_from_list(self.parent.data["tables"], self.id)
@property
def filter(self) -> typing.Dict:
"""Extraction table filter Definition."""
return self._data["filterDefinition"]
@property
def delta_filter(self) -> typing.Dict:
"""Extraction table delta filter Definition."""
return self._data["deltaFilterDefinition"]