Skip to content

data_pool.py

Pool (CelonisApiObject)

Pool object to interact to interact with Celonis Event Collection API.

url: str property readonly

API

  • /integration/api/pools/{pool_id}

url_data_push property readonly

API

  • /integration/api/v1/data-push/{pool_id}/jobs/

url_connection_creation property readonly

API

  • /integration/api/datasource/

datamodels: CelonisCollection property readonly

Get all Datamodels of the Pool.

API

  • GET: /integration/api/pools/{pool_id}/data-models

Returns:

Type Description
CelonisCollection

Collection of Pool Datamodels.

tables: List[Dict] property readonly

Get all Pool Tables.

API

  • GET: /integration/api/pools/{pool_id}/tables

Returns:

Type Description
List[Dict]

A List of dictionaries containing Pool tables.

data_connections: CelonisCollection[DataConnection] property readonly

Get all Pool Data Connections.

API

  • GET: /integration/api/pools/{pool_id}/data-sources/

Returns:

Type Description
CelonisCollection[DataConnection]

A Collection of Pool Data Connections.

data_jobs: CelonisCollection[DataJob] property readonly

Get all Pool Data Jobs.

API

  • GET: /integration/api/pools/{pool_id}/jobs

Returns:

Type Description
CelonisCollection[DataJob]

A Collection of Pool Data Jobs.

variables: CelonisCollection[PoolParameter] property readonly

Get all Pool Variables.

API

  • GET: /integration/api/pools/{pool_id}/variables

Returns:

Type Description
CelonisCollection[PoolParameter]

A Collection of Pool Variables.

find_table(self, table_name, data_source_id=None)

Find a Table in the Pool.

Parameters:

Name Type Description Default
table_name str

Name of the Pool Table.

required
data_source_id str

ID of the Data Source.

None

Returns:

Type Description
Optional[Dict]

The Pool Table, if found.

create_table(self, df_or_path, table_name, if_exists='error', column_config=None, connection=None, wait_for_finish=True, chunksize=100000)

Creates a new Table in the Pool from a pandas.DataFrame or pyarrow.parquet.ParquetFile.

Parameters:

Name Type Description Default
df_or_path Union[pandas.core.frame.DataFrame, pathlib.Path, str]
  • If DataFrame, df is chunked, written to parquet and uploaded.
  • If Path to parquet file, file is uploaded.
  • If Path to folder, any parquet file in folder is uploaded.
  • If str, value is converted to Path and handled as described above. (The index of the data frame is ignored and NOT pushed to Celonis.)
required
table_name str

Name of Table.

required
if_exists str
  • error -> an error is thrown if a table of the same name already exists in the pool.
  • drop -> the existing able is dropped completely and a new table is created, by default error.
  • replace_data_only -> the column names and types of the old tables are not overwritten.
'error'
column_config List[Dict[str, Any]]

Can be used to specify column types and string field length.

    [
        {
            "columnName":"BUKRS",
            "fieldLength":100,
            "columnType":"STRING"
        }...
    ]
with columnType one of [INTEGER, DATE, TIME, DATETIME, FLOAT, BOOLEAN, STRING].

None
connection Union[DataConnection, str]

The Data Connection to upload to, else uploads to Global.

None
wait_for_finish bool

Waits for the upload to finish processing, set to False to trigger only.

True
chunksize int

If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied.

100000

Returns:

Type Description
Dict

The Data Job Status.

Exceptions:

Type Description
PyCelonisValueError

If Table already exists and if_exists='error'.

PyCelonisTypeError

When connection is not DataConnection object or ID of Data Connection.

PyCelonisTypeError

If Path is not valid a file or folder.

append_table(self, df_or_path, table_name, column_config=None, connection=None, wait_for_finish=True, chunksize=100000)

Appends a pandas.DataFrame or pyarrow.parquet.ParquetFile to an existing Table in the Pool.

Parameters:

Name Type Description Default
df_or_path Union[pandas.core.frame.DataFrame, pathlib.Path, str]
  • If DataFrame, df is chunked, written to parquet and uploaded.
  • If Path to parquet file, file is uploaded.
  • If Path to folder, any parquet file in folder is uploaded.
  • If str, value is converted to Path and handled as described above.
required
table_name str

Name of Table.

required
column_config List[Dict[str, Any]]

Can be used to specify column types and string field length.

    [
        {
            "columnName":"BUKRS",
            "fieldLength":100,
            "columnType":"STRING"
        }...
    ]
