Skip to content

table_push_manager.py

TablePushManager

Table Push Manager object to interact with Celonis Event Collection API.

Parameters:

Name Type Description Default
pool_table_name str

Name of Table.

required
if_exists str
  • error -> an error is thrown if a table of the same name already exists in the pool.
  • drop -> the existing able is dropped completely and a new table is created, by default error.
  • replace_data_only -> the column names and types of the old tables are not overwritten.
required
primary_keys List[str]

List of Table primary keys.

None
column_config List[Dict]

Can be used to specify column types and string field length.

    [
        {
            "columnName":"BUKRS",
            "fieldLength":100,
            "columnType":"STRING"
        }...
    ]
with columnType one of [INTEGER, DATE, TIME, DATETIME, FLOAT, BOOLEAN, STRING].
None
alias str

Alias of the Table.

None
foreign_keys List[Tuple[str, str]]

Foreign keys to connect pool_table_name to either source_table or target_table in Datamodel.

None
source_table str

If given, the join created will be pool_table_name: N (parent) <-> (child) 1 :source_table

None
target_table str

If given, the join created will be target_table: N (parent) <-> (child) 1 :pool_table_name

None
partial_reload_on_execution bool

If True, Datamodel will be partially reloaded after each push.

False
Source code in celonis_api/event_collection/table_push_manager.py
class TablePushManager:
    """Table Push Manager object to interact with Celonis Event Collection API.

    Args:
        pool_table_name: Name of Table.
        if_exists:
            * `error` -> an error is thrown if a table of the same name already exists in the pool.
            * `drop` -> the existing able is dropped completely and a new table is created, by default `error`.
            * `replace_data_only` -> the column names and types of the old tables are not overwritten.
        primary_keys: List of Table primary keys.
        column_config: Can be used to specify column types and string field length.
            ```json
                [
                    {
                        "columnName":"BUKRS",
                        "fieldLength":100,
                        "columnType":"STRING"
                    }...
                ]
            ```
            with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
        alias: Alias of the Table.
        foreign_keys: Foreign keys to connect `pool_table_name` to either source_table or target_table in Datamodel.
        source_table: If given, the join created will be `pool_table_name`: N (parent) <-> (child) 1 :`source_table`
        target_table: If given, the join created will be `target_table`: N (parent) <-> (child) 1 :`pool_table_name`
        partial_reload_on_execution: If True, Datamodel will be partially reloaded after each push.
    """

    PUSH_TYPES = ["upsert", "drop", "append"]

    def __init__(
        self,
        pool_table_name: str,
        if_exists: str,
        primary_keys: typing.List[str] = None,
        column_config: typing.List[typing.Dict] = None,
        alias: str = None,
        foreign_keys: typing.List[typing.Tuple[str, str]] = None,
        source_table: str = None,
        target_table: str = None,
        partial_reload_on_execution: bool = False,
    ):
        self._logger.warning("The TablePushManager is deprecated and will be removed with the next release.")

        self._verify_inputs(
            pool_table_name=pool_table_name,
            alias=alias,
            if_exists=if_exists,
            primary_keys=primary_keys,
            foreign_keys=foreign_keys,
            source_table=source_table,
            target_table=target_table,
        )
        self.pool_table_name = pool_table_name
        self.alias_or_name = alias if alias is not None else pool_table_name
        self.if_exists = if_exists
        self.primary_keys = primary_keys
        self.foreign_keys = foreign_keys
        self.partial_reload_on_execution = partial_reload_on_execution
        self.column_config = column_config
        self._define_source_target(source_table=source_table, target_table=target_table, foreign_keys=foreign_keys)

    def _define_source_target(
        self, source_table: str, target_table: str, foreign_keys: typing.List[typing.Tuple[str, str]]
    ) -> None:
        """assigns source and target tables and keys according to given arguments in __init__"""
        if source_table is not None:
            self._target_table = self.pool_table_name
            self._source_table = source_table
            self._connected_table = source_table
        elif target_table is not None:
            self._target_table = target_table
            self._source_table = self.pool_table_name
            self._connected_table = target_table
        else:
            self._target_table = None
            self._source_table = None
            self._connected_table = None
        if foreign_keys is not None:
            self.source_keys = [k[0] for k in self.foreign_keys]
            self.target_keys = [k[1] if (isinstance(k, tuple) or isinstance(k, list)) else k for k in self.foreign_keys]
            self._connected_table_keys = (
                self.source_keys if self._connected_table == self._source_table else self.target_keys
            )
            self._local_table_keys = (
                self.source_keys if self._connected_table != self._source_table else self.target_keys
            )

    def _verify_inputs(
        self,
        pool_table_name: str,
        alias: str,
        if_exists: str,
        primary_keys: list,
        foreign_keys: typing.List[typing.Tuple[str, str]],
        source_table: str,
        target_table: str,
    ) -> None:
        """checks if given arguments are valid and raises otherwise"""
        if if_exists not in self.PUSH_TYPES:
            raise PyCelonisValueError(f"Argument 'if_exists' must be one of {', '.join(self.PUSH_TYPES)}.")
        if if_exists == "upsert" and primary_keys is None:
            raise PyCelonisValueError("Argument 'primary_keys' must be set if argument \"if_exists='upsert'\".")
        if source_table == (alias if alias else pool_table_name):
            raise PyCelonisValueError("A table cannot be connected to itself please, give a different source_table.")
        if target_table == (alias if alias else pool_table_name):
            raise PyCelonisValueError("A table cannot be connected to itself please, give a different target_table.")
        if foreign_keys is not None:
            if source_table is None and target_table is None:
                raise PyCelonisValueError("Specify either 'source_table' or 'target_table' if 'foreign_keys' is set.")
        if (source_table is not None or target_table is not None) and foreign_keys is None:
            raise PyCelonisValueError("Specify 'foreign_keys' if either 'source_table' or 'target_table' is set.")
        if source_table is not None and target_table is not None:
            raise PyCelonisValueError("Specify either 'source_table' or 'target_table', not both.")

    def get_existing_table_status(self, datamodel: Datamodel) -> typing.Dict:
        """Checks the Status of an existing Table in Event Collection."

        Status Dictionary:
        ```json
        {
            "pool_table_exists": True,
            "dm_table_exists": True,
            "foreign_keys": [("KEY","KEY"),...],
            "table_cols": ["KEY","ACTIVITY_EN",..]
        }
        ```


        Args:
            datamodel: Datamodel the the table is supposed to be added to.

        Returns:
           Status Dictionary.
        """
        dm_table = self._get_datamodel_table(datamodel)
        fks = self._get_existing_foreign_key(datamodel, dm_table)
        pool_table = self._get_pool_table(datamodel)
        table_columns = self._get_datamodel_table_columns(dm_table)

        return {
            "pool_table": pool_table,
            "datamodel_table": dm_table,
            "foreign_keys": fks,
            "table_columns": table_columns,
        }

    def _get_datamodel_table(self, datamodel):

        dm_table = datamodel.tables.find(self.alias_or_name, None)
        if dm_table is not None:
            if dm_table.source_name != self.pool_table_name:
                raise PyCelonisValueError(
                    f'A table of name {self.alias_or_name} already exists with a '
                    f'different pool source table, called: {dm_table.source_name}'
                )
        return dm_table

    def _get_pool_table(self, datamodel):
        table = [tab for tab in datamodel.pool.tables if tab["name"] == self.pool_table_name]
        if not table:
            table = None
        else:
            table = table[0]
        return table

    @staticmethod
    def _get_datamodel_table_columns(dm_table):
        if dm_table is not None:
            table_cols = [col['name'] for col in dm_table.columns if col['name'] != "_CELONIS_CHANGE_DATE"]
        else:
            table_cols = []
        return table_cols

    def _get_existing_foreign_key(self, datamodel: Datamodel, dm_table: DatamodelTable) -> typing.List:
        if dm_table is None:
            existing_foreign_keys = []
        elif self._connected_table is not None:
            connect_table = datamodel.tables.find(self._connected_table)
            all_foreign_keys = datamodel.foreign_keys
            source_temp = connect_table.alias if self._connected_table == self._source_table else dm_table.alias
            target_temp = connect_table.alias if self._connected_table == self._target_table else dm_table.alias
            tables = [source_temp, target_temp]
            existing_foreign_keys = [
                fk for fk in all_foreign_keys if (fk["source_table"] in tables and fk["target_table"] in tables)
            ]
            if existing_foreign_keys and existing_foreign_keys[0]['source_table'] != source_temp:
                raise PyCelonisValueError(
                    f'The existing foreign key connection is opposite of what you want to do. '
                    f'Either change source and target tables here or in the the datamodel.tables'
                    f'Existing Foreign Key Connection: {existing_foreign_keys[0]} '
                )
            if len(existing_foreign_keys) > 1:
                existing_foreign_keys = existing_foreign_keys[0]
        else:
            existing_foreign_keys = []
        return existing_foreign_keys

    def create_table(self, df: pd.DataFrame, datamodel: Datamodel, partial_reload: bool = False) -> DatamodelTable:
        """Creates the Table by pushing it to the Pool, adding it to the Datamodel and creating the Foreign Key.

        Args:
            df: Table to be pushed.
            datamodel: Target Datamodel
            partial_reload: If True Datamodel is partially reloaded after Table is added.

        Returns:
            The newly created Datamodel Table.
        """
        existing_table = self.get_existing_table_status(datamodel=datamodel)
        if self._connected_table is not None:
            self._verify_join_columns_exist(df, datamodel)
            self._verify_join_compatibility(df, datamodel)
        datamodel.pool.create_table(df, self.pool_table_name, if_exists="drop", column_config=self.column_config)
        if existing_table["datamodel_table"] is None:
            table = datamodel.add_table_from_pool(table_name=self.pool_table_name, alias=self.alias_or_name)
        else:
            table = datamodel.tables.find(self.alias_or_name, None)
        if (not existing_table["foreign_keys"]) and self.foreign_keys:
            datamodel.create_foreign_key(
                source_table=self._source_table, target_table=self._target_table, columns=self.foreign_keys
            )
        if partial_reload:
            datamodel.reload(tables=table)
        return table

    def push_data_into_table(self, df: pd.DataFrame, datamodel: Datamodel) -> DatamodelTable:
        """
        Executes the data push manager with the given df.
        Args:
            df (pd.DataFrame):
            datamodel (DataModel):
        Returns:
            str: "DONE"
        """
        existing_table = self.get_existing_table_status(datamodel=datamodel)
        if existing_table["pool_table"] is None and existing_table["datamodel_table"] is None:
            table = self.create_table(df=df, datamodel=datamodel, partial_reload=self.partial_reload_on_execution)
        else:
            self._verify_before_push_data_into_table(existing_table, datamodel, df)
            if self.if_exists == "upsert":
                datamodel.pool.upsert_table(df, self.pool_table_name, primary_keys=self.primary_keys)
            elif self.if_exists == "append":
                datamodel.pool.append_table(df, self.pool_table_name)
            elif self.if_exists == "drop":
                datamodel.pool.create_table(df, self.pool_table_name, self.if_exists, column_config=self.column_config)

            table = datamodel.tables.find(self.alias_or_name)
            if self.partial_reload_on_execution:
                table = datamodel.reload(tables=table)
        return table

    def _verify_before_push_data_into_table(self, existing_table: typing.Dict, datamodel: Datamodel, df: pd.DataFrame):

        if existing_table["datamodel_table"] is not None and existing_table["pool_table"] is None:
            raise PyCelonisValueError(
                f"The specified table {self.alias_or_name} exists in the datamodel "
                f"but the specified pool_table_name {self.pool_table_name} does not exist in the pool."
            )
        elif existing_table["table_columns"] and (set(existing_table["table_columns"]) != set(df.columns.tolist())):
            raise PyCelonisValueError(
                "The columns of df and the existing table in the datamodel don't match: "
                f"df columns: {', '.join(df.columns.tolist())}"
                f" columns in datamodel: {', '.join(existing_table['table_columns'])}"
            )
        if self._connected_table is None or self.alias_or_name != self._source_table:
            pass
        else:
            self._verify_join_compatibility(df, datamodel)

    def _verify_join_columns_exist(self, df: pd.DataFrame, datamodel: Datamodel) -> None:
        """verifies whether the specified join columns exist"""
        self._verify_local_foreign_keys_exist(df=df)
        self._verify_remote_foreign_keys_exist(datamodel)

    def _verify_remote_foreign_keys_exist(self, datamodel: Datamodel) -> None:
        dm_table = datamodel.tables.find(self._connected_table)
        cols = [col["name"] for col in dm_table.columns]
        if any(k not in cols for k in self._connected_table_keys):
            raise PyCelonisValueError(
                f"Not all of the columns {', '.join(self._connected_table_keys)} are "
                f"present in the table to connect to {self._connected_table}"
            )

    def _verify_local_foreign_keys_exist(self, df: pd.DataFrame) -> None:
        if any(k not in df.columns for k in self._local_table_keys):
            raise PyCelonisValueError(f"Not all of the columns {', '.join(self._local_table_keys)} are present in df")

    def _verify_join_compatibility(self, df: pd.DataFrame, datamodel: Datamodel) -> None:
        """verifies whether the specified join is compatible with the tables.
        Returns
        -------
        object
        """
        contains_duplicates = False
        if self.alias_or_name == self._source_table:
            contains_duplicates = self._source_table_key_local_contains_duplicates(df_source=df, keys=self.source_keys)
        elif self._target_table:
            contains_duplicates = self._source_table_key_remote_contains_duplicates(
                table_name=self._connected_table, foreign_keys=self._connected_table_keys, datamodel=datamodel
            )
        if contains_duplicates:
            raise PyCelonisValueError(
                f"There are duplicate entries in the columns: {', '.join(self.source_keys)}"
                f" of the  source_table: {self._source_table}. "
                "A foreign key connection would cause the datamodel to crash"
            )

    @staticmethod
    def _source_table_key_remote_contains_duplicates(table_name: str, foreign_keys: list, datamodel: Datamodel) -> bool:
        """Pulls the foreign key columns of the table to connect to"""
        q = pql.PQL()
        q_string = "||".join([f'"{table_name}"."{col}"' for col in foreign_keys])
        q += pql.PQLColumn(f"""MAX(PU_COUNT(DOMAIN_TABLE({q_string}),{q_string}))""", "key_count")
        df_connect = datamodel.get_data_frame(q)
        contains_duplicates = df_connect["key_count"].max() > 1
        return contains_duplicates

    @staticmethod
    def _source_table_key_local_contains_duplicates(df_source: pd.DataFrame, keys: list) -> bool:
        """checks for duplicate entries in the source table foreign key columns"""
        n_duplicates = df_source[keys].duplicated().sum()
        contains_duplicates = n_duplicates > 0
        return contains_duplicates

