compute_node.py
ComputeNode (CelonisApiObject)
¶
ComputeNode object to interact with Celonis Event Collection API.
Source code in celonis_api/event_collection/compute_node.py
class ComputeNode(CelonisApiObject):
"""ComputeNode object to interact with Celonis Event Collection API."""
@property
def data(self) -> typing.Dict:
"""ComputeNode has no data. Always returns emtpy dict.
Raises:
PyCelonisNotImplementedError: when trying to set data.
"""
return {}
@data.setter
def data(self, value):
raise PyCelonisNotImplementedError("Set 'data' not implemented.")
@property
def url(self) -> str:
"""
!!! api "API"
- `GET: /integration/api/v1/compute/{datamodel_id}`
"""
return f"{self.celonis.url}/integration/api/v1/compute/{self.id}"
def get_data_file(
self, pql_query: pql.PQL, file_path: typing.Union[str, pathlib.Path] = None, **kwargs
) -> pathlib.Path:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and returns the path to the exported file.
!!! api "API"
- `POST: /integration/api/v1/compute/{datamodel_id}/export/query`
```json
{
"dataCommand": {
"commands": [
{"queries": pql_query.query}
]
}
}
```
Args:
pql_query: The table query to be executed.
file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.
Returns:
Path to downloaded file containing the results of the query.
"""
return self._compute(pql_query=pql_query, file_path=file_path)
def get_data_frame(self, pql_query: pql.PQL, **kwargs) -> pd.DataFrame:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Uses [ComputeNode.get_data_file][celonis_api.event_collection.compute_node.ComputeNode.get_data_file].
Args:
pql_query: The table query to be executed.
Returns:
Dataframe containing the results of the query.
"""
file_path = self._compute(pql_query=pql_query)
return parquet_utils.read_parquet(file_path)
def _compute(self, pql_query, file_path=None) -> pathlib.Path:
self.celonis._tracker.track("Get data file", extra={"tracking_type": "DATA_PULL"})
if file_path is None:
tmp_file_path = pathlib.Path(tempfile.gettempdir())
file_path = tmp_file_path / self._tmp_file_name()
elif isinstance(file_path, str):
file_path = pathlib.Path(file_path)
pql_query = self._escalate_query_type(pql_query)
export_status_result = self._send_query_to_server(pql_query)
self._download_query_result(export_status_result=export_status_result, file_path=file_path)
return file_path
@staticmethod
def _escalate_query_type(pql_query) -> pql.PQL:
if isinstance(pql_query, str):
pql_query = pql.PQLColumn(pql_query)
if isinstance(pql_query, pql.PQLColumn):
pql_query = pql.PQL(pql_query)
if isinstance(pql_query, pql.PQL):
return pql_query
else:
raise PyCelonisTypeError(f"Argument 'pql_query' must be PQL, PQLColumn or str, but got {type(pql_query)}.")
@staticmethod
def _tmp_file_name(i=None) -> str:
if i is None:
return f"celonis_pql_export_{time.time()}.parquet"
else:
return f"celonis_pql_export_{i}.parquet"
def _download_query_result(self, export_status_result: typing.Dict, file_path: pathlib.Path) -> pathlib.Path:
if file_path.is_file():
file_path.unlink()
if "exportChunks" in export_status_result:
# The export supports server-side chunking
try:
download_query_result = self._download_chunks(export_status_result, file_path)
return download_query_result
except PyCelonisHTTPError as e:
self._logger.info(f"Could not download chunked export due to {str(e)}, trying unchunked endpoint.")
export_id = export_status_result["id"]
return self.celonis.api_request(f"{self.url}/export/{export_id}/result", file_path)
def _send_query_to_server(self, pql_query: pql.PQL) -> typing.Dict:
payload = {"dataCommand": {"commands": [{"queries": pql_query.query}]}}
try:
export_query_result = self.celonis.api_request(f"{self.url}/export/query", payload)
except PyCelonisHTTPError as e:
raise PyCelonisDataExportNotEnabledError(e.message if e.message else "")
return self._get_export_status_result(export_query_result, pql_query)
def _get_export_status_result(self, export_query_result: typing.Dict, pql_query: pql.PQL) -> typing.Dict:
self._logger.info("PQL Export started...")
iterations = 0
error_count = 0
while True:
try:
result = self.celonis.api_request(f"{self.url}/export/{export_query_result['id']}")
status = result["exportStatus"]
if status != "RUNNING":
self._logger.info(f"PQL Export status: {status}")
break
error_count = 0
iterations += 1
if iterations % 5 == 0:
self._logger.info(f"PQL Export status: {status}...")
time.sleep(1)
except PyCelonisError as e:
error_count += 1
time.sleep(3)
self._logger.exception("Failed to request export status, trying again...")
if error_count > 5:
raise e
if status != "DONE":
raise PyCelonisHTTPError(
f"Export failed. Status: {status} \n\n Query: {pql_query.query} \n\n Message "
f"{result.get('message', '')} \n\n Export ID: {result.get('id', '')}"
)
return result
def _download_chunks(self, export_status_result: typing.Dict, file_path: pathlib.Path) -> pathlib.Path:
num_chunks = export_status_result["exportChunks"]
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_dir_path = pathlib.Path(tmp_dir)
downloaded_chunks = 0
error_count = 0
while downloaded_chunks < num_chunks:
try:
chunk_path = self._get_chunk_path(tmp_dir_path, downloaded_chunks)
download_url = f"{self.url}/export/{export_status_result['id']}/{downloaded_chunks}/result"
self.celonis.api_request(download_url, chunk_path)
downloaded_chunks += 1
error_count = 0
except PyCelonisError as e:
error_count += 1
time.sleep(3)
self._logger.exception(f"Failed to download chunk {downloaded_chunks}, trying again...")
if error_count > 5:
raise e
self._unite_chunks(number_of_chunks=downloaded_chunks, file_path=file_path, tmp_dir_path=tmp_dir_path)
return file_path
def _get_chunk_path(self, tmp_dir_path: pathlib.Path, chunk: int) -> pathlib.Path:
return tmp_dir_path / self._tmp_file_name(chunk)
def _unite_chunks(self, number_of_chunks: int, file_path: pathlib.Path, tmp_dir_path: pathlib.Path):
chunk_file = self._get_chunk_path(tmp_dir_path, 0)
if number_of_chunks > 1:
table = read_table(source=chunk_file)
for chunk in range(1, number_of_chunks, 1):
chunk_file = self._get_chunk_path(tmp_dir_path, chunk)
chunk_table = read_table(source=chunk_file)
table = concat_tables([table, chunk_table])
write_table(table=table, where=file_path, allow_truncated_timestamps=True, coerce_timestamps="us")
else:
shutil.copy(chunk_file, file_path)
data: Dict
property
writable
¶
ComputeNode has no data. Always returns emtpy dict.
Exceptions:
Type | Description |
---|---|
PyCelonisNotImplementedError |
when trying to set data. |
url: str
property
readonly
¶
API
GET: /integration/api/v1/compute/{datamodel_id}
get_data_file(self, pql_query, file_path=None, **kwargs)
¶
Exports the results of a PQL query as pyarrow.parquet.ParquetFile and returns the path to the exported file.
API
POST: /integration/api/v1/compute/{datamodel_id}/export/query
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pql_query |
PQL |
The table query to be executed. |
required |
file_path |
Union[str, pathlib.Path] |
The output path for the export. Defaults to |
None |
Returns:
Type | Description |
---|---|
Path |
Path to downloaded file containing the results of the query. |
Source code in celonis_api/event_collection/compute_node.py
def get_data_file(
self, pql_query: pql.PQL, file_path: typing.Union[str, pathlib.Path] = None, **kwargs
) -> pathlib.Path:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and returns the path to the exported file.
!!! api "API"
- `POST: /integration/api/v1/compute/{datamodel_id}/export/query`
```json
{
"dataCommand": {
"commands": [
{"queries": pql_query.query}
]
}
}
```
Args:
pql_query: The table query to be executed.
file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.
Returns:
Path to downloaded file containing the results of the query.
"""
return self._compute(pql_query=pql_query, file_path=file_path)
get_data_frame(self, pql_query, **kwargs)
¶
Exports the results of a PQL query as pyarrow.parquet.ParquetFile and converts it to a pandas.DataFrame. Uses ComputeNode.get_data_file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pql_query |
PQL |
The table query to be executed. |
required |
Returns:
Type | Description |
---|---|
DataFrame |
Dataframe containing the results of the query. |
Source code in celonis_api/event_collection/compute_node.py
def get_data_frame(self, pql_query: pql.PQL, **kwargs) -> pd.DataFrame:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Uses [ComputeNode.get_data_file][celonis_api.event_collection.compute_node.ComputeNode.get_data_file].
Args:
pql_query: The table query to be executed.
Returns:
Dataframe containing the results of the query.
"""
file_path = self._compute(pql_query=pql_query)
return parquet_utils.read_parquet(file_path)
HybridPseudoComputeNode
¶
Hybrid ComputeNode object to interact with Celonis Hybrid Event Collection API.
Source code in celonis_api/event_collection/compute_node.py
class HybridPseudoComputeNode:
"""Hybrid ComputeNode object to interact with Celonis Hybrid Event Collection API."""
def __init__(self, id: str, datamodel: 'Datamodel'):
self.id = id
self.datamodel = datamodel
@property
def data(self) -> typing.Dict:
return {}
@data.setter
def data(self, value):
raise PyCelonisNotImplementedError("Set 'data' not implemented.")
@property
def url(self) -> str:
raise PyCelonisNotImplementedError("Property 'url' not implemented.")
@property
def _parent_class(self):
from pycelonis.celonis_api.event_collection.data_model import Datamodel
return Datamodel
def get_data_file(
self, pql_query: pql.PQL, file_path: typing.Union[str, pathlib.Path] = None, **kwargs
) -> pathlib.Path:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and returns the path to the exported file.
This method will create a temporary Studio Space, Package and Analysis to then export the result via
[Analysis.get_data_file][celonis_api.process_analytics.analysis.Analysis.get_data_file].
The temporary space will be deleted at before the result is returned.
It is recommended to set up an Analysis in Studio yourself and use it to export the query result.
Args:
pql_query: The table query to be executed.
file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.
Returns:
Path to downloaded file containing the results of the query.
"""
name = f"zzz___TEMP___{random.randint(1, 1000000)}"
temp_space = None
try:
temp_space = self.datamodel.celonis.create_space(name=name + "_SPACE")
temp_package = temp_space.create_package(name=name + "_PACKAGE")
temp_analysis = temp_package.create_analysis(name=name + "_ANALYSIS", data_model_id=self.datamodel.id)
file_path = temp_analysis.get_data_file(pql_query, file_path, **kwargs)
except PyCelonisPermissionError:
raise PyCelonisPermissionError(
"For querying Hybrid datamodel you need ALL Studio Permissions:\n"
"[CREATE SPACE, EDIT ALL SPACES, DELETE ALL SPACES, "
"USE ALL PACKAGES, CREATE PACKAGE, EDIT ALL PACKAGES, DELETE ALL PACKAGES]."
)
finally:
if temp_space:
temp_space.delete()
return file_path
def get_data_frame(self, pql_query: pql.PQL, **kwargs) -> pd.DataFrame:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
file and converts it to a
[pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Uses [get_data_file][celonis_api.event_collection.compute_node.HybridPseudoComputeNode.get_data_file].
Args:
pql_query: The table query to be executed.
Returns:
Dataframe containing the results of the query.
"""
file_path = self.get_data_file(pql_query=pql_query, **kwargs)
return parquet_utils.read_parquet(file_path)
def _get_export_status_result(self, *args, **kwargs):
raise PyCelonisNotImplementedError("_get_export_status_result not implemented for HybridPseudoComputeNode")
def _download_query_result(self, *args, **kwargs):
raise PyCelonisNotImplementedError("_download_query_result not implemented for HybridPseudoComputeNode")
get_data_file(self, pql_query, file_path=None, **kwargs)
¶
Exports the results of a PQL query as pyarrow.parquet.ParquetFile and returns the path to the exported file.
This method will create a temporary Studio Space, Package and Analysis to then export the result via Analysis.get_data_file. The temporary space will be deleted at before the result is returned. It is recommended to set up an Analysis in Studio yourself and use it to export the query result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pql_query |
PQL |
The table query to be executed. |
required |
file_path |
Union[str, pathlib.Path] |
The output path for the export. Defaults to |
None |
Returns:
Type | Description |
---|---|
Path |
Path to downloaded file containing the results of the query. |
Source code in celonis_api/event_collection/compute_node.py
def get_data_file(
self, pql_query: pql.PQL, file_path: typing.Union[str, pathlib.Path] = None, **kwargs
) -> pathlib.Path:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and returns the path to the exported file.
This method will create a temporary Studio Space, Package and Analysis to then export the result via
[Analysis.get_data_file][celonis_api.process_analytics.analysis.Analysis.get_data_file].
The temporary space will be deleted at before the result is returned.
It is recommended to set up an Analysis in Studio yourself and use it to export the query result.
Args:
pql_query: The table query to be executed.
file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.
Returns:
Path to downloaded file containing the results of the query.
"""
name = f"zzz___TEMP___{random.randint(1, 1000000)}"
temp_space = None
try:
temp_space = self.datamodel.celonis.create_space(name=name + "_SPACE")
temp_package = temp_space.create_package(name=name + "_PACKAGE")
temp_analysis = temp_package.create_analysis(name=name + "_ANALYSIS", data_model_id=self.datamodel.id)
file_path = temp_analysis.get_data_file(pql_query, file_path, **kwargs)
except PyCelonisPermissionError:
raise PyCelonisPermissionError(
"For querying Hybrid datamodel you need ALL Studio Permissions:\n"
"[CREATE SPACE, EDIT ALL SPACES, DELETE ALL SPACES, "
"USE ALL PACKAGES, CREATE PACKAGE, EDIT ALL PACKAGES, DELETE ALL PACKAGES]."
)
finally:
if temp_space:
temp_space.delete()
return file_path
get_data_frame(self, pql_query, **kwargs)
¶
Exports the results of a PQL query as pyarrow.parquet.ParquetFile file and converts it to a pandas.DataFrame. Uses get_data_file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pql_query |
PQL |
The table query to be executed. |
required |
Returns:
Type | Description |
---|---|
DataFrame |
Dataframe containing the results of the query. |
Source code in celonis_api/event_collection/compute_node.py
def get_data_frame(self, pql_query: pql.PQL, **kwargs) -> pd.DataFrame:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
file and converts it to a
[pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Uses [get_data_file][celonis_api.event_collection.compute_node.HybridPseudoComputeNode.get_data_file].
Args:
pql_query: The table query to be executed.
Returns:
Dataframe containing the results of the query.
"""
file_path = self.get_data_file(pql_query=pql_query, **kwargs)
return parquet_utils.read_parquet(file_path)