Skip to content

compute_node_factory.py

ComputeNodeFactory

Factory to create a ComputeNote for the Datamodel.

Source code in celonis_api/event_collection/compute_node_factory.py
class ComputeNodeFactory:
    """Factory to create a ComputeNote for the Datamodel."""

    @staticmethod
    def create_compute_node(
        id: str, datamodel: 'Datamodel', celonis: 'Celonis'
    ) -> typing.Union[ComputeNode, HybridPseudoComputeNode]:
        """Creates a new ComputeNode object for the Datamodel (Hybrid or Cloud).
        Args:
            id: 'ID'
            datamodel: Datamodel object.
            celonis: Celonis Base object.

        Returns:
            The respective ComputeNode object for the Datamodel.
        """
        from pycelonis.celonis_api.event_collection.data_pool import HybridPool

        if isinstance(datamodel.parent, HybridPool):
            return HybridPseudoComputeNode(datamodel=datamodel, id=id)
        else:
            return ComputeNode(celonis, {"id": id})

create_compute_node(id, datamodel, celonis) staticmethod

Creates a new ComputeNode object for the Datamodel (Hybrid or Cloud).

Parameters:

Name Type Description Default
id str

'ID'

required
datamodel Datamodel

Datamodel object.

required
celonis Celonis

Celonis Base object.

required

Returns:

Type Description
Union[pycelonis.celonis_api.event_collection.compute_node.ComputeNode, pycelonis.celonis_api.event_collection.compute_node.HybridPseudoComputeNode]

The respective ComputeNode object for the Datamodel.

Source code in celonis_api/event_collection/compute_node_factory.py
@staticmethod
def create_compute_node(
    id: str, datamodel: 'Datamodel', celonis: 'Celonis'
) -> typing.Union[ComputeNode, HybridPseudoComputeNode]:
    """Creates a new ComputeNode object for the Datamodel (Hybrid or Cloud).
    Args:
        id: 'ID'
        datamodel: Datamodel object.
        celonis: Celonis Base object.

    Returns:
        The respective ComputeNode object for the Datamodel.
    """
    from pycelonis.celonis_api.event_collection.data_pool import HybridPool

    if isinstance(datamodel.parent, HybridPool):
        return HybridPseudoComputeNode(datamodel=datamodel, id=id)
    else:
        return ComputeNode(celonis, {"id": id})