Skip to content

data_pool

Module to interact with data pools.

This module contains class to interact with a data pool in EMS Data integration.

Typical usage example:

```python
data_pool = celonis.data_integration.get_data_pool(data_pool_id)
data_pool.name = "NEW_NAME"
data_pool.update()
data_pool.delete()
```

DataPool

Bases: DataPoolTransport

Data pool object to interact with data pool specific data integration endpoints.

permissions class-attribute instance-attribute

permissions = Field(alias='permissions')

description class-attribute instance-attribute

description = Field(alias='description')

time_stamp class-attribute instance-attribute

time_stamp = Field(alias='timeStamp')

configuration_status class-attribute instance-attribute

configuration_status = Field(alias='configurationStatus')

locked class-attribute instance-attribute

locked = Field(alias='locked')

content_id class-attribute instance-attribute

content_id = Field(alias='contentId')

content_version class-attribute instance-attribute

content_version = Field(alias='contentVersion')

tags class-attribute instance-attribute

tags = Field(alias='tags')

original_id class-attribute instance-attribute

original_id = Field(alias='originalId')

monitoring_target class-attribute instance-attribute

monitoring_target = Field(alias='monitoringTarget')

custom_monitoring_target class-attribute instance-attribute

custom_monitoring_target = Field(
    alias="customMonitoringTarget"
)

custom_monitoring_target_active class-attribute instance-attribute

custom_monitoring_target_active = Field(
    alias="customMonitoringTargetActive"
)

exported class-attribute instance-attribute

exported = Field(alias='exported')

monitoring_message_columns_migrated class-attribute instance-attribute

monitoring_message_columns_migrated = Field(
    alias="monitoringMessageColumnsMigrated"
)

creator_user_id class-attribute instance-attribute

creator_user_id = Field(alias='creatorUserId')

object_id class-attribute instance-attribute

object_id = Field(alias='objectId')

client class-attribute instance-attribute

client = Field(..., exclude=True)

id instance-attribute

id

Id of data pool.

name instance-attribute

name

Name of data pool.

from_transport classmethod

from_transport(client, data_pool_transport)

Creates high-level data pool object from given DataPoolTransport.

Parameters:

  • client (Client) –

    Client to use to make API calls for given data pool.

  • data_pool_transport (DataPoolTransport) –

    DataPoolTransport object containing properties of data pool.

Returns:

  • DataPool

    A DataPool object with properties from transport and given client.

update

update()

Pushes local changes of data pool to EMS and updates properties with response from EMS.

sync

sync()

Syncs data pool properties with EMS.

delete

delete()

Deletes data pool.

create_data_model

create_data_model(name, **kwargs)

Creates new data model with name in given data pool.

Parameters:

  • name (str) –

    Name of new data model.

  • **kwargs (Any, default: {} ) –

    Additional parameters set for DataModelTransport object.

Returns:

  • DataModel

    A DataModel object for newly created data model.

Examples:

Create a data model and add tables:

data_model = data_pool.create_data_model("TEST_DATA_MODEL")
data_model.add_table(name="ACTIVITIES", alias="ACTIVITIES")
data_model.add_table(name="EKPO", alias="EKPO")

get_data_model

get_data_model(id_)

Gets data model with given id.

Parameters:

  • id_ (str) –

    Id of data model.

Returns:

  • DataModel

    A DataModel object for data model with given id.

get_data_models

get_data_models()

Gets all data models of given data pool.

Returns:

create_data_push_job_from staticmethod

create_data_push_job_from(
    client,
    data_pool_id,
    target_name,
    type_=None,
    column_config=None,
    keys=None,
    **kwargs
)

Creates new data push job in given data pool.

Parameters:

  • client (Client) –

    Client to use to make API calls for data export.

  • data_pool_id (str) –

    Id of data pool where data push job will be created.

  • target_name (str) –

    Table name to which job will push data.

  • type_ (Optional[JobType], default: None ) –

    Type of data push job.

  • column_config (Optional[List[ColumnTransport]], default: None ) –

    Can be used to specify column types and string field length in number of characters.

  • keys (Optional[List[str]], default: None ) –

    Primary keys to use in case of upsert data push job.

  • **kwargs (Any, default: {} ) –

    Additional parameters set for DataPushJob object.

Returns:

Examples:

Create data push job to replace table:

from pycelonis.ems import DataPool

data_push_job = DataPool.create_data_push_job_from(
    client=celonis.client,
    data_pool_id="<data_pool_id>",
    target_name="ACTIVITIES",
    type_=JobType.REPLACE
)

with open("ACTIVITIES.parquet", "rb") as file:
    data_push_job.add_file_chunk(file)

data_push_job.execute()

create_data_push_job

create_data_push_job(
    target_name,
    type_=None,
    column_config=None,
    keys=None,
    connection_id=None,
    **kwargs
)

Creates new data push job in given data pool.

Parameters:

  • target_name (str) –

    Table name to which job will push data.

  • type_ (Optional[JobType], default: None ) –

    Type of data push job.

  • column_config (Optional[List[ColumnTransport]], default: None ) –

    Can be used to specify column types and string field length in number of characters.

  • keys (Optional[List[str]], default: None ) –

    Primary keys to use in case of upsert data push job.

  • connection_id (Optional[str], default: None ) –

    Connection id of connection for data push job (Equivalent to data_source_id for pool tables).

  • **kwargs (Any, default: {} ) –

    Additional parameters set for DataPushJob object.

Returns:

  • DataPushJob

    A DataPushJob object for newly created data push job.

