Skip to content

data_model

Module to interact with data models.

This module contains class to interact with a data model in EMS data integration.

Typical usage example:

```python
data_model = data_pool.get_data_model(data_model_id)
data_model.name = "NEW_NAME"
data_model.update()
data_model.reload()
data_model.delete()
```

internal_tracking_logger module-attribute

internal_tracking_logger = getLogger(
    INTERNAL_TRACKING_LOGGER
)

DataModel

Bases: DataModelTransport

Data model object to interact with data model specific data integration endpoints.

permissions class-attribute instance-attribute

permissions = Field(None, alias='permissions')

description class-attribute instance-attribute

description = Field(None, alias='description')

create_date class-attribute instance-attribute

create_date = Field(None, alias='createDate')

changed_date class-attribute instance-attribute

changed_date = Field(None, alias='changedDate')

configuration_skipped class-attribute instance-attribute

configuration_skipped = Field(
    None, alias="configurationSkipped"
)

unavailable class-attribute instance-attribute

unavailable = Field(None, alias='unavailable')

editable class-attribute instance-attribute

editable = Field(None, alias='editable')

creator_user_id class-attribute instance-attribute

creator_user_id = Field(None, alias='creatorUserId')

tables class-attribute instance-attribute

tables = Field(None, alias='tables')

foreign_keys class-attribute instance-attribute

foreign_keys = Field(None, alias='foreignKeys')

process_configurations class-attribute instance-attribute

process_configurations = Field(
    None, alias="processConfigurations"
)

data_model_calendar_type class-attribute instance-attribute

data_model_calendar_type = Field(
    None, alias="dataModelCalendarType"
)

factory_calendar class-attribute instance-attribute

factory_calendar = Field(None, alias='factoryCalendar')

custom_calendar class-attribute instance-attribute

custom_calendar = Field(None, alias='customCalendar')

original_id class-attribute instance-attribute

original_id = Field(None, alias='originalId')

eventlog_automerge_enabled class-attribute instance-attribute

eventlog_automerge_enabled = Field(
    None, alias="eventlogAutomergeEnabled"
)

auto_merge_execution_mode class-attribute instance-attribute

auto_merge_execution_mode = Field(
    None, alias="autoMergeExecutionMode"
)

event_log_count class-attribute instance-attribute

event_log_count = Field(None, alias='eventLogCount')

object_id class-attribute instance-attribute

object_id = Field(None, alias='objectId')

client class-attribute instance-attribute

client = Field(..., exclude=True)

id instance-attribute

id

Id of data model.

pool_id instance-attribute

pool_id

Id of data pool where data model is located.

name instance-attribute

name

Name of data model.

data_pool_id property writable

data_pool_id

Returns id of data pool for given data model.

Returns:

  • str

    Id of data pool.

from_transport classmethod

from_transport(client, data_model_transport)

Creates high-level data model object from given DataModelTransport.

Parameters:

  • client (Client) –

    Client to use to make API calls for given data model.

  • data_model_transport (DataModelTransport) –

    DataModelTransport object containing properties of data model.

Returns:

  • DataModel

    A DataModel object with properties from transport and given client.

update

update()

Pushes local changes of data model to EMS and updates properties with response from EMS.

sync

sync()

Syncs data model properties with EMS.

delete

delete()

Deletes data model.

reload

reload(force_complete=True, wait=True)

Reloads given data model.

Parameters:

  • force_complete (bool, default: True ) –

    If true, complete reload is triggered. Else reload from cache is triggered.

  • wait (bool, default: True ) –

    If true, function only returns once data model has been reloaded and raises error if reload fails. If false, function returns after triggering reload and does not raise errors in case load failed.

Raises:

partial_reload

partial_reload(data_model_table_ids, wait=True)

Only reloads given tables in data model.

Parameters:

  • data_model_table_ids (List[str]) –

    List of data model table ids that needs to be reloaded.

  • wait (bool, default: True ) –

    If true, function only returns once data model tables have been reloaded and raises error if reload fails. If false, function returns after triggering partial reload and does not raise errors in case load failed.

Raises:

get_load_status

get_load_status()

Get the datamodel's load status.

API

  • GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/load-history/load-info-sync

Returns:

add_table

add_table(name, alias=None, **kwargs)

Creates new data model with name in given data pool.

Parameters:

  • name (str) –

    Name of existing table in data pool.

  • alias (Optional[str], default: None ) –

    Alias of new data model table.

  • **kwargs (Any, default: {} ) –

    Additional parameters set for DataModelTableTransport object.

Returns:

  • DataModelTable

    A DataModelTable object for newly created data model table.

Examples:

Create a data model and add tables:

pool_table = data_pool.create_table(data_frame, "TEST_TABLE")
data_model.add_table(name=pool_table.name, alias="ACTIVITIES")

get_table

get_table(id_)

Gets data model table with given id.

Parameters:

  • id_ (str) –

    Id of data model table.

Returns:

  • DataModelTable

    A DataModelTable object for data model table with given id.

get_tables

get_tables()

Gets all data model tables of given data model.

Returns:

export_data_frame_from staticmethod

export_data_frame_from(
    client,
    pool_id,
    data_model_id,
    query,
    query_environment=None,
)

Creates new data export and downloads exported data as data frame for given data model.

Warning

The method DataModel.export_data_frame_from has been deprecated and will be removed in future versions. Please use SaolaPy from now on to export PQL queries:

import pycelonis.pql as pql

df = pql.DataFrame.from_pql(
    query,
    data_model=DataModel(client=celonis.client, pool_id=<pool_id>, id=<data_model_id>)
)
df.head()

