data_pool.py
DataConnection (CelonisApiObjectChild)
¶
Data Connection object.
Source code in celonis_api/event_collection/data_pool.py
url: str
property
readonly
¶
API
/integration/api/pools/{pool_id}/data-sources/{data_connection_id}
HybridPool (Pool)
¶
Source code in celonis_api/event_collection/data_pool.py
class HybridPool(Pool):
@property
def url(self) -> str:
"""
!!! api "API"
- `/integration-hybrid/api/pools/{pool_id}`
"""
return f"{self.celonis.url}/integration-hybrid/api/pools/{self.id}"
@property
def url_data_push(self):
"""
!!! api "API"
- `/integration-hybrid/api/v1/data-push/{pool_id}/jobs/`
"""
return f"{self.celonis.url}/integration-hybrid/api/v1/data-push/{self.id}/jobs/"
@property
def url_connection_creation(self):
"""
!!! api "API"
- `/integration-hybrid/api/datasource/`
"""
return f"{self.celonis.url}/integration-hybrid/api/datasource/"
Pool (CelonisApiObject)
¶
Pool object to interact to interact with Celonis Event Collection API.
Source code in celonis_api/event_collection/data_pool.py
class Pool(CelonisApiObject):
"""Pool object to interact to interact with Celonis Event Collection API."""
@property
def url(self) -> str:
"""
!!! api "API"
- `/integration/api/pools/{pool_id}`
"""
return f"{self.celonis.url}/integration/api/pools/{self.id}"
@property
def url_data_push(self):
"""
!!! api "API"
- `/integration/api/v1/data-push/{pool_id}/jobs/`
"""
return f"{self.celonis.url}/integration/api/v1/data-push/{self.id}/jobs/"
@property
def url_connection_creation(self):
"""
!!! api "API"
- `/integration/api/datasource/`
"""
return f"{self.celonis.url}/integration/api/datasource/"
@property
def datamodels(self) -> CelonisCollection:
"""Get all Datamodels of the Pool.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/data-models`
Returns:
Collection of Pool Datamodels.
"""
response = self.celonis.api_request(f"{self.url}/data-models")
return CelonisCollection([Datamodel(parent=self, id_or_data=data) for data in response])
@property
def tables(self) -> typing.List[typing.Dict]:
"""Get all Pool Tables.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/tables`
Returns:
A List of dictionaries containing Pool tables.
"""
return self.celonis.api_request(f"{self.url}/tables")
@property
def data_connections(self) -> 'CelonisCollection[DataConnection]':
"""Get all Pool Data Connections.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/data-sources/`
Returns:
A Collection of Pool Data Connections.
"""
response = self.celonis.api_request(f"{self.url}/data-sources/", params={"excludeUnconfigured": False})
return CelonisCollection(DataConnection(self, data) for data in response)
@property
def data_jobs(self) -> 'CelonisCollection[DataJob]':
"""Get all Pool Data Jobs.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/jobs`
Returns:
A Collection of Pool Data Jobs.
"""
response = self.celonis.api_request(f"{self.url}/jobs")
return CelonisCollection([DataJob(self, d) for d in response])
@property
def variables(self) -> 'CelonisCollection[PoolParameter]':
"""Get all Pool Variables.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/variables`
Returns:
A Collection of Pool Variables.
"""
response = self.celonis.api_request(self.url + "/variables/")
return CelonisCollection([PoolParameter(self, v, self.celonis) for v in response])
def find_table(self, table_name: str, data_source_id: str = None) -> typing.Optional[typing.Dict]:
"""Find a Table in the Pool.
Args:
table_name: Name of the Pool Table.
data_source_id: ID of the Data Source.
Returns:
The Pool Table, if found.
"""
table_found = None
for table in self.tables:
if table["name"].lower() == table_name.lower() and table["dataSourceId"] == data_source_id:
table_found = table
break
return table_found
def create_table(
self,
df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
table_name: str,
if_exists: str = "error", # drop, replace_data_only
column_config: typing.List[typing.Dict[str, typing.Any]] = None,
connection: typing.Union["DataConnection", str] = None,
wait_for_finish: bool = True,
chunksize: int = 100_000,
) -> typing.Dict:
"""Creates a new Table in the Pool from a
[pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html).
Args:
df_or_path:
* If DataFrame, df is chunked, written to parquet and uploaded.
* If Path to parquet file, file is uploaded.
* If Path to folder, any parquet file in folder is uploaded.
* If str, value is converted to Path and handled as described above.
(The index of the data frame is ignored and NOT pushed to Celonis.)
table_name: Name of Table.
if_exists:
* `error` -> an error is thrown if a table of the same name already exists in the pool.
* `drop` -> the existing able is dropped completely and a new table is created, by default `error`.
* `replace_data_only` -> the column names and types of the old tables are not overwritten.
column_config: Can be used to specify column types and string field length.
```json
[
{
"columnName":"BUKRS",
"fieldLength":100,
"columnType":"STRING"
}...
]
```
with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
connection: The Data Connection to upload to, else uploads to Global.
wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
that are uploaded. If set to a value <1, no chunking is applied.
Returns:
The Data Job Status.
Raises:
PyCelonisValueError: If Table already exists and `if_exists='error'`.
PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
PyCelonisTypeError: If Path is not valid a file or folder.
"""
self.celonis._tracker.track("Create table", extra={"tracking_type": "DATA_PUSH"})
if if_exists not in ["error", "drop", "replace_data_only"]:
raise PyCelonisValueError("Argument 'if_exists' must be one of ['error', 'drop', 'replace_data_only'].")
data_source_id = self._get_data_source_id(connection)
job_type = None
if if_exists == "drop":
job_type = "REPLACE"
else:
table = self.find_table(table_name, data_source_id)
if table is not None:
if if_exists == "error":
raise PyCelonisValueError(
f"Table with name {table_name} already exists in the "
"Data Pool. If you want to drop it and create a new table, set if_exists='drop'."
)
elif if_exists == "replace_data_only":
if column_config is not None:
raise PyCelonisValueError(
"When argument if_exists='replace_data_only', "
"you cannot give an additional column_config. "
"The column_config is inferred from the pool table."
)
column_config = self.get_column_config(table, raise_error=True)
# Create payload for data job
payload, table_name = self._create_payload_for_data_job(
table_name=table_name,
column_config=column_config,
data_source_id=data_source_id,
job_type=job_type,
)
# Create data push job
response = self.celonis.api_request(self.url_data_push, payload)
job_id = response["id"]
# Create parquet uploader
chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
chunk_url = f"{chunk_base_url}/upserted"
# Create df chunks and upload
if isinstance(df_or_path, pd.DataFrame):
self._upload_dataframe(
df=df_or_path,
table_name=table_name,
chunksize=chunksize,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
)
# Or upload file(s)
elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
self._upload_parquet_files(path=df_or_path, chunk_url=chunk_url, chunk_base_url=chunk_base_url)
# Start job execution
_ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
response = self.check_push_status(job_id)
# Wait for job to finish
if wait_for_finish and response.get("status", "") != "DONE":
response = self._wait_until_data_job_finish(job_id=job_id)
return response
def append_table(
self,
df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
table_name: str,
column_config: typing.List[typing.Dict[str, typing.Any]] = None,
connection: typing.Union["DataConnection", str] = None,
wait_for_finish: bool = True,
chunksize: int = 100_000,
) -> typing.Dict:
"""Appends a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
to an existing Table in the Pool.
Args:
df_or_path:
* If DataFrame, df is chunked, written to parquet and uploaded.
* If Path to parquet file, file is uploaded.
* If Path to folder, any parquet file in folder is uploaded.
* If str, value is converted to Path and handled as described above.
table_name: Name of Table.
column_config: Can be used to specify column types and string field length.
```json
[
{
"columnName":"BUKRS",
"fieldLength":100,
"columnType":"STRING"
}...
]
```
with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
connection: The Data Connection to upload to, else uploads to Global.
wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
that are uploaded. If set to a value <1, no chunking is applied.
Returns:
The Data Job Status.
Raises:
PyCelonisValueError: If Table already exists and `if_exists='error'`.
PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
PyCelonisTypeError: If Path is not valid a file or folder.
"""
self.celonis._tracker.track("Append table", extra={"tracking_type": "DATA_PUSH"})
data_source_id = self._get_data_source_id(connection)
table = self.find_table(table_name, data_source_id)
if table is None:
raise PyCelonisNotFoundError(
f"The target table \"{table_name}\" could not be found in the Data Pool. "
"Please check spelling of table name."
)
if not column_config:
column_config = self.get_column_config(table)
# Create payload for data job
payload, table_name = self._create_payload_for_data_job(
table_name=table_name, column_config=column_config, data_source_id=data_source_id, job_type="DELTA"
)
# Create data push job
response = self.celonis.api_request(self.url_data_push, payload)
job_id = response["id"]
# Create parquet uploader
chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
chunk_url = f"{chunk_base_url}/upserted"
# Create df chunks and upload
if isinstance(df_or_path, pd.DataFrame):
self._upload_dataframe(
df=df_or_path,
table_name=table_name,
chunksize=chunksize,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
check_column=None,
)
# Or upload file(s)
elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
self._upload_parquet_files(
path=df_or_path,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
check_column=None,
)
# Start job execution
_ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
response = self.check_push_status(job_id)
# Wait for job to finish
if wait_for_finish and response.get("status", "") != "DONE":
response = self._wait_until_data_job_finish(job_id=job_id)
return response
def upsert_table(
self,
df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
table_name: str,
primary_keys: typing.List[str],
column_config: typing.List[typing.Dict[str, typing.Any]] = None,
connection: typing.Union["DataConnection", str] = None,
wait_for_finish: bool = True,
chunksize: int = 100_000,
) -> typing.Dict:
"""Upserts the [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
an existing Table in the Pool.
Args:
df_or_path:
* If DataFrame, df is chunked, written to parquet and uploaded.
* If Path to parquet file, file is uploaded.
* If Path to folder, any parquet file in folder is uploaded.
* If str, value is converted to Path and handled as described above.
table_name: Name of Table.
primary_keys: List of Table primary keys.
column_config: Can be used to specify column types and string field length.
```json
[
{
"columnName":"BUKRS",
"fieldLength":100,
"columnType":"STRING"
}...
]
```
with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
connection: The Data Connection to upload to, else uploads to Global.
wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
that are uploaded. If set to a value <1, no chunking is applied.
Returns:
The Data Job Status.
Raises:
PyCelonisValueError: If Table already exists and `if_exists='error'`.
PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
PyCelonisTypeError: If Path is not valid a file or folder.
"""
self.celonis._tracker.track("Upsert table", extra={"tracking_type": "DATA_PUSH"})
data_source_id = self._get_data_source_id(connection)
table = self.find_table(table_name, data_source_id)
if table is None:
raise PyCelonisNotFoundError(
f"The target table \"{table_name}\" could not be found in the Data Pool. "
"Please check spelling of table name."
)
if not column_config:
column_config = self.get_column_config(table)
# Create payload for data job
payload, table_name = self._create_payload_for_data_job(
table_name=table_name,
column_config=column_config,
data_source_id=data_source_id,
job_type="DELTA",
keys=primary_keys,
)
# Create data push job
response = self.celonis.api_request(self.url_data_push, payload)
job_id = response["id"]
# Create parquet uploader
chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
chunk_url = f"{chunk_base_url}/upserted"
# Create df chunks and upload
if isinstance(df_or_path, pd.DataFrame):
self._upload_dataframe(
df=df_or_path,
table_name=table_name,
chunksize=chunksize,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
check_column=None,
)
# Or upload file(s)
elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
self._upload_parquet_files(
path=df_or_path,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
check_column=None,
)
# Start job execution
_ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
response = self.check_push_status(job_id)
# Wait for job to finish
if wait_for_finish and response.get("status", "") != "DONE":
response = self._wait_until_data_job_finish(job_id=job_id)
return response
@deprecated("Use 'create_table', 'append_table' or 'upsert_table'.")
def push_table(
self,
df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
table_name: str,
if_exists: str = "error",
primary_keys: typing.Optional[typing.List[str]] = None,
column_config: typing.List[typing.Dict[str, typing.Any]] = None,
connection: typing.Union["DataConnection", str] = None,
wait_for_finish: bool = True,
chunksize: int = 100_000,
) -> typing.Dict:
"""Pushes a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
to the specified Table in the Pool.
!!! warning
Deprecation: The method 'push_table' is deprecated and will be removed in the next release.
Use one of:
[create_table][celonis_api.event_collection.data_pool.Pool.create_table],
[append_table][celonis_api.event_collection.data_pool.Pool.append_table],
[upsert_table][celonis_api.event_collection.data_pool.Pool.upsert_table].
Args:
df_or_path:
* If DataFrame, df is chunked, written to parquet and uploaded.
* If Path to parquet file, file is uploaded.
* If Path to folder, any parquet file in folder is uploaded.
* If str, value is converted to Path and handled as described above.
table_name: Name of Table.
if_exists:
* `error` -> an error is thrown if a table of the same name already exists in the pool.
* `drop` -> the existing able is dropped completely and a new table is created, by default `error`.
* `replace_data_only` -> the column names and types of the old tables are not overwritten.
primary_keys: List of Table primary keys.
column_config: Can be used to specify column types and string field length.
```json
[
{
"columnName":"BUKRS",
"fieldLength":100,
"columnType":"STRING"
}...
]
```
with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
connection: The Data Connection to upload to, else uploads to Global.
wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
that are uploaded. If set to a value <1, no chunking is applied.
Returns:
The Data Job Status.
"""
if if_exists == "error":
response = self.create_table(
df_or_path=df_or_path,
table_name=table_name,
if_exists=if_exists,
column_config=column_config,
connection=connection,
wait_for_finish=wait_for_finish,
chunksize=chunksize,
)
elif if_exists in ["replace", "replace_data_only"]:
response = self.create_table(
df_or_path=df_or_path,
table_name=table_name,
if_exists="drop",
column_config=column_config,
connection=connection,
wait_for_finish=wait_for_finish,
chunksize=chunksize,
)
elif if_exists == "append":
response = self.append_table(
df_or_path=df_or_path,
table_name=table_name,
connection=connection,
wait_for_finish=wait_for_finish,
chunksize=chunksize,
)
elif if_exists == "upsert":
if primary_keys is None:
raise PyCelonisValueError("Argument 'primary_keys' must be set for upsert.")
else:
response = self.upsert_table(
df_or_path=df_or_path,
primary_keys=primary_keys,
table_name=table_name,
connection=connection,
wait_for_finish=wait_for_finish,
chunksize=chunksize,
)
else:
raise PyCelonisValueError(f"if_exists {if_exists} not supported")
return response
def _check_matching_columns(self, local_table: pa.Table, remote_table: pd.DataFrame):
"""Compares column names and column types of the local table to be pushed with the remote table.
Args:
local_table: The local table to be pushed.
remote_table: The remote table which is located in the current data pool.
Raises:
PyCelonisValueError: If the table to be pushed has column names which
are not included in the remote table (table in data pool).
"""
local_col_names = set(local_table.column_names)
remote_col_names = set(remote_table.columns.values.tolist())
different_names = local_col_names.difference(remote_col_names)
if len(different_names) != 0:
raise PyCelonisValueError(
"The table you want to push has column names which are "
"not represented in the remote table,"
f"wrong column names: {different_names}. "
"Please check spelling or change column names!"
)
def _upload_file(
self,
filepath: pathlib.Path,
chunk_url: str,
chunk_base_url: str,
df_remote_table: typing.Optional[pd.DataFrame] = None,
threaded: bool = True,
):
"""Uploads parquet file(s) to pool.
Parameters
----------
filepath : pathlib.Path
Path to parquet file.
chunk_url : str
Designated chunk url.
chunk_base_url : str
Designated chunk base url.
df_remote_table : pd.DataFrame, optional
The targe table as data frame used to check column name
in a sub-method, by default None.
threaded : bool, optional
Set threaded true, if request needs to be parallelized,
by default True.
Returns
-------
Response
The response from the file upload data job.
Raises
------
exceptions.HTTPError
Raises when upload job failed.
"""
parquet_table = pq.read_table(filepath)
if df_remote_table is not None:
self._check_matching_columns(local_table=parquet_table, remote_table=df_remote_table)
try:
parquet_table = parquet_table.drop(["_CELONIS_CHANGE_DATE"])
pq.write_table(parquet_table, filepath)
except KeyError:
pass # This happens if the column does not exist on the pq file
del parquet_table
n_chunks_before = len(self.celonis.api_request(chunk_base_url))
time_before = time.time()
try:
return self.celonis.api_request(chunk_url, filepath)
except PyCelonisError as e:
error = True
if time.time() - time_before > 110:
for _ in range(24):
self._logger.info("Checking upload status in 10 seconds")
time.sleep(10)
n_chunks_after = len(self.celonis.api_request(chunk_base_url))
if n_chunks_after > n_chunks_before:
error = False
if error:
self._logger.exception(
f"File upload problem."
f"\n Maybe the file <{filepath}> "
f"is not a supported parquet/csv file.\n"
f"You could try: \n\n\tfrom pycelonis.utils.parquet_utils"
f" import read_parquet\n\tread_parquet('{filepath}')"
f"\n\nto check whether your parquet file is valid."
)
raise e
def _upload_df_chunk(
self,
chunk,
table_name: str,
chunk_url: str,
chunk_base_url: str,
df_remote_table: typing.Optional[pd.DataFrame] = None,
):
"""Stores the cunk as a parquet file
and uploads it then via _upload_file.
Parameters
----------
chunk : [type]
[description]
table_name : str
The target table name in pool.
chunk_url : str,
Designated chunk url.
chunk_base_url : str
Designated chunk base url.
df_remote_table : pd.DataFrame, optional
The targe table as data frame used to
check column name in a sub-method, by default None.
"""
file_name = f"celonis_push_{table_name}_{time.time()}_{chunk[1]}.parquet"
tmp_file = pathlib.Path(tempfile.gettempdir()) / file_name
parquet_utils.write_parquet(chunk[0], tmp_file)
if df_remote_table is not None:
self._upload_file(
tmp_file, chunk_url=chunk_url, chunk_base_url=chunk_base_url, df_remote_table=df_remote_table
)
else:
self._upload_file(tmp_file, chunk_url=chunk_url, chunk_base_url=chunk_base_url)
os.remove(tmp_file)
def _get_data_source_id(self, connection: typing.Union[str, 'DataConnection'] = None):
if connection is None:
data_source_id = None
else:
if isinstance(connection, DataConnection):
data_source_id = connection.id
elif isinstance(connection, str) and utils.check_uuid(connection):
data_source_id = connection
else:
raise PyCelonisTypeError("Argument 'connection' should be of type DataConnection or str (valid uuid).")
return data_source_id
def _create_payload_for_data_job(
self,
table_name: str,
column_config: typing.List[typing.Dict[str, typing.Any]] = None,
data_source_id: str = None,
job_type: str = None,
keys: list = None,
):
if "." in table_name:
table_name = table_name.replace(".", "_")
self._logger.warning(f'"." replaced by "_" in {table_name}.')
payload = {"targetName": table_name, "dataPoolId": self.id}
if data_source_id:
payload["connectionId"] = data_source_id
if column_config:
column_config = copy.deepcopy(column_config)
column_config = self._convert_column_config_fieldlength(column_config)
payload["tableSchema"] = {"tableName": table_name, "columns": column_config}
if job_type:
payload["type"] = job_type
if keys:
payload["keys"] = keys
return payload, table_name
def _upload_dataframe(
self,
df: pd.DataFrame,
table_name: str,
chunk_url: str,
chunk_base_url: str,
chunksize: int,
check_column: typing.Optional[pd.DataFrame] = None,
):
"""Takes the local table (DataFrame) and splits it in chunks if
demanded. Then thoses chunks are passed to parallalized method
to store them as parquet files. Those parquet files are then pushed
to the remote data pool.
Parameters
----------
df : pd.DataFrame
The local table to be pushed.
table_name : str
The name of the local table.
chunk_url : str,
Designated chunk url.
chunk_base_url : str
Designated chunk base url.
check_column : pd.DataFrame, optional
The targe table as a data frame used to check column name
in a sub-method, by default None.
chunksize : int
Sets how large one chunk (of the table to be pushed) should be.
"""
if chunksize > 0:
chunks = [(df[pos : pos + chunksize], pos) for pos in range(0, len(df), chunksize)]
else:
chunks = [(df, 0)]
[
r
for r in utils.threaded(
chunks,
self._upload_df_chunk,
table_name=table_name,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
df_remote_table=check_column,
)
]
def _upload_parquet_files(
self,
path: typing.Union[pd.DataFrame, pathlib.Path, str],
chunk_url: str,
chunk_base_url: str,
check_column: typing.Optional[pd.DataFrame] = None,
) -> None:
"""Upload parquet files based on the path to the file or to the
directory.
Parameters
----------
path : pathlib.Path or str
Path to parquet file or path to folder,
which contains one or more parquet files.
table_name : str
The name of the local table.
chunk_url : str,
Designated chunk url.
chunk_base_url : str
Designated chunk base url.
check_column : pd.DataFrame, optional
The targe table as a data frame used to check column name
in a sub-method, by default None.
Raises
------
FileNotFoundError
Raised when path is not file and not folder.
"""
path = pathlib.Path(path) # type: ignore
if path.is_file():
self._upload_file(
path,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
df_remote_table=check_column,
threaded=False,
)
elif path.is_dir():
[
r
for r in utils.threaded(
path.glob("*.parquet"),
self._upload_file,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
df_remote_table=check_column,
)
]
else:
raise PyCelonisTypeError("Argument 'path' must be either file or directory.")
def _convert_column_config_fieldlength(self, column_config):
"""Vertica multiplies the provided column length in column config with 4 silently, so we divide the
one provided by the user such that what actually is seen in vertica is the same as what will end
up in vertica"""
for col in column_config:
if "fieldLength" in col.keys() and col['columnType'] == 'STRING':
conv_len = math.ceil(int(col["fieldLength"]) / 4)
if col["fieldLength"] % 4 != 0:
self._logger.warning(
f"The field length parameter of column config needs to be multiples of 4."
f"field length of {col['columnName']} will be rounded to {conv_len}."
)
col["fieldLength"] = conv_len
return column_config
def get_column_config(
self, table: typing.Union[str, typing.Dict], raise_error: bool = False
) -> typing.Optional[typing.List[typing.Dict[str, typing.Any]]]:
"""Get a Column Configuration of a Pool Table.
Column Config List:
```json
[
{'columnName': 'colA', 'columnType': 'DATETIME'},
{'columnName': 'colB', 'columnType': 'FLOAT'},
{'columnName': 'colC', 'columnType': 'STRING', 'fieldLength': 80}
]
```
Args:
table: Name of the Pool Table or dictionary with `{'name': '', 'schemaName': ''}`.
raise_error: Raises a [celonis_api.errors.PyCelonisValueError][] if Table data types are `None` or table
has 99+ columns, else only logs warning.
Returns:
The Column Configuration of the Pool Table (Always ignoring '_CELONIS_CHANGE_DATE').
"""
if isinstance(table, str):
table_list = [t for t in self.tables if t["name"] == table]
if not table_list:
raise PyCelonisNotFoundError(f'Table {table} not found in pool')
table = table_list[0]
try:
column_config = self._get_raw_column_config(table_name=table['name'], data_source_id=table['dataSourceId'])
except Exception as e:
if raise_error:
raise e
self._logger.warning(str(e))
return None
column_config = self._clean_column_config(column_config=column_config)
return column_config
def _get_raw_column_config(
self, table_name: str, data_source_id: str = None
) -> typing.List[typing.Dict[str, typing.Any]]:
url = f'{self.url}/columns?tableName={table_name}'
if data_source_id is not None:
url += f"&schemaName={data_source_id}" # Inconsistency in naming on integration side
return self.celonis.api_request(url)
@staticmethod
def _clean_column_config(
column_config: typing.List[typing.Dict[str, typing.Any]]
) -> typing.List[typing.Dict[str, typing.Any]]:
"""function to clean config from /columns endpoint because it is different than config needed for push api"""
clean_config = []
for column in column_config:
if column["name"] != "_CELONIS_CHANGE_DATE":
clean_column = {"columnName": column["name"], "columnType": column["type"]}
if column["type"] == "STRING" and column.get("length") is not None:
clean_column["fieldLength"] = column["length"]
clean_config += [clean_column]
return clean_config
def check_push_status(self, job_id: str = "") -> typing.Dict:
"""Checks the Status of a Data Push Job.
!!! api "API"
- `GET: /integration/api/v1/data-push/{pool_id}/jobs/{job_id}`
Args:
job_id: The ID of the job to check. If empty returns all job status.
Returns:
Status of Data Push Job(s).
"""
return self.celonis.api_request(f"{self.url_data_push}{job_id}")
def check_data_job_execution_status(self) -> typing.List:
"""Checks the Status of Data Job Executions.
!!! api "API"
- `GET: /integration/api/pools/{pool_id}/logs/status`
Returns:
Status of all Data Job Executions.
"""
return self.celonis.api_request(f"{self.url}/logs/status")
def create_datamodel(self, name: str) -> 'Datamodel':
"""Creates a new Datamodel in the Pool.
Args:
name: Name of the Datamodel.
Returns:
The newly created Datamodel object.
"""
url = f"{self.url}/data-models"
payload = {"name": name, "poolId": self.id, "configurationSkipped": True}
response = self.celonis.api_request(url, payload)
return Datamodel(self, response)
@deprecated("Use the online wizard to set up Data Connections.")
def create_data_connection(
self,
client: str,
host: str,
password: str,
system_number: str,
user: str,
name: str,
connector_type: str,
uplink_id: str = None,
use_uplink: bool = True,
compression_type: str = "GZIP",
**kwargs,
) -> 'DataConnection':
"""Creates a new Data Connection (Currently, only SAP connection are supported).
!!! warning
This method is deprecated and will be removed in the next release.
Use the online wizard to set up Data Connections.
Args:
client: Client.
host: Host.
user: Username.
password: Password.
system_number: System Number.
name: Name of the Data Connection.
connector_type: Type of the Data Connection. One of ['SAP'].
uplink_id: ID of an Uplink Connection.
use_uplink: Whether to use an Uplink Connection or not.
compression_type: Compression Type.
**kwargs:
Returns:
The newly created Data Connection.
"""
if connector_type == "sap":
url = f"{self.url_connection_creation}/sap"
payload = {
"config": {
"client": client,
"compressionType": compression_type,
"groupName": "",
"host": host,
"parallelTables": 4,
"password": password,
"r3Name": "",
"serviceName": "",
"sncPartnerName": "",
"systemNumber": system_number,
"type": connector_type,
"useLogonGroup": False,
"useSnc": False,
"user": user,
},
"name": name,
"poolId": self.id,
"targetSchemaName": "",
"type": connector_type,
"uplinkId": uplink_id,
"useUplink": use_uplink,
}
else:
raise PyCelonisError("Currently, only SAP connection are supported.")
response = self.celonis.api_request(url, payload)
return DataConnection(self, response, self.celonis)
def move(self, to: str):
"""Moves the Pool to another team.
!!! api "API"
- `POST: /integration/api/pools/move`
```json
{
"subsetOfDataModels": False,
"dataPoolId": self.id,
"selectedDataModels": [],
"moveToDomain": to
}
```
Args:
to: Name of the host domain (e.g. `move` for https://move.eu-1.celonis.cloud).
"""
url = f"{self.celonis.url}/integration/api/pools/move"
payload = {
"subsetOfDataModels": False,
"dataPoolId": self.id,
"selectedDataModels": [k for k, v in self.datamodels.ids.items()],
"moveToDomain": to,
}
self.celonis.api_request(url, payload)
def create_pool_parameter(
self,
pool_variable: typing.Union[typing.Dict, 'PoolParameter'] = None,
name: str = None,
placeholder: str = None,
description: str = None,
data_type: str = "STRING",
var_type: str = "PUBLIC_CONSTANT",
values: typing.List = None,
) -> 'PoolParameter':
"""Creates a new Variable with the specified properties in the Pool.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/variables/`
```json
{
"poolId": self.id,
"dataType":"<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
"name": "",
"type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
"description": "",
"placeholder": "",
"values": [
{"value": ""},...
],
}
```
Args:
pool_variable: Pool Parameter object or dictionary (see API), if `None` all other arguments must be set.
name: Name of the Variable (same as `pool_variable["name"]`).
placeholder: Placeholder of the Variable.
description: Description of the Variable.
data_type: Data type of the Variable (see options `pool_variable`).
var_type: Type of the Variable (see options `pool_variable`).
values: List of Variable values.
Returns:
The newly create Pool Parameter object.
"""
payload = None
if isinstance(pool_variable, PoolParameter):
payload = {
"dataType": pool_variable.data["dataType"],
"name": pool_variable.data["name"],
"type": pool_variable.data["type"],
"description": pool_variable.data["description"],
"placeholder": pool_variable.data["placeholder"],
"poolId": self.id,
"values": pool_variable.data["values"],
}
elif isinstance(pool_variable, dict):
payload = pool_variable
payload["poolId"] = self.id
elif pool_variable is None:
if name is None:
raise PyCelonisValueError("name can't be None if no pool_variable is given")
payload = {
"dataType": data_type,
"name": name,
"type": var_type,
"description": description,
"placeholder": placeholder if placeholder is not None else name.upper(),
"poolId": self.id,
"values": values if values is not None else [{"value": "Undefined"}],
}
created_parameter = self.celonis.api_request(f"{self.url}/variables/", payload)
return PoolParameter(self, created_parameter, self.celonis)
def create_data_job(self, name: str, data_source_id: str = None) -> 'DataJob':
"""Creates a new Data Job with the specified name in the Pool.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/`
```json
{
"dataPoolId": self.id,
"dataSourceId": data_source_id,
"name": name
}
```
Args:
name: Name of the Data Job.
data_source_id: ID of the Data Source that the new Data Job will be connected to.
If not specified, the default global source will be connected to.
Returns:
The newly created Data Job object.
"""
payload = {"dataPoolId": self.id, "dataSourceId": data_source_id, "name": name}
response = self.celonis.api_request(f"{self.url}/jobs", payload)
return DataJob(self, response, self.celonis)
def _drop_remote_table(self, table_name: str, data_source_id: str = None):
rand_number = random.randint(1000, 9999)
data_job, transformation = None, None
try:
data_job = self.create_data_job(f"PyCelonis_Drop_Table_{rand_number}", data_source_id=data_source_id)
transformation = data_job.create_transformation(f"drop_{rand_number}")
transformation.statement = f"DROP TABLE IF EXISTS {table_name};"
transformation.execute_from_workbench()
except PyCelonisError as e:
if transformation:
transformation.delete()
if data_job:
data_job.delete()
raise e
table = self.find_table(table_name, data_source_id)
if table is not None:
raise PyCelonisNotFoundError(
f"FAILED: Dropping table - '{table_name}' failed. Remote table still exists in data pool."
)
self._logger.info(f"SUCCESS: Table - '{table_name}' from data pool was successful dropped.")
def _pull_remote_table_from_pool(
self,
table_name: str,
data_source_id: str = None,
row_limit: int = 1,
sort_by_column: typing.Optional[str] = None,
) -> pd.DataFrame:
rand_number = random.randint(1000, 9999)
data_job, transformation = None, None
try:
data_job = self.create_data_job(f"PyCelonis_Extract_Table_{rand_number}", data_source_id=data_source_id)
transformation = data_job.create_transformation(f"extract_{rand_number}")
df_remote_table = transformation.get_data_frame(f"SELECT * FROM {table_name} LIMIT {row_limit};")
except PyCelonisError as e:
if transformation:
transformation.delete()
if data_job:
data_job.delete()
raise e
if "_CELONIS_CHANGE_DATE" in df_remote_table:
df_remote_table.drop(columns=["_CELONIS_CHANGE_DATE"], inplace=True)
if sort_by_column:
df_remote_table.sort_values(by=[sort_by_column], inplace=True, ignore_index=True)
return df_remote_table
def _wait_until_data_job_finish(self, job_id) -> typing.Dict:
self._logger.info("Data push job started...")
iterations = 0
error_count = 0
while True:
try:
response = self.check_push_status(job_id)
status = response.get("status", "")
if status not in ["RUNNING", "QUEUED", "NEW"]:
self._logger.info(f"Data push job status: {status}")
break
error_count = 0
iterations += 1
if iterations % 5 == 0:
self._logger.info(f"Data push job status: {status}...")
time.sleep(1)
except PyCelonisHTTPError as e:
error_count += 1
self._logger.exception("Failed to request status, trying again...")
time.sleep(3)
if error_count > 5:
raise e
if status == "ERROR":
error_logs = response.get("logs", [])
error_logs = "\n".join(str(log) for log in error_logs)
raise PyCelonisHTTPError(f"Data push job failed. Error logs:\n{error_logs}")
return response
data_connections: CelonisCollection[DataConnection]
property
readonly
¶
Get all Pool Data Connections.
API
GET: /integration/api/pools/{pool_id}/data-sources/
Returns:
Type | Description |
---|---|
CelonisCollection[DataConnection] |
A Collection of Pool Data Connections. |
data_jobs: CelonisCollection[DataJob]
property
readonly
¶
Get all Pool Data Jobs.
API
GET: /integration/api/pools/{pool_id}/jobs
Returns:
Type | Description |
---|---|
CelonisCollection[DataJob] |
A Collection of Pool Data Jobs. |
datamodels: CelonisCollection
property
readonly
¶
Get all Datamodels of the Pool.
API
GET: /integration/api/pools/{pool_id}/data-models
Returns:
Type | Description |
---|---|
CelonisCollection |
Collection of Pool Datamodels. |
tables: List[Dict]
property
readonly
¶
Get all Pool Tables.
API
GET: /integration/api/pools/{pool_id}/tables
Returns:
Type | Description |
---|---|
List[Dict] |
A List of dictionaries containing Pool tables. |
url: str
property
readonly
¶
API
/integration/api/pools/{pool_id}
url_connection_creation
property
readonly
¶
API
/integration/api/datasource/
url_data_push
property
readonly
¶
API
/integration/api/v1/data-push/{pool_id}/jobs/
variables: CelonisCollection[PoolParameter]
property
readonly
¶
Get all Pool Variables.
API
GET: /integration/api/pools/{pool_id}/variables
Returns:
Type | Description |
---|---|
CelonisCollection[PoolParameter] |
A Collection of Pool Variables. |
append_table(self, df_or_path, table_name, column_config=None, connection=None, wait_for_finish=True, chunksize=100000)
¶
Appends a pandas.DataFrame or pyarrow.parquet.ParquetFile to an existing Table in the Pool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df_or_path |
Union[pandas.core.frame.DataFrame, pathlib.Path, str] |
|
required |
table_name |
str |
Name of Table. |
required |
column_config |
List[Dict[str, Any]] |
Can be used to specify column types and string field length. withcolumnType one of [INTEGER , DATE , TIME , DATETIME , FLOAT , BOOLEAN , STRING ]. |
None |
connection |
Union[DataConnection, str] |
The Data Connection to upload to, else uploads to Global. |
None |
wait_for_finish |
bool |
Waits for the upload to finish processing, set to False to trigger only. |
True |
chunksize |
int |
If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied. |
100000 |
Returns:
Type | Description |
---|---|
Dict |
The Data Job Status. |
Exceptions:
Type | Description |
---|---|
PyCelonisValueError |
If Table already exists and |
PyCelonisTypeError |
When connection is not DataConnection object or ID of Data Connection. |
PyCelonisTypeError |
If Path is not valid a file or folder. |
Source code in celonis_api/event_collection/data_pool.py
def append_table(
self,
df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
table_name: str,
column_config: typing.List[typing.Dict[str, typing.Any]] = None,
connection: typing.Union["DataConnection", str] = None,
wait_for_finish: bool = True,
chunksize: int = 100_000,
) -> typing.Dict:
"""Appends a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
to an existing Table in the Pool.
Args:
df_or_path:
* If DataFrame, df is chunked, written to parquet and uploaded.
* If Path to parquet file, file is uploaded.
* If Path to folder, any parquet file in folder is uploaded.
* If str, value is converted to Path and handled as described above.
table_name: Name of Table.
column_config: Can be used to specify column types and string field length.
```json
[
{
"columnName":"BUKRS",
"fieldLength":100,
"columnType":"STRING"
}...
]
```
with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
connection: The Data Connection to upload to, else uploads to Global.
wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
that are uploaded. If set to a value <1, no chunking is applied.
Returns:
The Data Job Status.
Raises:
PyCelonisValueError: If Table already exists and `if_exists='error'`.
PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
PyCelonisTypeError: If Path is not valid a file or folder.
"""
self.celonis._tracker.track("Append table", extra={"tracking_type": "DATA_PUSH"})
data_source_id = self._get_data_source_id(connection)
table = self.find_table(table_name, data_source_id)
if table is None:
raise PyCelonisNotFoundError(
f"The target table \"{table_name}\" could not be found in the Data Pool. "
"Please check spelling of table name."
)
if not column_config:
column_config = self.get_column_config(table)
# Create payload for data job
payload, table_name = self._create_payload_for_data_job(
table_name=table_name, column_config=column_config, data_source_id=data_source_id, job_type="DELTA"
)
# Create data push job
response = self.celonis.api_request(self.url_data_push, payload)
job_id = response["id"]
# Create parquet uploader
chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
chunk_url = f"{chunk_base_url}/upserted"
# Create df chunks and upload
if isinstance(df_or_path, pd.DataFrame):
self._upload_dataframe(
df=df_or_path,
table_name=table_name,
chunksize=chunksize,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
check_column=None,
)
# Or upload file(s)
elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
self._upload_parquet_files(
path=df_or_path,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
check_column=None,
)
# Start job execution
_ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
response = self.check_push_status(job_id)
# Wait for job to finish
if wait_for_finish and response.get("status", "") != "DONE":
response = self._wait_until_data_job_finish(job_id=job_id)
return response
check_data_job_execution_status(self)
¶
Checks the Status of Data Job Executions.
API
GET: /integration/api/pools/{pool_id}/logs/status
Returns:
Type | Description |
---|---|
List |
Status of all Data Job Executions. |
Source code in celonis_api/event_collection/data_pool.py
check_push_status(self, job_id='')
¶
Checks the Status of a Data Push Job.
API
GET: /integration/api/v1/data-push/{pool_id}/jobs/{job_id}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id |
str |
The ID of the job to check. If empty returns all job status. |
'' |
Returns:
Type | Description |
---|---|
Dict |
Status of Data Push Job(s). |
Source code in celonis_api/event_collection/data_pool.py
def check_push_status(self, job_id: str = "") -> typing.Dict:
"""Checks the Status of a Data Push Job.
!!! api "API"
- `GET: /integration/api/v1/data-push/{pool_id}/jobs/{job_id}`
Args:
job_id: The ID of the job to check. If empty returns all job status.
Returns:
Status of Data Push Job(s).
"""
return self.celonis.api_request(f"{self.url_data_push}{job_id}")
create_data_connection(self, client, host, password, system_number, user, name, connector_type, uplink_id=None, use_uplink=True, compression_type='GZIP', **kwargs)
¶
Creates a new Data Connection (Currently, only SAP connection are supported).
Warning
This method is deprecated and will be removed in the next release. Use the online wizard to set up Data Connections.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client |
str |
Client. |
required |
host |
str |
Host. |
required |
user |
str |
Username. |
required |
password |
str |
Password. |
required |
system_number |
str |
System Number. |
required |
name |
str |
Name of the Data Connection. |
required |
connector_type |
str |
Type of the Data Connection. One of ['SAP']. |
required |
uplink_id |
str |
ID of an Uplink Connection. |
None |
use_uplink |
bool |
Whether to use an Uplink Connection or not. |
True |
compression_type |
str |
Compression Type. |
'GZIP' |
**kwargs |
{} |
Returns:
Type | Description |
---|---|
DataConnection |
The newly created Data Connection. |
Source code in celonis_api/event_collection/data_pool.py
@deprecated("Use the online wizard to set up Data Connections.")
def create_data_connection(
self,
client: str,
host: str,
password: str,
system_number: str,
user: str,
name: str,
connector_type: str,
uplink_id: str = None,
use_uplink: bool = True,
compression_type: str = "GZIP",
**kwargs,
) -> 'DataConnection':
"""Creates a new Data Connection (Currently, only SAP connection are supported).
!!! warning
This method is deprecated and will be removed in the next release.
Use the online wizard to set up Data Connections.
Args:
client: Client.
host: Host.
user: Username.
password: Password.
system_number: System Number.
name: Name of the Data Connection.
connector_type: Type of the Data Connection. One of ['SAP'].
uplink_id: ID of an Uplink Connection.
use_uplink: Whether to use an Uplink Connection or not.
compression_type: Compression Type.
**kwargs:
Returns:
The newly created Data Connection.
"""
if connector_type == "sap":
url = f"{self.url_connection_creation}/sap"
payload = {
"config": {
"client": client,
"compressionType": compression_type,
"groupName": "",
"host": host,
"parallelTables": 4,
"password": password,
"r3Name": "",
"serviceName": "",
"sncPartnerName": "",
"systemNumber": system_number,
"type": connector_type,
"useLogonGroup": False,
"useSnc": False,
"user": user,
},
"name": name,
"poolId": self.id,
"targetSchemaName": "",
"type": connector_type,
"uplinkId": uplink_id,
"useUplink": use_uplink,
}
else:
raise PyCelonisError("Currently, only SAP connection are supported.")
response = self.celonis.api_request(url, payload)
return DataConnection(self, response, self.celonis)
create_data_job(self, name, data_source_id=None)
¶
Creates a new Data Job with the specified name in the Pool.
API
POST: /integration/api/pools/{pool_id}/jobs/
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the Data Job. |
required |
data_source_id |
str |
ID of the Data Source that the new Data Job will be connected to. If not specified, the default global source will be connected to. |
None |
Returns:
Type | Description |
---|---|
DataJob |
The newly created Data Job object. |
Source code in celonis_api/event_collection/data_pool.py
def create_data_job(self, name: str, data_source_id: str = None) -> 'DataJob':
"""Creates a new Data Job with the specified name in the Pool.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/jobs/`
```json
{
"dataPoolId": self.id,
"dataSourceId": data_source_id,
"name": name
}
```
Args:
name: Name of the Data Job.
data_source_id: ID of the Data Source that the new Data Job will be connected to.
If not specified, the default global source will be connected to.
Returns:
The newly created Data Job object.
"""
payload = {"dataPoolId": self.id, "dataSourceId": data_source_id, "name": name}
response = self.celonis.api_request(f"{self.url}/jobs", payload)
return DataJob(self, response, self.celonis)
create_datamodel(self, name)
¶
Creates a new Datamodel in the Pool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the Datamodel. |
required |
Returns:
Type | Description |
---|---|
Datamodel |
The newly created Datamodel object. |
Source code in celonis_api/event_collection/data_pool.py
def create_datamodel(self, name: str) -> 'Datamodel':
"""Creates a new Datamodel in the Pool.
Args:
name: Name of the Datamodel.
Returns:
The newly created Datamodel object.
"""
url = f"{self.url}/data-models"
payload = {"name": name, "poolId": self.id, "configurationSkipped": True}
response = self.celonis.api_request(url, payload)
return Datamodel(self, response)
create_pool_parameter(self, pool_variable=None, name=None, placeholder=None, description=None, data_type='STRING', var_type='PUBLIC_CONSTANT', values=None)
¶
Creates a new Variable with the specified properties in the Pool.
API
POST: /integration/api/pools/{pool_id}/variables/
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pool_variable |
Union[Dict, PoolParameter] |
Pool Parameter object or dictionary (see API), if |
None |
name |
str |
Name of the Variable (same as |
None |
placeholder |
str |
Placeholder of the Variable. |
None |
description |
str |
Description of the Variable. |
None |
data_type |
str |
Data type of the Variable (see options |
'STRING' |
var_type |
str |
Type of the Variable (see options |
'PUBLIC_CONSTANT' |
values |
List |
List of Variable values. |
None |
Returns:
Type | Description |
---|---|
PoolParameter |
The newly create Pool Parameter object. |
Source code in celonis_api/event_collection/data_pool.py
def create_pool_parameter(
self,
pool_variable: typing.Union[typing.Dict, 'PoolParameter'] = None,
name: str = None,
placeholder: str = None,
description: str = None,
data_type: str = "STRING",
var_type: str = "PUBLIC_CONSTANT",
values: typing.List = None,
) -> 'PoolParameter':
"""Creates a new Variable with the specified properties in the Pool.
!!! api "API"
- `POST: /integration/api/pools/{pool_id}/variables/`
```json
{
"poolId": self.id,
"dataType":"<DATE|DOUBLE|INT|STRING|COLUMN|QUALIFIED_COLUMN|LIST_DOUBLE|LIST_INT|LIST_STRING|NULL>",
"name": "",
"type": "<PRIVATE_CONSTANT|PUBLIC_CONSTANT|DYNAMIC>",
"description": "",
"placeholder": "",
"values": [
{"value": ""},...
],
}
```
Args:
pool_variable: Pool Parameter object or dictionary (see API), if `None` all other arguments must be set.
name: Name of the Variable (same as `pool_variable["name"]`).
placeholder: Placeholder of the Variable.
description: Description of the Variable.
data_type: Data type of the Variable (see options `pool_variable`).
var_type: Type of the Variable (see options `pool_variable`).
values: List of Variable values.
Returns:
The newly create Pool Parameter object.
"""
payload = None
if isinstance(pool_variable, PoolParameter):
payload = {
"dataType": pool_variable.data["dataType"],
"name": pool_variable.data["name"],
"type": pool_variable.data["type"],
"description": pool_variable.data["description"],
"placeholder": pool_variable.data["placeholder"],
"poolId": self.id,
"values": pool_variable.data["values"],
}
elif isinstance(pool_variable, dict):
payload = pool_variable
payload["poolId"] = self.id
elif pool_variable is None:
if name is None:
raise PyCelonisValueError("name can't be None if no pool_variable is given")
payload = {
"dataType": data_type,
"name": name,
"type": var_type,
"description": description,
"placeholder": placeholder if placeholder is not None else name.upper(),
"poolId": self.id,
"values": values if values is not None else [{"value": "Undefined"}],
}
created_parameter = self.celonis.api_request(f"{self.url}/variables/", payload)
return PoolParameter(self, created_parameter, self.celonis)
create_table(self, df_or_path, table_name, if_exists='error', column_config=None, connection=None, wait_for_finish=True, chunksize=100000)
¶
Creates a new Table in the Pool from a pandas.DataFrame or pyarrow.parquet.ParquetFile.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df_or_path |
Union[pandas.core.frame.DataFrame, pathlib.Path, str] |
|
required |
table_name |
str |
Name of Table. |
required |
if_exists |
str |
|
'error' |
column_config |
List[Dict[str, Any]] |
Can be used to specify column types and string field length. withcolumnType one of [INTEGER , DATE , TIME , DATETIME , FLOAT , BOOLEAN , STRING ]. |
None |
connection |
Union[DataConnection, str] |
The Data Connection to upload to, else uploads to Global. |
None |
wait_for_finish |
bool |
Waits for the upload to finish processing, set to False to trigger only. |
True |
chunksize |
int |
If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied. |
100000 |
Returns:
Type | Description |
---|---|
Dict |
The Data Job Status. |
Exceptions:
Type | Description |
---|---|
PyCelonisValueError |
If Table already exists and |
PyCelonisTypeError |
When connection is not DataConnection object or ID of Data Connection. |
PyCelonisTypeError |
If Path is not valid a file or folder. |
Source code in celonis_api/event_collection/data_pool.py
def create_table(
self,
df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
table_name: str,
if_exists: str = "error", # drop, replace_data_only
column_config: typing.List[typing.Dict[str, typing.Any]] = None,
connection: typing.Union["DataConnection", str] = None,
wait_for_finish: bool = True,
chunksize: int = 100_000,
) -> typing.Dict:
"""Creates a new Table in the Pool from a
[pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html).
Args:
df_or_path:
* If DataFrame, df is chunked, written to parquet and uploaded.
* If Path to parquet file, file is uploaded.
* If Path to folder, any parquet file in folder is uploaded.
* If str, value is converted to Path and handled as described above.
(The index of the data frame is ignored and NOT pushed to Celonis.)
table_name: Name of Table.
if_exists:
* `error` -> an error is thrown if a table of the same name already exists in the pool.
* `drop` -> the existing able is dropped completely and a new table is created, by default `error`.
* `replace_data_only` -> the column names and types of the old tables are not overwritten.
column_config: Can be used to specify column types and string field length.
```json
[
{
"columnName":"BUKRS",
"fieldLength":100,
"columnType":"STRING"
}...
]
```
with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
connection: The Data Connection to upload to, else uploads to Global.
wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
that are uploaded. If set to a value <1, no chunking is applied.
Returns:
The Data Job Status.
Raises:
PyCelonisValueError: If Table already exists and `if_exists='error'`.
PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
PyCelonisTypeError: If Path is not valid a file or folder.
"""
self.celonis._tracker.track("Create table", extra={"tracking_type": "DATA_PUSH"})
if if_exists not in ["error", "drop", "replace_data_only"]:
raise PyCelonisValueError("Argument 'if_exists' must be one of ['error', 'drop', 'replace_data_only'].")
data_source_id = self._get_data_source_id(connection)
job_type = None
if if_exists == "drop":
job_type = "REPLACE"
else:
table = self.find_table(table_name, data_source_id)
if table is not None:
if if_exists == "error":
raise PyCelonisValueError(
f"Table with name {table_name} already exists in the "
"Data Pool. If you want to drop it and create a new table, set if_exists='drop'."
)
elif if_exists == "replace_data_only":
if column_config is not None:
raise PyCelonisValueError(
"When argument if_exists='replace_data_only', "
"you cannot give an additional column_config. "
"The column_config is inferred from the pool table."
)
column_config = self.get_column_config(table, raise_error=True)
# Create payload for data job
payload, table_name = self._create_payload_for_data_job(
table_name=table_name,
column_config=column_config,
data_source_id=data_source_id,
job_type=job_type,
)
# Create data push job
response = self.celonis.api_request(self.url_data_push, payload)
job_id = response["id"]
# Create parquet uploader
chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
chunk_url = f"{chunk_base_url}/upserted"
# Create df chunks and upload
if isinstance(df_or_path, pd.DataFrame):
self._upload_dataframe(
df=df_or_path,
table_name=table_name,
chunksize=chunksize,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
)
# Or upload file(s)
elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
self._upload_parquet_files(path=df_or_path, chunk_url=chunk_url, chunk_base_url=chunk_base_url)
# Start job execution
_ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
response = self.check_push_status(job_id)
# Wait for job to finish
if wait_for_finish and response.get("status", "") != "DONE":
response = self._wait_until_data_job_finish(job_id=job_id)
return response
find_table(self, table_name, data_source_id=None)
¶
Find a Table in the Pool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table_name |
str |
Name of the Pool Table. |
required |
data_source_id |
str |
ID of the Data Source. |
None |
Returns:
Type | Description |
---|---|
Optional[Dict] |
The Pool Table, if found. |
Source code in celonis_api/event_collection/data_pool.py
def find_table(self, table_name: str, data_source_id: str = None) -> typing.Optional[typing.Dict]:
"""Find a Table in the Pool.
Args:
table_name: Name of the Pool Table.
data_source_id: ID of the Data Source.
Returns:
The Pool Table, if found.
"""
table_found = None
for table in self.tables:
if table["name"].lower() == table_name.lower() and table["dataSourceId"] == data_source_id:
table_found = table
break
return table_found
get_column_config(self, table, raise_error=False)
¶
Get a Column Configuration of a Pool Table.
Column Config List:
[
{'columnName': 'colA', 'columnType': 'DATETIME'},
{'columnName': 'colB', 'columnType': 'FLOAT'},
{'columnName': 'colC', 'columnType': 'STRING', 'fieldLength': 80}
]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
Union[str, Dict] |
Name of the Pool Table or dictionary with |
required |
raise_error |
bool |
Raises a celonis_api.errors.PyCelonisValueError if Table data types are |
False |
Returns:
Type | Description |
---|---|
Optional[List[Dict[str, Any]]] |
The Column Configuration of the Pool Table (Always ignoring '_CELONIS_CHANGE_DATE'). |
Source code in celonis_api/event_collection/data_pool.py
def get_column_config(
self, table: typing.Union[str, typing.Dict], raise_error: bool = False
) -> typing.Optional[typing.List[typing.Dict[str, typing.Any]]]:
"""Get a Column Configuration of a Pool Table.
Column Config List:
```json
[
{'columnName': 'colA', 'columnType': 'DATETIME'},
{'columnName': 'colB', 'columnType': 'FLOAT'},
{'columnName': 'colC', 'columnType': 'STRING', 'fieldLength': 80}
]
```
Args:
table: Name of the Pool Table or dictionary with `{'name': '', 'schemaName': ''}`.
raise_error: Raises a [celonis_api.errors.PyCelonisValueError][] if Table data types are `None` or table
has 99+ columns, else only logs warning.
Returns:
The Column Configuration of the Pool Table (Always ignoring '_CELONIS_CHANGE_DATE').
"""
if isinstance(table, str):
table_list = [t for t in self.tables if t["name"] == table]
if not table_list:
raise PyCelonisNotFoundError(f'Table {table} not found in pool')
table = table_list[0]
try:
column_config = self._get_raw_column_config(table_name=table['name'], data_source_id=table['dataSourceId'])
except Exception as e:
if raise_error:
raise e
self._logger.warning(str(e))
return None
column_config = self._clean_column_config(column_config=column_config)
return column_config
move(self, to)
¶
Moves the Pool to another team.
API
POST: /integration/api/pools/move
Parameters:
Name | Type | Description | Default |
---|---|---|---|
to |
str |
Name of the host domain (e.g. |
required |
Source code in celonis_api/event_collection/data_pool.py
def move(self, to: str):
"""Moves the Pool to another team.
!!! api "API"
- `POST: /integration/api/pools/move`
```json
{
"subsetOfDataModels": False,
"dataPoolId": self.id,
"selectedDataModels": [],
"moveToDomain": to
}
```
Args:
to: Name of the host domain (e.g. `move` for https://move.eu-1.celonis.cloud).
"""
url = f"{self.celonis.url}/integration/api/pools/move"
payload = {
"subsetOfDataModels": False,
"dataPoolId": self.id,
"selectedDataModels": [k for k, v in self.datamodels.ids.items()],
"moveToDomain": to,
}
self.celonis.api_request(url, payload)
push_table(self, df_or_path, table_name, if_exists='error', primary_keys=None, column_config=None, connection=None, wait_for_finish=True, chunksize=100000)
¶
Pushes a pandas.DataFrame or pyarrow.parquet.ParquetFile to the specified Table in the Pool.
Warning
Deprecation: The method 'push_table' is deprecated and will be removed in the next release. Use one of: create_table, append_table, upsert_table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df_or_path |
Union[pandas.core.frame.DataFrame, pathlib.Path, str] |
|
required |
table_name |
str |
Name of Table. |
required |
if_exists |
str |
|
'error' |
primary_keys |
Optional[List[str]] |
List of Table primary keys. |
None |
column_config |
List[Dict[str, Any]] |
Can be used to specify column types and string field length. withcolumnType one of [INTEGER , DATE , TIME , DATETIME , FLOAT , BOOLEAN , STRING ]. |
None |
connection |
Union[DataConnection, str] |
The Data Connection to upload to, else uploads to Global. |
None |
wait_for_finish |
bool |
Waits for the upload to finish processing, set to False to trigger only. |
True |
chunksize |
int |
If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied. |
100000 |
Returns:
Type | Description |
---|---|
Dict |
The Data Job Status. |
Source code in celonis_api/event_collection/data_pool.py
@deprecated("Use 'create_table', 'append_table' or 'upsert_table'.")
def push_table(
self,
df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
table_name: str,
if_exists: str = "error",
primary_keys: typing.Optional[typing.List[str]] = None,
column_config: typing.List[typing.Dict[str, typing.Any]] = None,
connection: typing.Union["DataConnection", str] = None,
wait_for_finish: bool = True,
chunksize: int = 100_000,
) -> typing.Dict:
"""Pushes a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
to the specified Table in the Pool.
!!! warning
Deprecation: The method 'push_table' is deprecated and will be removed in the next release.
Use one of:
[create_table][celonis_api.event_collection.data_pool.Pool.create_table],
[append_table][celonis_api.event_collection.data_pool.Pool.append_table],
[upsert_table][celonis_api.event_collection.data_pool.Pool.upsert_table].
Args:
df_or_path:
* If DataFrame, df is chunked, written to parquet and uploaded.
* If Path to parquet file, file is uploaded.
* If Path to folder, any parquet file in folder is uploaded.
* If str, value is converted to Path and handled as described above.
table_name: Name of Table.
if_exists:
* `error` -> an error is thrown if a table of the same name already exists in the pool.
* `drop` -> the existing able is dropped completely and a new table is created, by default `error`.
* `replace_data_only` -> the column names and types of the old tables are not overwritten.
primary_keys: List of Table primary keys.
column_config: Can be used to specify column types and string field length.
```json
[
{
"columnName":"BUKRS",
"fieldLength":100,
"columnType":"STRING"
}...
]
```
with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
connection: The Data Connection to upload to, else uploads to Global.
wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
that are uploaded. If set to a value <1, no chunking is applied.
Returns:
The Data Job Status.
"""
if if_exists == "error":
response = self.create_table(
df_or_path=df_or_path,
table_name=table_name,
if_exists=if_exists,
column_config=column_config,
connection=connection,
wait_for_finish=wait_for_finish,
chunksize=chunksize,
)
elif if_exists in ["replace", "replace_data_only"]:
response = self.create_table(
df_or_path=df_or_path,
table_name=table_name,
if_exists="drop",
column_config=column_config,
connection=connection,
wait_for_finish=wait_for_finish,
chunksize=chunksize,
)
elif if_exists == "append":
response = self.append_table(
df_or_path=df_or_path,
table_name=table_name,
connection=connection,
wait_for_finish=wait_for_finish,
chunksize=chunksize,
)
elif if_exists == "upsert":
if primary_keys is None:
raise PyCelonisValueError("Argument 'primary_keys' must be set for upsert.")
else:
response = self.upsert_table(
df_or_path=df_or_path,
primary_keys=primary_keys,
table_name=table_name,
connection=connection,
wait_for_finish=wait_for_finish,
chunksize=chunksize,
)
else:
raise PyCelonisValueError(f"if_exists {if_exists} not supported")
return response
upsert_table(self, df_or_path, table_name, primary_keys, column_config=None, connection=None, wait_for_finish=True, chunksize=100000)
¶
Upserts the pandas.DataFrame or pyarrow.parquet.ParquetFile an existing Table in the Pool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df_or_path |
Union[pandas.core.frame.DataFrame, pathlib.Path, str] |
|
required |
table_name |
str |
Name of Table. |
required |
primary_keys |
List[str] |
List of Table primary keys. |
required |
column_config |
List[Dict[str, Any]] |
Can be used to specify column types and string field length. withcolumnType one of [INTEGER , DATE , TIME , DATETIME , FLOAT , BOOLEAN , STRING ]. |
None |
connection |
Union[DataConnection, str] |
The Data Connection to upload to, else uploads to Global. |
None |
wait_for_finish |
bool |
Waits for the upload to finish processing, set to False to trigger only. |
True |
chunksize |
int |
If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files that are uploaded. If set to a value <1, no chunking is applied. |
100000 |
Returns:
Type | Description |
---|---|
Dict |
The Data Job Status. |
Exceptions:
Type | Description |
---|---|
PyCelonisValueError |
If Table already exists and |
PyCelonisTypeError |
When connection is not DataConnection object or ID of Data Connection. |
PyCelonisTypeError |
If Path is not valid a file or folder. |
Source code in celonis_api/event_collection/data_pool.py
def upsert_table(
self,
df_or_path: typing.Union[pd.DataFrame, pathlib.Path, str],
table_name: str,
primary_keys: typing.List[str],
column_config: typing.List[typing.Dict[str, typing.Any]] = None,
connection: typing.Union["DataConnection", str] = None,
wait_for_finish: bool = True,
chunksize: int = 100_000,
) -> typing.Dict:
"""Upserts the [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
an existing Table in the Pool.
Args:
df_or_path:
* If DataFrame, df is chunked, written to parquet and uploaded.
* If Path to parquet file, file is uploaded.
* If Path to folder, any parquet file in folder is uploaded.
* If str, value is converted to Path and handled as described above.
table_name: Name of Table.
primary_keys: List of Table primary keys.
column_config: Can be used to specify column types and string field length.
```json
[
{
"columnName":"BUKRS",
"fieldLength":100,
"columnType":"STRING"
}...
]
```
with `columnType` one of [`INTEGER`, `DATE`, `TIME`, `DATETIME`, `FLOAT`, `BOOLEAN`, `STRING`].
connection: The Data Connection to upload to, else uploads to Global.
wait_for_finish: Waits for the upload to finish processing, set to False to trigger only.
chunksize: If DataFrame is passed, the value is used to chunk the dataframe into multiple parquet files
that are uploaded. If set to a value <1, no chunking is applied.
Returns:
The Data Job Status.
Raises:
PyCelonisValueError: If Table already exists and `if_exists='error'`.
PyCelonisTypeError: When connection is not DataConnection object or ID of Data Connection.
PyCelonisTypeError: If Path is not valid a file or folder.
"""
self.celonis._tracker.track("Upsert table", extra={"tracking_type": "DATA_PUSH"})
data_source_id = self._get_data_source_id(connection)
table = self.find_table(table_name, data_source_id)
if table is None:
raise PyCelonisNotFoundError(
f"The target table \"{table_name}\" could not be found in the Data Pool. "
"Please check spelling of table name."
)
if not column_config:
column_config = self.get_column_config(table)
# Create payload for data job
payload, table_name = self._create_payload_for_data_job(
table_name=table_name,
column_config=column_config,
data_source_id=data_source_id,
job_type="DELTA",
keys=primary_keys,
)
# Create data push job
response = self.celonis.api_request(self.url_data_push, payload)
job_id = response["id"]
# Create parquet uploader
chunk_base_url = f"{self.url_data_push}{job_id}/chunks"
chunk_url = f"{chunk_base_url}/upserted"
# Create df chunks and upload
if isinstance(df_or_path, pd.DataFrame):
self._upload_dataframe(
df=df_or_path,
table_name=table_name,
chunksize=chunksize,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
check_column=None,
)
# Or upload file(s)
elif isinstance(df_or_path, str) or isinstance(df_or_path, pathlib.Path):
self._upload_parquet_files(
path=df_or_path,
chunk_url=chunk_url,
chunk_base_url=chunk_base_url,
check_column=None,
)
# Start job execution
_ = self.celonis.api_request(f"{self.url_data_push}{job_id}", method=HttpMethod.POST)
response = self.check_push_status(job_id)
# Wait for job to finish
if wait_for_finish and response.get("status", "") != "DONE":
response = self._wait_until_data_job_finish(job_id=job_id)
return response