Skip to content

data_job.py

DataJob (CelonisApiObjectChild)

url: str property readonly

API

  • /integration/api/pools/{pool_id}/jobs/{job_id}

transformations: CelonisCollection[Transformation] property readonly

Get all Data Job Tasks and filter for taskType = TRANSFORMATION.

API

  • GET: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/

Returns:

Type Description
CelonisCollection[Transformation]

Collection of Data Job Transformations.

extractions: CelonisCollection[Extraction] property readonly

Get all Data Job Tasks and filter for taskType = EXTRACTION, only if the Data Jobs dataSourceId is available.

Note

Data Job in global scope do not have a dataSourceId defined.

API

  • GET: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/

Returns:

Type Description
CelonisCollection[Extraction]

Collection of Data Job Extractions or empty Collection if global scope.

data_connection: CelonisCollection[DataConnection] property readonly

Get all Data Job DataConnections via Pool.data_connections and filter for Data Jobs dataSourceId, only if the Data Jobs dataSourceId is available.

Note

Data Job in global scope do not have a dataSourceId defined.

API

  • GET: /integration/api/pools/{pool_id}/data-sources/?excludeUnconfigured=False

Exceptions:

Type Description
PyCelonisValueError

If dataSourceId of Data Job is not available (global scope).

Returns:

Type Description
CelonisCollection[DataConnection]

Collection of Data Job Connections.

tables: List property readonly

Get all Data Jobs table schemas as list of dictionaries, e.g.

[{
    'name': 'pycelonis',
    'loaderSource': None,
    'available': False,
    'dataSourceId': None,
    'dataSourceName': None,
    'columns': [
        {'name': 'colA', 'length': 0, 'type': None},
        {'name': 'colB', 'length': 0, 'type': None},
        {'name': 'colC', 'length': 0, 'type': None},
        {'name': '_CELONIS_CHANGE_DATE', 'length': 0, 'type': None}
    ],
    'type': 'TABLE',
    'schemaName': '9ecdb9e0-4c30-46c6-b861-d805860c763c'
},...]

API

  • GET: /integration/api/pools/{pool_id}/jobs/{job_id}/schemas

Returns:

Type Description
List

List of Table Schemas.

execute(self, mode='DELTA', wait_for_execution=False)

Executes the Data Job with all extractions and transformations.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute
    {
        "jobId": self.id,
        "poolId": self.parent.id,
        "executeOnlySubsetOfExtractions": False,
        "executeOnlySubsetOfTransformations": False,
        "extractions": [{
            "extractionId": extraction.id,
            "tables": None,
            "loadOnlySubsetOfTables": False
        },... (for extraction in self.extractions)
        ],
        "mode": mode,
        "transformations": None
    }
    

Parameters:

Name Type Description Default
mode str

Execution mode of the Data Job, either full execution or delta. One of [DELTA,FULL].

'DELTA'
wait_for_execution bool

Wait for the Data Job to complete, else only triggers.

False

Exceptions:

Type Description
PyCelonisHTTPError

When Data Job failed (only if wait_for_execution=True).

create_extraction(self, name, task_id=None)

Creates a new Extraction in Data Job.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/
    {
        "jobId": self.id,
        "name": name,
        "taskId": task_id,
        "taskType": "EXTRACTION"
    }
    

Parameters:

Name Type Description Default
name str

Name of the Extraction.

required
task_id str

Task ID if the Extraction should be created from template.

None

Returns:

Type Description
Extraction

The newly created Extraction.

create_transformation(self, name, statement='', description='', task_id=None)

Creates a new transformation in Data Job.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/
    {
        "jobId": self.id,
        "description": description,
        "name": name,
        "taskId": task_id,
        "taskType": "TRANSFORMATION",
    }
    

Parameters:

Name Type Description Default
name str

Name of the transformation.

required
description str

Description of the transformation.

''
statement str

SQL statement as a string.

''
task_id

Task ID if the transformation should be created from template.

None

Returns:

Type Description
Transformation

The newly created Transformation.

Transformation (CelonisApiObjectChild)

url: str property readonly

API

  • /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}

variables: Dict property readonly

Get all Transformation Variables.

