analysis.py
Analysis (BaseAnalysis)
¶
Analysis object to interact with Celonis Process Mining API.
Source code in celonis_api/process_analytics/analysis.py
class Analysis(BaseAnalysis):
"""Analysis object to interact with Celonis Process Mining API."""
@property
def url(self) -> str:
"""
!!! api "API"
- `/process-mining/analysis/v1.2/api/analysis/{analysis_id}`
"""
return f"{self.celonis.url}/process-mining/analysis/v1.2/api/analysis/{self.id}"
@property
def web_link(self):
"""Get the web link to the Analysis."""
url = f"{self.celonis.url}/process-mining/analysis/{self.id}/#/"
return get_link(url, self.name)
@property
def data(self) -> typing.Dict:
return super().data
@data.setter
def data(self, value: typing.Dict):
self._data = value
self.celonis.api_request(self.url.replace("/analysis/v1.2", ""), self._data, method=HttpMethod.PUT)
@property
def workspace(self) -> 'Workspace':
"""Get the Analysis Workspace.
Returns:
The Analysis Workspace object.
"""
from pycelonis.celonis_api.process_analytics.workspace import Workspace
return Workspace(self.celonis, self.data["processId"])
@property
def datamodel(self) -> 'Datamodel':
"""Get the Analysis Datamodel.
Returns:
The Analysis Datamodel object.
"""
try:
dm = self.workspace.datamodel
except (PyCelonisError, AttributeError):
try:
response = self.celonis.api_request(f"{self.url}/data_model")
dm_id = response.get("id")
dm = self.celonis.datamodels.find(dm_id)
except PyCelonisError:
raise PyCelonisPermissionError(
"Datamodel not accessible. App/API key needs access to Pool and Datamodel."
)
return dm
def delete(self):
"""Deletes the Analysis.
!!! api "API"
- `DELETE: /process-mining/analysis/v1.2/api/analysis/{analysis_id}`
"""
self.celonis.api_request(f"{self.celonis.url}/process-mining/api/analysis/{self.id}", message=HttpMethod.DELETE)
def move(self, to: str, target_workspace: typing.Union[str, 'Workspace']):
"""Moves the Analysis to the specified Workspace in another Team.
!!! api "API"
- `POST: /process-mining/api/analysis/move`
```json
{
"analysisIdsToReplace": [],
"analysisId": self.id,
"processId": target_workspace,
"teamDomain": to
}
```
Args:
to: Team name (e.g. `move` for https://move.eu-1.celonis.cloud)
target_workspace: Workspace ID or Workspace object where the analysis should be copied to.
"""
from pycelonis.celonis_api.process_analytics.workspace import Workspace
if isinstance(target_workspace, Workspace):
target_workspace = target_workspace.id
payload = {"analysisIdsToReplace": [], "analysisId": self.id, "processId": target_workspace, "teamDomain": to}
self.celonis.api_request(f"{self.celonis.url}/process-mining/api/analysis/move", payload)
def get_data_frame(self, pql_query: typing.Union[PQL, PQLColumn], **kwargs) -> pandas.DataFrame:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Uses [get_data_file][celonis_api.process_analytics.analysis.Analysis.get_data_file].
Args:
pql_query: The table query to be executed.
**kwargs: optional arguments passed to
[get_data_file][celonis_api.process_analytics.analysis.Analysis.get_data_file].
Returns:
Dataframe containing the results of the query.
"""
file = self.get_data_file(pql_query, **kwargs)
df = parquet_utils.read_parquet(file)
return df
def get_data_file(
self,
pql_query: typing.Union[PQL, PQLColumn, str],
file_path: typing.Union[str, pathlib.Path] = None,
export_type: str = "PARQUET",
variables: typing.List[typing.Dict] = None,
chunked_download: bool = False,
) -> pathlib.Path:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and returns the path to the exported file.
Args:
pql_query: The table query to be executed.
file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.
export_type: Export filetype. One of [`PARQUET`, `EXCEL`, `CSV`].
variables: Only needed when PQL query contains saved formulas that contain variables.
chunked_download: Can be used to download big data sets in chunks (Datamodel permissions required).
Returns:
Path to downloaded file containing the results of the query.
"""
variables = variables or []
if isinstance(pql_query, str):
pql_query = PQLColumn(pql_query)
if isinstance(pql_query, PQLColumn):
pql_query = PQL(pql_query) # type: ignore
if isinstance(pql_query, PQL):
pql_query = pql_query.query # type: ignore
else:
raise TypeError("pql_query must be PQL, PQLColumn or str.")
file_type = export_type.lower() if export_type != "EXCEL" else "xlsx"
file_name = f"celonis_pql_export_{time.time()}.{file_type}"
if not file_path:
file_path = pathlib.Path(tempfile.gettempdir()) / file_name
file_path = pathlib.Path(file_path)
if file_path.is_dir():
file_path = file_path / file_name
elif file_path.is_file():
file_path.unlink()
dm_data = self.celonis.api_request(f"{self.url}/data_model")
payload = {
"dataCommandRequest": {
"variables": variables,
"request": {
"commands": [{"queries": pql_query, "computationId": 0, "isTransient": False}],
"cubeId": dm_data.get("id") if dm_data else self.datamodel.id,
},
},
"exportType": export_type,
}
export_url = f"{self.url}/exporting/query"
export_query_result = self.celonis.api_request(export_url, payload, params={"v": "false"})
self._raise_error_if_uplinkmodel_not_supported(export_query_result)
if chunked_download:
self._logger.info("For chunked downloads all permissions on datamodel level are needed.")
export_status_result = self.datamodel._compute_node._get_export_status_result(
export_query_result, pql_query
)
self.datamodel._compute_node._download_query_result(
export_status_result=export_status_result, file_path=file_path
)
else:
self._download_query_results_from_analysis_endpoint(export_query_result, pql_query, file_path, export_type)
return file_path
def _raise_error_if_uplinkmodel_not_supported(self, response: dict):
if ("exportStatus" in response) and ("message" in response):
if (response["exportStatus"] == "FAILED") and ("not supported for uplinked models" in response["message"]):
raise PyCelonisNotSupportedError(
"Operation not supported. This could occur when you are working on Demo-Data using an uplinked data"
" model or if data export is not enabled for your team. Please contact Celonis customer support."
)
def _download_query_results_from_analysis_endpoint(self, export_query_result, pql_query, file_path, export_type):
exp_id = export_query_result["id"]
export_url = f"{self.url}/exporting/query"
iterations = 0
error_count = 0
while True:
try:
export_status = self.celonis.api_request(f"{export_url}/{exp_id}/status")
if export_status["exportStatus"] != "RUNNING":
self._logger.info(f"PQL Export status: {export_status}")
break
error_count = 0
iterations += 1
if iterations % 10 == 0:
self._logger.info(f"PQL Export status: {export_status}...")
time.sleep(1)
except PyCelonisHTTPError as e:
error_count += 1
self._logger.exception("Failed to request PQL Export status, trying again...")
time.sleep(3)
if error_count > 5:
raise e
if export_status["exportStatus"] != "DONE":
raise PyCelonisHTTPError(f"PQL Export failed, status: {export_status} \n\n Query: {pql_query}")
export_results = self.celonis.api_request(f"{export_url}/{exp_id}/download", file_path)
return export_results
data: Dict
property
writable
¶
Response data from the Celonis API.
If static
is set to False
, every time you set the data
property will execute a POST
request to
the resource API endpoint to update the remote resource in Celonis EMS.
Examples:
datamodel: Datamodel
property
readonly
¶
Get the Analysis Datamodel.
Returns:
Type | Description |
---|---|
Datamodel |
The Analysis Datamodel object. |
url: str
property
readonly
¶
API
/process-mining/analysis/v1.2/api/analysis/{analysis_id}
web_link
property
readonly
¶
Get the web link to the Analysis.
workspace: Workspace
property
readonly
¶
Get the Analysis Workspace.
Returns:
Type | Description |
---|---|
Workspace |
The Analysis Workspace object. |
delete(self)
¶
Deletes the Analysis.
API
DELETE: /process-mining/analysis/v1.2/api/analysis/{analysis_id}
get_data_file(self, pql_query, file_path=None, export_type='PARQUET', variables=None, chunked_download=False)
¶
Exports the results of a PQL query as pyarrow.parquet.ParquetFile and returns the path to the exported file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pql_query |
Union[pycelonis.celonis_api.pql.pql.PQL, pycelonis.celonis_api.pql.pql.PQLColumn, str] |
The table query to be executed. |
required |
file_path |
Union[str, pathlib.Path] |
The output path for the export. Defaults to |
None |
export_type |
str |
Export filetype. One of [ |
'PARQUET' |
variables |
List[Dict] |
Only needed when PQL query contains saved formulas that contain variables. |
None |
chunked_download |
bool |
Can be used to download big data sets in chunks (Datamodel permissions required). |
False |
Returns:
Type | Description |
---|---|
Path |
Path to downloaded file containing the results of the query. |
Source code in celonis_api/process_analytics/analysis.py
def get_data_file(
self,
pql_query: typing.Union[PQL, PQLColumn, str],
file_path: typing.Union[str, pathlib.Path] = None,
export_type: str = "PARQUET",
variables: typing.List[typing.Dict] = None,
chunked_download: bool = False,
) -> pathlib.Path:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and returns the path to the exported file.
Args:
pql_query: The table query to be executed.
file_path: The output path for the export. Defaults to `tmpdir/celonis_pql_export_<current_time>.parquet`.
export_type: Export filetype. One of [`PARQUET`, `EXCEL`, `CSV`].
variables: Only needed when PQL query contains saved formulas that contain variables.
chunked_download: Can be used to download big data sets in chunks (Datamodel permissions required).
Returns:
Path to downloaded file containing the results of the query.
"""
variables = variables or []
if isinstance(pql_query, str):
pql_query = PQLColumn(pql_query)
if isinstance(pql_query, PQLColumn):
pql_query = PQL(pql_query) # type: ignore
if isinstance(pql_query, PQL):
pql_query = pql_query.query # type: ignore
else:
raise TypeError("pql_query must be PQL, PQLColumn or str.")
file_type = export_type.lower() if export_type != "EXCEL" else "xlsx"
file_name = f"celonis_pql_export_{time.time()}.{file_type}"
if not file_path:
file_path = pathlib.Path(tempfile.gettempdir()) / file_name
file_path = pathlib.Path(file_path)
if file_path.is_dir():
file_path = file_path / file_name
elif file_path.is_file():
file_path.unlink()
dm_data = self.celonis.api_request(f"{self.url}/data_model")
payload = {
"dataCommandRequest": {
"variables": variables,
"request": {
"commands": [{"queries": pql_query, "computationId": 0, "isTransient": False}],
"cubeId": dm_data.get("id") if dm_data else self.datamodel.id,
},
},
"exportType": export_type,
}
export_url = f"{self.url}/exporting/query"
export_query_result = self.celonis.api_request(export_url, payload, params={"v": "false"})
self._raise_error_if_uplinkmodel_not_supported(export_query_result)
if chunked_download:
self._logger.info("For chunked downloads all permissions on datamodel level are needed.")
export_status_result = self.datamodel._compute_node._get_export_status_result(
export_query_result, pql_query
)
self.datamodel._compute_node._download_query_result(
export_status_result=export_status_result, file_path=file_path
)
else:
self._download_query_results_from_analysis_endpoint(export_query_result, pql_query, file_path, export_type)
return file_path
get_data_frame(self, pql_query, **kwargs)
¶
Exports the results of a PQL query as pyarrow.parquet.ParquetFile and converts it to a pandas.DataFrame. Uses get_data_file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pql_query |
Union[pycelonis.celonis_api.pql.pql.PQL, pycelonis.celonis_api.pql.pql.PQLColumn] |
The table query to be executed. |
required |
**kwargs |
optional arguments passed to get_data_file. |
{} |
Returns:
Type | Description |
---|---|
DataFrame |
Dataframe containing the results of the query. |
Source code in celonis_api/process_analytics/analysis.py
def get_data_frame(self, pql_query: typing.Union[PQL, PQLColumn], **kwargs) -> pandas.DataFrame:
"""Exports the results of a PQL query as
[pyarrow.parquet.ParquetFile](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)
and converts it to a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).
Uses [get_data_file][celonis_api.process_analytics.analysis.Analysis.get_data_file].
Args:
pql_query: The table query to be executed.
**kwargs: optional arguments passed to
[get_data_file][celonis_api.process_analytics.analysis.Analysis.get_data_file].
Returns:
Dataframe containing the results of the query.
"""
file = self.get_data_file(pql_query, **kwargs)
df = parquet_utils.read_parquet(file)
return df
move(self, to, target_workspace)
¶
Moves the Analysis to the specified Workspace in another Team.
API
POST: /process-mining/api/analysis/move
Parameters:
Name | Type | Description | Default |
---|---|---|---|
to |
str |
Team name (e.g. |
required |
target_workspace |
Union[str, Workspace] |
Workspace ID or Workspace object where the analysis should be copied to. |
required |
Source code in celonis_api/process_analytics/analysis.py
def move(self, to: str, target_workspace: typing.Union[str, 'Workspace']):
"""Moves the Analysis to the specified Workspace in another Team.
!!! api "API"
- `POST: /process-mining/api/analysis/move`
```json
{
"analysisIdsToReplace": [],
"analysisId": self.id,
"processId": target_workspace,
"teamDomain": to
}
```
Args:
to: Team name (e.g. `move` for https://move.eu-1.celonis.cloud)
target_workspace: Workspace ID or Workspace object where the analysis should be copied to.
"""
from pycelonis.celonis_api.process_analytics.workspace import Workspace
if isinstance(target_workspace, Workspace):
target_workspace = target_workspace.id
payload = {"analysisIdsToReplace": [], "analysisId": self.id, "processId": target_workspace, "teamDomain": to}
self.celonis.api_request(f"{self.celonis.url}/process-mining/api/analysis/move", payload)
BaseAnalysis (CelonisApiObject)
¶
Base Analysis object to interact with Celonis Process Analytics API.
Source code in celonis_api/process_analytics/analysis.py
class BaseAnalysis(CelonisApiObject):
"""Base Analysis object to interact with Celonis Process Analytics API."""
@property
def published(self) -> 'BasePublishedDocument':
"""Get/Set the Base Published Document of the Analysis."""
return BasePublishedDocument(self)
@published.setter
def published(self, value):
if not isinstance(value, BasePublishedDocument):
raise PyCelonisTypeError(f"Cannot set {type(value)}. Must be of type 'BasePublishedDocument'.")
self.published.data["document"] = value.data["document"]
@property
def draft(self):
"""Get/Set the Base Draft Document of the Analysis."""
return BaseDraftDocument(self)
@draft.setter
def draft(self, value):
if not isinstance(value, BaseDraftDocument):
raise PyCelonisTypeError(f"Cannot set {type(value)}. Must be of type 'BaseDraftDocument'.")
self.draft.data["document"] = value.data["document"]
@property
def saved_formulas(self) -> 'CelonisCollection[BaseAnalysisSavedFormula]':
"""Get all saved formulas of the Analysis.
!!! api "API"
- `GET: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/kpi`
Returns:
A Collection of Analysis Saved Formulas.
"""
return CelonisCollection(BaseAnalysisSavedFormula(self, f) for f in self._saved_formula_data)
@property
def images(self) -> 'CelonisCollection[BaseAnalysisImage]':
"""Get all images of the Analysis.
!!! api "API"
- `GET: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/assets`
Returns:
A Collection of Analysis Images.
"""
return CelonisCollection(BaseAnalysisImage(self, i) for i in self._saved_images)
def create_saved_formula(
self, name: str, description: str = "", template: str = "", parameters: typing.List[str] = None, **kwargs
) -> 'BaseAnalysisSavedFormula':
"""Creates a new Saved Formula.
!!! api "API"
- `POST: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/kpi`
```json
{
"name": name,
"description": description,
"template": template,
"parameters": parameters
}
```
Args:
name: Name of the Saved Formula.
description: Description of the KPI.
template: Template of the Saved Formula.
parameters : Parameters of the Saved Formula.
**kwargs : optional
Returns:
The newly created Saved Formula.
"""
parameters = parameters or []
payload = {"name": name, "description": description, "template": template, "parameters": parameters}
payload.update(kwargs)
response = self.celonis.api_request(f"{self.url}/kpi", payload)
return BaseAnalysisSavedFormula(self, response)
def create_image_upload(self, path: typing.Union[str, pathlib.Path]) -> typing.Dict:
"""Upload an image to the Analysis.
!!! api "API"
- `POST: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/assets`
```json
{
"file": <bytes>
}
```
Args:
path: Path to the image file.
Returns:
Upload result.
"""
path = pathlib.Path(path)
return self.celonis.api_request(f"{self.url}/assets", path)
def backup_content(self, backup_path: typing.Union[str, pathlib.Path] = ".") -> pathlib.Path:
"""Make a Backup of the Analysis in JSON format.
Args:
backup_path: The path where the Backup Folder will be created.
Returns:
Path to the Backup Folder.
"""
path = pathlib.Path(backup_path) / f"Backup of Analysis - {pathify(self.name)}"
if path.exists():
shutil.rmtree(path)
path.mkdir()
files = {"saved_formulas.json": self._saved_formula_data, "name.json": self.name}
for i in ["draft", "published"]:
data = getattr(self, i).data
if data["document"]:
files[f"{i}_variables.json"] = data["document"].pop("variables", [])
sheets = data["document"].pop("components", [])
for n, s in enumerate(sheets):
files[f"{i}_sheet_{n:02}.json"] = s
files[f"{i}.json"] = data
for k, v in files.items():
(path / k).write_text(json.dumps(v, sort_keys=True, indent=2))
if len(self.images) > 0:
img_path = path / "images"
img_path.mkdir()
for img in self.images:
img.download(img_path / f"{img.id}.png")
return path
def rebuild_content_from_backup( # noqa: C901 TODO: refactor method
self, backup_path: typing.Union[str, pathlib.Path], keep_analysis_name: bool = True
):
"""Overwrites the contents of the Analysis object with content from the Backup Folder.
Args:
backup_path: The path where the Backup Folder is located.
keep_analysis_name: If `True` keeps the Analysis name, else take from Backup.
"""
path = pathlib.Path(backup_path)
assert path.is_dir(), "backup_path must be directory"
if not keep_analysis_name:
match = re.search("Backup of Analysis - (.+)", str(path))
assert match and len(match.groups()) == 1, "name of analysis not found in backup_path"
if (path / "name.json").is_file():
self.name = json.loads((path / "name.json").read_text())
else:
self.name = match[1]
# Upload saved formulas
formulas = json.loads((path / "saved_formulas.json").read_text())
for saved_formula in self.saved_formulas:
saved_formula.delete()
for formula in formulas:
self.create_saved_formula(**formula)
# Upload images and create mapping of ids (stored/old id => new generated id after upload)
img_path = path / "images"
if img_path.is_dir():
old2new_image_id_mapping = dict()
not_uploaded_images = list()
for p in img_path.iterdir():
old_id = p.name.split(".")[0]
try:
img_response = self.create_image_upload(p)
except PyCelonisHTTPError:
not_uploaded_images.append(p.name)
new_id = img_response["id"]
old2new_image_id_mapping[old_id] = new_id
if not_uploaded_images:
self._logger.warning(
f"Image(s): {not_uploaded_images}\n"
"Could not be uploaded since 'Uploaded file format is not supported'. "
"Please verify the image file(s) format or check for damaged file(s)."
)
# Upload components and saved variables
for i in ["published", "draft"]:
if (path / f"{i}.json").is_file():
doc = getattr(self, i)
data = json.loads((path / f"{i}.json").read_text())
data["document"]["variables"] = json.loads((path / f"{i}_variables.json").read_text())
data["document"]["components"] = []
for s in sorted(path.glob(f"{i}_sheet_*.json")):
comp_data = json.loads(s.read_text())
# Update 'imageId' in image_component / mapping old imageId to new generated imageID (assets)
if img_path.is_dir():
for comp in comp_data["components"]:
if "imageId" in comp:
comp["imageId"] = old2new_image_id_mapping[comp["imageId"]]
data["document"]["components"].append(comp_data)
doc.data["document"] = data["document"]
else:
self._logger.warning(f"{i} document of analyses not found in Backup folder '{path}' ")
@deprecated("")
def process_shared_selection_url(self, shared_url: str) -> PQL: # noqa: C901 TODO: refactor method
"""Returns PQL object containing variables and filters from shared selection url.
Args:
shared_url: The shared url.
Returns:
A PQL query containing variables and filters.
"""
bookmark = re.search(r"/b/([A-z0-9-]+)", shared_url)
if not bookmark:
raise PyCelonisValueError("No bookmark reference found in URL.")
response = self.celonis.api_request(f"{self.url}/bookmarks/{bookmark[1]}")
query = PQL()
query.add(self.published.calculate_variables(response["analysisState"]["variables"]))
for s in response["analysisState"]["selections"]:
if s.get("kind") == "SELECTION":
if s.get("type") == "DATE_FILTER":
if s["configuration"].get("dateSearchStart"):
start_date = s["configuration"].get("dateSearchStart") / 1000
start_date_converted = datetime.datetime.utcfromtimestamp(start_date).strftime("%Y-%m-%d")
condition = f"FILTER ROUND_DAY({s['expression']}) >= TO_TIMESTAMP('{start_date_converted}', 'YYYY-MM-DD')" # noqa: E501
query.add(PQLFilter(condition))
if s["configuration"].get("dateSearchEnd"):
end_date = s["configuration"].get("dateSearchEnd") / 1000
end_date_converted = datetime.datetime.utcfromtimestamp(end_date).strftime("%Y-%m-%d")
condition = f"FILTER ROUND_DAY({s['expression']}) <= TO_TIMESTAMP('{end_date_converted}', 'YYYY-MM-DD')" # noqa: E501
query.add(PQLFilter(condition))
else:
if s.get("type") == "STRING":
values = [f"'{v}'" for v in s["values"] if v is not None]
else:
values = [f"{v}" for v in s["values"] if v is not None]
addition = " DOMAIN " if not (values and None in s["values"]) else ""
# all not None values
if values:
values = ",".join(values) # type: ignore
inverse = " NOT" if s.get("revertSelection") else ""
condition = f"FILTER{addition} {s['expression'].split(' AS ')[0]}{inverse} IN ({values})"
# add NULL filter
if None in s["values"]:
inverse = "0" if s.get("revertSelection") else "1"
inverse_text = "AND" if s.get("revertSelection") else "OR"
if not values:
condition = f"FILTER{addition} ISNULL({s['expression'].split(' AS ')[0]})={inverse}"
else:
condition += f" {inverse_text} ISNULL({s['expression'].split(' AS ')[0]})={inverse}"
query.add(PQLFilter(condition))
if s.get("kind") == "FILTER":
cel_operator = (
" "
if s.get("configuration").get("operator") is None
else f""" ON {s.get("configuration").get('operator')}"""
)
# Activity Selection
if s["configuration"].get("type") == "ACTIVITY":
condition = f""" FILTER PROCESS {cel_operator} """
if s.get("configuration").get("noFlowThrough") != []:
condition += "NOT EQUALS " + "'" + "','".join(s.get("configuration").get("noFlowThrough")) + "'"
elif s.get("configuration").get("flowThrough") != []:
condition += "EQUALS " + "'" + "','".join(s.get("configuration").get("flowThrough")) + "'"
elif s.get("configuration").get("end") != []:
condition += f""" EQUALS '{s.get("configuration").get('end')[0]}' end """
elif s.get("configuration").get("start") != []:
condition += f""" EQUALS start '{s.get("configuration").get('start')[0]}' """
# Process Flow Selection
elif s["configuration"].get("type") == "PROCESS_FLOW":
condition = f""" FILTER PROCESS """ # noqa: F541
# never followed directly
if s["configuration"].get("mode") == "not_direct":
condition += f""" {cel_operator} """
condition += f""" NOT EQUALS '{s["configuration"].get("source")}' TO '{s["configuration"].get("target")}' """ # noqa: E501
# followed directly
elif s["configuration"].get("mode") == "direct":
condition += f""" {cel_operator} """
condition += f""" EQUALS '{s["configuration"].get("source")}' TO '{s["configuration"].get("target")}' """ # noqa: E501
# eventually followed by
elif s["configuration"].get("mode") == "eventually":
# TODO: Here there is no operator given back
condition += f""" EQUALS '{s["configuration"].get("source")}' TO ANY TO '{s["configuration"].get("target")}' """ # noqa: E501
# never followed by
elif s["configuration"].get("mode") == "not_eventually":
# TODO: Here there is no operator given back
condition += f""" NOT EQUALS '{s["configuration"].get("source")}' TO ANY TO '{s["configuration"].get("target")}' """ # noqa: E501
query.add(PQLFilter(condition))
return query
@property
def _saved_formula_data(self):
return self.celonis.api_request(f"{self.url}/kpi")
@property
def _saved_images(self):
return self.celonis.api_request(f"{self.url}/assets")
draft
property
writable
¶
Get/Set the Base Draft Document of the Analysis.
images: CelonisCollection[BaseAnalysisImage]
property
readonly
¶
Get all images of the Analysis.
API
GET: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/assets
Returns:
Type | Description |
---|---|
CelonisCollection[BaseAnalysisImage] |
A Collection of Analysis Images. |
published: BasePublishedDocument
property
writable
¶
Get/Set the Base Published Document of the Analysis.
saved_formulas: CelonisCollection[BaseAnalysisSavedFormula]
property
readonly
¶
Get all saved formulas of the Analysis.
API
GET: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/kpi
Returns:
Type | Description |
---|---|
CelonisCollection[BaseAnalysisSavedFormula] |
A Collection of Analysis Saved Formulas. |
backup_content(self, backup_path='.')
¶
Make a Backup of the Analysis in JSON format.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
backup_path |
Union[str, pathlib.Path] |
The path where the Backup Folder will be created. |
'.' |
Returns:
Type | Description |
---|---|
Path |
Path to the Backup Folder. |
Source code in celonis_api/process_analytics/analysis.py
def backup_content(self, backup_path: typing.Union[str, pathlib.Path] = ".") -> pathlib.Path:
"""Make a Backup of the Analysis in JSON format.
Args:
backup_path: The path where the Backup Folder will be created.
Returns:
Path to the Backup Folder.
"""
path = pathlib.Path(backup_path) / f"Backup of Analysis - {pathify(self.name)}"
if path.exists():
shutil.rmtree(path)
path.mkdir()
files = {"saved_formulas.json": self._saved_formula_data, "name.json": self.name}
for i in ["draft", "published"]:
data = getattr(self, i).data
if data["document"]:
files[f"{i}_variables.json"] = data["document"].pop("variables", [])
sheets = data["document"].pop("components", [])
for n, s in enumerate(sheets):
files[f"{i}_sheet_{n:02}.json"] = s
files[f"{i}.json"] = data
for k, v in files.items():
(path / k).write_text(json.dumps(v, sort_keys=True, indent=2))
if len(self.images) > 0:
img_path = path / "images"
img_path.mkdir()
for img in self.images:
img.download(img_path / f"{img.id}.png")
return path
create_image_upload(self, path)
¶
Upload an image to the Analysis.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, pathlib.Path] |
Path to the image file. |
required |
Returns:
Type | Description |
---|---|
Dict |
Upload result. |
Source code in celonis_api/process_analytics/analysis.py
def create_image_upload(self, path: typing.Union[str, pathlib.Path]) -> typing.Dict:
"""Upload an image to the Analysis.
!!! api "API"
- `POST: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/assets`
```json
{
"file": <bytes>
}
```
Args:
path: Path to the image file.
Returns:
Upload result.
"""
path = pathlib.Path(path)
return self.celonis.api_request(f"{self.url}/assets", path)
create_saved_formula(self, name, description='', template='', parameters=None, **kwargs)
¶
Creates a new Saved Formula.
API
POST: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/kpi
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the Saved Formula. |
required |
description |
str |
Description of the KPI. |
'' |
template |
str |
Template of the Saved Formula. |
'' |
parameters |
Parameters of the Saved Formula. |
None |
|
**kwargs |
optional |
{} |
Returns:
Type | Description |
---|---|
BaseAnalysisSavedFormula |
The newly created Saved Formula. |
Source code in celonis_api/process_analytics/analysis.py
def create_saved_formula(
self, name: str, description: str = "", template: str = "", parameters: typing.List[str] = None, **kwargs
) -> 'BaseAnalysisSavedFormula':
"""Creates a new Saved Formula.
!!! api "API"
- `POST: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/kpi`
```json
{
"name": name,
"description": description,
"template": template,
"parameters": parameters
}
```
Args:
name: Name of the Saved Formula.
description: Description of the KPI.
template: Template of the Saved Formula.
parameters : Parameters of the Saved Formula.
**kwargs : optional
Returns:
The newly created Saved Formula.
"""
parameters = parameters or []
payload = {"name": name, "description": description, "template": template, "parameters": parameters}
payload.update(kwargs)
response = self.celonis.api_request(f"{self.url}/kpi", payload)
return BaseAnalysisSavedFormula(self, response)
process_shared_selection_url(self, shared_url)
¶
Returns PQL object containing variables and filters from shared selection url.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
shared_url |
str |
The shared url. |
required |
Returns:
Type | Description |
---|---|
PQL |
A PQL query containing variables and filters. |
Source code in celonis_api/process_analytics/analysis.py
@deprecated("")
def process_shared_selection_url(self, shared_url: str) -> PQL: # noqa: C901 TODO: refactor method
"""Returns PQL object containing variables and filters from shared selection url.
Args:
shared_url: The shared url.
Returns:
A PQL query containing variables and filters.
"""
bookmark = re.search(r"/b/([A-z0-9-]+)", shared_url)
if not bookmark:
raise PyCelonisValueError("No bookmark reference found in URL.")
response = self.celonis.api_request(f"{self.url}/bookmarks/{bookmark[1]}")
query = PQL()
query.add(self.published.calculate_variables(response["analysisState"]["variables"]))
for s in response["analysisState"]["selections"]:
if s.get("kind") == "SELECTION":
if s.get("type") == "DATE_FILTER":
if s["configuration"].get("dateSearchStart"):
start_date = s["configuration"].get("dateSearchStart") / 1000
start_date_converted = datetime.datetime.utcfromtimestamp(start_date).strftime("%Y-%m-%d")
condition = f"FILTER ROUND_DAY({s['expression']}) >= TO_TIMESTAMP('{start_date_converted}', 'YYYY-MM-DD')" # noqa: E501
query.add(PQLFilter(condition))
if s["configuration"].get("dateSearchEnd"):
end_date = s["configuration"].get("dateSearchEnd") / 1000
end_date_converted = datetime.datetime.utcfromtimestamp(end_date).strftime("%Y-%m-%d")
condition = f"FILTER ROUND_DAY({s['expression']}) <= TO_TIMESTAMP('{end_date_converted}', 'YYYY-MM-DD')" # noqa: E501
query.add(PQLFilter(condition))
else:
if s.get("type") == "STRING":
values = [f"'{v}'" for v in s["values"] if v is not None]
else:
values = [f"{v}" for v in s["values"] if v is not None]
addition = " DOMAIN " if not (values and None in s["values"]) else ""
# all not None values
if values:
values = ",".join(values) # type: ignore
inverse = " NOT" if s.get("revertSelection") else ""
condition = f"FILTER{addition} {s['expression'].split(' AS ')[0]}{inverse} IN ({values})"
# add NULL filter
if None in s["values"]:
inverse = "0" if s.get("revertSelection") else "1"
inverse_text = "AND" if s.get("revertSelection") else "OR"
if not values:
condition = f"FILTER{addition} ISNULL({s['expression'].split(' AS ')[0]})={inverse}"
else:
condition += f" {inverse_text} ISNULL({s['expression'].split(' AS ')[0]})={inverse}"
query.add(PQLFilter(condition))
if s.get("kind") == "FILTER":
cel_operator = (
" "
if s.get("configuration").get("operator") is None
else f""" ON {s.get("configuration").get('operator')}"""
)
# Activity Selection
if s["configuration"].get("type") == "ACTIVITY":
condition = f""" FILTER PROCESS {cel_operator} """
if s.get("configuration").get("noFlowThrough") != []:
condition += "NOT EQUALS " + "'" + "','".join(s.get("configuration").get("noFlowThrough")) + "'"
elif s.get("configuration").get("flowThrough") != []:
condition += "EQUALS " + "'" + "','".join(s.get("configuration").get("flowThrough")) + "'"
elif s.get("configuration").get("end") != []:
condition += f""" EQUALS '{s.get("configuration").get('end')[0]}' end """
elif s.get("configuration").get("start") != []:
condition += f""" EQUALS start '{s.get("configuration").get('start')[0]}' """
# Process Flow Selection
elif s["configuration"].get("type") == "PROCESS_FLOW":
condition = f""" FILTER PROCESS """ # noqa: F541
# never followed directly
if s["configuration"].get("mode") == "not_direct":
condition += f""" {cel_operator} """
condition += f""" NOT EQUALS '{s["configuration"].get("source")}' TO '{s["configuration"].get("target")}' """ # noqa: E501
# followed directly
elif s["configuration"].get("mode") == "direct":
condition += f""" {cel_operator} """
condition += f""" EQUALS '{s["configuration"].get("source")}' TO '{s["configuration"].get("target")}' """ # noqa: E501
# eventually followed by
elif s["configuration"].get("mode") == "eventually":
# TODO: Here there is no operator given back
condition += f""" EQUALS '{s["configuration"].get("source")}' TO ANY TO '{s["configuration"].get("target")}' """ # noqa: E501
# never followed by
elif s["configuration"].get("mode") == "not_eventually":
# TODO: Here there is no operator given back
condition += f""" NOT EQUALS '{s["configuration"].get("source")}' TO ANY TO '{s["configuration"].get("target")}' """ # noqa: E501
query.add(PQLFilter(condition))
return query
rebuild_content_from_backup(self, backup_path, keep_analysis_name=True)
¶
Overwrites the contents of the Analysis object with content from the Backup Folder.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
backup_path |
Union[str, pathlib.Path] |
The path where the Backup Folder is located. |
required |
keep_analysis_name |
bool |
If |
True |
Source code in celonis_api/process_analytics/analysis.py
def rebuild_content_from_backup( # noqa: C901 TODO: refactor method
self, backup_path: typing.Union[str, pathlib.Path], keep_analysis_name: bool = True
):
"""Overwrites the contents of the Analysis object with content from the Backup Folder.
Args:
backup_path: The path where the Backup Folder is located.
keep_analysis_name: If `True` keeps the Analysis name, else take from Backup.
"""
path = pathlib.Path(backup_path)
assert path.is_dir(), "backup_path must be directory"
if not keep_analysis_name:
match = re.search("Backup of Analysis - (.+)", str(path))
assert match and len(match.groups()) == 1, "name of analysis not found in backup_path"
if (path / "name.json").is_file():
self.name = json.loads((path / "name.json").read_text())
else:
self.name = match[1]
# Upload saved formulas
formulas = json.loads((path / "saved_formulas.json").read_text())
for saved_formula in self.saved_formulas:
saved_formula.delete()
for formula in formulas:
self.create_saved_formula(**formula)
# Upload images and create mapping of ids (stored/old id => new generated id after upload)
img_path = path / "images"
if img_path.is_dir():
old2new_image_id_mapping = dict()
not_uploaded_images = list()
for p in img_path.iterdir():
old_id = p.name.split(".")[0]
try:
img_response = self.create_image_upload(p)
except PyCelonisHTTPError:
not_uploaded_images.append(p.name)
new_id = img_response["id"]
old2new_image_id_mapping[old_id] = new_id
if not_uploaded_images:
self._logger.warning(
f"Image(s): {not_uploaded_images}\n"
"Could not be uploaded since 'Uploaded file format is not supported'. "
"Please verify the image file(s) format or check for damaged file(s)."
)
# Upload components and saved variables
for i in ["published", "draft"]:
if (path / f"{i}.json").is_file():
doc = getattr(self, i)
data = json.loads((path / f"{i}.json").read_text())
data["document"]["variables"] = json.loads((path / f"{i}_variables.json").read_text())
data["document"]["components"] = []
for s in sorted(path.glob(f"{i}_sheet_*.json")):
comp_data = json.loads(s.read_text())
# Update 'imageId' in image_component / mapping old imageId to new generated imageID (assets)
if img_path.is_dir():
for comp in comp_data["components"]:
if "imageId" in comp:
comp["imageId"] = old2new_image_id_mapping[comp["imageId"]]
data["document"]["components"].append(comp_data)
doc.data["document"] = data["document"]
else:
self._logger.warning(f"{i} document of analyses not found in Backup folder '{path}' ")
BaseAnalysisComponent (CelonisDataObject)
¶
Base Analysis Components object to interact with Celonis Process Analytics API.
Source code in celonis_api/process_analytics/analysis.py
class BaseAnalysisComponent(CelonisDataObject):
"""Base Analysis Components object to interact with Celonis Process Analytics API."""
_name_data_key = "title"
_ignore_name_mapping = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._name = self.data.get("title")
@property
def data(self):
return find_object_from_list(self.parent.data["components"], self.id)
@property
def pql_filter(self) -> typing.Optional[PQLFilter]:
"""Get the PQLFilter query of the Analysis filter (load script)."""
f = self.data.get("componentFilter")
return PQLFilter(f) if f else None
@property
def pql_columns(self) -> typing.List[PQLColumn]:
"""Get all PQL Columns with all dimensions and KPIs of the Analysis Component.
Returns:
A List of PQL Columns.
"""
d = self.data
if d.get("type") == "pql-table":
pql_list = []
for col in d.get("axis0", []) + d.get("axis2", []):
name = re.sub(r"#{([^}]+)}", r"\1", col["name"]).replace('"', "")
if (
not self._ignore_name_mapping
and "." in col["name"]
and col.get("translatedName") is not None
and "expression" not in col.get("translatedName").lower()
):
name = col.get("translatedName")
pql_list += [
PQLColumn(
query=col["text"],
name=name,
format=col.get("valueFormat"),
sorting_index=col.get("sortingIndex"),
sorting_direction=col.get("sorting"),
not_included=col.get("notIncluded"),
)
]
return pql_list
elif d.get("type") == "single-kpi":
return [PQLColumn(d.get("formula", {}).get("text"), self.name)]
else:
return []
@property
def pql_query(self) -> typing.List[typing.Dict]:
"""Get the PQL query needed to recreate the component including filters and variables.
Returns:
A List of Dictionaries containing
"""
temp = PQL(
[col for col in self.pql_columns if not col.not_included],
self.pql_filter,
self.parent.pql_filter,
self.parent.parent.pql_filter,
)
pattern = "<%=([^%]+)%>"
compvars = PQL(self.parent.parent.variables).variables
def find_vars_recursive(s):
for var in re.findall(pattern, s):
yield var.strip()
if var.strip() in compvars:
yield from find_vars_recursive(compvars[var.strip()])
var_names = list(
list(i)
for i in map(
find_vars_recursive,
[col.query for col in temp.columns]
+ [col.name for col in temp.columns]
+ [str(f) for f in temp.filters],
)
)
variables = [var for var in self.parent.parent.variables if var["name"] in list(itertools.chain(*var_names))]
return temp + self.parent.parent.calculate_variables(variables)
def get_data_frame(self, ignore_name_mapping: bool = False, chunked_download=False) -> pandas.DataFrame:
"""Get a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
with the Analysis components data.
Args:
chunked_download: Can be used to download big data sets in chunks (Datamodel permissions required).
This might be necessary if the data of the component is too large to be downloaded in a single chunk.
Returns:
Dataframe containing the Analysis components data.
"""
self._ignore_name_mapping = ignore_name_mapping
try:
return self.parent.parent.parent.get_data_frame(
self.pql_query, variables=self.parent.parent.variables, chunked_download=chunked_download
)
except PyCelonisHTTPError as ex:
if not chunked_download and (ex.response.reason == "Internal Server Error"):
self._logger.info(
"Exception raised while retrieving a DataFrame from an AnalysisComponent. "
"If the underlying data is big, this might be solved by activating chunking."
)
raise ex
data
property
readonly
¶
A reference to the data of this object.
pql_columns: List[pycelonis.celonis_api.pql.pql.PQLColumn]
property
readonly
¶
Get all PQL Columns with all dimensions and KPIs of the Analysis Component.
Returns:
Type | Description |
---|---|
List[pycelonis.celonis_api.pql.pql.PQLColumn] |
A List of PQL Columns. |
pql_filter: Optional[pycelonis.celonis_api.pql.pql.PQLFilter]
property
readonly
¶
Get the PQLFilter query of the Analysis filter (load script).
pql_query: List[Dict]
property
readonly
¶
Get the PQL query needed to recreate the component including filters and variables.
Returns:
Type | Description |
---|---|
List[Dict] |
A List of Dictionaries containing |
get_data_frame(self, ignore_name_mapping=False, chunked_download=False)
¶
Get a pandas.DataFrame with the Analysis components data.
Args: !!! chunked_download "Can be used to download big data sets in chunks (Datamodel permissions required)." This might be necessary if the data of the component is too large to be downloaded in a single chunk.
Returns:
Type | Description |
---|---|
DataFrame |
Dataframe containing the Analysis components data. |
Source code in celonis_api/process_analytics/analysis.py
def get_data_frame(self, ignore_name_mapping: bool = False, chunked_download=False) -> pandas.DataFrame:
"""Get a [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
with the Analysis components data.
Args:
chunked_download: Can be used to download big data sets in chunks (Datamodel permissions required).
This might be necessary if the data of the component is too large to be downloaded in a single chunk.
Returns:
Dataframe containing the Analysis components data.
"""
self._ignore_name_mapping = ignore_name_mapping
try:
return self.parent.parent.parent.get_data_frame(
self.pql_query, variables=self.parent.parent.variables, chunked_download=chunked_download
)
except PyCelonisHTTPError as ex:
if not chunked_download and (ex.response.reason == "Internal Server Error"):
self._logger.info(
"Exception raised while retrieving a DataFrame from an AnalysisComponent. "
"If the underlying data is big, this might be solved by activating chunking."
)
raise ex
BaseAnalysisDocument (CelonisApiObject)
¶
Base Analysis Document object to interact with Celonis Process Analytics API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parent |
CelonisApiObject |
Parent Analysis object. |
required |
Source code in celonis_api/process_analytics/analysis.py
class BaseAnalysisDocument(CelonisApiObject):
"""Base Analysis Document object to interact with Celonis Process Analytics API.
Args:
parent: Parent Analysis object.
"""
def __init__(self, parent: CelonisApiObject):
self.parent = parent
self.celonis = parent.celonis
self.static = False
self._data = None # type: ignore
@property
def sheets(self) -> 'CelonisCollection[BaseAnalysisSheet]':
"""Get/Set all Analysis Document Sheets.
Returns:
A Collection of Analysis Sheets.
"""
d = self.data.get("document", {}).get("components", [])
return CelonisCollection(BaseAnalysisSheet(self, s) for s in d)
@sheets.setter
def sheets(self, value: 'CelonisCollection[BaseAnalysisSheet]'):
if not isinstance(value, CelonisCollection):
raise PyCelonisTypeError("Argument 'value' must be type CelonisCollection.")
for v in value:
if not isinstance(v, BaseAnalysisSheet):
raise PyCelonisTypeError(
"Items of argument 'value' must be of type BaseAnalysisSheet, " f"but got {type(v)}."
)
self.data["components"] = [v.data for v in value]
def create_sheet(self, name: str) -> 'BaseAnalysisSheet':
"""Create a new Analysis Document Sheet.
Args:
name: Name of the new Analysis Document Sheet.
Returns:
The newly created Analysis Document Sheet.
"""
new = {
"name": name,
"components": [],
"position": {"top": 0, "left": 0, "height": 800, "width": 1200},
"id": str(uuid.uuid4()),
"contentType": "blank-sheet",
"format": "FULLSCREEN",
"sheetFilter": {"text": ""},
}
components_list = self.data["document"].get("components", [])
components_list.append(new)
self.data["document"]["components"] = components_list
return BaseAnalysisSheet(self, new)
@property
def components(self) -> 'CelonisCollection[BaseAnalysisComponent]':
"""Get all components of all Analysis Document Sheets.
Returns:
A Collection of Analysis Components.
"""
return CelonisCollection(c for s in self.sheets for c in s.components)
@property
def variables(self) -> typing.List[typing.Dict]:
"""Get/Set all Variables of the Analysis Document."""
return self.data["document"].get("variables", [])
@variables.setter
def variables(self, value: typing.List[typing.Dict]):
if not (isinstance(value, list) and all(isinstance(v, dict) for v in value)):
raise PyCelonisTypeError("Can only set list of dicts.")
self.data["document"]["variables"] = value
def calculate_variables(self, variables: typing.List[typing.Dict] = None) -> typing.List[typing.Dict]:
"""Calculates static values of all Variables of the Analysis Document.
Args:
variables: Optional list of variables to calculate.
Returns:
Variables with results of static variable formulas.
"""
doc_vars = self.variables
if not isinstance(variables, list):
variables = doc_vars
def calculate(v):
doc_var = next((i for i in doc_vars if i["name"] == v["name"]), {})
if v.get("type") == "static_value" or doc_var.get("type") == "static_value":
v = dict(**v)
value = self.parent.get_data_frame(v["value"], variables=variables).iloc[0, 0]
if getattr(value, "item", None):
value = value.item()
v["value"] = value
return v
return list(threaded(variables, calculate))
def create_variable(self, name: str, value: str, var_type: str = "text_replacement") -> typing.Dict:
"""Creates a new Analysis Document Variable.
Args:
name: Name of the new Variable.
value: Value of the new Variable.
var_type: Type of the new Variable.
Returns:
The newly created Analysis Document Variable.
"""
data = self.data
var = {"name": name, "type": var_type, "value": value}
data["document"]["variables"].append(var)
self.data = data
return var
@property
def pql_filter(self) -> typing.Optional[PQLFilter]:
"""Get the PQLFilter query of the Analysis filter (load script)."""
f = self.data["document"].get("statelessLoadScript")
return PQLFilter(f) if f else None
def __repr__(self):
return f"<{self.__class__.__name__} of {str(self.parent)}>"
components: CelonisCollection[BaseAnalysisComponent]
property
readonly
¶
Get all components of all Analysis Document Sheets.
Returns:
Type | Description |
---|---|
CelonisCollection[BaseAnalysisComponent] |
A Collection of Analysis Components. |
pql_filter: Optional[pycelonis.celonis_api.pql.pql.PQLFilter]
property
readonly
¶
Get the PQLFilter query of the Analysis filter (load script).
sheets: CelonisCollection[BaseAnalysisSheet]
property
writable
¶
Get/Set all Analysis Document Sheets.
Returns:
Type | Description |
---|---|
CelonisCollection[BaseAnalysisSheet] |
A Collection of Analysis Sheets. |
variables: List[Dict]
property
writable
¶
Get/Set all Variables of the Analysis Document.
calculate_variables(self, variables=None)
¶
Calculates static values of all Variables of the Analysis Document.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
variables |
List[Dict] |
Optional list of variables to calculate. |
None |
Returns:
Type | Description |
---|---|
List[Dict] |
Variables with results of static variable formulas. |
Source code in celonis_api/process_analytics/analysis.py
def calculate_variables(self, variables: typing.List[typing.Dict] = None) -> typing.List[typing.Dict]:
"""Calculates static values of all Variables of the Analysis Document.
Args:
variables: Optional list of variables to calculate.
Returns:
Variables with results of static variable formulas.
"""
doc_vars = self.variables
if not isinstance(variables, list):
variables = doc_vars
def calculate(v):
doc_var = next((i for i in doc_vars if i["name"] == v["name"]), {})
if v.get("type") == "static_value" or doc_var.get("type") == "static_value":
v = dict(**v)
value = self.parent.get_data_frame(v["value"], variables=variables).iloc[0, 0]
if getattr(value, "item", None):
value = value.item()
v["value"] = value
return v
return list(threaded(variables, calculate))
create_sheet(self, name)
¶
Create a new Analysis Document Sheet.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the new Analysis Document Sheet. |
required |
Returns:
Type | Description |
---|---|
BaseAnalysisSheet |
The newly created Analysis Document Sheet. |
Source code in celonis_api/process_analytics/analysis.py
def create_sheet(self, name: str) -> 'BaseAnalysisSheet':
"""Create a new Analysis Document Sheet.
Args:
name: Name of the new Analysis Document Sheet.
Returns:
The newly created Analysis Document Sheet.
"""
new = {
"name": name,
"components": [],
"position": {"top": 0, "left": 0, "height": 800, "width": 1200},
"id": str(uuid.uuid4()),
"contentType": "blank-sheet",
"format": "FULLSCREEN",
"sheetFilter": {"text": ""},
}
components_list = self.data["document"].get("components", [])
components_list.append(new)
self.data["document"]["components"] = components_list
return BaseAnalysisSheet(self, new)
create_variable(self, name, value, var_type='text_replacement')
¶
Creates a new Analysis Document Variable.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the new Variable. |
required |
value |
str |
Value of the new Variable. |
required |
var_type |
str |
Type of the new Variable. |
'text_replacement' |
Returns:
Type | Description |
---|---|
Dict |
The newly created Analysis Document Variable. |
Source code in celonis_api/process_analytics/analysis.py
def create_variable(self, name: str, value: str, var_type: str = "text_replacement") -> typing.Dict:
"""Creates a new Analysis Document Variable.
Args:
name: Name of the new Variable.
value: Value of the new Variable.
var_type: Type of the new Variable.
Returns:
The newly created Analysis Document Variable.
"""
data = self.data
var = {"name": name, "type": var_type, "value": value}
data["document"]["variables"].append(var)
self.data = data
return var
BaseAnalysisImage (CelonisDataObject)
¶
Analysis Image object to interact with Celonis Process Analytics API.
Source code in celonis_api/process_analytics/analysis.py
class BaseAnalysisImage(CelonisDataObject):
"""Analysis Image object to interact with Celonis Process Analytics API."""
@property
def data(self):
return find_object_from_list(self.parent._saved_images, self.id)
def download(self, file_path: typing.Union[str, pathlib.Path] = "./image.png") -> pathlib.Path:
"""Downloads the Analysis Image to the specified path.
!!! api "API"
- `GET: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/assets/{asset_id}/image`
Args:
file_path: Path where the image should be downloaded.
Returns:
Path to the downloaded file.
"""
file_path = pathlib.Path(file_path)
if file_path.is_file():
file_path.unlink()
img_file = self.parent.celonis.api_request(f"{self.parent.url}/assets/{self.id}/image", file_path)
return img_file
data
property
readonly
¶
A reference to the data of this object.
download(self, file_path='./image.png')
¶
Downloads the Analysis Image to the specified path.
API
GET: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/assets/{asset_id}/image
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
Union[str, pathlib.Path] |
Path where the image should be downloaded. |
'./image.png' |
Returns:
Type | Description |
---|---|
Path |
Path to the downloaded file. |
Source code in celonis_api/process_analytics/analysis.py
def download(self, file_path: typing.Union[str, pathlib.Path] = "./image.png") -> pathlib.Path:
"""Downloads the Analysis Image to the specified path.
!!! api "API"
- `GET: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/assets/{asset_id}/image`
Args:
file_path: Path where the image should be downloaded.
Returns:
Path to the downloaded file.
"""
file_path = pathlib.Path(file_path)
if file_path.is_file():
file_path.unlink()
img_file = self.parent.celonis.api_request(f"{self.parent.url}/assets/{self.id}/image", file_path)
return img_file
BaseAnalysisImageComponent (BaseAnalysisComponent)
¶
Analysis Image Components object to interact with Celonis Process Analytics API.
Source code in celonis_api/process_analytics/analysis.py
class BaseAnalysisImageComponent(BaseAnalysisComponent):
"""Analysis Image Components object to interact with Celonis Process Analytics API."""
def set_image(self, image: 'BaseAnalysisImage'):
"""Sets the image of the image component.
Args:
image: The image that the component should be set to.
"""
if isinstance(image, BaseAnalysisImage):
self.data["imageId"] = image.id
set_image(self, image)
¶
Sets the image of the image component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
BaseAnalysisImage |
The image that the component should be set to. |
required |
BaseAnalysisSavedFormula (CelonisDataObject)
¶
Analysis Saved Formula object to interact with Celonis Process Analytics API.
Source code in celonis_api/process_analytics/analysis.py
class BaseAnalysisSavedFormula(CelonisDataObject):
"""Analysis Saved Formula object to interact with Celonis Process Analytics API."""
@property
def data(self):
return find_object_from_list(self.parent._saved_formula_data, self.id)
def delete(self):
"""
!!! api "API"
- `DELETE: /process-mining/analysis/v1.2/api/analysis/{analysis_id}/kpi/{formula_id}`
"""
self.parent.celonis.api_request(f"{self.parent.url}/kpi/{self.id}", HttpMethod.DELETE)
BaseAnalysisSheet (CelonisDataObject)
¶
Analysis Sheet object to interact with Celonis Process Analytics API.
Source code in celonis_api/process_analytics/analysis.py
class BaseAnalysisSheet(CelonisDataObject):
"""Analysis Sheet object to interact with Celonis Process Analytics API."""
@property
def data(self):
return find_object_from_list(self.parent.data["document"]["components"], self.id)
@property
def components(self) -> 'CelonisCollection[BaseAnalysisComponent]':
"""Get/Set all Analysis Sheet Components.
Returns:
A Collection of Analysis Components.
"""
component_types = {"pql-table": BaseAnalysisTable, "image": BaseAnalysisImageComponent}
d = self.data.get("components", [])
if self.data.get("contentType", "") == "conformance-checking":
# TODO: Define BaseAnalysis Conformance-Checking Class
self._logger.warning("Warning: Conformance Sheets are not supported yet, and will be skipped.")
d = []
return CelonisCollection(component_types.get(s["type"], BaseAnalysisComponent)(self, s) for s in d)
@components.setter
def components(self, value):
if not isinstance(value, CelonisCollection):
raise PyCelonisTypeError("Argument 'value' must be type CelonisCollection.")
for v in value:
if not isinstance(v, BaseAnalysisComponent):
raise PyCelonisTypeError(
"Items of argument 'value' must be of type BaseAnalysisComponent, " f"but got {type(v)}."
)
self.data["components"] = [v.data for v in value]
@property
def pql_filter(self) -> typing.Optional[PQLFilter]:
"""Get the PQLFilter query of the Analysis filter (load script)."""
f = self.data.get("sheetFilter", {}).get("text")
return PQLFilter(f) if f else None
components: CelonisCollection[BaseAnalysisComponent]
property
writable
¶
Get/Set all Analysis Sheet Components.
Returns:
Type | Description |
---|---|
CelonisCollection[BaseAnalysisComponent] |
A Collection of Analysis Components. |
data
property
readonly
¶
A reference to the data of this object.
pql_filter: Optional[pycelonis.celonis_api.pql.pql.PQLFilter]
property
readonly
¶
Get the PQLFilter query of the Analysis filter (load script).
BaseAnalysisTable (BaseAnalysisComponent)
¶
Analysis Table Components object to interact with Celonis Process Analytics API.
Source code in celonis_api/process_analytics/analysis.py
class BaseAnalysisTable(BaseAnalysisComponent):
"""Analysis Table Components object to interact with Celonis Process Analytics API."""
def add_column(self, column: typing.Union[str, PQLColumn], column_type: str, **kwargs):
"""Adds a PQL Column to the Analysis Table.
Args:
column: The column name to add or the PQLColumn with the column to add.
column_type: Type of the column added. One of [`dimension`, `kpi`].
"""
if isinstance(column, PQLColumn):
column_text = column.query
column_name = column.name
data_type = column.format
if column.sorting_index:
kwargs["sortingIndex"] = column.sorting_index
kwargs["sorting"] = column.sorting_direction
elif isinstance(column, str):
# check if column_type is valid
if column_type.lower() not in ["kpi", "dimension"]:
self._logger.info(
f" {column_type} is not a valid column type. Please select either 'kpi' or 'dimension'"
)
return
# check if column exists in datamodel
table_name_split, column_name_split = column.split(".")
datamodel_tables = self.parent.parent.parent.datamodel.tables
datamodel_table = next((table for table in datamodel_tables if table.name == table_name_split), None)
if not datamodel_table:
self._logger.info(f"No result found for table: {table_name_split}")
return
datamodel_column = next(
(column for column in datamodel_table.columns if column["name"] == column_name_split), None
)
if not datamodel_column:
self._logger.info(f"No result found for column {column_name_split} in table {table_name_split}")
return
column_text = column
column_name = column
data_type = datamodel_column["type"]
else:
raise PyCelonisTypeError(f"Wrong type for {column} (supported types: str and PQLColumn)")
column_format = {
"flex": kwargs.get("flex", 1),
"name": kwargs.get("name", f"#{{{column_name}}}"),
"notIncluded": kwargs.get("notIncluded", False),
"ratioFormula": kwargs.get("ratioFormula", {}),
"seriesColor": kwargs.get("seriesColor", ""),
"seriesColorMapping": kwargs.get("seriesColorMapping", ""),
"sortingIndex": kwargs.get("sortingIndex", None),
"sorting": kwargs.get("sorting", None),
"text": kwargs.get("text", column_text),
"textColor": kwargs.get("textColor", ""),
"textColorMapping": kwargs.get("textColorMapping", ""),
"translatedName": kwargs.get("translatedName", None),
"units": kwargs.get("units", ""),
"valueFormat": kwargs.get("valueFormat", None),
"width": kwargs.get("width", 100),
}
if data_type == "DATE":
column_format["valueFormat"] = kwargs.get("valueFormat", "%Y-%m-%d")
column_format["function"] = kwargs.get(
"function",
{
"DATE": True,
"displayName": "Round Day",
"example": "2015-02-22",
"name": "Round_Day",
"valueFormat": "%Y-%m-%d",
},
)
if data_type == "FLOAT":
column_format["valueFormat"] = kwargs.get("valueFormat", ",f")
if column_type.lower() == "dimension":
column_format["type"] = kwargs.get("type", "dimension")
if column_type.lower() == "kpi":
column_format["dataLabelStyle"] = kwargs.get("dataLabelStyle", {})
column_format["dataLabelsPosition"] = kwargs.get("dataLabelsPosition", "custom")
column_format["fillStyle"] = kwargs.get("fillStyle", {"opacity": 1})
column_format["ignoreSeriesPalettes"] = kwargs.get("ignoreSeriesPalettes", True)
column_format["seriesPalette"] = kwargs.get("seriesPalette", "")
column_format["showDataLabels"] = kwargs.get("showDataLabels", False)
column_format["sorting"] = kwargs.get("sorting", "DESC")
column_format["text"] = (
kwargs.get("text", f"SUM({column_text})") if data_type == "FLOAT" else column_format["text"]
)
# add column
axis = "axis0" if column_type == "dimension" else "axis2"
self.data[axis] = self.data[axis] + [column_format]
def remove_column(self, column_name):
"""Removes a Column to the Analysis Table.
Args:
column_name: The column name to remove.
"""
def get_column_name(column_text):
cleaned_text = re.split("[()]", column_text)
column_name = column_text if len(cleaned_text) == 1 else cleaned_text[int(len(cleaned_text) / 2)]
return column_name.lower()
for axis in ["axis2", "axis0"]:
column = next((i for i in self.data[axis] if get_column_name(i["text"]) in column_name.lower()), False)
if column:
self.data[axis] = [
table_column for table_column in self.data[axis] if table_column["text"] != column["text"]
]
self._logger.info(f"Removed column {column_name} from table {self.name}")
return True
self._logger.info(f"No result found for column: {column_name}")
add_column(self, column, column_type, **kwargs)
¶
Adds a PQL Column to the Analysis Table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column |
Union[str, pycelonis.celonis_api.pql.pql.PQLColumn] |
The column name to add or the PQLColumn with the column to add. |
required |
column_type |
str |
Type of the column added. One of [ |
required |
Source code in celonis_api/process_analytics/analysis.py
def add_column(self, column: typing.Union[str, PQLColumn], column_type: str, **kwargs):
"""Adds a PQL Column to the Analysis Table.
Args:
column: The column name to add or the PQLColumn with the column to add.
column_type: Type of the column added. One of [`dimension`, `kpi`].
"""
if isinstance(column, PQLColumn):
column_text = column.query
column_name = column.name
data_type = column.format
if column.sorting_index:
kwargs["sortingIndex"] = column.sorting_index
kwargs["sorting"] = column.sorting_direction
elif isinstance(column, str):
# check if column_type is valid
if column_type.lower() not in ["kpi", "dimension"]:
self._logger.info(
f" {column_type} is not a valid column type. Please select either 'kpi' or 'dimension'"
)
return
# check if column exists in datamodel
table_name_split, column_name_split = column.split(".")
datamodel_tables = self.parent.parent.parent.datamodel.tables
datamodel_table = next((table for table in datamodel_tables if table.name == table_name_split), None)
if not datamodel_table:
self._logger.info(f"No result found for table: {table_name_split}")
return
datamodel_column = next(
(column for column in datamodel_table.columns if column["name"] == column_name_split), None
)
if not datamodel_column:
self._logger.info(f"No result found for column {column_name_split} in table {table_name_split}")
return
column_text = column
column_name = column
data_type = datamodel_column["type"]
else:
raise PyCelonisTypeError(f"Wrong type for {column} (supported types: str and PQLColumn)")
column_format = {
"flex": kwargs.get("flex", 1),
"name": kwargs.get("name", f"#{{{column_name}}}"),
"notIncluded": kwargs.get("notIncluded", False),
"ratioFormula": kwargs.get("ratioFormula", {}),
"seriesColor": kwargs.get("seriesColor", ""),
"seriesColorMapping": kwargs.get("seriesColorMapping", ""),
"sortingIndex": kwargs.get("sortingIndex", None),
"sorting": kwargs.get("sorting", None),
"text": kwargs.get("text", column_text),
"textColor": kwargs.get("textColor", ""),
"textColorMapping": kwargs.get("textColorMapping", ""),
"translatedName": kwargs.get("translatedName", None),
"units": kwargs.get("units", ""),
"valueFormat": kwargs.get("valueFormat", None),
"width": kwargs.get("width", 100),
}
if data_type == "DATE":
column_format["valueFormat"] = kwargs.get("valueFormat", "%Y-%m-%d")
column_format["function"] = kwargs.get(
"function",
{
"DATE": True,
"displayName": "Round Day",
"example": "2015-02-22",
"name": "Round_Day",
"valueFormat": "%Y-%m-%d",
},
)
if data_type == "FLOAT":
column_format["valueFormat"] = kwargs.get("valueFormat", ",f")
if column_type.lower() == "dimension":
column_format["type"] = kwargs.get("type", "dimension")
if column_type.lower() == "kpi":
column_format["dataLabelStyle"] = kwargs.get("dataLabelStyle", {})
column_format["dataLabelsPosition"] = kwargs.get("dataLabelsPosition", "custom")
column_format["fillStyle"] = kwargs.get("fillStyle", {"opacity": 1})
column_format["ignoreSeriesPalettes"] = kwargs.get("ignoreSeriesPalettes", True)
column_format["seriesPalette"] = kwargs.get("seriesPalette", "")
column_format["showDataLabels"] = kwargs.get("showDataLabels", False)
column_format["sorting"] = kwargs.get("sorting", "DESC")
column_format["text"] = (
kwargs.get("text", f"SUM({column_text})") if data_type == "FLOAT" else column_format["text"]
)
# add column
axis = "axis0" if column_type == "dimension" else "axis2"
self.data[axis] = self.data[axis] + [column_format]
remove_column(self, column_name)
¶
Removes a Column to the Analysis Table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column_name |
The column name to remove. |
required |
Source code in celonis_api/process_analytics/analysis.py
def remove_column(self, column_name):
"""Removes a Column to the Analysis Table.
Args:
column_name: The column name to remove.
"""
def get_column_name(column_text):
cleaned_text = re.split("[()]", column_text)
column_name = column_text if len(cleaned_text) == 1 else cleaned_text[int(len(cleaned_text) / 2)]
return column_name.lower()
for axis in ["axis2", "axis0"]:
column = next((i for i in self.data[axis] if get_column_name(i["text"]) in column_name.lower()), False)
if column:
self.data[axis] = [
table_column for table_column in self.data[axis] if table_column["text"] != column["text"]
]
self._logger.info(f"Removed column {column_name} from table {self.name}")
return True
self._logger.info(f"No result found for column: {column_name}")
BaseDraftDocument (BaseAnalysisDocument)
¶
Analysis Draft Document object to interact with Celonis Process Analytics API.
Source code in celonis_api/process_analytics/analysis.py
class BaseDraftDocument(BaseAnalysisDocument):
"""Analysis Draft Document object to interact with Celonis Process Analytics API."""
@property
def url(self) -> str:
"""
!!! api "API"
- `/process-mining/analysis/v1.2/api/analysis/{analysis_id}/autosave'`
"""
return f"{self.parent.url}/autosave"
@property
def data(self):
return super().data
@data.setter
def data(self, value):
self._data = value
self.celonis.api_request(self.url, self._data, method=HttpMethod.PUT, params={"release": False})
def publish(self):
"""Converts the Analysis Draft Document into a Published Document."""
self.parent.published.data = self.data
return self.parent.published
data
property
writable
¶
Response data from the Celonis API.
If static
is set to False
, every time you set the data
property will execute a POST
request to
the resource API endpoint to update the remote resource in Celonis EMS.
Examples:
url: str
property
readonly
¶
API
/process-mining/analysis/v1.2/api/analysis/{analysis_id}/autosave'
publish(self)
¶
BasePublishedDocument (BaseAnalysisDocument)
¶
Analysis Published Document object to interact with Celonis Process Analytics API.
Source code in celonis_api/process_analytics/analysis.py
class BasePublishedDocument(BaseAnalysisDocument):
"""Analysis Published Document object to interact with Celonis Process Analytics API."""
@property
def url(self) -> str:
"""
!!! api "API"
- `/process-mining/analysis/v1.2/api/analysis/{analysis_id}/published'`
"""
return f"{self.parent.url}/published"
@property
def data(self):
return super().data
@data.setter
def data(self, value):
self._data = value
self.celonis.api_request(f"{self.parent.url}/history/", self._data)
self.celonis.api_request(self.parent.draft.url, self._data, method=HttpMethod.PUT, params={"release": True})
data
property
writable
¶
Response data from the Celonis API.
If static
is set to False
, every time you set the data
property will execute a POST
request to
the resource API endpoint to update the remote resource in Celonis EMS.
Examples:
url: str
property
readonly
¶
API
/process-mining/analysis/v1.2/api/analysis/{analysis_id}/published'
StudioAnalysis (Analysis)
¶
Analysis object to interact with Celonis Process Analytics API. Use celonis_api.studio.analysis.Analysis to interact with Studio Analysis.
Source code in celonis_api/process_analytics/analysis.py
class StudioAnalysis(Analysis):
"""Analysis object to interact with Celonis Process Analytics API.
Use [celonis_api.studio.analysis.Analysis][] to interact with Studio Analysis.
"""
@property
def url(self) -> str:
"""
!!! api "API"
- `/process-analytics/analysis/v2/api/analysis/{analysis_id}`
"""
return f"{self.celonis.url}/process-analytics/analysis/v2/api/analysis/{self.id}"
@property
def web_link(self):
"""This method is not implemented."""
raise PyCelonisNotImplementedError
def delete(self):
"""This method is not implemented."""
raise PyCelonisNotImplementedError
def move(self, to: str, target_workspace: typing.Union[str, 'Workspace']):
"""This method is not implemented."""
raise PyCelonisNotImplementedError
def backup_content(self, backup_path: typing.Union[str, pathlib.Path] = "."):
"""This method is not implemented."""
raise PyCelonisNotImplementedError
def rebuild_content_from_backup(
self, backup_path: typing.Union[str, pathlib.Path], keep_analysis_name: bool = True
):
"""This method is not implemented."""
raise PyCelonisNotImplementedError
def process_shared_selection_url(self, shared_url: str):
"""This method is not implemented."""
raise PyCelonisNotImplementedError
url: str
property
readonly
¶
API
/process-analytics/analysis/v2/api/analysis/{analysis_id}
web_link
property
readonly
¶
This method is not implemented.