create_table(self, df, datamodel, partial_reload=False)

Creates the Table by pushing it to the Pool, adding it to the Datamodel and creating the Foreign Key.

Parameters:

Name Type Description Default
df DataFrame

Table to be pushed.

required
datamodel Datamodel

Target Datamodel

required
partial_reload bool

If True Datamodel is partially reloaded after Table is added.

False

Returns:

Type Description
DatamodelTable

The newly created Datamodel Table.

Source code in celonis_api/event_collection/table_push_manager.py
def create_table(self, df: pd.DataFrame, datamodel: Datamodel, partial_reload: bool = False) -> DatamodelTable:
    """Creates the Table by pushing it to the Pool, adding it to the Datamodel and creating the Foreign Key.

    Args:
        df: Table to be pushed.
        datamodel: Target Datamodel
        partial_reload: If True Datamodel is partially reloaded after Table is added.

    Returns:
        The newly created Datamodel Table.
    """
    existing_table = self.get_existing_table_status(datamodel=datamodel)
    if self._connected_table is not None:
        self._verify_join_columns_exist(df, datamodel)
        self._verify_join_compatibility(df, datamodel)
    datamodel.pool.create_table(df, self.pool_table_name, if_exists="drop", column_config=self.column_config)
    if existing_table["datamodel_table"] is None:
        table = datamodel.add_table_from_pool(table_name=self.pool_table_name, alias=self.alias_or_name)
    else:
        table = datamodel.tables.find(self.alias_or_name, None)
    if (not existing_table["foreign_keys"]) and self.foreign_keys:
        datamodel.create_foreign_key(
            source_table=self._source_table, target_table=self._target_table, columns=self.foreign_keys
        )
    if partial_reload:
        datamodel.reload(tables=table)
    return table