with columnType one of [INTEGER, DATE, TIME, DATETIME, FLOAT, BOOLEAN, STRING].

None
connection Union[DataConnection, str]

The Data Connection to upload to, else uploads to Global.

None
wait_for_finish bool

Waits for the upload to finish processing, set to False to trigger only.

True
chunksize int

If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied.

100000

Returns:

Type Description
Dict

The Data Job Status.

Exceptions:

Type Description
PyCelonisValueError

If Table already exists and if_exists='error'.

PyCelonisTypeError

When connection is not DataConnection object or ID of Data Connection.

PyCelonisTypeError

If Path is not valid a file or folder.

upsert_table(self, df_or_path, table_name, primary_keys, column_config=None, connection=None, wait_for_finish=True, chunksize=100000)

Upserts the pandas.DataFrame or pyarrow.parquet.ParquetFile an existing Table in the Pool.

Parameters:

Name Type Description Default
df_or_path Union[pandas.core.frame.DataFrame, pathlib.Path, str]
  • If DataFrame, df is chunked, written to parquet and uploaded.
  • If Path to parquet file, file is uploaded.
  • If Path to folder, any parquet file in folder is uploaded.
  • If str, value is converted to Path and handled as described above.
required
table_name str

Name of Table.

required
primary_keys List[str]

List of Table primary keys.

required
column_config List[Dict[str, Any]]

Can be used to specify column types and string field length.

    [
        {
            "columnName":"BUKRS",
            "fieldLength":100,
            "columnType":"STRING"
        }...
    ]
with columnType one of [INTEGER, DATE, TIME, DATETIME, FLOAT, BOOLEAN, STRING].

None
connection Union[DataConnection, str]

The Data Connection to upload to, else uploads to Global.

None
wait_for_finish bool

Waits for the upload to finish processing, set to False to trigger only.

True
chunksize int

If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied.

100000

Returns:

Type Description
Dict

The Data Job Status.

Exceptions:

Type Description
PyCelonisValueError

If Table already exists and if_exists='error'.

PyCelonisTypeError

When connection is not DataConnection object or ID of Data Connection.

PyCelonisTypeError

If Path is not valid a file or folder.

push_table(self, df_or_path, table_name, if_exists='error', primary_keys=None, column_config=None, connection=None, wait_for_finish=True, chunksize=100000)

Pushes a pandas.DataFrame or pyarrow.parquet.ParquetFile to the specified Table in the Pool.

Warning

Deprecation: The method 'push_table' is deprecated and will be removed in the next release. Use one of: create_table, append_table, upsert_table.

Parameters:

Name Type Description Default
df_or_path Union[pandas.core.frame.DataFrame, pathlib.Path, str]
  • If DataFrame, df is chunked, written to parquet and uploaded.
  • If Path to parquet file, file is uploaded.
  • If Path to folder, any parquet file in folder is uploaded.
  • If str, value is converted to Path and handled as described above.
required
table_name str

Name of Table.

required
if_exists str
  • error -> an error is thrown if a table of the same name already exists in the pool.
  • drop -> the existing able is dropped completely and a new table is created, by default error.
  • replace_data_only -> the column names and types of the old tables are not overwritten.
'error'
primary_keys Optional[List[str]]

List of Table primary keys.

None
column_config List[Dict[str, Any]]

Can be used to specify column types and string field length.

    [
        {
            "columnName":"BUKRS",
            "fieldLength":100,
            "columnType":"STRING"
        }...
    ]
with columnType one of [INTEGER, DATE, TIME, DATETIME, FLOAT, BOOLEAN, STRING].

None
connection Union[DataConnection, str]

The Data Connection to upload to, else uploads to Global.

None
wait_for_finish bool

Waits for the upload to finish processing, set to False to trigger only.

True
chunksize int

If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied.

100000

Returns:

Type Description
Dict

The Data Job Status.

get_column_config(self, table, raise_error=False)

Get a Column Configuration of a Pool Table.

Column Config List:

[
    {'columnName': 'colA', 'columnType': 'DATETIME'},
    {'columnName': 'colB', 'columnType': 'FLOAT'},
    {'columnName': 'colC', 'columnType': 'STRING', 'fieldLength': 80}
]

Parameters:

Name Type Description Default
table Union[str, Dict]

Name of the Pool Table or dictionary with {'name': '', 'schemaName': ''}.

required
raise_error bool

Raises a celonis_api.errors.PyCelonisValueError if Table data types are None or table has 99+ columns, else only logs warning.

