Skip to content

data_push_job

Module to interact with data push jobs.

This module contains class to interact with a data push job in EMS data integration.

Typical usage example:

```python
data_push_job = data_pool.create_data_push_job("TABLE_NAME", JobType.REPLACE)
data_push_job.add_file_chunk(file)
data_push_job.execute()
```

DataPushJob

Bases: DataPushJob

Data push job object to interact with data push job specific data integration endpoints.

tenant_id class-attribute instance-attribute

tenant_id = Field(alias='tenantId')

last_modified class-attribute instance-attribute

last_modified = Field(alias='lastModified')

last_ping class-attribute instance-attribute

last_ping = Field(alias='lastPing')

status class-attribute instance-attribute

status = Field(alias='status')

type_ class-attribute instance-attribute

type_ = Field(alias='type')

file_type class-attribute instance-attribute

file_type = Field(alias='fileType')

target_schema class-attribute instance-attribute

target_schema = Field(alias='targetSchema')

upsert_strategy class-attribute instance-attribute

upsert_strategy = Field(alias='upsertStrategy')

fallback_varchar_length class-attribute instance-attribute

fallback_varchar_length = Field(
    alias="fallbackVarcharLength"
)

post_execution_query class-attribute instance-attribute

post_execution_query = Field(alias='postExecutionQuery')

sanitized_post_execution_query class-attribute instance-attribute

sanitized_post_execution_query = Field(
    alias="sanitizedPostExecutionQuery"
)

allow_duplicate class-attribute instance-attribute

allow_duplicate = Field(alias='allowDuplicate')

foreign_keys class-attribute instance-attribute

foreign_keys = Field(alias='foreignKeys')

keys class-attribute instance-attribute

keys = Field(alias='keys')

logs class-attribute instance-attribute

logs = Field(alias='logs')

table_schema class-attribute instance-attribute

table_schema = Field(alias='tableSchema')

csv_parsing_options class-attribute instance-attribute

csv_parsing_options = Field(alias='csvParsingOptions')

mirror_target_names class-attribute instance-attribute

mirror_target_names = Field(alias='mirrorTargetNames')

change_date class-attribute instance-attribute

change_date = Field(alias='changeDate')

optional_tenant_id class-attribute instance-attribute

optional_tenant_id = Field(alias='optionalTenantId')

client class-attribute instance-attribute

client = Field(..., exclude=True)

id instance-attribute

id

Id of data push job.

data_pool_id instance-attribute

data_pool_id

Id of data pool where data push job is located.

target_name instance-attribute

target_name

Name of table where data is pushed to.

connection_id instance-attribute

connection_id

Id of data connection where data is pushed to.

from_transport classmethod

from_transport(client, data_push_job_transport)

Creates high-level data push job object from given DataPushJobTransport.

Parameters:

  • client (Client) –

    Client to use to make API calls for given data push job.

  • data_push_job_transport (DataPushJob) –

    DataPushJobTransport object containing properties of data push job.

Returns:

  • DataPushJob

    A DataPushJob object with properties from transport and given client.

sync

sync()

Syncs data push job properties with EMS.

delete

delete()

Deletes data push job.

add_file_chunk

add_file_chunk(file)

Adds file chunk to data push job.

Parameters:

  • file (Union[BytesIO, BufferedReader]) –

    File stream to be upserted within data push job.

Examples:

Create data push job to replace table:

data_push_job = data_pool.create_data_push_job(
    target_name="ACTIVITIES",
    type_=JobType.REPLACE
)

with open("ACTIVITIES.parquet", "rb") as file:
    data_push_job.add_file_chunk(file)

data_push_job.execute()

delete_file_chunk

delete_file_chunk(file)

Deletes file chunk from data push job.

Parameters:

  • file (Union[BytesIO, BufferedReader]) –

    File stream to be deleted within data push job.

Examples:

Create data push job to delete table:

data_push_job = data_pool.create_data_push_job(
    target_name="ACTIVITIES",
    type_=JobType.DELTA,
    keys=["_CASE_KEY"]
)

with open("ACTIVITIES.parquet", "rb") as file:
    data_push_job.delete_file_chunk(file)

data_push_job.execute()

add_data_frame

add_data_frame(df, chunk_size=100000, index=False)

Splits data frame into chunks of size chunk_size and adds each chunk to data push job.

Parameters:

  • df (DataFrame) –

    Data frame to push with given data push job.

  • chunk_size (int, default: 100000 ) –

    Number of rows for each chunk.

  • index (Optional[bool], default: False ) –

    Whether index is included in parquet file. See pandas documentation.

Examples:

Create data push job to replace table:

data_push_job = data_pool.create_data_push_job(
    target_name="ACTIVITIES",
    type_=JobType.REPLACE
)

df = pd.DataFrame({"ID": ["aa", "bb", "cc"], "TEST_COLUMN": [1, 2, 3]})
data_push_job.add_data_frame(df)

data_push_job.execute()

delete_data_frame

delete_data_frame(df, chunk_size=100000, index=False)

Splits data frame into chunks of size chunk_size and deletes each chunk to data push job.

Parameters:

  • df (DataFrame) –

    Data frame to push with given data push job.

  • chunk_size (int, default: 100000 ) –

    Number of rows for each chunk.

  • index (Optional[bool], default: False ) –

    Whether index is included in parquet file. See pandas documentation.

Examples:

Create data push job to delete table:

data_push_job = data_pool.create_data_push_job(
    target_name="ACTIVITIES",
    type_=JobType.DELTA,
    keys=["_CASE_KEY"]
)

df = pd.DataFrame({"_CASE_KEY": ["aa", "bb", "cc"], "TEST_COLUMN": [1, 2, 3]})
data_push_job.delete_data_frame(df)

data_push_job.execute()

get_chunks

get_chunks()

Gets all chunks of given data push job.

Returns:

execute

execute(wait=True)

Execute given data push job.

Parameters:

  • wait (bool, default: True ) –

    If true, function only returns once data push job has been executed and raises error if reload fails. If false, function returns after triggering execution and does not raise errors in case it failed.

Raises:

Examples:

Create data push job to replace table:

data_push_job = data_pool.create_data_push_job(
    target_name="ACTIVITIES",
    type_=JobType.REPLACE
)

df = pd.DataFrame({"ID": ["aa", "bb", "cc"], "TEST_COLUMN": [1,2, 3]})
data_push_job.add_data_frame(df)

data_push_job.execute()