get_existing_table_status(self, datamodel)

Checks the Status of an existing Table in Event Collection."

Status Dictionary:

{
    "pool_table_exists": True,
    "dm_table_exists": True,
    "foreign_keys": [("KEY","KEY"),...],
    "table_cols": ["KEY","ACTIVITY_EN",..]
}

Parameters:

Name Type Description Default
datamodel Datamodel

Datamodel the the table is supposed to be added to.

required

Returns:

Type Description
Dict

Status Dictionary.

Source code in celonis_api/event_collection/table_push_manager.py
def get_existing_table_status(self, datamodel: Datamodel) -> typing.Dict:
    """Checks the Status of an existing Table in Event Collection."

    Status Dictionary:
    ```json
    {
        "pool_table_exists": True,
        "dm_table_exists": True,
        "foreign_keys": [("KEY","KEY"),...],
        "table_cols": ["KEY","ACTIVITY_EN",..]
    }
    ```


    Args:
        datamodel: Datamodel the the table is supposed to be added to.

    Returns:
       Status Dictionary.
    """
    dm_table = self._get_datamodel_table(datamodel)
    fks = self._get_existing_foreign_key(datamodel, dm_table)
    pool_table = self._get_pool_table(datamodel)
    table_columns = self._get_datamodel_table_columns(dm_table)

    return {
        "pool_table": pool_table,
        "datamodel_table": dm_table,
        "foreign_keys": fks,
        "table_columns": table_columns,
    }

