data_model.py
Datamodel (CelonisApiObjectChild)
¶
Datamodel object to interact with Celonis Event Collection API.
Source code in celonis_api/event_collection/data_model.py
class Datamodel(CelonisApiObjectChild):
"""Datamodel object to interact with Celonis Event Collection API."""
def __init__(self, parent: typing.Union[CelonisApiObject, str], id_or_data, **kwargs):
super().__init__(parent=parent, id_or_data=id_or_data, **kwargs)
self._compute_node = ComputeNodeFactory.create_compute_node(
id=self.id, datamodel=self, celonis=self.parent.celonis
)
@property
def _parent_class(self):
from pycelonis.celonis_api.event_collection.data_pool import Pool
return Pool
@property
def pool(self) -> 'Pool':
from pycelonis.celonis_api.event_collection.data_pool import Pool
return typing.cast(Pool, self.parent)
@property
def url(self) -> str:
"""
!!! api "API"
- `/integration/api/pools/{pool_id}/data-models/{datamodel_id}`
"""
return f"{self.parent.url}/data-models/{self.id}"
@property
def tables(self) -> 'CelonisCollection[DatamodelTable]':
"""Get all Datamodel Tables.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{table_id}`
Returns:
Collection of Datamodel Tables.
"""
return CelonisCollection([DatamodelTable(self, t) for t in self.data["tables"]])
@deprecated("Use 'celonis_api.event_collection.data_model.Datamodel.process_configurations'.") # type: ignore
@property
def process_configuration(self) -> 'DatamodelProcessConfiguration':
"""Get the Datamodels Process Configuration.
!!! warning
This method is deprecated. Use [celonis_api.event_collection.data_model.Datamodel.process_configurations][]:
```py
dm.process_configurations[0]
```
Returns:
The first Process Configuration of the all Datamodels Process Configurations.
"""
configs = self.process_configurations
if len(configs) > 0:
return configs[0]
else:
raise PyCelonisNotFoundError("Couldn't find any process configuration.")
@property
def process_configurations(self) -> 'CelonisCollection[DatamodelProcessConfiguration]':
"""Get all Datamodels Process Configurations.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations`
Returns:
A Collection the Datamodel's Process Configurations.
"""
return CelonisCollection(DatamodelProcessConfiguration(self, pc) for pc in self._process_configuration_data)
@property
def _process_configuration_data(self) -> typing.List:
try:
return self.celonis.api_request(f"{self.url}/process-configurations")
except PyCelonisHTTPError:
return [self.celonis.api_request(f"{self.url}/process-configuration")]
@property
def default_activity_table(self) -> 'typing.Optional[DatamodelTable]':
"""Get/Set the default Activity Table via Process Configuration.
```py
# Get
default_at = dm.default_activity_table
# Set
dm.default_activity_table = default_at
```
!!! api "API"
- `PUT: /integration/api/pools/{pool_id}/data-models/process-configurations/default-activity-table`
```json
{
"id": datamodel_table.id,
"dataSourceId": datamodel_table.data["dataSourceId"],
"aliasOrName": datamodel_table.alias,
"alias": datamodel_table.alias,
"dataModelId": self.id,
"name": datamodel_table.source_name,
"useDirectStorage": False,
"columns": [],
}
```
Returns:
The default Activity Table.
"""
activity_tables = self._process_configuration_data
default_act = [a for a in activity_tables if a.get("defaultConfiguration", False)]
if len(default_act) > 0:
return self.tables.find(default_act[0]["activityTableId"])
elif len(activity_tables) == 1:
return self.tables.find(activity_tables[0]["activityTableId"])
else:
self._logger.info("No default activity table set.")
return None
@default_activity_table.setter
def default_activity_table(self, table: typing.Union[str, 'DatamodelTable']):
if isinstance(table, str):
datamodel_table = self.tables.find(table)
elif isinstance(table, DatamodelTable):
datamodel_table = table
else:
raise PyCelonisTypeError("default_activity_table must be of type string or DatamodelTable")
if datamodel_table.id not in [act["activityTableId"] for act in self._process_configuration_data]:
raise PyCelonisValueError("default_activity_table must be an activity table")
payload = {
"id": datamodel_table.id,
"dataSourceId": datamodel_table.data["dataSourceId"],
"aliasOrName": datamodel_table.alias,
"alias": datamodel_table.alias,
"dataModelId": self.id,
"name": datamodel_table.source_name,
"useDirectStorage": False,
"columns": [],
}
url = f"{self.url}/process-configurations/default-activity-table"
self.celonis.api_request(url, message=payload, method=HttpMethod.PUT)
def create_process_configuration(
self,
activity_table: typing.Union[str, 'DatamodelTable'] = None,
case_table: typing.Union[str, 'DatamodelTable'] = None,
case_column: str = None,
activity_column: str = None,
timestamp_column: str = None,
sorting_column: str = None,
) -> 'CelonisCollection[DatamodelProcessConfiguration]':
"""Creates a new Process Configuration.
!!! api "API"
- `PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations`
```json
{
"activityTableId": activity_table_id,
"caseIdColumn": case_column,
"activityColumn": activity_column,
"timestampColumn": timestamp_column,
"sortingColumn": sorting_column,
"caseTableId": case_table_id
}
```
Args:
activity_table: Name of Activity Table or DatamodelTable object.
case_table: Name of Case Table or DatamodelTable object.
case_column: Case Column Name referring to column in activity table.
activity_column: Activity Column Name referring to column in activity table.
timestamp_column: Timestamp Column Name referring to column in activity table.
sorting_column: Sorting Column Name.
Returns:
A Collection the Datamodel's Process Configurations.
"""
payload = {}
if activity_table:
if not isinstance(activity_table, DatamodelTable):
activity_table = self.tables.find(activity_table)
_columns = {case_column, activity_column, timestamp_column}
_activity_table_columns = set(col["name"] for col in activity_table.columns)
check_columns = None in _columns or not _columns.issubset(_activity_table_columns)
if check_columns:
raise PyCelonisNotFoundError("Problem finding case, activity or timestamp column.")
payload.update(
{
"activityTableId": activity_table.id,
"caseIdColumn": case_column,
"activityColumn": activity_column,
"timestampColumn": timestamp_column,
"sortingColumn": sorting_column,
}
)
if case_table:
if not isinstance(case_table, DatamodelTable):
case_table = self.tables.find(case_table)
payload.update({"caseTableId": case_table.id})
try:
self.celonis.api_request(f"{self.url}/process-configurations", payload, method=HttpMethod.PUT)
except PyCelonisHTTPError:
self.celonis.api_request(f"{self.url}/process-configuration", payload, method=HttpMethod.PUT)
return self.process_configurations
@property
def case_table_key(self) -> typing.List:
"""Get all Case Table Keys.
Returns:
Sorted List of Case Table Keys.
"""
keys = []
if len(self.process_configurations) > 0:
config = self.process_configurations[0]
tables = [config.case_table.id, config.activity_table.id] # pylint: disable=no-member
for fk in self.data["foreignKeys"]:
if [fk["sourceTableId"], fk["targetTableId"]] == tables:
for key in fk["columns"]:
keys.append(key["sourceColumnName"])
break
elif [fk["sourceTableId"], fk["targetTableId"]] == tables[::-1]:
for key in fk["columns"]:
keys.append(key["targetColumnName"])
break
return sorted(keys)
@property
def foreign_keys(self) -> "ForeignKeyList":
"""Get all Foreign Keys.
Returns:
A List of Foreign Keys.
"""
return ForeignKeyList(self.data["foreignKeys"], self.tables)
def _find_table_object(self, table_name) -> 'DatamodelTable':
if isinstance(table_name, DatamodelTable):
return table_name
else:
table = self.tables.names.get(table_name)
if table is None:
table_list = list(self.tables.filter(lambda x: x.source_name == table_name))
if table_list:
table = table_list[0]
else:
raise PyCelonisNotFoundError(f"Table with name {table_name} can't be found in Data Model.")
return table
def _get_foreign_key_connection(self, source_table, target_table) -> typing.List:
old_columns = []
source_name = source_table.alias if source_table.alias is not None else source_table.name
target_name = target_table.alias if target_table.alias is not None else target_table.name
for fk in self.foreign_keys:
if sorted((fk["source_table"], fk["target_table"])) == sorted((source_name, target_name)):
for colset in fk["columns"]:
if (colset[0], colset[1]) not in old_columns:
old_columns.append(colset)
return old_columns
def create_foreign_key(
self,
source_table: typing.Union[str, "DatamodelTable"],
target_table: typing.Union[str, "DatamodelTable"],
columns: typing.List[typing.Tuple[str, str]],
**kwargs,
) -> typing.Dict:
"""Creates a new Foreign Key between the source_table (child(1)) and the target_table (parent(N)).
Args:
source_table: Name of Source Table or DatamodelTable object.
target_table: Name of Target Table or DatamodelTable object.
columns: List of 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName' which represents
the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..]
Returns:
The newly created Foreign Key configuration as Dictionary.
Raises:
PyCelonisValueError: If tables not found or columns not found or foreign key connection
already exists between the specified tables.
PyCelonisTypeError: If table columns have different data types.
"""
# convert source_table and target_table to DatamodelTable
source_table, target_table = map(self._find_table_object, [source_table, target_table])
# assure that foreign keys are made between pairs of columns
columns = [columns] if isinstance(columns, str) else columns # type: ignore
columns = [(c, c) if isinstance(c, str) else c for c in columns]
if not all(map(lambda pair: len(pair) == 2, columns)):
raise PyCelonisValueError("A pair must have exactly two elements.")
# check if the foreign key already exists
old_fk = self._get_foreign_key_connection(source_table, target_table)
if old_fk:
raise PyCelonisValueError(
"A foreign key already exists between the two "
f"given tables {source_table}, {target_table}: \n{old_fk}"
)
source_cols = {
col["name"]: col["type"]
for col in source_table.columns
if col["name"] in list(map(operator.itemgetter(0), columns))
}
target_cols = {
col["name"]: col["type"]
for col in target_table.columns
if col["name"] in list(map(operator.itemgetter(1), columns))
}
# check if given columns present
for i, table_cols in enumerate([source_cols, target_cols]):
key_cols = set(map(operator.itemgetter(i), columns))
if not key_cols.issubset(set(table_cols)):
raise PyCelonisValueError(
f"Please check if specified columns"
f" {key_cols.difference(table_cols)} "
f"are in either source or target table."
)
# check that data type coincide
for tuple_item in columns:
if source_cols.get(tuple_item[0]) != target_cols.get(tuple_item[1]):
raise PyCelonisTypeError(
f"The specified columns {tuple_item[0]} and "
f"{tuple_item[1]} have different data types: "
f"{source_cols.get(tuple_item[0])} and "
f"{target_cols.get(tuple_item[1])}. "
"Join on columns with different data types not possible."
)
url, payload = self._payload_new_foreign_key(target_table, source_table, columns)
return self.celonis.api_request(url, payload)
@deprecated(
"Use 'celonis_api.event_collection.data_pool.Pool.create_table'. "
"After the table is created, you can add the new table from the "
"pool to your datamodel via 'add_table_from_pool'."
)
def push_table(self, df_or_path, table_name, reload_datamodel=True, **kwargs) -> 'DatamodelTable':
"""Pushes a table to a Pool, adds it the Datamodel and by default reloads the Datamodel.
!!! warning
This method is deprecated. Use [create_table][celonis_api.event_collection.data_pool.Pool.create_table].
Returns:
The newly added Datamodel Table.
"""
self.parent.push_table(df_or_path, table_name, **kwargs)
table = self.tables.names.get(table_name) or self.add_table_from_pool(table_name)
if reload_datamodel:
self.reload(tables=table_name)
return table
def reload(self, from_cache: bool = False, tables: typing.List = None, wait_for_reload: bool = True) -> typing.Dict:
"""Reloads the Datamodel only if a reload is not already running.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/reload`
```json
{ "forceComplete": from_cache }
```
Args:
from_cache: Deprecated, as it is no longer supported for data models. Will always be set to False.
tables: List of Datamodel Tables or table ids. If given a partial reload will be performed on these tables,
from_cache will be ignored.
wait_for_reload: If True waits until the Datamodel is reloaded.
Returns:
Report of the reload.
"""
if from_cache:
self._logger.warning(
"Reloading data models from cache is no longer supported. Defaulting to complete reload."
)
response = self.load_status
if response is not None and response["loadStatus"] == "RUNNING":
self._logger.warning("A Data Model Reload is already in progress. No new load is triggered.")
return response
if not tables:
reload_type = "Complete"
payload = {"forceComplete": True}
self.celonis.api_request(f"{self.url}/reload", method=HttpMethod.POST, params=payload)
else:
reload_type = "Partial"
payload = self._get_table_ids(tables) # type: ignore
url = (
f"{self.celonis.url}/integration/api/v1/data-pools/"
f"{self.pool.id}/data-models/{self.id}/load/partial-sync"
)
self.celonis.api_request(url, payload)
response = self.load_status
if wait_for_reload:
response = self._wait_for_reload(reload_type)
return response
def _get_table_ids(self, tables: typing.List) -> typing.List:
table_ids = []
tables = [tables] if not isinstance(tables, list) else tables
for tab in tables:
if isinstance(tab, DatamodelTable):
table_ids += [tab.id]
else:
dm_table = self.tables.names.get(tab, None)
if dm_table is None:
raise PyCelonisValueError(f"The table {tab} does not exist in the datamodel.")
table_ids += [dm_table.id]
return table_ids
def _wait_for_reload(self, reload_type="") -> typing.Dict:
iterations = 0
error_count = 0
while True:
try:
load_status = self.load_status
if load_status["loadStatus"] == "ERROR" and load_status["message"] == "Lost connection to load":
time.sleep(3)
elif load_status["loadStatus"] != "RUNNING":
self._logger.info("Data Model reload done")
break
error_count = 0
iterations += 1
if iterations % 5 == 0:
self._logger.info(f"{reload_type} Data Model reload running...")
time.sleep(1)
except (PyCelonisHTTPError, PyCelonisPermissionError) as e:
error_count += 1
time.sleep(3)
self._logger.exception("Failed to request load status, trying again...")
if error_count > 5:
raise e
if load_status["loadStatus"] not in ["SUCCESS", "WARNING"]:
raise PyCelonisHTTPError(f"Data Model reload did not succeed, status: {load_status}")
return load_status
@property
def load_status(self) -> typing.Dict:
"""Get the Datamodels Load Status.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/load-history/load-info-sync`
Returns:
Load status.
"""
response = self.celonis.api_request(f"{self.url}/load-history/load-info-sync")
return response["loadInfo"]["currentComputeLoad"]
def _payload_add_table_from_pool(
self, connection: typing.Optional[typing.Union['DataConnection', str]], table_name: str, alias: str = None
) -> typing.Tuple[str, typing.List]:
if connection:
from pycelonis.celonis_api.event_collection.data_pool import DataConnection
if isinstance(connection, DataConnection):
connection = connection.id
elif isinstance(connection, str) and check_uuid(connection):
raise PyCelonisTypeError(
"Argument 'connection' should either be of type "
"DataConnection or str (valid uuid of DataConnection)."
)
# Add table from pool to datamodel
url = f"{self.url.replace('/data-models/', '/data-model/')}/tables"
payload = [{"name": table_name, "alias": alias, "dataModelId": self.id, "dataSourceId": connection}]
return url, payload
def _payload_new_foreign_key(
self, target_table: 'DatamodelTable', source_table: 'DatamodelTable', foreign_key_columns: typing.List
) -> typing.Tuple[str, typing.Dict]:
columns = []
for foreign_key_tuple in foreign_key_columns:
columns.append(
{"sourceColumnName": f"{foreign_key_tuple[0]}", "targetColumnName": f"{foreign_key_tuple[1]}"}
)
if not source_table and not target_table:
raise PyCelonisValueError("Arguments 'source_table' and 'target_table' must not be None.")
url = f"{self.url}/foreign-keys"
payload = {
"dataModelId": self.id,
"sourceTableId": source_table.id,
"targetTableId": target_table.id,
"columns": columns,
}
return url, payload
def add_table_from_pool(
self,
table_name: str, # objects_base.BaseDatamodelTable
alias: str = None,
connection: typing.Union['DataConnection', str] = None,
new_foreign_key_to_table: str = None,
added_table_join_type: str = "source",
foreign_key_columns: typing.List[typing.Tuple[str, str]] = None,
reload: str = None,
) -> "DatamodelTable":
"""Adds a Table from Pool to Datamodel. If the Table names and the ey parameters are provided
it also can connect two tables directly.
Args:
table_name: Name of the table to be added in the Pool.
alias: An alias name for the new table, by default None
added_table_join_type: Add table to be pushed as 'source' table or as 'target' table regarding
the connection. One of [`source`, `target`].
new_foreign_key_to_table: Set connection to or from this table.
foreign_key_columns : List 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName'
which represents the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..]
between two tables.
connection: DataConnection object or ID, uses Global if not specified.
reload: Reload type. One of [`FORCE_COMPLETE`, `PARTIAL_ON_TABLE`],
if not specified doesn't reload.
Returns:
DatamodelTable.
"""
# check if table exists in pool
pool_table_names = [table["name"] for table in self.pool.tables if "name" in table]
if table_name not in pool_table_names:
raise PyCelonisNotFoundError(
f"table_name '{table_name}' not found in Data Pool tables: {', '.join(pool_table_names)}"
)
# check if table already added to dm
self._check_table_already_added_to_dm(alias, table_name)
url, payload = self._payload_add_table_from_pool(connection, table_name, alias)
response = self.celonis.api_request(url, payload)
# Connect tables via foreign keys
if new_foreign_key_to_table:
if added_table_join_type == 'source':
source_table = alias if alias else table_name
target_table = new_foreign_key_to_table
elif added_table_join_type == 'target':
source_table = new_foreign_key_to_table
target_table = table_name
else:
raise PyCelonisValueError(
f"Entered value '{added_table_join_type}' for "
"'added_table_join_type' is invalid. Please use 'source' or 'target'."
)
if foreign_key_columns is None:
raise PyCelonisValueError("Foreign key columns can't be none if new_foreign_key_to_table is set")
self.create_foreign_key(source_table=source_table, target_table=target_table, columns=foreign_key_columns)
if reload == "FORCE_COMPLETE":
self.reload(from_cache=False)
elif reload == "PARTIAL_ON_TABLE":
self.reload(tables=response[0].get("aliasOrName"))
return response
def _check_table_already_added_to_dm(self, alias: typing.Optional[str], table_name: str):
alias_or_name = alias if alias else table_name
for table in self.tables:
_table_name = table.data.get("aliasOrName")
if alias_or_name == _table_name:
raise PyCelonisValueError(f"Table with name {alias_or_name} already exists in the Data Model.")
@property
def workspaces(self) -> 'CelonisCollection[Workspace]':
"""Get all Process Mining Workspaces. Uses [celonis_api.celonis.Celonis.workspaces][].
Returns:
Collection of Workspaces.
"""
return self.celonis.workspaces.filter(self.id, "dataModelId")
def create_workspace(self, name) -> 'Workspace':
"""Creates a new Workspace.
!!! api "API"
- `POST: /process-mining/api/processes`
```json
{
"name": name,
"dataModelId": self.id
}
```
Args:
name : Name of the Workspace.
Returns:
The newly created Workspace object.
"""
from pycelonis.celonis_api.process_analytics.workspace import Workspace
if name in self.celonis.workspaces.names:
raise PyCelonisValueError(f"Workspace with name {name} already exists!")
payload = {"name": name, "dataModelId": self.id}
response = self.celonis.api_request(f"{self.celonis.url}/process-mining/api/processes", payload)
return Workspace(self.celonis, response)
@property
def last_change_dates(self) -> pd.DataFrame:
"""Returns the latest change date found in tables."""
tables = self.tables
df = pd.DataFrame()
for t in tables:
# Find maximum value of _CELONIS_CHANGE_DATE for table
date_df = self._get_data_frame(
pql.PQLColumn(f'MAX("{t.name}"."_CELONIS_CHANGE_DATE")', "MAX_CELONIS_CHANGE_DATE")
)
date_df.index = [t.name]
df = df.append(date_df) # type: ignore
return df
def _get_data_file(self, pql_query, file_path=None, chunksize=None, **kwargs) -> pathlib.Path:
# needed for compatibility with cpm4 and base classes
return self._compute_node.get_data_file(pql_query=pql_query, file_path=file_path, **kwargs)
def get_data_file(
self,
pql_query: pql.PQL,
file_path: typing.Union[str, pathlib.Path] = None,
verify_query: bool = False,
**kwargs,
) -> pathlib.Path:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and returns the path to the exported file.
Uses [celonis_api.event_collection.compute_node.ComputeNode.get_data_file][].
Args:
pql_query: The table query to be executed.
file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.
verify_query: Whether query should be verified before execution.
Returns:
Path to downloaded file containing the results of the query.
"""
if verify_query:
self.verify_query(pql_query)
return self._compute_node.get_data_file(pql_query=pql_query, file_path=file_path, **kwargs)
@property
def name_mapping(self) -> typing.Dict:
"""Get the Datamodels Name Mappings.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/name-mapping`
Returns:
Dictionary containing the Name Mappings.
"""
return self.celonis.api_request(f"{self.url}/name-mapping")
def download_name_mapping(self, file_path: typing.Union[str, pathlib.Path] = NAME_MAPPING_XLSX) -> pathlib.Path:
"""Downloads the Datamodels Name Mappings.
Args:
file_path: Download path to Excel File.
Returns:
Path to downloaded Excel File.
"""
return self.celonis.api_request(f"{self.url}/name-mapping/template", pathlib.Path(file_path), method="GET")
def upload_name_mapping(self, file_path: typing.Union[str, pathlib.Path]) -> typing.Dict:
"""Uploads the Datamodels Name Mappings Excel File.
Args:
file_path: Upload path to Excel File.
Returns:
Upload Result.
"""
file_path = pathlib.Path(file_path)
if not file_path.is_file():
raise PyCelonisValueError("Argument 'file_path' must refer to a file.")
additional_headers = {"x-csrf-token": self.celonis._session.cookies.get("XSRF-TOKEN")}
return self.celonis.api_request(f"{self.url}/name-mapping/file", file_path, headers=additional_headers)
def _backup_content(self, backup_path: str = ".", include_data: bool = True):
"""Creates a backup of the datamodel."""
path = pathlib.Path(backup_path) / f"Backup of Datamodel - {self.name}"
if path.exists():
shutil.rmtree(path)
path.mkdir()
config_information = {
"tables": [{"alias": t.alias, "source_name": t.source_name} for t in self.tables],
"foreign_keys": self.foreign_keys,
"process_configurations": [],
}
for pc in self.process_configurations:
config_information["process_configurations"].append(
{
"activity_table": getattr(pc.activity_table, "name", None),
"case_table": getattr(pc.case_table, "name", None),
"case_column": pc.case_column,
"activity_column": pc.activity_column,
"timestamp_column": pc.timestamp_column,
"sorting_column": pc.sorting_column,
}
)
self.download_name_mapping(path / NAME_MAPPING_XLSX)
(path / "datamodel_information.json").write_text(json.dumps(config_information, sort_keys=True, indent=2))
if include_data:
data_path = path / "data"
data_path.mkdir()
# download tables
for t in self.tables:
t._get_data_file(file_path=(data_path / f"{t.source_name}.parquet"), include_change_date=False)
return path
# TODO: refactor complex method
def _rebuild_content_from_backup(self, backup_path, overwrite_existing_tables=False): # noqa: C901
"""Rebuilds a datamodel from a backup."""
path = pathlib.Path(backup_path)
if not path.is_dir():
raise PyCelonisValueError("Argument 'backup_path' must be directory")
match = re.search("Backup of Datamodel - (.+)", str(path))
if not (match and len(match.groups()) == 1):
raise PyCelonisValueError("Name of the datamodel not found in backup_path")
self.name = match[1]
info = json.loads((path / "datamodel_information.json").read_text())
data_path = path / "data"
def push(table):
alias, source_name = table["alias"], table["source_name"]
name = alias or source_name
if name in self.tables.names and not overwrite_existing_tables:
return {}
else:
table_file = data_path / f"{source_name}.parquet"
if table_file.is_file() and (
overwrite_existing_tables
or source_name not in [t["name"] for t in self.parent.tables if not t["dataSourceId"]]
):
self._logger.info(f"Pushing table: {source_name}")
return self.parent.create_table(table_file, source_name, if_exists="drop", wait_for_finish=False)
else:
return {}
jobs = list(map(push, info["tables"]))
for j in jobs:
job_id = j.get("id")
if job_id:
iterations = 0
error_count = 0
while True:
try:
status = self.parent.check_push_status(job_id)["status"]
if status not in ["RUNNING", "QUEUED"]:
self._logger.info(f"Data push job status: {status}")
break
error_count = 0
iterations += 1
if iterations % 10 == 0:
self._logger.info(f"Data push job status: {status}...")
time.sleep(1)
except PyCelonisError as e:
error_count += 1
time.sleep(3)
self._logger.exception("Data push job failed, trying again...")
if error_count > 5:
raise e
for table in info["tables"]:
alias, source_name = table["alias"], table["source_name"]
_ = self.add_table_from_pool(source_name, alias)
for fk in info["foreign_keys"]:
self.create_foreign_key(**fk)
for pc in info["process_configurations"]:
self.create_process_configuration(**pc)
name_mapping = path / NAME_MAPPING_XLSX
if name_mapping.is_file():
self.upload_name_mapping(name_mapping)
self.reload(from_cache=False)
return self
def _get_data_frame(self, query, **kwargs):
"""Queries the datamodel and returns the results of the query in pandas DataFrame
Parameters
----------
query : PQL or PQLColumn
The query to be executed.
Returns
-------
pandas.DataFrame
The results of the query.
"""
return self._compute_node.get_data_frame(pql_query=query, **kwargs)
def get_data_frame(self, query: pql.PQL, verify_query: bool = False, **kwargs) -> pd.DataFrame:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Uses [ComputeNode.get_data_frame][celonis_api.event_collection.compute_node.ComputeNode.get_data_frame].
Args:
query: The table query to be executed.
verify_query: Whether query should be verified before execution.
Returns:
Dataframe containing the results of the query.
"""
if verify_query:
self.verify_query(query)
return self._compute_node.get_data_frame(pql_query=query, **kwargs)
def verify_query(self, query: typing.Union[pql.PQLColumn, pql.PQLFilter, pql.PQL, typing.List]):
errors = PQLDebugger.debug(query, self)
if len(errors) > 0:
raise PyCelonisValueError("\n".join(errors))
case_table_key: List
property
readonly
¶
Get all Case Table Keys.
Returns:
Type | Description |
---|---|
List |
Sorted List of Case Table Keys. |
default_activity_table: Optional[DatamodelTable]
property
writable
¶
Get/Set the default Activity Table via Process Configuration.
API
PUT: /integration/api/pools/{pool_id}/data-models/process-configurations/default-activity-table
Returns:
Type | Description |
---|---|
Optional[DatamodelTable] |
The default Activity Table. |
foreign_keys: ForeignKeyList
property
readonly
¶
Get all Foreign Keys.
Returns:
Type | Description |
---|---|
ForeignKeyList |
A List of Foreign Keys. |
last_change_dates: DataFrame
property
readonly
¶
Returns the latest change date found in tables.
load_status: Dict
property
readonly
¶
Get the Datamodels Load Status.
API
GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/load-history/load-info-sync
Returns:
Type | Description |
---|---|
Dict |
Load status. |
name_mapping: Dict
property
readonly
¶
Get the Datamodels Name Mappings.
API
GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/name-mapping
Returns:
Type | Description |
---|---|
Dict |
Dictionary containing the Name Mappings. |
process_configuration: DatamodelProcessConfiguration
property
readonly
¶
Get the Datamodels Process Configuration.
Warning
This method is deprecated. Use celonis_api.event_collection.data_model.Datamodel.process_configurations:
Returns:
Type | Description |
---|---|
DatamodelProcessConfiguration |
The first Process Configuration of the all Datamodels Process Configurations. |
process_configurations: CelonisCollection[DatamodelProcessConfiguration]
property
readonly
¶
Get all Datamodels Process Configurations.
API
GET: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations
Returns:
Type | Description |
---|---|
CelonisCollection[DatamodelProcessConfiguration] |
A Collection the Datamodel's Process Configurations. |
tables: CelonisCollection[DatamodelTable]
property
readonly
¶
Get all Datamodel Tables.
API
GET: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{table_id}
Returns:
Type | Description |
---|---|
CelonisCollection[DatamodelTable] |
Collection of Datamodel Tables. |
url: str
property
readonly
¶
API
/integration/api/pools/{pool_id}/data-models/{datamodel_id}
workspaces: CelonisCollection[Workspace]
property
readonly
¶
Get all Process Mining Workspaces. Uses celonis_api.celonis.Celonis.workspaces.
Returns:
Type | Description |
---|---|
CelonisCollection[Workspace] |
Collection of Workspaces. |
add_table_from_pool(self, table_name, alias=None, connection=None, new_foreign_key_to_table=None, added_table_join_type='source', foreign_key_columns=None, reload=None)
¶
Adds a Table from Pool to Datamodel. If the Table names and the ey parameters are provided it also can connect two tables directly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_name |
str |
Name of the table to be added in the Pool. |
required |
alias |
str |
An alias name for the new table, by default None |
None |
added_table_join_type |
str |
Add table to be pushed as 'source' table or as 'target' table regarding
the connection. One of [ |
'source' |
new_foreign_key_to_table |
str |
Set connection to or from this table. |
None |
foreign_key_columns |
List 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName' which represents the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..] between two tables. |
None |
|
connection |
Union[DataConnection, str] |
DataConnection object or ID, uses Global if not specified. |
None |
reload |
str |
Reload type. One of [ |
None |
Returns:
Type | Description |
---|---|
DatamodelTable |
DatamodelTable. |
Source code in celonis_api/event_collection/data_model.py
def add_table_from_pool(
self,
table_name: str, # objects_base.BaseDatamodelTable
alias: str = None,
connection: typing.Union['DataConnection', str] = None,
new_foreign_key_to_table: str = None,
added_table_join_type: str = "source",
foreign_key_columns: typing.List[typing.Tuple[str, str]] = None,
reload: str = None,
) -> "DatamodelTable":
"""Adds a Table from Pool to Datamodel. If the Table names and the ey parameters are provided
it also can connect two tables directly.
Args:
table_name: Name of the table to be added in the Pool.
alias: An alias name for the new table, by default None
added_table_join_type: Add table to be pushed as 'source' table or as 'target' table regarding
the connection. One of [`source`, `target`].
new_foreign_key_to_table: Set connection to or from this table.
foreign_key_columns : List 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName'
which represents the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..]
between two tables.
connection: DataConnection object or ID, uses Global if not specified.
reload: Reload type. One of [`FORCE_COMPLETE`, `PARTIAL_ON_TABLE`],
if not specified doesn't reload.
Returns:
DatamodelTable.
"""
# check if table exists in pool
pool_table_names = [table["name"] for table in self.pool.tables if "name" in table]
if table_name not in pool_table_names:
raise PyCelonisNotFoundError(
f"table_name '{table_name}' not found in Data Pool tables: {', '.join(pool_table_names)}"
)
# check if table already added to dm
self._check_table_already_added_to_dm(alias, table_name)
url, payload = self._payload_add_table_from_pool(connection, table_name, alias)
response = self.celonis.api_request(url, payload)
# Connect tables via foreign keys
if new_foreign_key_to_table:
if added_table_join_type == 'source':
source_table = alias if alias else table_name
target_table = new_foreign_key_to_table
elif added_table_join_type == 'target':
source_table = new_foreign_key_to_table
target_table = table_name
else:
raise PyCelonisValueError(
f"Entered value '{added_table_join_type}' for "
"'added_table_join_type' is invalid. Please use 'source' or 'target'."
)
if foreign_key_columns is None:
raise PyCelonisValueError("Foreign key columns can't be none if new_foreign_key_to_table is set")
self.create_foreign_key(source_table=source_table, target_table=target_table, columns=foreign_key_columns)
if reload == "FORCE_COMPLETE":
self.reload(from_cache=False)
elif reload == "PARTIAL_ON_TABLE":
self.reload(tables=response[0].get("aliasOrName"))
return response
create_foreign_key(self, source_table, target_table, columns, **kwargs)
¶
Creates a new Foreign Key between the source_table (child(1)) and the target_table (parent(N)).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_table |
Union[str, DatamodelTable] |
Name of Source Table or DatamodelTable object. |
required |
target_table |
Union[str, DatamodelTable] |
Name of Target Table or DatamodelTable object. |
required |
columns |
List[Tuple[str, str]] |
List of 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName' which represents the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..] |
required |
Returns:
Type | Description |
---|---|
Dict |
The newly created Foreign Key configuration as Dictionary. |
Exceptions:
Type | Description |
---|---|
PyCelonisValueError |
If tables not found or columns not found or foreign key connection already exists between the specified tables. |
PyCelonisTypeError |
If table columns have different data types. |
Source code in celonis_api/event_collection/data_model.py
def create_foreign_key(
self,
source_table: typing.Union[str, "DatamodelTable"],
target_table: typing.Union[str, "DatamodelTable"],
columns: typing.List[typing.Tuple[str, str]],
**kwargs,
) -> typing.Dict:
"""Creates a new Foreign Key between the source_table (child(1)) and the target_table (parent(N)).
Args:
source_table: Name of Source Table or DatamodelTable object.
target_table: Name of Target Table or DatamodelTable object.
columns: List of 2D-tuples consisting of a 'sourceColumnName' and 'targetColumnName' which represents
the foreign_key, e.g. foreign_key_columns=[('Col1', 'Col3'), ('Col2', 'Col2'), ..]
Returns:
The newly created Foreign Key configuration as Dictionary.
Raises:
PyCelonisValueError: If tables not found or columns not found or foreign key connection
already exists between the specified tables.
PyCelonisTypeError: If table columns have different data types.
"""
# convert source_table and target_table to DatamodelTable
source_table, target_table = map(self._find_table_object, [source_table, target_table])
# assure that foreign keys are made between pairs of columns
columns = [columns] if isinstance(columns, str) else columns # type: ignore
columns = [(c, c) if isinstance(c, str) else c for c in columns]
if not all(map(lambda pair: len(pair) == 2, columns)):
raise PyCelonisValueError("A pair must have exactly two elements.")
# check if the foreign key already exists
old_fk = self._get_foreign_key_connection(source_table, target_table)
if old_fk:
raise PyCelonisValueError(
"A foreign key already exists between the two "
f"given tables {source_table}, {target_table}: \n{old_fk}"
)
source_cols = {
col["name"]: col["type"]
for col in source_table.columns
if col["name"] in list(map(operator.itemgetter(0), columns))
}
target_cols = {
col["name"]: col["type"]
for col in target_table.columns
if col["name"] in list(map(operator.itemgetter(1), columns))
}
# check if given columns present
for i, table_cols in enumerate([source_cols, target_cols]):
key_cols = set(map(operator.itemgetter(i), columns))
if not key_cols.issubset(set(table_cols)):
raise PyCelonisValueError(
f"Please check if specified columns"
f" {key_cols.difference(table_cols)} "
f"are in either source or target table."
)
# check that data type coincide
for tuple_item in columns:
if source_cols.get(tuple_item[0]) != target_cols.get(tuple_item[1]):
raise PyCelonisTypeError(
f"The specified columns {tuple_item[0]} and "
f"{tuple_item[1]} have different data types: "
f"{source_cols.get(tuple_item[0])} and "
f"{target_cols.get(tuple_item[1])}. "
"Join on columns with different data types not possible."
)
url, payload = self._payload_new_foreign_key(target_table, source_table, columns)
return self.celonis.api_request(url, payload)
create_process_configuration(self, activity_table=None, case_table=None, case_column=None, activity_column=None, timestamp_column=None, sorting_column=None)
¶
Creates a new Process Configuration.
API
PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations
Parameters:
Name | Type | Description | Default |
---|---|---|---|
activity_table |
Union[str, DatamodelTable] |
Name of Activity Table or DatamodelTable object. |
None |
case_table |
Union[str, DatamodelTable] |
Name of Case Table or DatamodelTable object. |
None |
case_column |
str |
Case Column Name referring to column in activity table. |
None |
activity_column |
str |
Activity Column Name referring to column in activity table. |
None |
timestamp_column |
str |
Timestamp Column Name referring to column in activity table. |
None |
sorting_column |
str |
Sorting Column Name. |
None |
Returns:
Type | Description |
---|---|
CelonisCollection[DatamodelProcessConfiguration] |
A Collection the Datamodel's Process Configurations. |
Source code in celonis_api/event_collection/data_model.py
def create_process_configuration(
self,
activity_table: typing.Union[str, 'DatamodelTable'] = None,
case_table: typing.Union[str, 'DatamodelTable'] = None,
case_column: str = None,
activity_column: str = None,
timestamp_column: str = None,
sorting_column: str = None,
) -> 'CelonisCollection[DatamodelProcessConfiguration]':
"""Creates a new Process Configuration.
!!! api "API"
- `PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations`
```json
{
"activityTableId": activity_table_id,
"caseIdColumn": case_column,
"activityColumn": activity_column,
"timestampColumn": timestamp_column,
"sortingColumn": sorting_column,
"caseTableId": case_table_id
}
```
Args:
activity_table: Name of Activity Table or DatamodelTable object.
case_table: Name of Case Table or DatamodelTable object.
case_column: Case Column Name referring to column in activity table.
activity_column: Activity Column Name referring to column in activity table.
timestamp_column: Timestamp Column Name referring to column in activity table.
sorting_column: Sorting Column Name.
Returns:
A Collection the Datamodel's Process Configurations.
"""
payload = {}
if activity_table:
if not isinstance(activity_table, DatamodelTable):
activity_table = self.tables.find(activity_table)
_columns = {case_column, activity_column, timestamp_column}
_activity_table_columns = set(col["name"] for col in activity_table.columns)
check_columns = None in _columns or not _columns.issubset(_activity_table_columns)
if check_columns:
raise PyCelonisNotFoundError("Problem finding case, activity or timestamp column.")
payload.update(
{
"activityTableId": activity_table.id,
"caseIdColumn": case_column,
"activityColumn": activity_column,
"timestampColumn": timestamp_column,
"sortingColumn": sorting_column,
}
)
if case_table:
if not isinstance(case_table, DatamodelTable):
case_table = self.tables.find(case_table)
payload.update({"caseTableId": case_table.id})
try:
self.celonis.api_request(f"{self.url}/process-configurations", payload, method=HttpMethod.PUT)
except PyCelonisHTTPError:
self.celonis.api_request(f"{self.url}/process-configuration", payload, method=HttpMethod.PUT)
return self.process_configurations
create_workspace(self, name)
¶
Creates a new Workspace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Name of the Workspace. |
required |
Returns:
Type | Description |
---|---|
Workspace |
The newly created Workspace object. |
Source code in celonis_api/event_collection/data_model.py
def create_workspace(self, name) -> 'Workspace':
"""Creates a new Workspace.
!!! api "API"
- `POST: /process-mining/api/processes`
```json
{
"name": name,
"dataModelId": self.id
}
```
Args:
name : Name of the Workspace.
Returns:
The newly created Workspace object.
"""
from pycelonis.celonis_api.process_analytics.workspace import Workspace
if name in self.celonis.workspaces.names:
raise PyCelonisValueError(f"Workspace with name {name} already exists!")
payload = {"name": name, "dataModelId": self.id}
response = self.celonis.api_request(f"{self.celonis.url}/process-mining/api/processes", payload)
return Workspace(self.celonis, response)
download_name_mapping(self, file_path='name_mapping.xlsx')
¶
Downloads the Datamodels Name Mappings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
Union[str, pathlib.Path] |
Download path to Excel File. |
'name_mapping.xlsx' |
Returns:
Type | Description |
---|---|
Path |
Path to downloaded Excel File. |
Source code in celonis_api/event_collection/data_model.py
def download_name_mapping(self, file_path: typing.Union[str, pathlib.Path] = NAME_MAPPING_XLSX) -> pathlib.Path:
"""Downloads the Datamodels Name Mappings.
Args:
file_path: Download path to Excel File.
Returns:
Path to downloaded Excel File.
"""
return self.celonis.api_request(f"{self.url}/name-mapping/template", pathlib.Path(file_path), method="GET")
get_data_file(self, pql_query, file_path=None, verify_query=False, **kwargs)
¶
Exports the results of a PQL query as pyarrow.parquet.ParquetFile and returns the path to the exported file. Uses celonis_api.event_collection.compute_node.ComputeNode.get_data_file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pql_query |
PQL |
The table query to be executed. |
required |
file_path |
Union[str, pathlib.Path] |
The output path for the export. Defaults to |
None |
verify_query |
bool |
Whether query should be verified before execution. |
False |
Returns:
Type | Description |
---|---|
Path |
Path to downloaded file containing the results of the query. |
Source code in celonis_api/event_collection/data_model.py
def get_data_file(
self,
pql_query: pql.PQL,
file_path: typing.Union[str, pathlib.Path] = None,
verify_query: bool = False,
**kwargs,
) -> pathlib.Path:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and returns the path to the exported file.
Uses [celonis_api.event_collection.compute_node.ComputeNode.get_data_file][].
Args:
pql_query: The table query to be executed.
file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.
verify_query: Whether query should be verified before execution.
Returns:
Path to downloaded file containing the results of the query.
"""
if verify_query:
self.verify_query(pql_query)
return self._compute_node.get_data_file(pql_query=pql_query, file_path=file_path, **kwargs)
get_data_frame(self, query, verify_query=False, **kwargs)
¶
Exports the results of a PQL query as pyarrow.parquet.ParquetFile and converts it to a pandas.DataFrame. Uses ComputeNode.get_data_frame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
PQL |
The table query to be executed. |
required |
verify_query |
bool |
Whether query should be verified before execution. |
False |
Returns:
Type | Description |
---|---|
DataFrame |
Dataframe containing the results of the query. |
Source code in celonis_api/event_collection/data_model.py
def get_data_frame(self, query: pql.PQL, verify_query: bool = False, **kwargs) -> pd.DataFrame:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Uses [ComputeNode.get_data_frame][celonis_api.event_collection.compute_node.ComputeNode.get_data_frame].
Args:
query: The table query to be executed.
verify_query: Whether query should be verified before execution.
Returns:
Dataframe containing the results of the query.
"""
if verify_query:
self.verify_query(query)
return self._compute_node.get_data_frame(pql_query=query, **kwargs)
push_table(self, df_or_path, table_name, reload_datamodel=True, **kwargs)
¶
Pushes a table to a Pool, adds it the Datamodel and by default reloads the Datamodel.
Warning
This method is deprecated. Use create_table.
Returns:
Type | Description |
---|---|
DatamodelTable |
The newly added Datamodel Table. |
Source code in celonis_api/event_collection/data_model.py
@deprecated(
"Use 'celonis_api.event_collection.data_pool.Pool.create_table'. "
"After the table is created, you can add the new table from the "
"pool to your datamodel via 'add_table_from_pool'."
)
def push_table(self, df_or_path, table_name, reload_datamodel=True, **kwargs) -> 'DatamodelTable':
"""Pushes a table to a Pool, adds it the Datamodel and by default reloads the Datamodel.
!!! warning
This method is deprecated. Use [create_table][celonis_api.event_collection.data_pool.Pool.create_table].
Returns:
The newly added Datamodel Table.
"""
self.parent.push_table(df_or_path, table_name, **kwargs)
table = self.tables.names.get(table_name) or self.add_table_from_pool(table_name)
if reload_datamodel:
self.reload(tables=table_name)
return table
reload(self, from_cache=False, tables=None, wait_for_reload=True)
¶
Reloads the Datamodel only if a reload is not already running.
API
POST: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/reload
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_cache |
bool |
Deprecated, as it is no longer supported for data models. Will always be set to False. |
False |
tables |
List |
List of Datamodel Tables or table ids. If given a partial reload will be performed on these tables, from_cache will be ignored. |
None |
wait_for_reload |
bool |
If True waits until the Datamodel is reloaded. |
True |
Returns:
Type | Description |
---|---|
Dict |
Report of the reload. |
Source code in celonis_api/event_collection/data_model.py
def reload(self, from_cache: bool = False, tables: typing.List = None, wait_for_reload: bool = True) -> typing.Dict:
"""Reloads the Datamodel only if a reload is not already running.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/reload`
```json
{ "forceComplete": from_cache }
```
Args:
from_cache: Deprecated, as it is no longer supported for data models. Will always be set to False.
tables: List of Datamodel Tables or table ids. If given a partial reload will be performed on these tables,
from_cache will be ignored.
wait_for_reload: If True waits until the Datamodel is reloaded.
Returns:
Report of the reload.
"""
if from_cache:
self._logger.warning(
"Reloading data models from cache is no longer supported. Defaulting to complete reload."
)
response = self.load_status
if response is not None and response["loadStatus"] == "RUNNING":
self._logger.warning("A Data Model Reload is already in progress. No new load is triggered.")
return response
if not tables:
reload_type = "Complete"
payload = {"forceComplete": True}
self.celonis.api_request(f"{self.url}/reload", method=HttpMethod.POST, params=payload)
else:
reload_type = "Partial"
payload = self._get_table_ids(tables) # type: ignore
url = (
f"{self.celonis.url}/integration/api/v1/data-pools/"
f"{self.pool.id}/data-models/{self.id}/load/partial-sync"
)
self.celonis.api_request(url, payload)
response = self.load_status
if wait_for_reload:
response = self._wait_for_reload(reload_type)
return response
upload_name_mapping(self, file_path)
¶
Uploads the Datamodels Name Mappings Excel File.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
Union[str, pathlib.Path] |
Upload path to Excel File. |
required |
Returns:
Type | Description |
---|---|
Dict |
Upload Result. |
Source code in celonis_api/event_collection/data_model.py
def upload_name_mapping(self, file_path: typing.Union[str, pathlib.Path]) -> typing.Dict:
"""Uploads the Datamodels Name Mappings Excel File.
Args:
file_path: Upload path to Excel File.
Returns:
Upload Result.
"""
file_path = pathlib.Path(file_path)
if not file_path.is_file():
raise PyCelonisValueError("Argument 'file_path' must refer to a file.")
additional_headers = {"x-csrf-token": self.celonis._session.cookies.get("XSRF-TOKEN")}
return self.celonis.api_request(f"{self.url}/name-mapping/file", file_path, headers=additional_headers)
DatamodelProcessConfiguration (CelonisDataObject)
¶
Datamodel ProcessConfiguration object.
Source code in celonis_api/event_collection/data_model.py
class DatamodelProcessConfiguration(CelonisDataObject):
"""Datamodel ProcessConfiguration object."""
def __init__(self, parent, data):
super().__init__(parent, data)
self._name = self.activity_table.name
@property
def data(self):
return self._data
@property
def activity_table(self) -> DatamodelTable:
"""Get the Datamodels Process Configuration Activity Table.
Returns:
Activity Table.
"""
_id = self.data["activityTableId"]
return _id and self.parent.tables.find(_id)
@property
def case_table(self) -> DatamodelTable:
"""Get the Datamodels Process Configuration Case Table.
Returns:
Case Table.
"""
_id = self.data["caseTableId"]
return _id and self.parent.tables.find(_id)
@property
def case_column(self) -> str:
"""Get the Datamodels Process Configuration Case Column.
Returns:
Case Column.
"""
return self.data["caseIdColumn"]
@property
def activity_column(self) -> str:
"""Get the Datamodels Process Configuration Activity Column.
Returns:
Activity Column.
"""
return self.data["activityColumn"]
@property
def timestamp_column(self) -> str:
"""Get the Datamodels Process Configuration Timestamp Column.
Returns:
Timestamp Column.
"""
return self.data["timestampColumn"]
@property
def sorting_column(self) -> str:
"""Get the Datamodels Process Configuration Sorting Column.
Returns:
Sorting Column.
"""
return self.data["sortingColumn"]
@property
def default_configuration(self) -> bool:
"""Get whether the Datamodels Process Configuration is set to default.
Returns:
Process Configuration default status.
"""
return self.data["defaultConfiguration"]
def edit_configuration(
self,
activity_table: 'DatamodelTable' = None,
case_table: 'DatamodelTable' = None,
case_column: str = None,
activity_column: str = None,
timestamp_column: str = None,
sorting_column: str = None,
) -> 'DatamodelProcessConfiguration':
"""Edit the Datamodels Process Configuration.
!!! api "API"
- `PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations`
```json
{
"activityTableId": activity_table_id,
"caseIdColumn": case_column,
"activityColumn": activity_column,
"timestampColumn": timestamp_column,
"sortingColumn": sorting_column,
"caseTableId": case_table_id
}
```
Args:
activity_table: Name of Activity Table or DatamodelTable object.
case_table: Name of Case Table or DatamodelTable object.
case_column: Case Column Name referring to column in activity table.
activity_column: Activity Column Name referring to column in activity table.
timestamp_column: Timestamp Column Name referring to column in activity table.
sorting_column: Sorting Column Name.
Returns:
Updated Process Configuration.
"""
keys = [
"id",
"activityTableId",
"caseIdColumn",
"activityColumn",
"timestampColumn",
"sortingColumn",
"caseTableId",
]
payload = {key: self.data[key] for key in keys}
if activity_table:
_columns = {case_column, activity_column, timestamp_column}
_activity_table_columns = set(col["name"] for col in activity_table.columns)
check_columns = None in _columns or not _columns.issubset(_activity_table_columns)
if check_columns:
raise PyCelonisValueError("Problem finding case, activity or timestamp column.")
payload.update(
{
"activityTableId": activity_table.id,
"caseIdColumn": case_column,
"activityColumn": activity_column,
"timestampColumn": timestamp_column,
"sortingColumn": sorting_column,
}
)
if case_table:
payload.update({"caseTableId": case_table.id})
try:
self.parent.celonis.api_request(f"{self.parent.url}/process-configurations", payload, method=HttpMethod.PUT)
except exceptions.HTTPError:
self.parent.celonis.api_request(f"{self.parent.url}/process-configuration", payload, method=HttpMethod.PUT)
return self
def __repr__(self):
case = self.case_table and f"\nCase table: {self.case_table}"
act = self.activity_table and (
f"\nActivity table: {self.activity_table},"
+ f"\nCase column: {self.case_column},"
+ f"\nActivity column: {self.activity_column},"
+ f"\nTimestamp column: {self.timestamp_column},"
+ f"\nSorting column: {self.sorting_column}"
)
return f"<DatamodelProcessConfiguration of {self.parent}{case}{act}>"
activity_column: str
property
readonly
¶
Get the Datamodels Process Configuration Activity Column.
Returns:
Type | Description |
---|---|
str |
Activity Column. |
activity_table: DatamodelTable
property
readonly
¶
Get the Datamodels Process Configuration Activity Table.
Returns:
Type | Description |
---|---|
DatamodelTable |
Activity Table. |
case_column: str
property
readonly
¶
Get the Datamodels Process Configuration Case Column.
Returns:
Type | Description |
---|---|
str |
Case Column. |
case_table: DatamodelTable
property
readonly
¶
Get the Datamodels Process Configuration Case Table.
Returns:
Type | Description |
---|---|
DatamodelTable |
Case Table. |
data
property
readonly
¶
A reference to the data of this object.
default_configuration: bool
property
readonly
¶
Get whether the Datamodels Process Configuration is set to default.
Returns:
Type | Description |
---|---|
bool |
Process Configuration default status. |
sorting_column: str
property
readonly
¶
Get the Datamodels Process Configuration Sorting Column.
Returns:
Type | Description |
---|---|
str |
Sorting Column. |
timestamp_column: str
property
readonly
¶
Get the Datamodels Process Configuration Timestamp Column.
Returns:
Type | Description |
---|---|
str |
Timestamp Column. |
edit_configuration(self, activity_table=None, case_table=None, case_column=None, activity_column=None, timestamp_column=None, sorting_column=None)
¶
Edit the Datamodels Process Configuration.
API
PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations
Parameters:
Name | Type | Description | Default |
---|---|---|---|
activity_table |
DatamodelTable |
Name of Activity Table or DatamodelTable object. |
None |
case_table |
DatamodelTable |
Name of Case Table or DatamodelTable object. |
None |
case_column |
str |
Case Column Name referring to column in activity table. |
None |
activity_column |
str |
Activity Column Name referring to column in activity table. |
None |
timestamp_column |
str |
Timestamp Column Name referring to column in activity table. |
None |
sorting_column |
str |
Sorting Column Name. |
None |
Returns:
Type | Description |
---|---|
DatamodelProcessConfiguration |
Updated Process Configuration. |
Source code in celonis_api/event_collection/data_model.py
def edit_configuration(
self,
activity_table: 'DatamodelTable' = None,
case_table: 'DatamodelTable' = None,
case_column: str = None,
activity_column: str = None,
timestamp_column: str = None,
sorting_column: str = None,
) -> 'DatamodelProcessConfiguration':
"""Edit the Datamodels Process Configuration.
!!! api "API"
- `PUT: /integration/api/pools/{pool_id}/data-models/{datamodel_id}/process-configurations`
```json
{
"activityTableId": activity_table_id,
"caseIdColumn": case_column,
"activityColumn": activity_column,
"timestampColumn": timestamp_column,
"sortingColumn": sorting_column,
"caseTableId": case_table_id
}
```
Args:
activity_table: Name of Activity Table or DatamodelTable object.
case_table: Name of Case Table or DatamodelTable object.
case_column: Case Column Name referring to column in activity table.
activity_column: Activity Column Name referring to column in activity table.
timestamp_column: Timestamp Column Name referring to column in activity table.
sorting_column: Sorting Column Name.
Returns:
Updated Process Configuration.
"""
keys = [
"id",
"activityTableId",
"caseIdColumn",
"activityColumn",
"timestampColumn",
"sortingColumn",
"caseTableId",
]
payload = {key: self.data[key] for key in keys}
if activity_table:
_columns = {case_column, activity_column, timestamp_column}
_activity_table_columns = set(col["name"] for col in activity_table.columns)
check_columns = None in _columns or not _columns.issubset(_activity_table_columns)
if check_columns:
raise PyCelonisValueError("Problem finding case, activity or timestamp column.")
payload.update(
{
"activityTableId": activity_table.id,
"caseIdColumn": case_column,
"activityColumn": activity_column,
"timestampColumn": timestamp_column,
"sortingColumn": sorting_column,
}
)
if case_table:
payload.update({"caseTableId": case_table.id})
try:
self.parent.celonis.api_request(f"{self.parent.url}/process-configurations", payload, method=HttpMethod.PUT)
except exceptions.HTTPError:
self.parent.celonis.api_request(f"{self.parent.url}/process-configuration", payload, method=HttpMethod.PUT)
return self
DatamodelTable (CelonisApiObjectChild)
¶
Celonis DatamodelTable object to interact with Celonis Event Collection API.
Source code in celonis_api/event_collection/data_model.py
class DatamodelTable(CelonisApiObjectChild):
"""Celonis DatamodelTable object to interact with Celonis Event Collection API."""
_name_data_key = "aliasOrName"
@property
def _parent_class(self):
return Datamodel
@property
def url(self) -> str:
"""
!!! api "API"
- `/integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}`
"""
return f"{self.parent.url.replace('/data-models/', '/data-model/')}/tables/{self.id}"
@property
def columns(self) -> typing.List:
"""Get all Datamodel Table Column Definitions
(e.g.: `{'name': 'EVENTTIME_START', 'length': 26, 'type': 'DATE'}`).
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}/columns`
Returns:
List of Column definitions.
"""
return self.celonis.api_request(self.url + "/columns")
@property
def alias(self):
"""Datamodel Table Alias."""
return self.data["alias"]
@alias.setter
def alias(self, value):
try:
self.data["alias"] = value
except ValueError:
pass
@property
def source_name(self):
"""Datamodel Table Name."""
return self.data["name"]
@source_name.setter
def source_name(self, value):
try:
self.data["name"] = value
except ValueError:
pass
def delete(self):
"""Deletes the Datamodel Table.
!!! api "API"
- `DELETE: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}`
"""
self.celonis.api_request(self.url, HttpMethod.DELETE)
def get_data_frame(
self,
max_rows: typing.Optional[int] = None,
verify_query: bool = False,
include_change_date: bool = False,
**kwargs,
) -> pd.DataFrame:
"""Exports the results of the Datamodel Table Columns PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Uses [Datamodel.get_data_frame][celonis_api.event_collection.data_model.Datamodel.get_data_frame].
Args:
max_rows: Limits the number of rows (e.g. max_rows = 100) that are queried.
verify_query: Whether query should be verified before execution.
include_change_date: Whether '_CELONIS_CHANGE_DATE' column is included or not.
Returns:
Dataframe containing the results of the query.
"""
query = self._get_table_query(include_change_date=include_change_date)
if max_rows:
query.limit = max_rows
return self.parent.get_data_frame(query=query, verify_query=verify_query, **kwargs)
def _get_table_query(self, include_change_date=False):
query = pql.PQL()
cols = self.columns
for c in cols:
colname = c["name"] if isinstance(c, dict) else c.name
if include_change_date or colname != "_CELONIS_CHANGE_DATE":
query += pql.PQLColumn(f'"{self.name}"."{colname}"', colname)
return query
def _get_data_file(self, file_path=None, include_change_date=False):
query = self._get_table_query(include_change_date=include_change_date)
return self.parent._get_data_file(query, file_path)
alias
property
writable
¶
Datamodel Table Alias.
columns: List
property
readonly
¶
Get all Datamodel Table Column Definitions
(e.g.: {'name': 'EVENTTIME_START', 'length': 26, 'type': 'DATE'}
).
API
GET: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}/columns
Returns:
Type | Description |
---|---|
List |
List of Column definitions. |
source_name
property
writable
¶
Datamodel Table Name.
url: str
property
readonly
¶
API
/integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}
delete(self)
¶
Deletes the Datamodel Table.
API
DELETE: /integration/api/pools/{pool_id}/data-model/{datamodel_id}/tables/{datamodel_table_id}
get_data_frame(self, max_rows=None, verify_query=False, include_change_date=False, **kwargs)
¶
Exports the results of the Datamodel Table Columns PQL query as pyarrow.parquet.ParquetFile and converts it to a pandas.DataFrame. Uses Datamodel.get_data_frame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
max_rows |
Optional[int] |
Limits the number of rows (e.g. max_rows = 100) that are queried. |
None |
verify_query |
bool |
Whether query should be verified before execution. |
False |
include_change_date |
bool |
Whether '_CELONIS_CHANGE_DATE' column is included or not. |
False |
Returns:
Type | Description |
---|---|
DataFrame |
Dataframe containing the results of the query. |
Source code in celonis_api/event_collection/data_model.py
def get_data_frame(
self,
max_rows: typing.Optional[int] = None,
verify_query: bool = False,
include_change_date: bool = False,
**kwargs,
) -> pd.DataFrame:
"""Exports the results of the Datamodel Table Columns PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Uses [Datamodel.get_data_frame][celonis_api.event_collection.data_model.Datamodel.get_data_frame].
Args:
max_rows: Limits the number of rows (e.g. max_rows = 100) that are queried.
verify_query: Whether query should be verified before execution.
include_change_date: Whether '_CELONIS_CHANGE_DATE' column is included or not.
Returns:
Dataframe containing the results of the query.
"""
query = self._get_table_query(include_change_date=include_change_date)
if max_rows:
query.limit = max_rows
return self.parent.get_data_frame(query=query, verify_query=verify_query, **kwargs)
ForeignKey (dict)
¶
Datamodel Foreign Key object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_table |
DatamodelTable |
Source table of foreign key. |
required |
target_table |
DatamodelTable |
Target table of foreign key. |
required |
columns |
List[Tuple[str, str]] |
List of column tuples for foreign key. |
required |
Source code in celonis_api/event_collection/data_model.py
class ForeignKey(dict):
"""Datamodel Foreign Key object.
Args:
source_table: Source table of foreign key.
target_table: Target table of foreign key.
columns: List of column tuples for foreign key.
"""
def __init__(
self, source_table: DatamodelTable, target_table: DatamodelTable, columns: typing.List[typing.Tuple[str, str]]
):
self.source_table_object = source_table
self.target_table_object = target_table
self.columns = columns
super().__init__(
{
"source_table": (
self.source_table_object.alias
if self.source_table_object.alias is not None
else self.source_table_object.name
),
"target_table": (
self.target_table_object.alias
if self.target_table_object.alias is not None
else self.target_table_object.name
),
"columns": self.columns,
}
)
ForeignKeyList (list)
¶
List of ForeignKey.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
foreign_keys_raw |
Dict |
Dictionary of raw foreign keys |
required |
tables |
CelonisCollection |
Collection of DatamodelTable |
required |
Source code in celonis_api/event_collection/data_model.py
class ForeignKeyList(list):
"""List of [ForeignKey][celonis_api.event_collection.data_model.ForeignKey].
Args:
foreign_keys_raw: Dictionary of raw foreign keys
tables: Collection of [DatamodelTable][celonis_api.event_collection.data_model.DatamodelTable]
"""
def __init__(self, foreign_keys_raw: typing.Dict, tables: CelonisCollection):
foreign_keys = []
for fk in foreign_keys_raw:
source_table = tables.ids[fk["sourceTableId"]]
target_table = tables.ids[fk["targetTableId"]]
columns = [(col["sourceColumnName"], col["targetColumnName"]) for col in fk["columns"]]
foreign_keys.append(ForeignKey(source_table=source_table, target_table=target_table, columns=columns))
super().__init__(foreign_keys)
def find_keys_by_source_name(self, name: str) -> typing.List[ForeignKey]:
"""Get all foreign keys with given source name.
Args:
name: Source name to search for (source name of table, not alias).
Returns:
List of foreign keys with the same source name.
"""
return [fk for fk in self if fk.source_table_object.source_name == name]
def find_keys_by_target_name(self, name: str) -> typing.List[ForeignKey]:
"""Returns all foreign keys with given target name
Args:
name: Target name to search for (source name of table, not alias)
Returns:
List of foreign keys with the same target name.
"""
return [fk for fk in self if fk.target_table_object.source_name == name]
def find_keys_by_name(self, name: str) -> typing.List[ForeignKey]:
"""Get all foreign keys with given target name or source name.
Args:
name: Target and/or source name to search for (source name of table, not alias).
Returns:
List of foreign keys with the same target name or same source name.
"""
return self.find_keys_by_source_name(name) + self.find_keys_by_target_name(name)
find_keys_by_name(self, name)
¶
Get all foreign keys with given target name or source name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Target and/or source name to search for (source name of table, not alias). |
required |
Returns:
Type | Description |
---|---|
List[celonis_api.event_collection.data_model.ForeignKey] |
List of foreign keys with the same target name or same source name. |
Source code in celonis_api/event_collection/data_model.py
def find_keys_by_name(self, name: str) -> typing.List[ForeignKey]:
"""Get all foreign keys with given target name or source name.
Args:
name: Target and/or source name to search for (source name of table, not alias).
Returns:
List of foreign keys with the same target name or same source name.
"""
return self.find_keys_by_source_name(name) + self.find_keys_by_target_name(name)
find_keys_by_source_name(self, name)
¶
Get all foreign keys with given source name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Source name to search for (source name of table, not alias). |
required |
Returns:
Type | Description |
---|---|
List[celonis_api.event_collection.data_model.ForeignKey] |
List of foreign keys with the same source name. |
Source code in celonis_api/event_collection/data_model.py
def find_keys_by_source_name(self, name: str) -> typing.List[ForeignKey]:
"""Get all foreign keys with given source name.
Args:
name: Source name to search for (source name of table, not alias).
Returns:
List of foreign keys with the same source name.
"""
return [fk for fk in self if fk.source_table_object.source_name == name]
find_keys_by_target_name(self, name)
¶
Returns all foreign keys with given target name
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Target name to search for (source name of table, not alias) |
required |
Returns:
Type | Description |
---|---|
List[celonis_api.event_collection.data_model.ForeignKey] |
List of foreign keys with the same target name. |
Source code in celonis_api/event_collection/data_model.py
def find_keys_by_target_name(self, name: str) -> typing.List[ForeignKey]:
"""Returns all foreign keys with given target name
Args:
name: Target name to search for (source name of table, not alias)
Returns:
List of foreign keys with the same target name.
"""
return [fk for fk in self if fk.target_table_object.source_name == name]