data_job.py
DataJob (CelonisApiObjectChild)
¶
Source code in celonis_api/event_collection/data_job.py
class DataJob(CelonisApiObjectChild):
@property
def _parent_class(self):
from pycelonis.celonis_api.event_collection.data_pool import Pool
return Pool
@property
def url(self) -> str:
"""
!!! api "API"
- `/integration/api/pools/{pool_id}/jobs/{job_id}`
"""
return f"{self.parent.url}/jobs/{self.id}"
@property
def transformations(self) -> 'CelonisCollection[Transformation]':
"""Get all Data Job Tasks and filter for `taskType = TRANSFORMATION`.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`
Returns:
Collection of Data Job Transformations.
"""
response = self.celonis.api_request(f"{self.url}/tasks/")
return CelonisCollection([Transformation(self, d, self.celonis) for d in response]).filter(
"TRANSFORMATION", "taskType"
)
@property
def extractions(self) -> 'CelonisCollection[Extraction]':
"""Get all Data Job Tasks and filter for `taskType = EXTRACTION`,
only if the Data Jobs `dataSourceId` is available.
!!! note
Data Job in global scope do not have a `dataSourceId` defined.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`
Returns:
Collection of Data Job Extractions or empty Collection if global scope.
"""
if self.data['dataSourceId'] is None: # global data jobs do not contain extractions
extraction_list: CelonisCollection[Extraction] = CelonisCollection([])
else:
response = self.celonis.api_request(f"{self.url}/tasks/")
extraction_list = CelonisCollection([Extraction(self, d, self.celonis) for d in response]).filter(
"EXTRACTION", "taskType"
)
return extraction_list
@property
def data_connection(self) -> 'CelonisCollection[DataConnection]':
"""Get all Data Job DataConnections via
[Pool.data_connections][celonis_api.event_collection.data_pool.Pool.data_connections]
and filter for Data Jobs `dataSourceId`, only if the Data Jobs `dataSourceId` is available.
!!! note
Data Job in global scope do not have a `dataSourceId` defined.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/data-sources/?excludeUnconfigured=False`
Raises:
PyCelonisValueError: If `dataSourceId` of Data Job is not available (global scope).
Returns:
Collection of Data Job Connections.
"""
if self.data["dataSourceId"] is None:
raise PyCelonisValueError("No Data Connection available (Data Job in global scope).")
return self.parent.data_connections.filter(self.data["dataSourceId"], "id")
@property
def tables(self) -> typing.List:
"""Get all Data Jobs table schemas as list of dictionaries, e.g.
```json
[{
'name': 'pycelonis',
'loaderSource': None,
'available': False,
'dataSourceId': None,
'dataSourceName': None,
'columns': [
{'name': 'colA', 'length': 0, 'type': None},
{'name': 'colB', 'length': 0, 'type': None},
{'name': 'colC', 'length': 0, 'type': None},
{'name': '_CELONIS_CHANGE_DATE', 'length': 0, 'type': None}
],
'type': 'TABLE',
'schemaName': '9ecdb9e0-4c30-46c6-b861-d805860c763c'
},...]
```
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/jobs/{job_id}/schemas`
Returns:
List of Table Schemas.
"""
response = self.celonis.api_request(f"{self.url}/schemas")
return [d.get("tables") for d in response]
@property
def _templates(self) -> typing.Dict:
return self.celonis.api_request(f"{self.parent.url}/templates/")
def execute(self, mode: str = "DELTA", wait_for_execution: bool = False):
"""Executes the Data Job with all extractions and transformations.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute`
```json
{
"jobId": self.id,
"poolId": self.parent.id,
"executeOnlySubsetOfExtractions": False,
"executeOnlySubsetOfTransformations": False,
"extractions": [{
"extractionId": extraction.id,
"tables": None,
"loadOnlySubsetOfTables": False
},... (for extraction in self.extractions)
],
"mode": mode,
"transformations": None
}
```
Args:
mode: Execution mode of the Data Job, either full execution or delta. One of [`DELTA`,`FULL`].
wait_for_execution: Wait for the Data Job to complete, else only triggers.
Raises:
PyCelonisHTTPError: When Data Job failed (only if `wait_for_execution=True`).
"""
extractions = [
{"extractionId": e.id, "tables": None, "loadOnlySubsetOfTables": False} for e in self.extractions
]
payload = {
"executeOnlySubsetOfExtractions": False,
"executeOnlySubsetOfTransformations": False,
"extractions": extractions,
"jobId": self.id,
"mode": mode,
"poolId": self.parent.id,
"transformations": None,
}
self.celonis.api_request(f"{self.parent.url}/jobs/{self.id}/execute", payload)
if wait_for_execution:
self._wait_for_execution()
def _wait_for_execution(self):
iterations = 0
error_count = 0
while True:
try:
log_status = self.parent.check_data_job_execution_status()
jobs = [job for job in log_status if job["id"] == self.id]
if all(job["status"] not in ["RUNNING", "QUEUED", "NEW"] for job in jobs):
self._logger.info("Data Jobs done")
break
error_count = 0
iterations += 1
if iterations % 5 == 0:
self._logger.info("Execution of Data Jobs running...")
time.sleep(1)
except PyCelonisHTTPError as e:
error_count += 1
time.sleep(3)
self._logger.exception("Failed to request job status, trying again...")
if error_count > 5:
raise e
if any(job["status"] == "FAIL" for job in jobs):
failed_jobs = [job for job in jobs if job["status"] == "FAIL"]
raise PyCelonisHTTPError(f"Failed data job(s): {failed_jobs}")
def create_extraction(self, name: str, task_id: str = None) -> Extraction:
"""Creates a new Extraction in Data Job.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`
```json
{
"jobId": self.id,
"name": name,
"taskId": task_id,
"taskType": "EXTRACTION"
}
```
Args:
name: Name of the Extraction.
task_id: Task ID if the Extraction should be created from template.
Returns:
The newly created Extraction.
"""
if task_id is None:
payload = {"jobId": str(self.id), "name": name, "taskId": None, "taskType": "EXTRACTION"}
else:
payload = {"jobId": str(self.id), "taskId": task_id, "taskType": "EXTRACTION"}
response = self.celonis.api_request(f"{self.url}/tasks/", payload)
return Extraction(self, response, self.celonis)
def create_transformation(
self, name: str, statement: str = "", description: str = "", task_id: str = None
) -> 'Transformation':
"""Creates a new transformation in Data Job.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`
```json
{
"jobId": self.id,
"description": description,
"name": name,
"taskId": task_id,
"taskType": "TRANSFORMATION",
}
```
Args:
name: Name of the transformation.
description: Description of the transformation.
statement: SQL statement as a string.
task_id : Task ID if the transformation should be created from template.
Returns:
The newly created Transformation.
"""
if task_id is None:
payload = {
"description": description,
"jobId": str(self.id),
"name": name,
"taskId": None,
"taskType": "TRANSFORMATION",
}
else:
payload = {"jobId": str(self.id), "taskId": task_id, "taskType": "TRANSFORMATION"}
response = self.celonis.api_request(f"{self.url}/tasks/", payload)
transformation = Transformation(self, response, self.celonis)
transformation.statement = statement
return transformation
data_connection: CelonisCollection[DataConnection]
property
readonly
¶
Get all Data Job DataConnections via
Pool.data_connections
and filter for Data Jobs dataSourceId
, only if the Data Jobs dataSourceId
is available.
Note
Data Job in global scope do not have a dataSourceId
defined.
API
GET: /integration/api/pools/{pool_id}/data-sources/?excludeUnconfigured=False
Exceptions:
Type | Description |
---|---|
PyCelonisValueError |
If |
Returns:
Type | Description |
---|---|
CelonisCollection[DataConnection] |
Collection of Data Job Connections. |
extractions: CelonisCollection[Extraction]
property
readonly
¶
Get all Data Job Tasks and filter for taskType = EXTRACTION
,
only if the Data Jobs dataSourceId
is available.
Note
Data Job in global scope do not have a dataSourceId
defined.
API
GET: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/
Returns:
Type | Description |
---|---|
CelonisCollection[Extraction] |
Collection of Data Job Extractions or empty Collection if global scope. |
tables: List
property
readonly
¶
Get all Data Jobs table schemas as list of dictionaries, e.g.
[{
'name': 'pycelonis',
'loaderSource': None,
'available': False,
'dataSourceId': None,
'dataSourceName': None,
'columns': [
{'name': 'colA', 'length': 0, 'type': None},
{'name': 'colB', 'length': 0, 'type': None},
{'name': 'colC', 'length': 0, 'type': None},
{'name': '_CELONIS_CHANGE_DATE', 'length': 0, 'type': None}
],
'type': 'TABLE',
'schemaName': '9ecdb9e0-4c30-46c6-b861-d805860c763c'
},...]
API
GET: /integration/api/pools/{pool_id}/jobs/{job_id}/schemas
Returns:
Type | Description |
---|---|
List |
List of Table Schemas. |
transformations: CelonisCollection[Transformation]
property
readonly
¶
Get all Data Job Tasks and filter for taskType = TRANSFORMATION
.
API
GET: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/
Returns:
Type | Description |
---|---|
CelonisCollection[Transformation] |
Collection of Data Job Transformations. |
url: str
property
readonly
¶
API
/integration/api/pools/{pool_id}/jobs/{job_id}
create_extraction(self, name, task_id=None)
¶
Creates a new Extraction in Data Job.
API
POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the Extraction. |
required |
task_id |
str |
Task ID if the Extraction should be created from template. |
None |
Returns:
Type | Description |
---|---|
Extraction |
The newly created Extraction. |
Source code in celonis_api/event_collection/data_job.py
def create_extraction(self, name: str, task_id: str = None) -> Extraction:
"""Creates a new Extraction in Data Job.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`
```json
{
"jobId": self.id,
"name": name,
"taskId": task_id,
"taskType": "EXTRACTION"
}
```
Args:
name: Name of the Extraction.
task_id: Task ID if the Extraction should be created from template.
Returns:
The newly created Extraction.
"""
if task_id is None:
payload = {"jobId": str(self.id), "name": name, "taskId": None, "taskType": "EXTRACTION"}
else:
payload = {"jobId": str(self.id), "taskId": task_id, "taskType": "EXTRACTION"}
response = self.celonis.api_request(f"{self.url}/tasks/", payload)
return Extraction(self, response, self.celonis)
create_transformation(self, name, statement='', description='', task_id=None)
¶
Creates a new transformation in Data Job.
API
POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the transformation. |
required |
description |
str |
Description of the transformation. |
'' |
statement |
str |
SQL statement as a string. |
'' |
task_id |
Task ID if the transformation should be created from template. |
None |
Returns:
Type | Description |
---|---|
Transformation |
The newly created Transformation. |
Source code in celonis_api/event_collection/data_job.py
def create_transformation(
self, name: str, statement: str = "", description: str = "", task_id: str = None
) -> 'Transformation':
"""Creates a new transformation in Data Job.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/`
```json
{
"jobId": self.id,
"description": description,
"name": name,
"taskId": task_id,
"taskType": "TRANSFORMATION",
}
```
Args:
name: Name of the transformation.
description: Description of the transformation.
statement: SQL statement as a string.
task_id : Task ID if the transformation should be created from template.
Returns:
The newly created Transformation.
"""
if task_id is None:
payload = {
"description": description,
"jobId": str(self.id),
"name": name,
"taskId": None,
"taskType": "TRANSFORMATION",
}
else:
payload = {"jobId": str(self.id), "taskId": task_id, "taskType": "TRANSFORMATION"}
response = self.celonis.api_request(f"{self.url}/tasks/", payload)
transformation = Transformation(self, response, self.celonis)
transformation.statement = statement
return transformation
execute(self, mode='DELTA', wait_for_execution=False)
¶
Executes the Data Job with all extractions and transformations.
API
POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute
{ "jobId": self.id, "poolId": self.parent.id, "executeOnlySubsetOfExtractions": False, "executeOnlySubsetOfTransformations": False, "extractions": [{ "extractionId": extraction.id, "tables": None, "loadOnlySubsetOfTables": False },... (for extraction in self.extractions) ], "mode": mode, "transformations": None }
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mode |
str |
Execution mode of the Data Job, either full execution or delta. One of [ |
'DELTA' |
wait_for_execution |
bool |
Wait for the Data Job to complete, else only triggers. |
False |
Exceptions:
Type | Description |
---|---|
PyCelonisHTTPError |
When Data Job failed (only if |
Source code in celonis_api/event_collection/data_job.py
def execute(self, mode: str = "DELTA", wait_for_execution: bool = False):
"""Executes the Data Job with all extractions and transformations.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute`
```json
{
"jobId": self.id,
"poolId": self.parent.id,
"executeOnlySubsetOfExtractions": False,
"executeOnlySubsetOfTransformations": False,
"extractions": [{
"extractionId": extraction.id,
"tables": None,
"loadOnlySubsetOfTables": False
},... (for extraction in self.extractions)
],
"mode": mode,
"transformations": None
}
```
Args:
mode: Execution mode of the Data Job, either full execution or delta. One of [`DELTA`,`FULL`].
wait_for_execution: Wait for the Data Job to complete, else only triggers.
Raises:
PyCelonisHTTPError: When Data Job failed (only if `wait_for_execution=True`).
"""
extractions = [
{"extractionId": e.id, "tables": None, "loadOnlySubsetOfTables": False} for e in self.extractions
]
payload = {
"executeOnlySubsetOfExtractions": False,
"executeOnlySubsetOfTransformations": False,
"extractions": extractions,
"jobId": self.id,
"mode": mode,
"poolId": self.parent.id,
"transformations": None,
}
self.celonis.api_request(f"{self.parent.url}/jobs/{self.id}/execute", payload)
if wait_for_execution:
self._wait_for_execution()
Transformation (CelonisApiObjectChild)
¶
Source code in celonis_api/event_collection/data_job.py
class Transformation(CelonisApiObjectChild):
@property
def _parent_class(self):
return DataJob
@property
def url(self) -> str:
"""
!!! api "API"
- `/integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}`
"""
return f"{self.parent.url}/transformations/{self.id}"
@property
def variables(self) -> typing.Dict:
"""Get all Transformation Variables.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables/`
Returns:
Collection of Transformation Variables.
"""
return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/")
@property
def description(self) -> str:
"""Get/Set Transformation Description.
```py
# Get
description = transformation.description
# Set
transformation.description = "Some description"
```
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}`
- `PUT: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}`
```json
{
"description": desc,
"name": self.name
}
```
Returns:
Transformation Description.
"""
return self.celonis.api_request(f"{self.url}")["description"]
@description.setter
def description(self, desc):
payload = {"description": desc, "name": self.name}
self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}", payload, method=HttpMethod.PUT)
@property
def statement(self) -> str:
"""Get the Transformation's SQL Statement.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}`
Returns:
Transformation's SQL Statement.
"""
response = self.celonis.api_request(f"{self.url}/statement")
if response.get("legalNote") is not None:
self._logger.warning(response.get("legalNote"))
return response["statement"]
@statement.setter
def statement(self, stmt):
self.celonis.api_request(f"{self.url}/statement", stmt, method=HttpMethod.PUT)
def execute(self, wait_for_execution=False):
"""Executes the Transformation.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute`
```json
{
"jobId": self.parent.id,
"poolId": self.parent.parent.id,
"executeOnlySubsetOfExtractions": True,
"executeOnlySubsetOfTransformations": True,
"extractions": [],
"mode": "DELTA",
"transformations": self.id
}
```
Args:
wait_for_execution: Wait for the Transformation to complete, else only triggers.
Raises:
PyCelonisHTTPError: When Transformation failed (only if `wait_for_execution=True`).
"""
payload = {
"executeOnlySubsetOfTransformations": True,
"executeOnlySubsetOfExtractions": True,
"extractions": [],
"jobId": self.parent.id,
"poolId": self.parent.parent.id,
"mode": "DELTA",
"transformations": [self.id],
}
self.celonis.api_request(f"{self.parent.url}/execute", payload)
if wait_for_execution:
self._wait_for_execution()
def _wait_for_execution(self):
iterations = 0
error_count = 0
while True:
try:
log_status = self.parent.parent.check_data_job_execution_status()
jobs = [job for job in log_status if job["id"] == self.parent.id]
if not jobs:
raise PyCelonisHTTPError(f"Failed to find any executions related to data job: {self.parent.id}.")
if all(job["status"] not in ["RUNNING", "QUEUED", "NEW"] for job in jobs):
self._logger.info("Transformation done")
break
error_count = 0
if iterations % 5 == 0:
self._logger.info("Transformation running...")
iterations += 1
time.sleep(1)
except PyCelonisHTTPError as e:
error_count += 1
time.sleep(3)
self._logger.exception("Failed to request Transformation status, trying again...")
if error_count > 5:
raise e
if any(job["status"] == "FAIL" for job in jobs):
failed_jobs = [job for job in jobs if job["status"] == "FAIL"]
raise PyCelonisHTTPError(f"Failed Transformation: {failed_jobs}")
def execute_from_workbench(self, statement: str = None) -> typing.Dict:
"""Executes the Transformation Statement using the `Edit Transformation` Workbench and waits for the execution.
This is useful to test SQL statements.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/workbench/{transformation_id}/execute`
```json
{
"jobId": self.parent.id,
"transformationId": self.id,
"statement": statement,
}
```
Args:
statement: An SQL Statement.
Raises:
PyCelonisHTTPError: When Transformation failed.
Returns:
The Transformation Statement result.
"""
if statement is None:
statement = self.statement
url = self.url.replace("/transformations/", "/workbench/")
payload = {"jobId": self.parent.id, "transformationId": self.id, "statement": statement}
execution_id = self.celonis.api_request(f"{url}/execute", payload)["id"]
execution_url = f"{url}/{execution_id}"
iterations = 0
error_count = 0
while True:
try:
status = self.celonis.api_request(execution_url, params={"fullCheck": False})["status"]
if status not in ["RUNNING", "QUEUED", "NEW"]:
self._logger.info(f"Transformation status: {status}")
break
error_count = 0
iterations += 1
if iterations % 10 == 0:
self._logger.info(f"Transformation status: {status}...")
time.sleep(1)
except PyCelonisHTTPError as e:
error_count += 1
time.sleep(3)
self._logger.exception("Failed to request transformation status, trying again...")
if error_count > 5:
raise e
results = self.celonis.api_request(f"{execution_url}/results")
if not (results and results[0]["status"] == "SUCCESS"):
raise PyCelonisHTTPError(f"Transformation did not succeed, status: {results or status}")
return results
@deprecated(
"This function will be removed in future versions because it only returns a sample of the actual result. "
"For further information see the docs."
)
def get_data_frame(self, statement: str = None) -> pd.DataFrame:
"""Executes the Transformation Statement using
[execute_from_workbench][celonis_api.event_collection.data_job.Transformation.execute_from_workbench]
and converts the result into a
[pandas.Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
!!! warning
This method is deprecated because of the following reason:
The execution in the workbench implies that the DataFrame will only contain a sample of the actual result.
This function should only be used for debugging/exploring the transformation.
Args:
statement: An SQL Statement.
Returns:
The Transformation Statement result as Dataframe.
"""
result = self.execute_from_workbench(statement=statement)
table = result[0]["result"]["tableContent"]
columns = [d["name"] for d in result[0]["result"]["columns"]]
return pd.DataFrame(table, columns=columns)
def create_transformation_parameter(self, variable: typing.Dict) -> typing.Dict:
"""Creates a new Transformation Parameter with the specified variable properties.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables`
```json
{
"dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
"name": "",
"type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
"description": "",
"settings": {
"poolVariableID": ""
},
"placeholder": "",
"values": [
{"value": ""},...
],
"defaultValues": [],
"dynamicColumn": "",
"dynamicTable": "",
"dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>"
}
```
Args:
variable: Dictionary.
Returns:
The newly created Transformation Parameter as Dictionary.
"""
if variable['settings'].get('poolVariableId', None) is not None and variable["poolId"] != self.data["poolId"]:
raise PyCelonisValueError(
"The transformation parameter you are trying to create is referencing a data pool parameter "
f"from another pool with id: {variable['poolId']}. "
"(see variable['settings']['poolVariableId']). This is not supported."
)
payload = {
"dataType": variable["dataType"],
"name": variable["name"],
"type": variable["type"],
"description": variable["description"],
"settings": variable["settings"],
"placeholder": variable["placeholder"],
"poolId": self.data["poolId"],
"values": variable["values"],
"id": None,
"taskId": self.data["taskId"],
"defaultSettings": variable["defaultSettings"],
"defaultValues": variable["defaultValues"],
"dynamicColumn": variable["dynamicColumn"],
"dynamicTable": variable["dynamicTable"],
}
return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/", payload)
def to_template(self, protection: str = "OPEN") -> str:
"""Transforms a Transformation to a Template and sets the protection status.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/templates/`
```json
{"taskInstanceId": self.id}
```
- `POST /integration/api/pools/{pool_id}/templates/{template_id}/protection`
```json
{"protectionStatus": protection}
```
Args:
protection: Protection status of the transformation template, one of [`OPEN`, `LOCKED`, `VIEWABLE`].
Returns:
The ID of the Task Template.
"""
response = self.celonis.api_request(f"{self.parent.parent.url}/templates/", {"taskInstanceId": self.id})
if protection != "OPEN":
payload = {"protectionStatus": protection}
self.celonis.api_request(f"{self.parent.parent.url}/templates/{response['id']}/protection", payload)
return response["id"]
def backup_content(self, backup_path: str = ".") -> pathlib.Path:
"""Creates a backup of transformation in JSON format.
Args:
backup_path: Path where to store the backup folder.
Returns:
Path to the backup folder.
"""
path = pathlib.Path(backup_path) / f"Backup of Transformation - {pathify(self.name)}.sql"
if path.exists():
path.unlink()
if self.statement or self.description:
text = f"/*DESCRIPTION:\n{self.description}\n*/\n" if self.description else ""
text += self.statement if self.statement else ""
path.write_text(text)
return path
def enable(self) -> typing.Dict:
"""Enables the Transformation.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled`
Returns:
The updated Transformation as Dictionary.
"""
return self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}/enabled", method=HttpMethod.POST)
def disable(self) -> typing.Dict:
"""Disables the Transformation.
!!! api "API"
- `DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled`
Returns:
The updated Transformation as Dictionary.
"""
return self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}/enabled", method=HttpMethod.DELETE)
def delete(self):
"""Deletes the Transformation.
!!! api "API"
- `DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}`
"""
self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}", method=HttpMethod.DELETE)
description: str
property
writable
¶
Get/Set Transformation Description.
# Get
description = transformation.description
# Set
transformation.description = "Some description"
API
GET: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}
PUT: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}
Returns:
Type | Description |
---|---|
str |
Transformation Description. |
statement: str
property
writable
¶
Get the Transformation's SQL Statement.
API
GET: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}
Returns:
Type | Description |
---|---|
str |
Transformation's SQL Statement. |
url: str
property
readonly
¶
API
/integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}
variables: Dict
property
readonly
¶
Get all Transformation Variables.
API
GET: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables/
Returns:
Type | Description |
---|---|
Dict |
Collection of Transformation Variables. |
backup_content(self, backup_path='.')
¶
Creates a backup of transformation in JSON format.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
backup_path |
str |
Path where to store the backup folder. |
'.' |
Returns:
Type | Description |
---|---|
Path |
Path to the backup folder. |
Source code in celonis_api/event_collection/data_job.py
def backup_content(self, backup_path: str = ".") -> pathlib.Path:
"""Creates a backup of transformation in JSON format.
Args:
backup_path: Path where to store the backup folder.
Returns:
Path to the backup folder.
"""
path = pathlib.Path(backup_path) / f"Backup of Transformation - {pathify(self.name)}.sql"
if path.exists():
path.unlink()
if self.statement or self.description:
text = f"/*DESCRIPTION:\n{self.description}\n*/\n" if self.description else ""
text += self.statement if self.statement else ""
path.write_text(text)
return path
create_transformation_parameter(self, variable)
¶
Creates a new Transformation Parameter with the specified variable properties.
API
POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables
{ "dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN| LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>", "name": "", "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>", "description": "", "settings": { "poolVariableID": "" }, "placeholder": "", "values": [ {"value": ""},... ], "defaultValues": [], "dynamicColumn": "", "dynamicTable": "", "dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>" }
Parameters:
Name | Type | Description | Default |
---|---|---|---|
variable |
Dict |
Dictionary. |
required |
Returns:
Type | Description |
---|---|
Dict |
The newly created Transformation Parameter as Dictionary. |
Source code in celonis_api/event_collection/data_job.py
def create_transformation_parameter(self, variable: typing.Dict) -> typing.Dict:
"""Creates a new Transformation Parameter with the specified variable properties.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables`
```json
{
"dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
"name": "",
"type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
"description": "",
"settings": {
"poolVariableID": ""
},
"placeholder": "",
"values": [
{"value": ""},...
],
"defaultValues": [],
"dynamicColumn": "",
"dynamicTable": "",
"dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>"
}
```
Args:
variable: Dictionary.
Returns:
The newly created Transformation Parameter as Dictionary.
"""
if variable['settings'].get('poolVariableId', None) is not None and variable["poolId"] != self.data["poolId"]:
raise PyCelonisValueError(
"The transformation parameter you are trying to create is referencing a data pool parameter "
f"from another pool with id: {variable['poolId']}. "
"(see variable['settings']['poolVariableId']). This is not supported."
)
payload = {
"dataType": variable["dataType"],
"name": variable["name"],
"type": variable["type"],
"description": variable["description"],
"settings": variable["settings"],
"placeholder": variable["placeholder"],
"poolId": self.data["poolId"],
"values": variable["values"],
"id": None,
"taskId": self.data["taskId"],
"defaultSettings": variable["defaultSettings"],
"defaultValues": variable["defaultValues"],
"dynamicColumn": variable["dynamicColumn"],
"dynamicTable": variable["dynamicTable"],
}
return self.celonis.api_request(f"{self.parent.parent.url}/tasks/{self.id}/variables/", payload)
delete(self)
¶
Deletes the Transformation.
API
DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}
disable(self)
¶
Disables the Transformation.
API
DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled
Returns:
Type | Description |
---|---|
Dict |
The updated Transformation as Dictionary. |
Source code in celonis_api/event_collection/data_job.py
def disable(self) -> typing.Dict:
"""Disables the Transformation.
!!! api "API"
- `DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled`
Returns:
The updated Transformation as Dictionary.
"""
return self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}/enabled", method=HttpMethod.DELETE)
enable(self)
¶
Enables the Transformation.
API
POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled
Returns:
Type | Description |
---|---|
Dict |
The updated Transformation as Dictionary. |
Source code in celonis_api/event_collection/data_job.py
def enable(self) -> typing.Dict:
"""Enables the Transformation.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled`
Returns:
The updated Transformation as Dictionary.
"""
return self.celonis.api_request(f"{self.parent.url}/tasks/{self.id}/enabled", method=HttpMethod.POST)
execute(self, wait_for_execution=False)
¶
Executes the Transformation.
API
POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wait_for_execution |
Wait for the Transformation to complete, else only triggers. |
False |
Exceptions:
Type | Description |
---|---|
PyCelonisHTTPError |
When Transformation failed (only if |
Source code in celonis_api/event_collection/data_job.py
def execute(self, wait_for_execution=False):
"""Executes the Transformation.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute`
```json
{
"jobId": self.parent.id,
"poolId": self.parent.parent.id,
"executeOnlySubsetOfExtractions": True,
"executeOnlySubsetOfTransformations": True,
"extractions": [],
"mode": "DELTA",
"transformations": self.id
}
```
Args:
wait_for_execution: Wait for the Transformation to complete, else only triggers.
Raises:
PyCelonisHTTPError: When Transformation failed (only if `wait_for_execution=True`).
"""
payload = {
"executeOnlySubsetOfTransformations": True,
"executeOnlySubsetOfExtractions": True,
"extractions": [],
"jobId": self.parent.id,
"poolId": self.parent.parent.id,
"mode": "DELTA",
"transformations": [self.id],
}
self.celonis.api_request(f"{self.parent.url}/execute", payload)
if wait_for_execution:
self._wait_for_execution()
execute_from_workbench(self, statement=None)
¶
Executes the Transformation Statement using the Edit Transformation
Workbench and waits for the execution.
This is useful to test SQL statements.
API
POST: /integration/api/pools/{pool_id}/jobs/{job_id}/workbench/{transformation_id}/execute
Parameters:
Name | Type | Description | Default |
---|---|---|---|
statement |
str |
An SQL Statement. |
None |
Exceptions:
Type | Description |
---|---|
PyCelonisHTTPError |
When Transformation failed. |
Returns:
Type | Description |
---|---|
Dict |
The Transformation Statement result. |
Source code in celonis_api/event_collection/data_job.py
def execute_from_workbench(self, statement: str = None) -> typing.Dict:
"""Executes the Transformation Statement using the `Edit Transformation` Workbench and waits for the execution.
This is useful to test SQL statements.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/{job_id}/workbench/{transformation_id}/execute`
```json
{
"jobId": self.parent.id,
"transformationId": self.id,
"statement": statement,
}
```
Args:
statement: An SQL Statement.
Raises:
PyCelonisHTTPError: When Transformation failed.
Returns:
The Transformation Statement result.
"""
if statement is None:
statement = self.statement
url = self.url.replace("/transformations/", "/workbench/")
payload = {"jobId": self.parent.id, "transformationId": self.id, "statement": statement}
execution_id = self.celonis.api_request(f"{url}/execute", payload)["id"]
execution_url = f"{url}/{execution_id}"
iterations = 0
error_count = 0
while True:
try:
status = self.celonis.api_request(execution_url, params={"fullCheck": False})["status"]
if status not in ["RUNNING", "QUEUED", "NEW"]:
self._logger.info(f"Transformation status: {status}")
break
error_count = 0
iterations += 1
if iterations % 10 == 0:
self._logger.info(f"Transformation status: {status}...")
time.sleep(1)
except PyCelonisHTTPError as e:
error_count += 1
time.sleep(3)
self._logger.exception("Failed to request transformation status, trying again...")
if error_count > 5:
raise e
results = self.celonis.api_request(f"{execution_url}/results")
if not (results and results[0]["status"] == "SUCCESS"):
raise PyCelonisHTTPError(f"Transformation did not succeed, status: {results or status}")
return results
get_data_frame(self, statement=None)
¶
Executes the Transformation Statement using execute_from_workbench and converts the result into a pandas.Dataframe.
Warning
This method is deprecated because of the following reason: The execution in the workbench implies that the DataFrame will only contain a sample of the actual result. This function should only be used for debugging/exploring the transformation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
statement |
str |
An SQL Statement. |
None |
Returns:
Type | Description |
---|---|
DataFrame |
The Transformation Statement result as Dataframe. |
Source code in celonis_api/event_collection/data_job.py
@deprecated(
"This function will be removed in future versions because it only returns a sample of the actual result. "
"For further information see the docs."
)
def get_data_frame(self, statement: str = None) -> pd.DataFrame:
"""Executes the Transformation Statement using
[execute_from_workbench][celonis_api.event_collection.data_job.Transformation.execute_from_workbench]
and converts the result into a
[pandas.Dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
!!! warning
This method is deprecated because of the following reason:
The execution in the workbench implies that the DataFrame will only contain a sample of the actual result.
This function should only be used for debugging/exploring the transformation.
Args:
statement: An SQL Statement.
Returns:
The Transformation Statement result as Dataframe.
"""
result = self.execute_from_workbench(statement=statement)
table = result[0]["result"]["tableContent"]
columns = [d["name"] for d in result[0]["result"]["columns"]]
return pd.DataFrame(table, columns=columns)
to_template(self, protection='OPEN')
¶
Transforms a Transformation to a Template and sets the protection status.
API
POST: /integration/api/pools/{pool_id}/templates/
POST /integration/api/pools/{pool_id}/templates/{template_id}/protection
Parameters:
Name | Type | Description | Default |
---|---|---|---|
protection |
str |
Protection status of the transformation template, one of [ |
'OPEN' |
Returns:
Type | Description |
---|---|
str |
The ID of the Task Template. |
Source code in celonis_api/event_collection/data_job.py
def to_template(self, protection: str = "OPEN") -> str:
"""Transforms a Transformation to a Template and sets the protection status.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/templates/`
```json
{"taskInstanceId": self.id}
```
- `POST /integration/api/pools/{pool_id}/templates/{template_id}/protection`
```json
{"protectionStatus": protection}
```
Args:
protection: Protection status of the transformation template, one of [`OPEN`, `LOCKED`, `VIEWABLE`].
Returns:
The ID of the Task Template.
"""
response = self.celonis.api_request(f"{self.parent.parent.url}/templates/", {"taskInstanceId": self.id})
if protection != "OPEN":
payload = {"protectionStatus": protection}
self.celonis.api_request(f"{self.parent.parent.url}/templates/{response['id']}/protection", payload)
return response["id"]