False

Returns:

Type Description
Optional[List[Dict[str, Any]]]

The Column Configuration of the Pool Table (Always ignoring '_CELONIS_CHANGE_DATE').

check_push_status(self, job_id='')

Checks the Status of a Data Push Job.

API

  • GET: /integration/api/v1/data-push/{pool_id}/jobs/{job_id}

Parameters:

Name Type Description Default
job_id str

The ID of the job to check. If empty returns all job status.

''

Returns:

Type Description
Dict

Status of Data Push Job(s).

check_data_job_execution_status(self)

Checks the Status of Data Job Executions.

API

  • GET: /integration/api/pools/{pool_id}/logs/status

Returns:

Type Description
List

Status of all Data Job Executions.

create_datamodel(self, name)

Creates a new Datamodel in the Pool.

Parameters:

Name Type Description Default
name str

Name of the Datamodel.

required

Returns:

Type Description
Datamodel

The newly created Datamodel object.

create_data_connection(self, client, host, password, system_number, user, name, connector_type, uplink_id=None, use_uplink=True, compression_type='GZIP', **kwargs)

Creates a new Data Connection (Currently, only SAP connection are supported).

Warning

This method is deprecated and will be removed in the next release. Use the online wizard to set up Data Connections.

Parameters:

Name Type Description Default
client str

Client.

required
host str

Host.

required
user str

Username.

required
password str

Password.

required
system_number str

System Number.

required
name str

Name of the Data Connection.

required
connector_type str

Type of the Data Connection. One of ['SAP'].

required
uplink_id str

ID of an Uplink Connection.

None
use_uplink bool

Whether to use an Uplink Connection or not.

True
compression_type str

Compression Type.

'GZIP'
**kwargs {}

Returns:

Type Description
DataConnection

The newly created Data Connection.

move(self, to)

Moves the Pool to another team.

API

  • POST: /integration/api/pools/move
    {
        "subsetOfDataModels": False,
        "dataPoolId": self.id,
        "selectedDataModels": [],
        "moveToDomain": to
    }
    

Parameters:

Name Type Description Default
to str

Name of the host domain (e.g. move for https://move.eu-1.celonis.cloud).

required

create_pool_parameter(self, pool_variable=None, name=None, placeholder=None, description=None, data_type='STRING', var_type='PUBLIC_CONSTANT', values=None)

Creates a new Variable with the specified properties in the Pool.

API

  • POST: /integration/api/pools/{pool_id}/variables/
    {
        "poolId": self.id,
        "dataType":"<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
        "name": "",
        "type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
        "description": "",
        "placeholder": "",
        "values": [
            {"value": ""},...
        ],
    }
    

Parameters:

Name Type Description Default
pool_variable Union[Dict, PoolParameter]

Pool Parameter object or dictionary (see API), if None all other arguments must be set.

None
name str

Name of the Variable (same as pool_variable["name"]).

None
placeholder str

Placeholder of the Variable.

None
description str

Description of the Variable.

None
data_type str

Data type of the Variable (see options pool_variable).

'STRING'
var_type str

Type of the Variable (see options pool_variable).

'PUBLIC_CONSTANT'
values List

List of Variable values.

None

Returns:

Type Description
PoolParameter

The newly create Pool Parameter object.

create_data_job(self, name, data_source_id=None)

Creates a new Data Job with the specified name in the Pool.

API

  • POST: /integration/api/pools/{pool_id}/jobs/
    {
        "dataPoolId": self.id,
        "dataSourceId": data_source_id,
        "name": name
    }
    

Parameters:

Name Type Description Default
name str

Name of the Data Job.

required
data_source_id str

ID of the Data Source that the new Data Job will be connected to. If not specified, the default global source will be connected to.

None

Returns:

Type Description
DataJob

The newly created Data Job object.

HybridPool (Pool)

url: str property readonly

API

  • /integration-hybrid/api/pools/{pool_id}

url_data_push property readonly

API

  • /integration-hybrid/api/v1/data-push/{pool_id}/jobs/

url_connection_creation property readonly

API

  • /integration-hybrid/api/datasource/

PoolParameter (CelonisApiObjectChild)

Pool Parameter object.

url: str property readonly

API

  • /integration/api/pools/{pool_id}/variables/{variable_id}

DataConnection (CelonisApiObjectChild)

Data Connection object.

url: str property readonly

API

  • /integration/api/pools/{pool_id}/data-sources/{data_connection_id}