push_data_into_table(self, df, datamodel)

Executes the data push manager with the given df.

Parameters:

Name Type Description Default
df pd.DataFrame required
datamodel DataModel required

Returns:

Type Description
str

"DONE"

Source code in celonis_api/event_collection/table_push_manager.py
def push_data_into_table(self, df: pd.DataFrame, datamodel: Datamodel) -> DatamodelTable:
    """
    Executes the data push manager with the given df.
    Args:
        df (pd.DataFrame):
        datamodel (DataModel):
    Returns:
        str: "DONE"
    """
    existing_table = self.get_existing_table_status(datamodel=datamodel)
    if existing_table["pool_table"] is None and existing_table["datamodel_table"] is None:
        table = self.create_table(df=df, datamodel=datamodel, partial_reload=self.partial_reload_on_execution)
    else:
        self._verify_before_push_data_into_table(existing_table, datamodel, df)
        if self.if_exists == "upsert":
            datamodel.pool.upsert_table(df, self.pool_table_name, primary_keys=self.primary_keys)
        elif self.if_exists == "append":
            datamodel.pool.append_table(df, self.pool_table_name)
        elif self.if_exists == "drop":
            datamodel.pool.create_table(df, self.pool_table_name, self.if_exists, column_config=self.column_config)

        table = datamodel.tables.find(self.alias_or_name)
        if self.partial_reload_on_execution:
            table = datamodel.reload(tables=table)
    return table