Examples:

Create data push job to replace table:

data_push_job = data_pool.create_data_push_job(
    target_name="ACTIVITIES",
    type_=JobType.REPLACE
)

with open("ACTIVITIES.parquet", "rb") as file:
    data_push_job.add_file_chunk(file)

data_push_job.execute()

get_data_push_job

get_data_push_job(id_)

Gets data push job with given id.

Parameters:

  • id_ (str) –

    Id of data push job.

Returns:

  • DataPushJob

    A DataPushJob object for data push job with given id.

get_data_push_jobs

get_data_push_jobs()

Gets all data push jobs of given data pool.

Returns:

create_table

create_table(
    df,
    table_name,
    drop_if_exists=False,
    column_config=None,
    chunk_size=100000,
    force=False,
    data_source_id=None,
    index=False,
    **kwargs
)

Creates new table in given data pool.

Parameters:

  • df (DataFrame) –

    DataFrame to push to new table.

  • table_name (str) –

    Name of new table.

  • drop_if_exists (bool, default: False ) –

    If true, drops existing table if it exists. If false, raises PyCelonisTableAlreadyExistsError if table already exists.

  • column_config (Optional[List[ColumnTransport]], default: None ) –

    Can be used to specify column types and string field length in number of characters.

  • chunk_size (int, default: 100000 ) –

    Number of rows to push in one chunk.

  • force (bool, default: False ) –

    If true, replacing table without column config is possible. Otherwise, error is raised if table would be replaced without column config.

  • data_source_id (Optional[str], default: None ) –

    Id of data connection where table will be created (Equivalent to connection_id for data push jobs).

  • index (Optional[bool], default: False ) –

    Whether index is included in parquet file that is pushed. Default False. See pandas documentation.

  • **kwargs (Any, default: {} ) –

    Additional parameters set for DataPushJob object.

Returns:

Raises:

  • PyCelonisTableAlreadyExistsError

    Raised if drop_if_exists=False and table already exists.

  • PyCelonisDataPushExecutionFailedError

    Raised when table creation fails.

  • PyCelonisValueError

    Raised when table already exists and no column config is given.

Examples:

Create new table:

df = pd.DataFrame({"TEST_COLUMN": [1,2, 3]})

pool_table = data_pool.create_table(df, "TEST_TABLE")
Replace table:
from pycelonis.ems import ColumnTransport, ColumnType

df = pd.DataFrame({"TEST_COLUMN": [1,2, 3]})
column_config = [ColumnTransport(column_name="TEST_COLUMN", column_type=ColumnType.INTEGER)]
pool_table = data_pool.create_table(
    df, "TEST_TABLE", drop_if_exists=True, column_config=column_config)

get_table

get_table(name, data_source_id=None)

Gets table located in data pool with given name and data source id.

Parameters:

  • name (str) –

    Name of table.

  • data_source_id (Optional[str], default: None ) –

    Id of data connection where table is located (Equivalent to connection_id for data push jobs).

Returns:

Raises:

get_tables

get_tables()

Gets all data pool tables of given data pool.

Returns:

get_data_connection

get_data_connection(id_)

Gets data connection with given id.

Parameters:

  • id_ (str) –

    Id of data connection.

Returns:

  • DataConnection

    A DataConnection object for data connection with given id.

get_data_connections

get_data_connections()

Gets all data connections of given data pool.

Returns:

create_job

create_job(name, data_source_id=None, **kwargs)

Creates new job with name in given data pool.

Parameters:

  • name (str) –

    Name of new job.

  • data_source_id (Optional[str], default: None ) –

    Data connection id to use for job scope. (Equivalent to connection_id for data push jobs).

Returns:

  • Job

    A Job object for newly created job.

Examples:

Create data job with transformation statement and execute it:

data_job = data_pool.create_job("PyCelonis Tutorial Job")

task = data_job.create_transformation(
    name="PyCelonis Tutorial Task",
    description="This is an example task"
)

task.update_statement(\"\"\"
    DROP TABLE IF EXISTS ACTIVITIES;
    CREATE TABLE ACTIVITIES (
        _CASE_KEY VARCHAR(100),
        ACTIVITY_EN VARCHAR(300)
    );
\"\"\")

data_job.execute()

get_job

get_job(id_)

Gets job with given id.

Parameters:

  • id_ (str) –

    Id of job.

Returns:

  • Job

    A Job object for job with given id.

get_jobs

get_jobs()

Gets all jobs of given data pool.

Returns:

get_pool_variables

get_pool_variables()

Gets data pool variables.

Returns:

get_pool_variable

get_pool_variable(id_)

Gets pool variable with given id.

Parameters:

  • id_ (str) –

    Id of pool variable.

Returns:

  • PoolVariable

    A PoolVariable object for pool variable with given id.

create_pool_variable

create_pool_variable(
    name,
    placeholder,
    description=None,
    var_type=VariableType.PUBLIC_CONSTANT,
    data_type=FilterParserDataType.STRING,
    values=None,
    **kwargs
)

Creates and returns newly created pool variable.

Parameters:

  • name (str) –

    name of a variable

  • placeholder (str) –

    placeholder of variable

  • description (Optional[str], default: None ) –

    description of variable

  • var_type (VariableType, default: PUBLIC_CONSTANT ) –

    type of variable

  • data_type (FilterParserDataType, default: STRING ) –

    type of value of variable

  • values (Optional[List[VariableValueTransport]], default: None ) –

    list of variables

  • **kwargs (Any, default: {} ) –

    Additional parameters