Data Upload & Export Advanced¶
In this tutorial, you will dive deeper into more advanced topics of uploading data into the EMS and exporting data into your local project. More specifically, you will learn:
- How to upload tables with a custom column configuration into the EMS
- How to upload tables as Parquet files
- How to perform a data upload with chunking
- How to export data via manual data exports
Prerequisites¶
To follow this tutorial, you should have created a data model and should have uploaded data into it. As we continue working with the SAP Purchase-to-Pay (P2P) tables from the Data Push tutorial, it is recommended to complete the Data Push tutorial first. Further, it is recommended to complete the Data Export tutorial to have a basic understanding how data is retrieved from a data model via PQL.
Tutorial¶
1. Import PyCelonis and connect to Celonis API¶
from pycelonis import get_celonis
celonis = get_celonis(permissions=False)
[2023-06-26 13:10:16,050] INFO: No `base_url` given. Using environment variable 'CELONIS_URL' [2023-06-26 13:10:16,052] INFO: No `api_token` given. Using environment variable 'CELONIS_API_TOKEN'
[2023-06-26 13:10:16,133] WARNING: KeyType is not set. Defaulted to 'APP_KEY'.
[2023-06-26 13:10:16,135] INFO: Initial connect successful! PyCelonis Version: 2.3.0
2. Find data pool and model to use for data upload and export¶
data_pool = celonis.data_integration.get_data_pools().find("PyCelonis Tutorial Data Pool")
data_pool
DataPool(id='6c178afe-21e2-4f77-b862-e37653ae0b2e', name='PyCelonis Tutorial Data Pool')
data_model = data_pool.get_data_models().find("PyCelonis Tutorial Data Model")
data_model
DataModel(id='0caea823-104c-4555-9b58-678a727c62b2', name='PyCelonis Tutorial Data Model', pool_id='6c178afe-21e2-4f77-b862-e37653ae0b2e')
3. Advanced Data Upload¶
3.1 Upload table with custom column configuration¶
When uploading data into the EMS via create_table()
, we have, besides the standard input arguments df
, table_name
, and drop_if_exists
, the option to give a column_config
as input parameter, which specifies properties, such as name and data type, for each column of the table to be pushed into the EMS. Column configurations are used to make properties of tables explicit and to enforce a consistent data push (e.g. when replacing tables), which is especially crucial in production environments.
A column_config
is essentially a list of ColumnTransport
objects, where each object specifies the properties of a single column. To create these objects, we have to import ColumnTansport
and ColumnType
from PyCelonis:
from pycelonis.ems import ColumnTransport, ColumnType
A ColumnTransport
object now takes the following input arguments:
Name | Type | Description | Default |
---|---|---|---|
column_name |
str |
Name of the column | None |
column_type |
ColumnType |
Data type of the column in the format: ColumnType.<Type> (supported data types: INTEGER , DATE , TIME , DATETIME , FLOAT , BOOLEAN , STRING ) |
None |
field_length |
int |
Max. number of characters per field (Required for the data type STRING ). By default, the max. number of characters is 80. Hence, when working with longer-text columns, they may get cut off when the table is uploaded into the EMS. To avoid this, we can specify a greater field_length . Note: The number specified in field_length is multiplied by 4 to obtain the max. number of characters. |
None |
Let's start by pushing a new activity table with a custom column configuration into the EMS. For this, we import our P2P activity table:
import pandas as pd
activity_df = pd.read_parquet("../../../assets/_CEL_P2P_ACTIVITIES_EN.parquet", engine="pyarrow")
print(activity_df.shape)
activity_df.head()
(60, 13)
_CASE_KEY | ACTIVITY_EN | ACTIVITY_DE | EVENTTIME | _SORTING | USER_TYPE | CHANGED_TABLE | CHANGED_FIELD | CHANGED_FROM | CHANGED_TO | CHANGED_FROM_FLOAT | CHANGED_TO_FLOAT | CHANGE_NUMBER | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 800000000006800001 | Create Purchase Requisition Item | Lege BANF Position an | 2008-12-31 07:44:05 | 0.0 | B | None | None | None | None | NaN | NaN | None |
1 | 800000000006800001 | Create Purchase Order Item | Lege Bestellposition an | 2009-01-02 07:44:05 | 10.0 | B | None | None | None | None | NaN | NaN | None |
2 | 800000000006800001 | Print and Send Purchase Order | Sende Bestellung | 2009-01-05 07:44:05 | NaN | B | None | None | None | None | NaN | NaN | None |
3 | 800000000006800001 | Receive Goods | Wareneingang | 2009-01-12 07:44:05 | 30.0 | A | None | None | None | None | NaN | NaN | None |
4 | 800000000006800001 | Scan Invoice | Scanne Rechnung | 2009-01-20 07:44:05 | NaN | A | None | None | None | None | NaN | NaN | None |
We now create a column_config
by creating for each column of the activity table a ColumnTransport
object and wrapping them in a list. Hereby, the input arguments depend on the data type of the column:
column_config = [
ColumnTransport(column_name="_CASE_KEY", column_type=ColumnType.STRING, field_length=20),
ColumnTransport(column_name="ACTIVITY_EN", column_type=ColumnType.STRING, field_length=100),
ColumnTransport(column_name="ACTIVITY_DE", column_type=ColumnType.STRING, field_length=100),
ColumnTransport(column_name="EVENTTIME", column_type=ColumnType.DATETIME),
ColumnTransport(column_name="_SORTING", column_type=ColumnType.FLOAT),
ColumnTransport(column_name="USER_TYPE", column_type=ColumnType.STRING, field_length=1),
ColumnTransport(column_name="CHANGED_TABLE", column_type=ColumnType.STRING, field_length=20),
ColumnTransport(column_name="CHANGED_FIELD", column_type=ColumnType.STRING, field_length=20),
ColumnTransport(column_name="CHANGED_FROM", column_type=ColumnType.STRING, field_length=20),
ColumnTransport(column_name="CHANGED_TO", column_type=ColumnType.STRING, field_length=20),
ColumnTransport(column_name="CHANGED_FROM_FLOAT", column_type=ColumnType.FLOAT),
ColumnTransport(column_name="CHANGED_TO_FLOAT", column_type=ColumnType.FLOAT),
ColumnTransport(column_name="CHANGE_NUMBER", column_type=ColumnType.STRING, field_length=20)
]
We can now push the activity table as dataframe along with our custom column_config
:
table = data_pool.create_table(df=activity_df, table_name="ACTIVITIES_2", drop_if_exists=True, column_config=column_config)
[2023-06-26 13:10:16,271] INFO: Successfully created data push job with id '9a4011e8-acc8-4ef8-ae46-94252708d48c' [2023-06-26 13:10:16,273] INFO: Add data frame as file chunks to data push job with id '9a4011e8-acc8-4ef8-ae46-94252708d48c'
[2023-06-26 13:10:16,298] INFO: Successfully upserted file chunk to data push job with id '9a4011e8-acc8-4ef8-ae46-94252708d48c' [2023-06-26 13:10:16,307] INFO: Successfully triggered execution for data push job with id '9a4011e8-acc8-4ef8-ae46-94252708d48c' [2023-06-26 13:10:16,308] INFO: Wait for execution of data push job with id '9a4011e8-acc8-4ef8-ae46-94252708d48c'
[2023-06-26 13:10:16,334] INFO: Successfully created table 'ACTIVITIES_2' in data pool [2023-06-26 13:10:16,338] INFO: Successfully deleted data push job with id '9a4011e8-acc8-4ef8-ae46-94252708d48c'
To verify the column configuration, we can print the column properties of a table with get_columns()
:
table.get_columns()
[2023-06-26 13:10:16,350] WARNING: For string columns, the field length is automatically multiplied by 4. For creating new tables based on the given columns, make sure to divide the field length for each column by 4 again in the column config before pushing.
[ PoolColumn(name='_CASE_KEY', length=80, type_=<PoolColumnType.STRING: 'STRING'>), PoolColumn(name='ACTIVITY_EN', length=400, type_=<PoolColumnType.STRING: 'STRING'>), PoolColumn(name='ACTIVITY_DE', length=400, type_=<PoolColumnType.STRING: 'STRING'>), PoolColumn(name='EVENTTIME', length=26, type_=<PoolColumnType.DATE: 'DATE'>), PoolColumn(name='_SORTING', length=15, type_=<PoolColumnType.FLOAT: 'FLOAT'>), PoolColumn(name='USER_TYPE', length=4, type_=<PoolColumnType.STRING: 'STRING'>), PoolColumn(name='CHANGED_TABLE', length=19, type_=<PoolColumnType.INTEGER: 'INTEGER'>), PoolColumn(name='CHANGED_FIELD', length=19, type_=<PoolColumnType.INTEGER: 'INTEGER'>), PoolColumn(name='CHANGED_FROM', length=19, type_=<PoolColumnType.INTEGER: 'INTEGER'>), PoolColumn(name='CHANGED_TO', length=19, type_=<PoolColumnType.INTEGER: 'INTEGER'>), PoolColumn(name='CHANGED_FROM_FLOAT', length=15, type_=<PoolColumnType.FLOAT: 'FLOAT'>), PoolColumn(name='CHANGED_TO_FLOAT', length=15, type_=<PoolColumnType.FLOAT: 'FLOAT'>), PoolColumn(name='CHANGE_NUMBER', length=19, type_=<PoolColumnType.INTEGER: 'INTEGER'>), PoolColumn(name='__index_level_0__', length=19, type_=<PoolColumnType.INTEGER: 'INTEGER'>), PoolColumn(name='_CELONIS_CHANGE_DATE', length=23, type_=<PoolColumnType.DATE: 'DATE'>) ]
We can see that certain columns, such as ACTIVITY_EN
now have a higher max. number of characters than the default of 80 (namely: field_length
of 100 * 4 = 400).
When pushing a table with a custom column configuration into the EMS, it is not possible to replace this table with create_table(drop_if_exists=True)
anymore, unless the new table also has a column_config
specified. This is to ensure that productive tables, on whose columns other Celonis objects rely are not replaced:
data_pool.create_table(df=activity_df, table_name="ACTIVITIES_2", drop_if_exists=True)
--------------------------------------------------------------------------- PyCelonisValueError Traceback (most recent call last) Cell In[11], line 1 ----> 1 data_pool.create_table(df=activity_df, table_name="ACTIVITIES_2", drop_if_exists=True) File ~/work/pycelonis/pycelonis/pycelonis/pycelonis/ems/data_integration/data_pool.py:334, in DataPool.create_table(self, df, table_name, drop_if_exists, column_config, chunk_size, force, data_source_id, index, **kwargs) 332 raise PyCelonisTableAlreadyExistsError(table_name) 333 if column_config is None and not force: --> 334 raise PyCelonisValueError( 335 f"Replacing table '{table_name}' without specifying column_config resets table schema to default. " 336 f"Especially STRING columns are affected where the default data type is VARCHAR(80) which causes" 337 f" longer strings to be cut at 80 characters. Either specify column_config or set `force=True`." 338 ) 340 if column_config is None: 341 logger.warning( 342 "STRING columns are by default stored as VARCHAR(80) and therefore cut after 80 characters. You can " 343 "specify a custom field length for each column using the `column_config` parameter." 344 ) PyCelonisValueError: Replacing table 'ACTIVITIES_2' without specifying column_config resets table schema to default. Especially STRING columns are affected where the default data type is VARCHAR(80) which causes longer strings to be cut at 80 characters. Either specify column_config or set `force=True`.
If we want to disable this functionality and replace a table without specifying a custom column_config
, we can set the input argument force=True
:
data_pool.create_table(df=activity_df, table_name="ACTIVITIES_2", drop_if_exists=True, force=True)
[2023-06-26 13:10:16,799] WARNING: STRING columns are by default stored as VARCHAR(80) and therefore cut after 80 characters. You can specify a custom field length for each column using the `column_config` parameter.
[2023-06-26 13:10:16,805] INFO: Successfully created data push job with id 'bbbe1ecd-b759-458b-9251-d1acfd9db2aa' [2023-06-26 13:10:16,806] INFO: Add data frame as file chunks to data push job with id 'bbbe1ecd-b759-458b-9251-d1acfd9db2aa'
[2023-06-26 13:10:16,829] INFO: Successfully upserted file chunk to data push job with id 'bbbe1ecd-b759-458b-9251-d1acfd9db2aa' [2023-06-26 13:10:16,841] INFO: Successfully triggered execution for data push job with id 'bbbe1ecd-b759-458b-9251-d1acfd9db2aa' [2023-06-26 13:10:16,842] INFO: Wait for execution of data push job with id 'bbbe1ecd-b759-458b-9251-d1acfd9db2aa'
[2023-06-26 13:10:16,869] INFO: Successfully created table 'ACTIVITIES_2' in data pool [2023-06-26 13:10:16,874] INFO: Successfully deleted data push job with id 'bbbe1ecd-b759-458b-9251-d1acfd9db2aa'
DataPoolTable(name='ACTIVITIES_2', data_source_id=None, columns=[], schema_name='6c178afe-21e2-4f77-b862-e37653ae0b2e', data_pool_id='6c178afe-21e2-4f77-b862-e37653ae0b2e')
3.2 Push table in specific data connection¶
To push a table within a specific data connection, simply pass the data_source_id
parameter:
data_connection = data_pool.get_data_connection("<DATA_CONNECTION_ID>")
data_pool.create_table(df=activity_df, table_name="ACTIVITIES_IN_CONNECTION", data_source_id=data_connection.id)
3.3 Push table as Parquet file¶
Besides pushing data as Pandas dataframes into the EMS, it is also possible to push data as Parquet files. For this use case, we need to create a manual data push job.
In general, pushing data as Parquet files into the EMS is done via the following workflow:
- Create a manual data push job
- Add file chunks into the data push job
- Execute the data push job
3.3.1 Create a manual data push job¶
A manual data push job can be created with the method create_data_push_job()
inside a data pool. The method takes the following input arguments:
Name | Type | Description | Default |
---|---|---|---|
target_name |
str |
Name of data pool table, into which the data should be pushed | Required |
type_ |
JobType |
Type of data push job in format JobType.<type> (Supported types: REPLACE = create table, DELTA = append or upsert table) |
None |
column_config |
List[ColumnTransport] |
Custom column configuration to specify column names and types | None |
keys |
List[str] |
List of columns that are used in an upsert operation to check for equality (not relevant for create and append) | None |
Let's start by pushing our activity table from the Data Push tutorial once again into the EMS but this time as a Parquet file. For this, we first have to create a manual data push job. We specify JobType.REPLACE
to create a new table and use the custom column configuration, which we created in the previous section:
from pycelonis.ems import JobType
data_push_job = data_pool.create_data_push_job(target_name="ACTIVITIES_3",
type_=JobType.REPLACE,
column_config=column_config)
[2023-06-26 13:10:16,897] INFO: Successfully created data push job with id 'f30c07bd-f6ac-48c6-9cb9-f71d6042d4f1'
We can verify that the data push job was created inside our data pool by calling the get_data_push_jobs()
method:
data_pool.get_data_push_jobs()
[ DataPushJob(id='f30c07bd-f6ac-48c6-9cb9-f71d6042d4f1', target_name='ACTIVITIES_3', status=<JobStatus.NEW: 'NEW'>, keys=[]) ]
3.3.2 Add file chunks into the data push job¶
The newly-created data push job currently does not contain any data to be pushed. In order to add data to the data push job, we need to call the add_file_chunk()
method. This method takes as input argument the Parquet file as a BytesIO
byte-stream. Hence, we first need to convert our file into a byte stream with open(file, "rb)"
and can then call the add_file_chunk()
method:
with open("../../../assets/_CEL_P2P_ACTIVITIES_EN.parquet", "rb") as file:
data_push_job.add_file_chunk(file)
[2023-06-26 13:10:16,930] INFO: Successfully upserted file chunk to data push job with id 'f30c07bd-f6ac-48c6-9cb9-f71d6042d4f1'
To verify that the data has been added to the data push job, we can call the get_chunks()
method:
data_push_job.get_chunks()
[ DataPushChunk(id='dcc8a644-652f-4902-8868-916abffcb780', tenant_id=None, creation_date=datetime.datetime(2023, 2, 1, 10, 52, 31, 730000, tzinfo=datetime.timezone.utc), type_=<ChunkType.UPSERT: 'UPSERT'>, push_job_id='f30c07bd-f6ac-48c6-9cb9-f71d6042d4f1', checksum='A34B94BE90681E2DD7B619C7E6C2A2A5', optional_tenant_id=None) ]
3.3.3 Execute the data push job¶
Lastly, we can execute the data push job in order to push the file into the EMS. Further, we can specify with wait
whether we want to wait for the data push job to be successfully executed:
- If
wait=True
, the method waits for the data push job and raises an error if the data push job fails - If
wait=False
, the method does not wait for the data push job and does not raise an error
Note:
A data push job can only be executed one single time!
data_push_job.execute(wait=True)
[2023-06-26 13:10:16,968] INFO: Successfully triggered execution for data push job with id 'f30c07bd-f6ac-48c6-9cb9-f71d6042d4f1' [2023-06-26 13:10:16,969] INFO: Wait for execution of data push job with id 'f30c07bd-f6ac-48c6-9cb9-f71d6042d4f1'
We can verify that the pushed table exists in the data pool by calling the get_tables().find()
method:
data_pool.get_tables().find("ACTIVITIES_3")
DataPoolTable(name='ACTIVITIES_3', data_source_id=None, columns=[], schema_name='6c178afe-21e2-4f77-b862-e37653ae0b2e', data_pool_id='6c178afe-21e2-4f77-b862-e37653ae0b2e')
3.4 Perform data push with chunking¶
3.4.1 Chunking for Parquet files¶
Celonis only supports Parquet files up to 1GB to be pushed in one file chunk into the EMS. However, a data job can contain multiple chunks, which can be pushed and are later combined into one single table inside the EMS. Hence, if we want to push data bigger than 1GB, we have to split up our data into multiple Parquet files (i.e. chunks), add these chunks into our data job, and can then execute the data push job.
Note:
Even though 1GB is the upper limit, it is recommended to stay significantly below that limit to ensure a smooth data job execution.
Let's suppose we want to push our P2P activity table again into the EMS. However, to reduce the data load, we have split up our activity table into 3 equal Parquet files. To push these chunks into the EMS, we first have to create a manual data push job:
data_push_job = data_pool.create_data_push_job(target_name="ACTIVITIES_4",
type_=JobType.REPLACE,
column_config=column_config)
[2023-06-26 13:10:17,039] INFO: Successfully created data push job with id '880dc5c4-8c44-4816-be0d-053c3c6c5458'
Now, we can iterate over the different Parquet files and add each file as a separate chunk into the data job by calling the add_file_chunk()
method:
file_path = "../../../assets/chunked_files"
file_names = ["ACTIVITIES_PUSH_CHUNKED_1", "ACTIVITIES_PUSH_CHUNKED_2", "ACTIVITIES_PUSH_CHUNKED_3"]
for file_name in file_names:
with open(f"{file_path}/{file_name}.parquet", "rb") as f:
data_push_job.add_file_chunk(f)
[2023-06-26 13:10:17,057] INFO: Successfully upserted file chunk to data push job with id '880dc5c4-8c44-4816-be0d-053c3c6c5458' [2023-06-26 13:10:17,064] INFO: Successfully upserted file chunk to data push job with id '880dc5c4-8c44-4816-be0d-053c3c6c5458' [2023-06-26 13:10:17,073] INFO: Successfully upserted file chunk to data push job with id '880dc5c4-8c44-4816-be0d-053c3c6c5458'
We can verify that all chunks are in our data push job by calling the get_chunks()
method:
data_push_job.get_chunks()
[ DataPushChunk(id='8498111c-c895-4e19-8432-41b19dc3c5b6', tenant_id=None, creation_date=datetime.datetime(2023, 2, 1, 10, 52, 45, 602000, tzinfo=datetime.timezone.utc), type_=<ChunkType.UPSERT: 'UPSERT'>, push_job_id='880dc5c4-8c44-4816-be0d-053c3c6c5458', checksum='286E3BD7D31BE38D2D5AA6DDAA50AD74', optional_tenant_id=None), DataPushChunk(id='9feb4a3f-2645-4b4e-afae-4afd59ac2e98', tenant_id=None, creation_date=datetime.datetime(2023, 2, 1, 10, 52, 45, 447000, tzinfo=datetime.timezone.utc), type_=<ChunkType.UPSERT: 'UPSERT'>, push_job_id='880dc5c4-8c44-4816-be0d-053c3c6c5458', checksum='A8E14A9CEFD265EFE85D34552CF74CA7', optional_tenant_id=None), DataPushChunk(id='01c389a2-a065-497a-aaf8-7ff675e0a920', tenant_id=None, creation_date=datetime.datetime(2023, 2, 1, 10, 52, 45, 122000, tzinfo=datetime.timezone.utc), type_=<ChunkType.UPSERT: 'UPSERT'>, push_job_id='880dc5c4-8c44-4816-be0d-053c3c6c5458', checksum='56137942D23232755875C0CFE8450D60', optional_tenant_id=None) ]
Lastly, we can execute the data push job:
data_push_job.execute(wait=True)
[2023-06-26 13:10:17,114] INFO: Successfully triggered execution for data push job with id '880dc5c4-8c44-4816-be0d-053c3c6c5458' [2023-06-26 13:10:17,115] INFO: Wait for execution of data push job with id '880dc5c4-8c44-4816-be0d-053c3c6c5458'
The EMS will take the different chunks and construct a single table out of them. We can call the get_tables().find_all()
method inside our data pool to verify that, indeed, only a single table with the name ACTIVITIES_4
was created:
data_pool.get_tables().find_all("ACTIVITIES_4")
[ DataPoolTable(name='ACTIVITIES_4', data_source_id=None, columns=[], schema_name='6c178afe-21e2-4f77-b862-e37653ae0b2e', data_pool_id='6c178afe-21e2-4f77-b862-e37653ae0b2e') ]
3.4.2 Chunking for Pandas dataframes¶
Splitting data into multiple chunks can also be performed when pushing Pandas dataframes into the EMS. This is done by specifying the input argument chunk_size
in the methods create_table()
, append()
, or upsert()
. The chunk_size
argument specifies, how many rows of the data frame are pushed into the EMS in one chunk.
By default, this value is set to 100,000 rows. However, the optimal value for chunk_size
depends on the number of columns and type of data in our table. For instance, if our table only contains few columns and data with small memory requirements, such as boolean or integer, we can also increase the chunk_size
for a faster data push. If our table contains many columns and data with high memory requirements, such as long strings, it might be a good idea to decrease the chunk_size
to avoid reaching the data push limit of Celonis.
Let's push our P2P activity table as Pandas dataframe but limit the chunk_size
to 10,000 rows:
data_pool.create_table(df=activity_df, table_name="ACTIVITIES_5", drop_if_exists=True, chunk_size=10000)
[2023-06-26 13:10:17,193] WARNING: STRING columns are by default stored as VARCHAR(80) and therefore cut after 80 characters. You can specify a custom field length for each column using the `column_config` parameter.
[2023-06-26 13:10:17,204] INFO: Successfully created data push job with id 'e89462cd-1f20-4c2c-b3af-eec0ecf46d2a' [2023-06-26 13:10:17,205] INFO: Add data frame as file chunks to data push job with id 'e89462cd-1f20-4c2c-b3af-eec0ecf46d2a'
[2023-06-26 13:10:17,227] INFO: Successfully upserted file chunk to data push job with id 'e89462cd-1f20-4c2c-b3af-eec0ecf46d2a' [2023-06-26 13:10:17,248] INFO: Successfully triggered execution for data push job with id 'e89462cd-1f20-4c2c-b3af-eec0ecf46d2a' [2023-06-26 13:10:17,248] INFO: Wait for execution of data push job with id 'e89462cd-1f20-4c2c-b3af-eec0ecf46d2a'
[2023-06-26 13:10:17,291] INFO: Successfully created table 'ACTIVITIES_5' in data pool [2023-06-26 13:10:17,303] INFO: Successfully deleted data push job with id 'e89462cd-1f20-4c2c-b3af-eec0ecf46d2a'
DataPoolTable(name='ACTIVITIES_5', data_source_id=None, columns=[], schema_name='6c178afe-21e2-4f77-b862-e37653ae0b2e', data_pool_id='6c178afe-21e2-4f77-b862-e37653ae0b2e')
Internally, PyCelonis will iterate over the dataframe using a sliding window of chunk_size
rows, convert these rows into Parquet files, add these files as chunks into a data job, and execute the data job.
4. Advanced Data Export¶
4.1 Export data in alternative file formats¶
Besides retrieving data as Pandas dataframe via the export_data_frame()
method, it is also possible to export data in alternative file formats, such as .csv
, .xlsx
, or .parquet
files, from the EMS. Similarly to the manual data push job, we have to create a manual data export in order to achieve this.
In general, exporting data in alternative file formats from the EMS is done via the following workflow:
- Specify the data to be exported via PQL
- Create a manual data export with the PQL query as argument
- Write chunks into files
4.1.1 Specify data to be exported via PQL¶
First, we need to specify which data to export by constructing a PQL query. Here, we will use the query from the Data Export tutorial. However, instead of exporting the result table as a Pandas dataframe, we will export it as Parquet files.
from pycelonis.pql import PQL, PQLColumn, PQLFilter, OrderByColumn
query = PQL(distinct=False, limit=3, offset=3)
query += PQLColumn(name="_CASE_KEY", query=""" "ACTIVITIES"."_CASE_KEY" """)
query += PQLColumn(name="ACTIVITY_EN", query=""" "ACTIVITIES"."ACTIVITY_EN" """)
query += PQLColumn(name="EVENTTIME", query=""" "ACTIVITIES"."EVENTTIME" """)
query += PQLColumn(name="_SORTING", query=""" "ACTIVITIES"."_SORTING" """)
query += PQLFilter(query=""" FILTER "ACTIVITIES"."_CASE_KEY" = '800000000006800001'; """)
query += OrderByColumn(query=""" "ACTIVITIES"."EVENTTIME" """)
query += OrderByColumn(query=""" "ACTIVITIES"."_SORTING" """)
query
PQL(columns=[PQLColumn(name='_CASE_KEY', query=' "ACTIVITIES"."_CASE_KEY" '), PQLColumn(name='ACTIVITY_EN', query=' "ACTIVITIES"."ACTIVITY_EN" '), PQLColumn(name='EVENTTIME', query=' "ACTIVITIES"."EVENTTIME" '), PQLColumn(name='_SORTING', query=' "ACTIVITIES"."_SORTING" ')], filters=[PQLFilter(query=' FILTER "ACTIVITIES"."_CASE_KEY" = \'800000000006800001\'; ')], order_by_columns=[OrderByColumn(query=' "ACTIVITIES"."EVENTTIME" ', ascending=True), OrderByColumn(query=' "ACTIVITIES"."_SORTING" ', ascending=True)], distinct=False, limit=3, offset=3)
4.1.2 Create manual data export¶
Next, we need to put the PQL query into a manual data export by calling the create_data_export()
method. The method takes the following input arguments:
Name | Type | Description | Default |
---|---|---|---|
query |
PQL |
PQL query that specifies which data to retrieve | Required |
export_type |
ExportType |
File format in which data should be retrieved (Format: ExportType.<type> , Supported types: PARQUET , EXCEL , CSV ) |
Required |
from pycelonis.ems import ExportType
data_export = data_model.create_data_export(query=query, export_type=ExportType.PARQUET)
[2023-06-26 13:10:17,348] INFO: Successfully created data export with id '46471448-e511-4c20-8f36-cbe08018ebec'
The method will export the data specified in the PQL query from the EMS and store it in one or multiple chunks (depending on the data size) inside a DataExport
object.
To access the chunks inside the DataExport
object, we first have to make sure that the data export is completed. To achieve this, we call the wait_for_execution()
method, which shows the current status of the data export as a progress bar. Once the method returns the status DONE
, we are able to retrieve the chunks from the DataExport
object.
data_export.wait_for_execution()
[2023-06-26 13:10:17,357] INFO: Wait for execution of data export with id '46471448-e511-4c20-8f36-cbe08018ebec'
4.1.3 Write chunks into files¶
Lastly, we retrieve the chunks inside the DataExport
object by calling the get_chunks()
method. This method returns the data chunks as a Generator, over which can be iterated in a for-loop:
chunks = data_export.get_chunks()
chunks
<generator object DataExport.get_chunks at 0x7f55a81a6510>
Now, we can read the content of each chunk and write it into a Parquet file:
file_path = "../../../assets/chunked_files"
file_name = "ACTIVITIES_PULL_CHUNKED"
file_counter = 1
for chunk in chunks:
with open(f"{file_path}/{file_name}_{file_counter}.parquet", "wb") as f:
f.write(chunk.read())
file_counter += 1
[2023-06-26 13:10:17,398] INFO: Export result chunks for data export with id '46471448-e511-4c20-8f36-cbe08018ebec'
Conclusion¶
Congratulations! You have successfully mastered the more advanced topics of pushing and exporting data via PyCelonis. More specifically, you have learned how to specify custom column configurations, how to use alternative file formats, and how to perform chunking. In the next tutorial Data Jobs, you will learn how to use PyCelonis to interact with data jobs and tasks in order to modify data pool tables.