How to implement get_data_for_table

The get_data_for_table method returns data for each record of each table defined in your metadata. Its output is an ExtractionData object which is imported directly from the celoxtractor.types module. As displayed below, ExtractionData contains a list (all records) of lists (all fields for a specific record).

Important: the order of columns in the output of get_data_for_table must be identical to the order of columns in the output of get_metadata.

Here is a sample get_data_for_table implementation to illustrate the data structure:
from celoxtractor.extractor import CelonisExtractor
from celoxtractor.types import Table, Column, ExtractionData
from celoxtractor.utils import get_variable_from_filter

class TestExtractor(CelonisExtractor):

    # Metadata is defined as explained in the "How to implement get_metadata" section
    def get_metadata(self, parameters):
        users_table = Table("users")
        users_table.columns = [Column("name", "STRING"), Column("user_id", "INTEGER"), Column("birthDate", "DATETIME")]

        departments_table = Table("departments")
        departments_table.columns = [Column("department_name", "STRING"), Column("department_id", "INTEGER")]

        return [users_table, departments_table]

    # The get_data_for table method can leverage additional arguments which are discussed in the "Advanced" sections below
    def get_data_for_table(self, table_name, parameters, is_delta_load, extraction_filter):
        # Initiate an ExtractionData object
        extraction_data = ExtractionData()
        # Depending on which table the Python Extractor user selects, define which data will be returned
        if table_name == "users":
            # Here, the data for "users" table records is hard-coded to illustrate the data structure.
            # Note that you could equally send a specific API request here, e.g. (see "Real-Life Example" below)
            extraction_data.records = [
                ["user1", 1, datetime.strptime("1990-06-04 14:32:45", '%Y-%m-%d %H:%M:%S')],
                ["user2", 2, datetime.strptime("1987-02-34 12:47:21", '%Y-%m-%d %H:%M:%S')],
            ];
        if table_name == "departments":
            extraction_data.records = [
                ["Engineering", 32],
                ["Product Management", 12]
            ];
        # Return the ExtractionData object containing the table's data as a list of lists
        return extraction_data
And here is how it would look like in the IBC after the extraction:
Table Structure Table Structure

Real-Life Example

This sample Python script extracts the “sys_id” and “sys_updated_on” columns from every table of the ServiceNow API dynamically.

from celoxtractor.extractor import CelonisExtractor
from celoxtractor.types import Table, Column, ExtractionData
from celoxtractor.utils import get_variable_from_filter

# Import additional libraries that make it easier to send API requests
import requests
from requests.auth import HTTPBasicAuth
import datetime

class ServiceNowSysIdExtractor(CelonisExtractor):

    # Get metadata using ServiceNow's REST API
    def get_metadata(self, parameters):
        tables = []
        response = requests.get('https://dev80458.service-now.com/api/now/table/sys_db_object', auth = HTTPBasicAuth(parameters["username"], parameters["password"])).json()
        for record in list(response.values())[0]:
            table = Table(record["name"])
            table.columns = [Column("sys_id", "STRING"), Column("sys_updated_on", "DATETIME")]
            tables.append(table)
        return tables

    # Get metadata using ServiceNow's REST API
    def get_data_for_table(self, table_name, parameters, is_delta_load, extraction_filter):
        extraction_data = ExtractionData()
        response = requests.get('https://dev80458.service-now.com/api/now/table/' + table_name, auth = HTTPBasicAuth(parameters["username"], parameters["password"])).json()
        for record in list(response.values())[0]:
            extraction_data.records.append([record["sys_id"], datetime.datetime.strptime(record["sys_updated_on"], '%Y-%m-%d %H:%M:%S')])
        return extraction_data

Advanced: How to use the is_delta_load parameter

Each extraction is executed as either full or delta load. To enable your Python Extractor to handle both full and delta loads, you can implement the is_delta_load parameter (boolean; also comes directly with Celoxtractor). For example:

def get_data_for_table(self, table_name, parameters, is_delta_load, extraction_filter):
    # Define the initial API endpoint
    url = 'https://my-company.com/internal/api/get_event_logs'
    # Depending on whether you want to execute a delta or full load, add different request parameters
    if is_delta_load:
        # For delta loads, only extract records that were updated after one day ago
        url += "?updated_after=" + (datetime.datetime.now() - datetime.timedelta(days = 1)).strftime("%y%m%d")
    else:
        # For full loads, extract all records created in 2020
        url += "?created_after=20200101"
    # Fetch the data
    response = requests.get(url).json()

    # TODO parse the response and return records

After implementing is_delta_load in your script, you can flexibly trigger full or delta loads directly from the IBC:

_images/delta_load_example.png

Advanced: How to use extraction filters

Celoxtractor allows you to access all custom filters that you have defined in the IBC’s extraction configuration. To achieve that, Celoxtractor passes those filters in the extraction_filter field (SQL format). Below is an example to illustrate how that works.

We define the filters in the native IBC environment as follows:
_images/extraction_filter.png
In full mode, the value of extraction_filter for those filters will then be:
( my_create_date > '2018-01-01 00:00:00' AND my_create_date > '2022-01-01 00:00:00' )
In delta mode, its value will be:
( ( my_create_date > '2018-01-01 00:00:00' AND my_create_date < '2022-01-01 00:00:00' ) AND my_update_date > '2020-09-07 01:08:30' )
To use the filters in your Python script, you can parse the extraction_filter yourself or just use get_variable_from_filter (comes with celoxtractor.utils module). For example:
from celoxtractor.utils import get_variable_from_filter

def get_data_for_table(self, table_name, parameters, is_delta_load, extraction_filter):
    # Define the initial API endpoint
    url = 'https://my-company.com/internal/api/get_event_logs'
    # Depending on whether you want to execute a delta or full load, add different request parameters
    if is_delta_load:
            # For delta loads, only extract records that were updated after the date defined as delta filter in the IBC
            update_date_after = get_variable_from_filter(extraction_filter, 'my_update_date', '>', 'DATETIME')
            url += "?updated_after=" + update_date_after.strftime("%y%m%d")
    else:
            # For full loads, extract records that were created between the dates defined as filters in the IBC
            create_date_before = get_variable_from_filter(extraction_filter, 'my_create_date', '<', 'DATETIME')
            create_date_after =  get_variable_from_filter(extraction_filter, 'my_create_date', '>', 'DATETIME')

            url += "?created_after=" + create_date_after.strftime("%y%m%d") + "&created_before=" + create_date_before.strftime("%y%m%d")

    response = requests.get(url).json()
    # TODO parse the response and return records

Advanced: How to use batching

Celoxtractor’s batching feature allows Celonis to start processing extraction data in the background while your Python Extractor script continues to run. That can be helpful to resolve various memory exceptions and fasten your extraction.

Batching is introduced via the flush_extraction_data method as shown in the example below.

def get_data_for_table(self, table_name, parameters, is_delta_load, extraction_filter):

    extraction_data = ExtractionData()

    if table_name == "users":

            # Records for the first batch
            extraction_data.records = [
                ["user1", 1, datetime.strptime("1991-01-01 14:32:11", '%Y-%m-%d %H:%M:%S')],
                ["user2", 2, datetime.strptime("1992-02-02 20:30:22", '%Y-%m-%d %H:%M:%S')],
                ["user3", 3, datetime.strptime("1993-03-03 14:32:33", '%Y-%m-%d %H:%M:%S')]
            ]

            # Subtable records for the first batch
            extraction_data.sub_table_records = {
                "user_login": [
                    [1, datetime.strptime("2001-11-25 14:32:11", '%Y-%m-%d %H:%M:%S')]
                ]
            }

            # This line sends current data to Celonis and returns you a new and empty ExtractionData object
            extraction_data = self.flush_extraction_data(extraction_data)

            # Records for the second batch
            extraction_data.records = [
                ["user4", 4, datetime.strptime("2004-04-24 20:30:44", '%Y-%m-%d %H:%M:%S')],
                ["user5", 5, datetime.strptime("2005-05-25 14:32:55", '%Y-%m-%d %H:%M:%S')]
            ]

    return extraction_data