This method should be used in case you only have USE ALL DATA MODELS or USE DATA MODEL permissions.

Parameters:

  • client (Client) –

    Client to use to make API calls for data export.

  • pool_id (str) –

    Id of data pool.

  • data_model_id (str) –

    Id of data model.

  • query (Union[DataQuery, PQL]) –

    PQL query to export.

  • query_environment (Optional[QueryEnvironment], default: None ) –

    Query environment KPIs.

Returns:

  • DataFrame

    A data frame containing the exported data.

Examples:

Export data into dataframe:

from pycelonis.pql import PQL, PQLColumn, PQLFilter, OrderByColumn
from pycelonis.ems import DataModel

query = PQL(distinct=False, limit=None, offset=None)
query += PQLColumn(name="_CASE_KEY", query='"ACTIVITIES"."_CASE_KEY"')
query += PQLColumn(name="ACTIVITY_EN", query='"ACTIVITIES"."ACTIVITY_EN"')

df = DataModel.export_data_frame_from(
    client=celonis.client, pool_id=<data_pool_id>, data_model_id=<data_model_id>, query=query
)

create_data_export

create_data_export(
    query, export_type, query_environment=None
)

Creates new data export in given data model.

Parameters:

Returns:

  • DataExport

    A DataExport object for newly created data export.

Examples:

Manually run data export and wait for it to finish:

data_export = data_model.create_data_export(query=query, export_type=ExportType.PARQUET)
data_export.wait_for_execution()
chunks = data_export.get_chunks()

for chunk in chunks:
    with open(f"<file_name>.parquet", "wb") as f:
        f.write(chunk.read())

export_data_frame

export_data_frame(query, query_environment=None)

Creates new data export and downloads exported data as data frame.

Warning

The method data_model.export_data_frame has been deprecated and will be removed in future versions. Please use SaolaPy from now on to export PQL queries:

import pycelonis.pql as pql

df = pql.DataFrame.from_pql(query, data_model=data_model)
df.head()

Parameters:

  • query (Union[DataQuery, PQL]) –

    PQL query to export.

  • query_environment (Optional[QueryEnvironment], default: None ) –

    Query environment KPIs.

Returns:

  • DataFrame

    A data frame containing the exported data.

Examples:

Export data into dataframe:

from pycelonis.pql import PQL, PQLColumn, PQLFilter, OrderByColumn

query = PQL(distinct=False, limit=None, offset=None)
query += PQLColumn(name="_CASE_KEY", query='"ACTIVITIES"."_CASE_KEY"')
query += PQLColumn(name="ACTIVITY_EN", query='"ACTIVITIES"."ACTIVITY_EN"')

df = data_model.export_data_frame(query)

create_foreign_key

create_foreign_key(
    source_table_id, target_table_id, columns
)

Creates new foreign key with name in given data model.

Parameters:

  • source_table_id (str) –

    Id of source table

  • target_table_id (str) –

    Id of target table

  • columns (List[Tuple[str, str]]) –

    List of 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName' which represents the foreign_key, e.g. columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..]

Returns:

  • ForeignKey

    A ForeignKey object for newly created foreign key.

Examples:

Create foreign key between two tables:

ekpo = tables.find("EKPO")
activities = tables.find("ACTIVITIES")

foreign_key = data_model.create_foreign_key(
    source_table_id=ekpo.id,
    target_table_id=activities.id,
    columns=[("_CASE_KEY", "_CASE_KEY")]
)

get_foreign_key

get_foreign_key(id_)

Gets foreign key with given id.

Parameters:

  • id_ (str) –

    Id of foreign key.

Returns:

  • ForeignKey

    A ForeignKey object for foreign key with given id.

get_foreign_keys

get_foreign_keys()

Gets all foreign keys of given data pool.

Returns:

add_name_mappings

add_name_mappings(file_path)

Add the name mappings to data model.

Parameters:

  • file_path (Union[str, Path]) –

    Path of name mapping Excel file.

Returns:

get_name_mappings

get_name_mappings()

Gets name mappings of given data model.

Returns:

delete_name_mappings

delete_name_mappings()

Deletes name mappings of given data model.

create_process_configuration

create_process_configuration(
    activity_table_id,
    case_id_column,
    activity_column,
    timestamp_column,
    sorting_column=None,
    case_table_id=None,
    **kwargs
)

Creates new process configuration in given data model.

Parameters:

  • activity_table_id (str) –

    Id of activity table.

  • case_id_column (str) –

    Column name of case id column.

  • activity_column (str) –

    Column name of activity column.

  • timestamp_column (str) –

    Column name of timestamp column.

  • sorting_column (Optional[str], default: None ) –

    Column name of sorting column.

  • case_table_id (Optional[str], default: None ) –

    Id of case table.

  • **kwargs (Any, default: {} ) –

    Additional parameters set for DataModelConfiguration object.

Returns:

Examples:

Create process configuration for given data model:

ekpo = tables.find("EKPO")
activities = tables.find("ACTIVITIES")

process_configuration = data_model.create_process_configuration(
    activity_table_id=activities.id,
    case_id_column="_CASE_KEY",
    activity_column="ACTIVITY_EN",
    timestamp_column="EVENTTIME",
    sorting_column="_SORTING",
    case_table_id=ekpo.id
)

get_process_configuration

get_process_configuration(id_)

Gets process configuration with given id.

Parameters:

  • id_ (str) –

    Id of process configuration.

Returns:

Raises:

get_process_configurations

get_process_configurations()

Gets all process configurations of given data model.

Returns: