table_push_manager.py
TablePushManager
¶
Table Push Manager object to interact with Celonis Event Collection API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pool_table_name |
str |
Name of Table. |
required |
if_exists |
str |
|
required |
primary_keys |
List[str] |
List of Table primary keys. |
None |
column_config |
List[Dict] |
Can be used to specify column types and string field length. withcolumnType one of [INTEGER , DATE , TIME , DATETIME , FLOAT , BOOLEAN , STRING ]. |
None |
alias |
str |
Alias of the Table. |
None |
foreign_keys |
List[Tuple[str, str]] |
Foreign keys to connect |
None |
source_table |
str |
If given, the join created will be |
None |
target_table |
str |
If given, the join created will be |
None |
partial_reload_on_execution |
bool |
If True, Datamodel will be partially reloaded after each push. |
False |
Source code in celonis_api/event_collection/table_push_manager.py
class TablePushManager:
"""Table Push Manager object to interact with Celonis Event Collection API.
Args:
pool_table_name: Name of Table.
if_exists:
* `error` -> an error is thrown if a table of the same name already exists in the pool.
* `drop` -> the existing able is dropped completely and a new table is created, by default `error`.
* `replace_data_only` -> the column names and types of the old tables are not overwritten.
primary_keys: List of Table primary keys.
column_config: Can be used to specify column types and string field length.
```json
[
{
"columnName":"BUKRS",
"fieldLength":100,
"columnType":"STRING"
}...
]
```
with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
alias: Alias of the Table.
foreign_keys: Foreign keys to connect `pool_table_name` to either source_table or target_table in Datamodel.
source_table: If given, the join created will be `pool_table_name`: N (parent) <-> (child) 1 :`source_table`
target_table: If given, the join created will be `target_table`: N (parent) <-> (child) 1 :`pool_table_name`
partial_reload_on_execution: If True, Datamodel will be partially reloaded after each push.
"""
PUSH_TYPES = ["upsert", "drop", "append"]
def __init__(
self,
pool_table_name: str,
if_exists: str,
primary_keys: typing.List[str] = None,
column_config: typing.List[typing.Dict] = None,
alias: str = None,
foreign_keys: typing.List[typing.Tuple[str, str]] = None,
source_table: str = None,
target_table: str = None,
partial_reload_on_execution: bool = False,
):
self._logger.warning("The TablePushManager is deprecated and will be removed with the next release.")
self._verify_inputs(
pool_table_name=pool_table_name,
alias=alias,
if_exists=if_exists,
primary_keys=primary_keys,
foreign_keys=foreign_keys,
source_table=source_table,
target_table=target_table,
)
self.pool_table_name = pool_table_name
self.alias_or_name = alias if alias is not None else pool_table_name
self.if_exists = if_exists
self.primary_keys = primary_keys
self.foreign_keys = foreign_keys
self.partial_reload_on_execution = partial_reload_on_execution
self.column_config = column_config
self._define_source_target(source_table=source_table, target_table=target_table, foreign_keys=foreign_keys)
def _define_source_target(
self, source_table: str, target_table: str, foreign_keys: typing.List[typing.Tuple[str, str]]
) -> None:
"""assigns source and target tables and keys according to given arguments in __init__"""
if source_table is not None:
self._target_table = self.pool_table_name
self._source_table = source_table
self._connected_table = source_table
elif target_table is not None:
self._target_table = target_table
self._source_table = self.pool_table_name
self._connected_table = target_table
else:
self._target_table = None
self._source_table = None
self._connected_table = None
if foreign_keys is not None:
self.source_keys = [k[0] for k in self.foreign_keys]
self.target_keys = [k[1] if (isinstance(k, tuple) or isinstance(k, list)) else k for k in self.foreign_keys]
self._connected_table_keys = (
self.source_keys if self._connected_table == self._source_table else self.target_keys
)
self._local_table_keys = (
self.source_keys if self._connected_table != self._source_table else self.target_keys
)
def _verify_inputs(
self,
pool_table_name: str,
alias: str,
if_exists: str,
primary_keys: list,
foreign_keys: typing.List[typing.Tuple[str, str]],
source_table: str,
target_table: str,
) -> None:
"""checks if given arguments are valid and raises otherwise"""
if if_exists not in self.PUSH_TYPES:
raise PyCelonisValueError(f"Argument 'if_exists' must be one of {', '.join(self.PUSH_TYPES)}.")
if if_exists == "upsert" and primary_keys is None:
raise PyCelonisValueError("Argument 'primary_keys' must be set if argument \"if_exists='upsert'\".")
if source_table == (alias if alias else pool_table_name):
raise PyCelonisValueError("A table cannot be connected to itself please, give a different source_table.")
if target_table == (alias if alias else pool_table_name):
raise PyCelonisValueError("A table cannot be connected to itself please, give a different target_table.")
if foreign_keys is not None:
if source_table is None and target_table is None:
raise PyCelonisValueError("Specify either 'source_table' or 'target_table' if 'foreign_keys' is set.")
if (source_table is not None or target_table is not None) and foreign_keys is None:
raise PyCelonisValueError("Specify 'foreign_keys' if either 'source_table' or 'target_table' is set.")
if source_table is not None and target_table is not None:
raise PyCelonisValueError("Specify either 'source_table' or 'target_table', not both.")
def get_existing_table_status(self, datamodel: Datamodel) -> typing.Dict:
"""Checks the Status of an existing Table in Event Collection."
Status Dictionary:
```json
{
"pool_table_exists": True,
"dm_table_exists": True,
"foreign_keys": [("KEY","KEY"),...],
"table_cols": ["KEY","ACTIVITY_EN",..]
}
```
Args:
datamodel: Datamodel the the table is supposed to be added to.
Returns:
Status Dictionary.
"""
dm_table = self._get_datamodel_table(datamodel)
fks = self._get_existing_foreign_key(datamodel, dm_table)
pool_table = self._get_pool_table(datamodel)
table_columns = self._get_datamodel_table_columns(dm_table)
return {
"pool_table": pool_table,
"datamodel_table": dm_table,
"foreign_keys": fks,
"table_columns": table_columns,
}
def _get_datamodel_table(self, datamodel):
dm_table = datamodel.tables.find(self.alias_or_name, None)
if dm_table is not None:
if dm_table.source_name != self.pool_table_name:
raise PyCelonisValueError(
f'A table of name {self.alias_or_name} already exists with a '
f'different pool source table, called: {dm_table.source_name}'
)
return dm_table
def _get_pool_table(self, datamodel):
table = [tab for tab in datamodel.pool.tables if tab["name"] == self.pool_table_name]
if not table:
table = None
else:
table = table[0]
return table
@staticmethod
def _get_datamodel_table_columns(dm_table):
if dm_table is not None:
table_cols = [col['name'] for col in dm_table.columns if col['name'] != "_CELONIS_CHANGE_DATE"]
else:
table_cols = []
return table_cols
def _get_existing_foreign_key(self, datamodel: Datamodel, dm_table: DatamodelTable) -> typing.List:
if dm_table is None:
existing_foreign_keys = []
elif self._connected_table is not None:
connect_table = datamodel.tables.find(self._connected_table)
all_foreign_keys = datamodel.foreign_keys
source_temp = connect_table.alias if self._connected_table == self._source_table else dm_table.alias
target_temp = connect_table.alias if self._connected_table == self._target_table else dm_table.alias
tables = [source_temp, target_temp]
existing_foreign_keys = [
fk for fk in all_foreign_keys if (fk["source_table"] in tables and fk["target_table"] in tables)
]
if existing_foreign_keys and existing_foreign_keys[0]['source_table'] != source_temp:
raise PyCelonisValueError(
f'The existing foreign key connection is opposite of what you want to do. '
f'Either change source and target tables here or in the the datamodel.tables'
f'Existing Foreign Key Connection: {existing_foreign_keys[0]} '
)
if len(existing_foreign_keys) > 1:
existing_foreign_keys = existing_foreign_keys[0]
else:
existing_foreign_keys = []
return existing_foreign_keys
def create_table(self, df: pd.DataFrame, datamodel: Datamodel, partial_reload: bool = False) -> DatamodelTable:
"""Creates the Table by pushing it to the Pool, adding it to the Datamodel and creating the Foreign Key.
Args:
df: Table to be pushed.
datamodel: Target Datamodel
partial_reload: If True Datamodel is partially reloaded after Table is added.
Returns:
The newly created Datamodel Table.
"""
existing_table = self.get_existing_table_status(datamodel=datamodel)
if self._connected_table is not None:
self._verify_join_columns_exist(df, datamodel)
self._verify_join_compatibility(df, datamodel)
datamodel.pool.create_table(df, self.pool_table_name, if_exists="drop", column_config=self.column_config)
if existing_table["datamodel_table"] is None:
table = datamodel.add_table_from_pool(table_name=self.pool_table_name, alias=self.alias_or_name)
else:
table = datamodel.tables.find(self.alias_or_name, None)
if (not existing_table["foreign_keys"]) and self.foreign_keys:
datamodel.create_foreign_key(
source_table=self._source_table, target_table=self._target_table, columns=self.foreign_keys
)
if partial_reload:
datamodel.reload(tables=table)
return table
def push_data_into_table(self, df: pd.DataFrame, datamodel: Datamodel) -> DatamodelTable:
"""
Executes the data push manager with the given df.
Args:
df (pd.DataFrame):
datamodel (DataModel):
Returns:
str: "DONE"
"""
existing_table = self.get_existing_table_status(datamodel=datamodel)
if existing_table["pool_table"] is None and existing_table["datamodel_table"] is None:
table = self.create_table(df=df, datamodel=datamodel, partial_reload=self.partial_reload_on_execution)
else:
self._verify_before_push_data_into_table(existing_table, datamodel, df)
if self.if_exists == "upsert":
datamodel.pool.upsert_table(df, self.pool_table_name, primary_keys=self.primary_keys)
elif self.if_exists == "append":
datamodel.pool.append_table(df, self.pool_table_name)
elif self.if_exists == "drop":
datamodel.pool.create_table(df, self.pool_table_name, self.if_exists, column_config=self.column_config)
table = datamodel.tables.find(self.alias_or_name)
if self.partial_reload_on_execution:
table = datamodel.reload(tables=table)
return table
def _verify_before_push_data_into_table(self, existing_table: typing.Dict, datamodel: Datamodel, df: pd.DataFrame):
if existing_table["datamodel_table"] is not None and existing_table["pool_table"] is None:
raise PyCelonisValueError(
f"The specified table {self.alias_or_name} exists in the datamodel "
f"but the specified pool_table_name {self.pool_table_name} does not exist in the pool."
)
elif existing_table["table_columns"] and (set(existing_table["table_columns"]) != set(df.columns.tolist())):
raise PyCelonisValueError(
"The columns of df and the existing table in the datamodel don't match: "
f"df columns: {', '.join(df.columns.tolist())}"
f" columns in datamodel: {', '.join(existing_table['table_columns'])}"
)
if self._connected_table is None or self.alias_or_name != self._source_table:
pass
else:
self._verify_join_compatibility(df, datamodel)
def _verify_join_columns_exist(self, df: pd.DataFrame, datamodel: Datamodel) -> None:
"""verifies whether the specified join columns exist"""
self._verify_local_foreign_keys_exist(df=df)
self._verify_remote_foreign_keys_exist(datamodel)
def _verify_remote_foreign_keys_exist(self, datamodel: Datamodel) -> None:
dm_table = datamodel.tables.find(self._connected_table)
cols = [col["name"] for col in dm_table.columns]
if any(k not in cols for k in self._connected_table_keys):
raise PyCelonisValueError(
f"Not all of the columns {', '.join(self._connected_table_keys)} are "
f"present in the table to connect to {self._connected_table}"
)
def _verify_local_foreign_keys_exist(self, df: pd.DataFrame) -> None:
if any(k not in df.columns for k in self._local_table_keys):
raise PyCelonisValueError(f"Not all of the columns {', '.join(self._local_table_keys)} are present in df")
def _verify_join_compatibility(self, df: pd.DataFrame, datamodel: Datamodel) -> None:
"""verifies whether the specified join is compatible with the tables.
Returns
-------
object
"""
contains_duplicates = False
if self.alias_or_name == self._source_table:
contains_duplicates = self._source_table_key_local_contains_duplicates(df_source=df, keys=self.source_keys)
elif self._target_table:
contains_duplicates = self._source_table_key_remote_contains_duplicates(
table_name=self._connected_table, foreign_keys=self._connected_table_keys, datamodel=datamodel
)
if contains_duplicates:
raise PyCelonisValueError(
f"There are duplicate entries in the columns: {', '.join(self.source_keys)}"
f" of the source_table: {self._source_table}. "
"A foreign key connection would cause the datamodel to crash"
)
@staticmethod
def _source_table_key_remote_contains_duplicates(table_name: str, foreign_keys: list, datamodel: Datamodel) -> bool:
"""Pulls the foreign key columns of the table to connect to"""
q = pql.PQL()
q_string = "||".join([f'"{table_name}"."{col}"' for col in foreign_keys])
q += pql.PQLColumn(f"""MAX(PU_COUNT(DOMAIN_TABLE({q_string}),{q_string}))""", "key_count")
df_connect = datamodel.get_data_frame(q)
contains_duplicates = df_connect["key_count"].max() > 1
return contains_duplicates
@staticmethod
def _source_table_key_local_contains_duplicates(df_source: pd.DataFrame, keys: list) -> bool:
"""checks for duplicate entries in the source table foreign key columns"""
n_duplicates = df_source[keys].duplicated().sum()
contains_duplicates = n_duplicates > 0
return contains_duplicates
create_table(self, df, datamodel, partial_reload=False)
¶
Creates the Table by pushing it to the Pool, adding it to the Datamodel and creating the Foreign Key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
Table to be pushed. |
required |
datamodel |
Datamodel |
Target Datamodel |
required |
partial_reload |
bool |
If True Datamodel is partially reloaded after Table is added. |
False |
Returns:
Type | Description |
---|---|
DatamodelTable |
The newly created Datamodel Table. |
Source code in celonis_api/event_collection/table_push_manager.py
def create_table(self, df: pd.DataFrame, datamodel: Datamodel, partial_reload: bool = False) -> DatamodelTable:
"""Creates the Table by pushing it to the Pool, adding it to the Datamodel and creating the Foreign Key.
Args:
df: Table to be pushed.
datamodel: Target Datamodel
partial_reload: If True Datamodel is partially reloaded after Table is added.
Returns:
The newly created Datamodel Table.
"""
existing_table = self.get_existing_table_status(datamodel=datamodel)
if self._connected_table is not None:
self._verify_join_columns_exist(df, datamodel)
self._verify_join_compatibility(df, datamodel)
datamodel.pool.create_table(df, self.pool_table_name, if_exists="drop", column_config=self.column_config)
if existing_table["datamodel_table"] is None:
table = datamodel.add_table_from_pool(table_name=self.pool_table_name, alias=self.alias_or_name)
else:
table = datamodel.tables.find(self.alias_or_name, None)
if (not existing_table["foreign_keys"]) and self.foreign_keys:
datamodel.create_foreign_key(
source_table=self._source_table, target_table=self._target_table, columns=self.foreign_keys
)
if partial_reload:
datamodel.reload(tables=table)
return table
get_existing_table_status(self, datamodel)
¶
Checks the Status of an existing Table in Event Collection."
Status Dictionary:
{
"pool_table_exists": True,
"dm_table_exists": True,
"foreign_keys": [("KEY","KEY"),...],
"table_cols": ["KEY","ACTIVITY_EN",..]
}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
datamodel |
Datamodel |
Datamodel the the table is supposed to be added to. |
required |
Returns:
Type | Description |
---|---|
Dict |
Status Dictionary. |
Source code in celonis_api/event_collection/table_push_manager.py
def get_existing_table_status(self, datamodel: Datamodel) -> typing.Dict:
"""Checks the Status of an existing Table in Event Collection."
Status Dictionary:
```json
{
"pool_table_exists": True,
"dm_table_exists": True,
"foreign_keys": [("KEY","KEY"),...],
"table_cols": ["KEY","ACTIVITY_EN",..]
}
```
Args:
datamodel: Datamodel the the table is supposed to be added to.
Returns:
Status Dictionary.
"""
dm_table = self._get_datamodel_table(datamodel)
fks = self._get_existing_foreign_key(datamodel, dm_table)
pool_table = self._get_pool_table(datamodel)
table_columns = self._get_datamodel_table_columns(dm_table)
return {
"pool_table": pool_table,
"datamodel_table": dm_table,
"foreign_keys": fks,
"table_columns": table_columns,
}
push_data_into_table(self, df, datamodel)
¶
Executes the data push manager with the given df.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pd.DataFrame |
required | |
datamodel |
DataModel |
required |
Returns:
Type | Description |
---|---|
str |
"DONE" |
Source code in celonis_api/event_collection/table_push_manager.py
def push_data_into_table(self, df: pd.DataFrame, datamodel: Datamodel) -> DatamodelTable:
"""
Executes the data push manager with the given df.
Args:
df (pd.DataFrame):
datamodel (DataModel):
Returns:
str: "DONE"
"""
existing_table = self.get_existing_table_status(datamodel=datamodel)
if existing_table["pool_table"] is None and existing_table["datamodel_table"] is None:
table = self.create_table(df=df, datamodel=datamodel, partial_reload=self.partial_reload_on_execution)
else:
self._verify_before_push_data_into_table(existing_table, datamodel, df)
if self.if_exists == "upsert":
datamodel.pool.upsert_table(df, self.pool_table_name, primary_keys=self.primary_keys)
elif self.if_exists == "append":
datamodel.pool.append_table(df, self.pool_table_name)
elif self.if_exists == "drop":
datamodel.pool.create_table(df, self.pool_table_name, self.if_exists, column_config=self.column_config)
table = datamodel.tables.find(self.alias_or_name)
if self.partial_reload_on_execution:
table = datamodel.reload(tables=table)
return table