API

  • GET: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables/

Returns:

Type Description
Dict

Collection of Transformation Variables.

description: str property writable

Get/Set Transformation Description.

# Get
description = transformation.description
# Set
transformation.description = "Some description"

API

  • GET: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}
  • PUT: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}
    {
        "description": desc,
        "name": self.name
    }
    

Returns:

Type Description
str

Transformation Description.

statement: str property writable

Get the Transformation's SQL Statement.

API

  • GET: /integration/api/pools/{pool_id}/jobs/{job_id}/transformations/{transformation_id}

Returns:

Type Description
str

Transformation's SQL Statement.

execute(self, wait_for_execution=False)

Executes the Transformation.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/execute
    {
        "jobId": self.parent.id,
        "poolId": self.parent.parent.id,
        "executeOnlySubsetOfExtractions": True,
        "executeOnlySubsetOfTransformations": True,
        "extractions": [],
        "mode": "DELTA",
        "transformations": self.id
    }
    

Parameters:

Name Type Description Default
wait_for_execution

Wait for the Transformation to complete, else only triggers.

False

Exceptions:

Type Description
PyCelonisHTTPError

When Transformation failed (only if wait_for_execution=True).

execute_from_workbench(self, statement=None)

Executes the Transformation Statement using the Edit Transformation Workbench and waits for the execution. This is useful to test SQL statements.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/workbench/{transformation_id}/execute
    {
        "jobId": self.parent.id,
        "transformationId": self.id,
        "statement": statement,
    }
    

Parameters:

Name Type Description Default
statement str

An SQL Statement.

None

Exceptions:

Type Description
PyCelonisHTTPError

When Transformation failed.

Returns:

Type Description
Dict

The Transformation Statement result.

get_data_frame(self, statement=None)

Executes the Transformation Statement using execute_from_workbench and converts the result into a pandas.Dataframe.

Warning

This method is deprecated because of the following reason: The execution in the workbench implies that the DataFrame will only contain a sample of the actual result. This function should only be used for debugging/exploring the transformation.

Parameters:

Name Type Description Default
statement str

An SQL Statement.

None

Returns:

Type Description
DataFrame

The Transformation Statement result as Dataframe.

create_transformation_parameter(self, variable)

Creates a new Transformation Parameter with the specified variable properties.

API

  • POST: /integration/api/pools/{pool_id}/tasks/{transformation_id}/variables
        {
            "dataType": "<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|
                        LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
            "name": "",
            "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
            "description": "",
            "settings": {
                "poolVariableID": ""
            },
            "placeholder": "",
            "values": [
                {"value": ""},...
            ],
            "defaultValues": [],
            "dynamicColumn": "",
            "dynamicTable": "",
            "dynamicVariableOpType": "<FIND_MAX|FIND_MIN|LIST>"
        }
    

Parameters:

Name Type Description Default
variable Dict

Dictionary.

required

Returns:

Type Description
Dict

The newly created Transformation Parameter as Dictionary.

to_template(self, protection='OPEN')

Transforms a Transformation to a Template and sets the protection status.

API

  • POST: /integration/api/pools/{pool_id}/templates/
        {"taskInstanceId": self.id}
    
  • POST /integration/api/pools/{pool_id}/templates/{template_id}/protection
        {"protectionStatus": protection}
    

Parameters:

Name Type Description Default
protection str

Protection status of the transformation template, one of [OPEN, LOCKED, VIEWABLE].

'OPEN'

Returns:

Type Description
str

The ID of the Task Template.

backup_content(self, backup_path='.')

Creates a backup of transformation in JSON format.

Parameters:

Name Type Description Default
backup_path str

Path where to store the backup folder.

'.'

Returns:

Type Description
Path

Path to the backup folder.

enable(self)

Enables the Transformation.

API

  • POST: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled

Returns:

Type Description
Dict

The updated Transformation as Dictionary.

disable(self)

Disables the Transformation.

API

  • DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}/enabled

Returns:

Type Description
Dict

The updated Transformation as Dictionary.

delete(self)

Deletes the Transformation.

API

  • DELETE: /integration/api/pools/{pool_id}/jobs/{job_id}/tasks/{transformation_id}