Skip to content

data_pool_table

Module to interact with data pool tables.

This module contains class to interact with data pool tables in EMS data integration.

Typical usage example:

```python
tables = data_pool.get_tables()
data_pool_table = data_pool.create_table(df, "TEST_TABLE")
data_pool_table.append(df)
data_pool_table.upsert(df, keys=["PRIMARY_KEY_COLUMN"])
```

DataPoolTable

Bases: PoolTable

Data model table object to interact with data model table specific data integration endpoints.

loader_source class-attribute instance-attribute

loader_source = Field(None, alias='loaderSource')

available class-attribute instance-attribute

available = Field(None, alias='available')

data_source_name class-attribute instance-attribute

data_source_name = Field(None, alias='dataSourceName')

type_ class-attribute instance-attribute

type_ = Field(None, alias='type')

schema_name class-attribute instance-attribute

schema_name = Field(None, alias='schemaName')

client class-attribute instance-attribute

client = Field(..., exclude=True)

name instance-attribute

name

Name of data pool table.

data_pool_id instance-attribute

data_pool_id

Id of data pool where table is located.

data_source_id instance-attribute

data_source_id

Id of data connection where table is located.

columns instance-attribute

columns

Columns of data pool table.

from_transport classmethod

from_transport(client, data_pool_id, pool_table_transport)

Creates high-level data pool table object from given PoolTable.

Parameters:

  • client (Client) –

    Client to use to make API calls for given data pool table.

  • data_pool_id (str) –

    Id of data pool where table is located

  • pool_table_transport (PoolTable) –

    PoolTable object containing properties of data pool table.

Returns:

  • DataPoolTable

    A DataPoolTable object with properties from transport and given client.

sync

sync()

Syncs data pool table properties with EMS.

get_columns

get_columns()

Gets all table columns of given table.

Returns:

upsert

upsert(
    df,
    keys,
    chunk_size=100000,
    index=False,
    column_config=None,
    **kwargs
)

Upserts data frame to existing table in data pool.

Parameters:

  • df (DataFrame) –

    DataFrame to push to existing table.

  • keys (List[str]) –

    Primary keys of table.

  • chunk_size (int, default: 100000 ) –

    Number of rows to push in one chunk.

  • index (Optional[bool], default: False ) –

    Whether index is included in parquet file that is pushed. Default False. See pandas documentation.

  • column_config (Optional[dict], default: None ) –

    Configuration for the columns.

  • **kwargs (Any, default: {} ) –

    Additional parameters set for DataPushJob object.

Returns:

  • None

    The updated table object.

Raises:

  • PyCelonisTableDoesNotExistError

    Raised if table does not exist in data pool.

  • PyCelonisDataPushExecutionFailedError

    Raised when table creation fails.

Examples:

Upsert new data to table:

df = pd.DataFrame({"ID": ["aa", "bb", "cc"], "TEST_COLUMN": [1,2, 3]})

data_pool_table = data_pool.get_table("TEST_TABLE")
data_pool_table.upsert(df, keys=["ID"])

append

append(
    df,
    chunk_size=100000,
    index=False,
    column_config=None,
    **kwargs
)

Appends data frame to existing table in data pool.

Parameters:

  • df (DataFrame) –

    DataFrame to push to existing table.

  • chunk_size (int, default: 100000 ) –

    Number of rows to push in one chunk.

  • index (Optional[bool], default: False ) –

    Whether index is included in parquet file that is pushed. Default False. See pandas documentation.

  • column_config (Optional[dict], default: None ) –

    Configuration for the columns.

  • **kwargs (Any, default: {} ) –

    Additional parameters set for NewTaskInstanceTransport object.

Returns:

  • None

    The updated table object.

Raises:

  • PyCelonisTableDoesNotExistError

    Raised if table does not exist in data pool.

  • PyCelonisDataPushExecutionFailedError

    Raised when table creation fails.

Examples:

Append new data to table:

df = pd.DataFrame({"ID": ["aa", "bb", "cc"], "TEST_COLUMN": [1,2, 3]})

data_pool_table = data_pool.get_table("TEST_TABLE")
data_pool_table.append(df)