Data Jobs¶
In this tutorial, you will learn how to interact with data jobs and tasks via PyCelonis. More specifically, you will learn:
- For what data jobs and tasks are used in the EMS
- How to create new data jobs and tasks
- How to create and retrieve statements from tasks
Prerequisites¶
To follow this tutorial, you should have already created a data pool inside the EMS and should have uploaded data into it. If you haven't done this yet, please complete at a minimum the Data Integration - Introduction and the Data Push tutorials first. However, it is recommended to also complete the other Data Integration tutorials, as this tutorial will build upon the tables created in the previous tutorials.
Tutorial¶
Data jobs are used to define ETL pipelines for data pools. They consist of one or multiple tasks that specify how data is extracted from source systems and how it is transformed to yield the final data pool tables, which can be loaded into a data model.
Note:
PyCelonis currently does not support extraction tasks. Hence, we will focus mainly on transformation tasks in this tutorial.
Note:
Transformation tasks are used to modify data pool tables and cannot be used to modify data model tables. Data model tables are simply references to the corresponding data pool tables and are read-only. Hence, data model tables can only be used to retrieve data. If we want to modify tables, we need to do this over the corresponding data pool tables. When we now reload the data model, the changes from the data pool tables will be ingested into the data model tables.
1. Import PyCelonis and connect to Celonis API¶
from pycelonis import get_celonis
celonis = get_celonis(permissions=False)
[2024-08-09 08:58:04,434] INFO: No `base_url` given. Using environment variable 'CELONIS_URL'
[2024-08-09 08:58:04,435] INFO: No `api_token` given. Using environment variable 'CELONIS_API_TOKEN'
[2024-08-09 08:58:04,470] WARNING: KeyType is not set. Defaulted to 'APP_KEY'.
[2024-08-09 08:58:04,471] INFO: Initial connect successful! PyCelonis Version: 2.10.1
No access key provided. Please set the CELONIS_MLOPS_ACCESS_KEY environment variable.
2. Find corresponding data pool¶
Since data jobs and tasks are used to modify tables inside a data pool, we have to navigate to the corresponding pool, in which we want to create our data jobs and tasks. Here, we will use the data pool created in the previous tutorials:
data_pool = celonis.data_integration.get_data_pools().find("PyCelonis Tutorial Data Pool")
data_pool
DataPool(id='1bcc67dc-935a-4b7f-940a-9c0b81a7135a', name='PyCelonis Tutorial Data Pool')
3. Create data jobs and tasks¶
The first step in working with data jobs and tasks is to have an empty data job in place, which will serve as a place to store and organize different extraction and transformation tasks. A new data job can be created with the create_job()
command:
data_job = data_pool.create_job("PyCelonis Tutorial Data Job")
data_job
Since we have already created an empty data job in the Data Integration - Introduction tutorial, we can simply access this data job by calling the get_jobs().find()
method:
data_job = data_pool.get_jobs().find("PyCelonis Tutorial Data Job")
data_job
Job(id='a2c77d73-4758-47f1-a59e-157f061fa35f', name='PyCelonis Tutorial Data Job', data_pool_id='1bcc67dc-935a-4b7f-940a-9c0b81a7135a')
After having an empty data job in place, we can start adding tasks into it. PyCelonis currently only supports creating transformation tasks. This can be done by calling the create_transformation()
method, which takes as input arguments:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the transformation task | Required |
description |
str |
Custom description what the transformation task does | None |
task = data_job.create_transformation(name="PyCelonis Tutorial Task",
description="This is an example task created for the PyCelonis tutorial")
task
[2024-08-09 08:58:04,526] INFO: Successfully created task of type 'TRANSFORMATION' with id 'a351a632-8d16-4e9d-976a-6445ca41f3cf'
Transformation(id='a351a632-8d16-4e9d-976a-6445ca41f3cf', pool_id='1bcc67dc-935a-4b7f-940a-9c0b81a7135a', task_id='5164a569-9a7c-4f8c-bece-2021e828b8eb', task_type='TRANSFORMATION', name='PyCelonis Tutorial Task', job_id='a2c77d73-4758-47f1-a59e-157f061fa35f', disabled=False)
To verify that the transformation task exists in the data job, we can call the get_tasks()
method:
data_job.get_tasks()
[ Transformation(id='a351a632-8d16-4e9d-976a-6445ca41f3cf', pool_id='1bcc67dc-935a-4b7f-940a-9c0b81a7135a', task_id='5164a569-9a7c-4f8c-bece-2021e828b8eb', task_type='TRANSFORMATION', name='PyCelonis Tutorial Task', job_id='a2c77d73-4758-47f1-a59e-157f061fa35f', disabled=False) ]
4. Create and retrieve statements from tasks¶
The newly-created transformation task is currently empty. Hence, we need to specify a statement first how the data should be transformed. Transformation statements in Celonis are written in Vertica SQL. A transformation statement for a specific task can be created and updated by calling the update_statement()
method and passing the Vertica SQL query as input argument.
Here, we will create a simple Vertica SQL statement that creates a new activity table ACTIVITIES_6
and inserts for each case (i.e. purchase order item) with a puchasing document category 'F' an activity 'Create Purchase Requisition Item'.
task.update_statement("""
DROP TABLE IF EXISTS ACTIVITIES_6;
CREATE TABLE ACTIVITIES_6 (
_CASE_KEY VARCHAR(100),
ACTIVITY_EN VARCHAR(300)
);
INSERT INTO ACTIVITIES_6(
_CASE_KEY,
ACTIVITY_EN
)
SELECT
EKPO.MANDT || EKPO.EBELN || EKPO.EBELP AS _CASE_KEY,
'Create Purchase Requisition Item' AS ACTIVITY_EN
FROM EKPO
JOIN EKKO ON 1=1
AND EKPO.MANDT = EKKO.MANDT
AND EKKO.EBELN = EKPO.EBELN
AND EKKO.BSTYP = 'F'
;
""")
[2024-08-09 08:58:04,546] INFO: Successfully updated statement of task with id 'a351a632-8d16-4e9d-976a-6445ca41f3cf'
However, it is also possible to create way more complex transformation tasks in Vertica SQL. Some of the possible Vertica statements include CREATE VIEW
, CREATE TABLE
, SELECT FROM
, DROP TABLE
, INSERT INTO
, JOIN
, GROUP BY
, CASE WHEN
, ROW NUMBER () OVER
, COALESCE
, etc.
We can verify that the task includes a Vertica SQL statement by calling the get_statement()
method:
task.get_statement()
"\nDROP TABLE IF EXISTS ACTIVITIES_6;\nCREATE TABLE ACTIVITIES_6 (\n _CASE_KEY VARCHAR(100),\n ACTIVITY_EN VARCHAR(300)\n);\nINSERT INTO ACTIVITIES_6(\n _CASE_KEY,\n ACTIVITY_EN\n)\nSELECT\nEKPO.MANDT || EKPO.EBELN || EKPO.EBELP AS _CASE_KEY,\n'Create Purchase Requisition Item' AS ACTIVITY_EN\nFROM EKPO\nJOIN EKKO ON 1=1\n AND EKPO.MANDT = EKKO.MANDT\n AND EKKO.EBELN = EKPO.EBELN\n AND EKKO.BSTYP = 'F'\n;\n"
By default, the newly-created task is enabled. This means that when the data job is triggered in the next step, the transformation will be executed. We can also disable the task and so exclude the transformation from the execution.
task.disable()
[2024-08-09 08:58:04,563] INFO: Successfully disabled task with id 'a351a632-8d16-4e9d-976a-6445ca41f3cf'
If we want the transformation to be executed again, we can enable it.
task.enable()
[2024-08-09 08:58:04,571] INFO: Successfully enabled task with id 'a351a632-8d16-4e9d-976a-6445ca41f3cf'
5. Execute the data job¶
To execute the data job simply call the execute
function which by default executes all transformations, extractions and data model reloads:
data_job.execute()
[2024-08-09 08:58:04,583] INFO: Successfully started execution for job with id 'a2c77d73-4758-47f1-a59e-157f061fa35f'
[2024-08-09 08:58:04,584] INFO: Wait for execution of job with id 'a2c77d73-4758-47f1-a59e-157f061fa35f'
Conclusion¶
Congratulations! You have learned how to create and retrieve data jobs and transformation tasks via PyCelonis and how to specify transformation statements via Vertica SQL. You have now reached the end of the Data Integration tutorial and should be equipped with enough knowledge to get data from and upload data into the EMS with ease. In the next tutorial Studio Introduction, you will learn how to perform basic interactions with Celonis Studio.