Data Pool: Push data¶
#To get a copy of this notebook in your current working directory, run:
from pycelonis.notebooks import api_tutorial
1. Connect to Celonis¶
from pycelonis import get_celonis
celonis = get_celonis()
WARNING:root:The module pycelonis.utils.api_utils is deprecated and will be removed with pycelonis version 2.0. Please use the following import statement instead: from pycelonis.celonis_api import api_utils
2021-02-08 13:19:20 - pycelonis: Login successful! Hello John Doe
2. Find the data pool¶
data_pool = celonis.pools.find("f8e841e3-26f5-494a-93fd-842252aef176")
data_pool
<Pool, id f8e841e3-26f5-494a-93fd-842252aef176, name SAP ECC - Purchase to Pay (SE HANA SolEng)>
# see existing tables
data_pool.tables
[{'name': '_CEL_P2P_ACTIVITIES', 'loaderSource': None, 'available': False, 'dataSourceId': 'f9387f64-7074-43c7-b043-79a362e00e11', 'dataSourceName': 'SE HANA (SolEng)', 'columns': [], 'type': 'TABLE', 'schemaName': 'f8e841e3-26f5-494a-93fd-842252aef176_f9387f64-7074-43c7-b043-79a362e00e11'}, {'name': '_CEL_P2P_ACTIVITY_MASTER_DATA', 'loaderSource': None, 'available': False, 'dataSourceId': 'f9387f64-7074-43c7-b043-79a362e00e11', 'dataSourceName': 'SE HANA (SolEng)', 'columns': [], 'type': 'TABLE', 'schemaName': 'f8e841e3-26f5-494a-93fd-842252aef176_f9387f64-7074-43c7-b043-79a362e00e11'}]
3. Create a pandas dataframe.¶
import pandas as pd
df = pd.DataFrame({'A': [2, 4, 8, 0], 'B': [2, 0, 0, 0], 'C': [10, 2, 1, 8]})
df.head()
A | B | C | |
---|---|---|---|
0 | 2 | 2 | 10 |
1 | 4 | 0 | 2 |
2 | 8 | 0 | 1 |
3 | 0 | 0 | 8 |
4. Data Pool¶
The following functions have required and optional parameters. The required parameters are listed and explained in detail. The optional parameters can be checked by pressing SHIFT + TAB, while the curser is in the function
1. Create new table
The create_table(...) function creates a new table in the current data pool (with name: table_name) and inserts the provdied data from df_or_path into this table.
If a table of the same name already exists, an error will be raised. You can see how to edit an existing table further down in this tutorial.
Required parameters:
- df_or_path: Either a pandas data frame or a path to a parquet file(s) that should be pushed to Celonis.
- table_name: The name that the table in the data pool should have.
data_pool.create_table(table_name="MY_PUSH_TABLE",
df_or_path=df,
if_exists="error")
2020-12-11 11:07:22 - pycelonis: Data push job started... 2020-12-11 11:07:36 - pycelonis: Data push job status: RUNNING... 2020-12-11 11:07:51 - pycelonis: Data push job status: RUNNING...
{'id': '521daa71-440e-4364-a511-8945ada3f84d', 'targetName': 'MY_PUSH_TABLE', 'lastModified': 1607684841860, 'lastPing': None, 'status': 'DONE', 'type': None, 'fileType': None, 'targetSchema': 'f8e841e3-26f5-494a-93fd-842252aef176', 'upsertStrategy': 'UPSERT_WITH_UNCHANGED_METADATA', 'fallbackVarcharLength': None, 'dataPoolId': 'f8e841e3-26f5-494a-93fd-842252aef176', 'connectionId': None, 'keys': [], 'tableSchema': None, 'logs': ['2020-12-11T11:07:21 - Starting execution, status set to RUNNING', '2020-12-11T11:07:58 - Done loading chunks', '2020-12-11T11:07:58 - Status set to DONE', '2020-12-11T11:07:58 - Execution done'], 'csvParsingOptions': None, 'mirrorTargetNames': [], 'optionalTenantId': None}
The table is now in the data pool and can be added to any data model in that pool.
2. Append table
The append function appends a new table to the target table. In order to succesfully execute this operation, the column names and and column types of the new table must match with the target tables respective properties.
Required parameters:
- df_or_path: Either a pandas data frame or a path to a parquet file(s) that has the same columns (names and types) as the target table.
- table_name: The target table name in the data pool.
append_df = pd.DataFrame({'A': [91, 42], 'B': [72, 54], 'C': [80, 38]})
append_df.head()
A | B | C | |
---|---|---|---|
0 | 91 | 72 | 80 |
1 | 42 | 54 | 38 |
data_pool.append_table(table_name="MY_PUSH_TABLE", df_or_path=append_df)
2020-12-11 11:08:53 - pycelonis: Data push job started...
{'id': '03391ec8-f1b4-4ae3-8434-98e8923404bd', 'targetName': 'MY_PUSH_TABLE', 'lastModified': 1607684933911, 'lastPing': None, 'status': 'DONE', 'type': 'DELTA', 'fileType': None, 'targetSchema': 'f8e841e3-26f5-494a-93fd-842252aef176', 'upsertStrategy': 'UPSERT_WITH_UNCHANGED_METADATA', 'fallbackVarcharLength': None, 'dataPoolId': 'f8e841e3-26f5-494a-93fd-842252aef176', 'connectionId': None, 'keys': [], 'tableSchema': None, 'mirrorTargetNames': [], 'logs': ['2020-12-11T11:08:53 - Starting execution, status set to RUNNING', '2020-12-11T11:09:14 - Done loading chunks', '2020-12-11T11:09:14 - Status set to DONE', '2020-12-11T11:09:14 - Execution done'], 'csvParsingOptions': None, 'optionalTenantId': None}
3. Upsert table
The upsert_table function work similarily as the append_table function, only here the new data is upserted into the table in the data pool, by the help of the "primary_keys" parameter. In our example the "primary_keys" columns are 'C'. In this case, every row in the pool table which has the same value in the column 'C' as a corresponding row in the new table is replaced by this respective row.
Required parameters:
- df_or_path: Either a pandas data frame or a path to a parquet file(s) that has the same columns (names and types) as the target table.
- table_name: The target table name in the data pool.
- primary_keys: A list of column names (strings).
upsert_df = pd.DataFrame({'A': [91, 42], 'B': [72, 54], 'C': [80, 40]})
upsert_df.head()
A | B | C | |
---|---|---|---|
0 | 91 | 72 | 80 |
1 | 42 | 54 | 40 |
data_pool.upsert_table(table_name="MY_PUSH_TABLE",
df_or_path=upsert_df,
primary_keys=['C'])
2020-12-11 11:09:45 - pycelonis: Data push job started... 2020-12-11 11:10:04 - pycelonis: Data push job status: RUNNING...
{'id': 'fa557797-9e67-4736-9816-99e9f1c65831', 'targetName': 'MY_PUSH_TABLE', 'lastModified': 1607684985177, 'lastPing': None, 'status': 'DONE', 'type': 'DELTA', 'fileType': None, 'targetSchema': 'f8e841e3-26f5-494a-93fd-842252aef176', 'upsertStrategy': 'UPSERT_WITH_UNCHANGED_METADATA', 'fallbackVarcharLength': None, 'dataPoolId': 'f8e841e3-26f5-494a-93fd-842252aef176', 'connectionId': None, 'keys': ['C'], 'mirrorTargetNames': [], 'tableSchema': None, 'logs': ['2020-12-11T11:09:45 - Starting execution, status set to RUNNING', '2020-12-11T11:10:10 - Done loading chunks', '2020-12-11T11:10:10 - Status set to DONE', '2020-12-11T11:10:10 - Execution done'], 'csvParsingOptions': None, 'optionalTenantId': None}
4. Drop and Replace table
The following function replaces a whole (target) table in the data pool with the new table/data frame "replace_df".
Required parameters:
- df_or_path: Either a pandas data frame or a path to a parquet file(s) that has the same columns (names and types) as the target table.
- table_name: The target table name in the data pool.
- if_exists: To replace a complete table, set if_exists='drop'
replace_df = pd.DataFrame({'A': [2, 4, 8, 0], 'B': [2, 0, 0, 0], 'C': [10, 2, 1, 8]})
df.head()
A | B | C | |
---|---|---|---|
0 | 2 | 2 | 10 |
1 | 4 | 0 | 2 |
2 | 8 | 0 | 1 |
3 | 0 | 0 | 8 |
data_pool.create_table(table_name="Test_Data",
df_or_path=replace_df,
if_exists="drop")
2020-12-11 11:10:21 - pycelonis: Data push job started... 2020-12-11 11:10:37 - pycelonis: Data push job status: RUNNING...
{'id': '9316497c-e07f-4116-8f90-cbddb465bba2', 'targetName': 'Test_Data', 'lastModified': 1607685021400, 'lastPing': None, 'status': 'DONE', 'type': 'REPLACE', 'fileType': None, 'targetSchema': 'f8e841e3-26f5-494a-93fd-842252aef176', 'upsertStrategy': 'UPSERT_WITH_UNCHANGED_METADATA', 'fallbackVarcharLength': None, 'dataPoolId': 'f8e841e3-26f5-494a-93fd-842252aef176', 'connectionId': None, 'keys': [], 'tableSchema': None, 'mirrorTargetNames': [], 'logs': ['2020-12-11T11:10:21 - Starting execution, status set to RUNNING', '2020-12-11T11:10:46 - Done loading chunks', '2020-12-11T11:10:46 - Status set to DONE', '2020-12-11T11:10:46 - Execution done'], 'csvParsingOptions': None, 'optionalTenantId': None}
5. Drop and Replace table (keeping the table schema)
The following replaces a whole (target) table in the data pool with the new table/data frame "replace_df". BUT it keeps the old table schema, i.e. columns + column types. This can be very important, especially in productive environments.
Additional parameters:
- column_config: config of the pool table, with column names and data types
column_config = data_pool.get_column_config('Test_Data')
2021-02-08 13:25:54 - pycelonis: Execution of Transformation started...
column_config
[{'columnName': 'A', 'columnType': 'INTEGER'}, {'columnName': 'B', 'columnType': 'INTEGER'}, {'columnName': 'C', 'columnType': 'INTEGER'}]
data_pool.create_table(table_name="Test_Data",
df_or_path=replace_df,
if_exists="drop",
column_config=column_config)
2021-02-08 13:27:43 - pycelonis: Dropping existing table Test_Data. 2021-02-08 13:27:45 - pycelonis: Data push job started... 2021-02-08 13:28:04 - pycelonis: Data push job status finished: DONE
{'id': 'f88909f6-ed18-4288-97da-b3f8dc6f3167', 'targetName': 'Test_Data', 'lastModified': 1612790865301, 'lastPing': None, 'status': 'DONE', 'type': 'REPLACE', 'fileType': None, 'targetSchema': 'f8e841e3-26f5-494a-93fd-842252aef176', 'upsertStrategy': 'UPSERT_WITH_UNCHANGED_METADATA', 'fallbackVarcharLength': None, 'dataPoolId': 'f8e841e3-26f5-494a-93fd-842252aef176', 'connectionId': None, 'keys': [], 'tableSchema': {'tableName': 'Test_Data', 'columns': [{'columnName': 'A', 'columnType': 'INTEGER', 'fieldLength': 0, 'decimals': 0, 'pkField': False}, {'columnName': 'B', 'columnType': 'INTEGER', 'fieldLength': 0, 'decimals': 0, 'pkField': False}, {'columnName': 'C', 'columnType': 'INTEGER', 'fieldLength': 0, 'decimals': 0, 'pkField': False}]}, 'mirrorTargetNames': [], 'csvParsingOptions': None, 'logs': ['2021-02-08T13:27:45 - Starting execution, status set to RUNNING', '2021-02-08T13:28:04 - Done loading chunks', '2021-02-08T13:28:04 - Status set to DONE', '2021-02-08T13:28:04 - Execution done'], 'optionalTenantId': None}