Skip to content

Deprecations

Deprecation Warnings

In order to keep PyCelonis up-to-date and guarantee support, some outdated modules are marked deprecated and will be removed from PyCelonis in Version 2.0:
- Data Deduplication: please contact Service Desk to migrate to the official Data Deduplication product.

DuplicateChecker

Class to check for data deduplication.

Source code in data_deduplication/duplicate_checker.py
class DuplicateChecker:
    """Class to check for data deduplication."""

    def __init__(self, **kwargs) -> None:
        self._tracker = PyCelonisTracker()
        self.n_jobs = 1
        self._max_chunk_size = 5000
        self.celonis_patterns = {
            "CompanyNameFuzzy": "comparers.CompanyNameComparer(column, column,label=column,"
            "threshold=0.85,method='jarowinkler')",
            "DateFuzzy": "comparers.DateComparer(column, column, label=column)",
            "InvoiceReferenceFuzzy": "comparers.ReferenceComparer(column, column)",
            "InvoiceValueFuzzy": "comparers.InvoiceAmountComparer(column, column, "
            "method=\"linear\", offset=0.0, scale=40, label=column)",
            "MaterialFuzzy": "comparers.MaterialComparer(column, column)",
            "StringFuzzy": "recordlinkage.compare.String(column, column, "
            "method='jarowinkler', threshold=0.85, missing_value=0.0, "
            "label=column)",
            "ZipCodeComparer": "comparers.CompareZipCodes(column,column,label=column)",
        }
        # TODO: this will allow to switch between catalog matching and incremental matching.
        self._include_df_internal_matches = True
        self.fast_mode = False
        self.disable_tqdm = False

        warnings.warn(
            "Deprecation: The module 'data_deduplication' is deprecated and will be removed in PyCelonis version 2.0.\n"
            "Please contact Service Desk to migrate to the official Data Deduplication product.",
            DeprecationWarning,
            stacklevel=2,
        )

    def apply(
        self,
        df: pd.DataFrame,
        search_patterns: typing.Dict,
        unique_id_columns: typing.Union[str, typing.List[str]],
        df_reference: pd.DataFrame = None,
        max_chunk_size: int = 5000,
        return_preprocessed_df: bool = False,
        allow_group_intersection: bool = False,
        fast_mode: bool = False,
        disable_tqdm: bool = False,
    ) -> typing.Union[pd.DataFrame, typing.Tuple[pd.DataFrame, pd.DataFrame]]:
        """
        Computes the duplicates on the table df, based on the search patterns
        specified in the init of the DuplicateChecker Object.
        If df_reference is given, in addition the duplicates between df and
        df_reference are computed.

        Args:

            df : pd.DataFrame
                DataFrame containing the unique_id_columns and the columns that
                are to be compared against each other to find duplicates.
            search_patterns: dict, optional if search_patterns already set before.
                dict containing key value pairs of the form pattern_name: pattern
                where pattern_name is a string and pattern is a dict where the keys
                are
                columns of df and the values are the matching logic to apply. E.g.:
                search_patterns={
                "Some Patter Name":{
                    "VENDOR_NAME": "exact",
                    "INVOICE_DATE": "exact",
                    "REFERENCE": "different",
                    "VALUE": "exact",
                    "_VENDOR_ID": "exact"},...}

                The last used search patterns will always be stored in the
                DuplicateChecker object under .search_patterns .
            unique_id_columns : str or List[str]
                The column or list of columns to be used a unique identifier.
            df_reference : pd.DataFrame, optional
                DataFrame of same structure containing the already processed items,
                all items of df will be checked against each other and against all
                of df_reference, NOT checked will the items of df_reference against
                 each other, by default None.
            max_chunk_size : int, optional
                Size of the chunks compared at a time, decrease if memory problems occur,
                 increase for speed. takes

        Returns:
            pd.DataFrame:
                DataFrame containing duplicates rows of df + 2 Additional columns:
                * GROUP_ID : Column which uniquely identifies and maps together those
                    rows that are duplciates of each other.
                * PATTERN : Name of the seach pattern that maps the items of a group.
        """
        self.allow_group_intersection = allow_group_intersection
        self.disable_tqdm = disable_tqdm
        self._tracker.track(
            "Run duplicate checking",
            extra={
                "tracking_type": "DUPLICATE_CHECKER",
            },
        )

        self.fast_mode = fast_mode
        self._max_chunk_size = max_chunk_size
        if isinstance(unique_id_columns, str):
            unique_id_columns = [unique_id_columns]
        elif isinstance(unique_id_columns, list):
            unique_id_columns = unique_id_columns
        else:
            raise TypeError("unique_id_columns must be a string or a list of strings.")

        preprocessed_df = self._preprocess_df(df, unique_id_columns, df_reference)
        processed_search_patterns = self._process_search_patterns(preprocessed_df, search_patterns)
        results = self._compute_duplicates(preprocessed_df, processed_search_patterns)
        # cleaning the results
        if not self.allow_group_intersection:
            results = self._remove_group_intersection(results)
        results = self._remove_faulty_groups(results)
        df = df.drop_duplicates(unique_id_columns, keep="first")
        results = pd.concat(
            [results, df[~df.set_index(unique_id_columns).index.isin(results.set_index(unique_id_columns).index)]],
            ignore_index=True,
        )
        results["MATCH_FOUND"] = np.where(results["GROUP_ID"].isna(), "NO", "YES")
        results = results.drop("_CASE_KEY_GENERATED", axis=1, errors="ignore")

        if return_preprocessed_df:
            return results, preprocessed_df

        return results

    def _remove_group_intersection(self, results):
        """Drops intersecting groups if one invoice in more than one group,
        drop all but the biggest one (containing most items)."""
        if not results.empty:
            # find case_key that are duplicated + corresponding group ids
            intersect_groups = results[results["_CASE_KEY_GENERATED"].duplicated(keep=False)].GROUP_ID.drop_duplicates()
            group_sizes = (
                results[results.GROUP_ID.isin(intersect_groups)]
                .groupby("GROUP_ID")
                .count()["_CASE_KEY_GENERATED"]
                .reset_index()
            )
            group_sizes.rename(columns={"_CASE_KEY_GENERATED": "N_ITEMS_IN_GROUP"}, inplace=True)

            # of the found groups filter these containing most cases
            groups_to_keep = results[results["_CASE_KEY_GENERATED"].duplicated(keep=False)]
            groups_to_keep = groups_to_keep.merge(group_sizes, on="GROUP_ID", how="left")
            groups_to_keep = (
                groups_to_keep.sort_values(["_CASE_KEY_GENERATED", "N_ITEMS_IN_GROUP"], ascending=False)
                .drop_duplicates(['_CASE_KEY_GENERATED'], keep="first")
                .GROUP_ID
            )

            groups_to_drop = intersect_groups[~intersect_groups.isin(groups_to_keep)]
            results = results[~results.GROUP_ID.isin(groups_to_drop)]
            results = results.drop_duplicates(["_CASE_KEY_GENERATED", "GROUP_ID"], keep="first")
            results = results.drop_duplicates(["_CASE_KEY_GENERATED"], keep="first")
        return results

    def _remove_faulty_groups(self, results):
        # drop groups that contain less than 1 item in it
        # TODO: this is a bug but unclear where and how it happens
        if not results.empty:
            df_help = results.groupby("GROUP_ID").count()
            results = results[~results.GROUP_ID.isin(df_help[df_help["_CASE_KEY_GENERATED"] < 2].index)]

        return results

    def _merge_df_reference_df(self, df, df_reference, unique_id_columns):
        """Merges df and df_reference and checks if are compatible"""
        if not all(col in df.columns for col in unique_id_columns):
            raise KeyError("Not all unique_id_columns contained in df.")
        if not all(col in df_reference.columns for col in unique_id_columns):
            raise KeyError("Not all unique_id_columns contained in df_reference.")

        for col in set(df.columns.tolist() + df_reference.columns.tolist()):
            if col not in df.columns:
                self._logger.warning(f"Column {col} contained in df but not in df_reference")
            if col not in df_reference.columns:
                self._logger.warning(f"Column {col} contained in df_reference but not in df_reference")

        df["ONLY_IN_DF"] = True
        df_reference["ONLY_IN_DF"] = False
        df = pd.concat([df, df_reference], ignore_index=True)
        return df

    def _drop_na_with_warning(self, df, subset):
        """For record linkage NAs need to be dropped, this function makes it verbose"""
        nas = df[subset].isna()
        for col in nas:
            number_nas = nas[col].sum()
            if number_nas > 0:
                self._logger.warning(f"{number_nas} rows of column {col} are 'NA'. They will be dropped")
        df.dropna(subset=subset, inplace=True)
        return df

    def _preprocess_df(self, df, unique_id_columns, df_reference=None):
        """Drop duplicate and separate into case and data columns"""
        self._logger.info("Preprocessing DataFrame")

        if df_reference is not None:
            df = self._merge_df_reference_df(df, df_reference, unique_id_columns)

        df = self._drop_na_with_warning(df, subset=unique_id_columns)
        # access case key
        for col in unique_id_columns:
            if col not in df.columns:
                raise KeyError(
                    f"df does not contain the column {col}. Please add a unique identifier column with this name."
                )

        if len(unique_id_columns) == 1:
            df.loc[:, "_CASE_KEY_GENERATED"] = df[unique_id_columns[0]]
        else:
            df.loc[:, "_CASE_KEY_GENERATED"] = df[unique_id_columns].astype(str).agg("-".join, axis=1)
        # drop rows with same key
        if df.duplicated(subset=unique_id_columns).sum() > 0:
            self._logger.warning("The specified unique_id_columns are not unique, duplicated ids will be dropped.")
            df = df.drop_duplicates(unique_id_columns)

        def _remove_time_zone_info(df):
            """
            Timezone information is removed from datetime columns,
            because recordlinkage doesn't accept
            datetime columns with timezones.
            """
            for column in df.select_dtypes(include=["datetimetz"]):
                df[column] = df[column].dt.tz_convert(None)

            return df

        df = _remove_time_zone_info(df)
        # self._sanity_check(df)
        return df

    def _compute_duplicates(self, df, processed_search_patterns):
        """Runs the duplicate checker with the current settings for fuzzy + exact"""
        results = pd.DataFrame()
        if df.empty:
            raise ValueError("The dataframe df is empty. No cases to be processed")
        results_exact = self._compute_exact_matches(df, processed_search_patterns)
        results_fuzzy = self._compute_fuzzy_matches(df, processed_search_patterns)

        # Implied hierachy that fuzzy matches are more important than exact

        if not self.allow_group_intersection and not results_exact.empty and not results_fuzzy.empty:
            results_fuzzy = results_fuzzy[~results_fuzzy["GROUP_ID"].isin(results_exact["GROUP_ID"])]
            results_exact = results_exact[
                ~results_exact["_CASE_KEY_GENERATED"].isin(results_fuzzy["_CASE_KEY_GENERATED"])
            ]
        results = pd.concat([results_fuzzy, results_exact], ignore_index=True)

        if not results.empty:
            results = results.sort_values(by="GROUP_ID")
        if 'ONLY_IN_DF' in results.columns:
            results["ONLY_IN_DF"] = np.where(results["ONLY_IN_DF"], 1, 0)
        return results

    def _chunk_df_fuzzy_matching(self, df, patterns):
        """If df is too big to process it is cut into smaller chunks"""
        chunks = []
        for pattern_name, pattern in patterns.items():
            if pattern.get("compare_type") == "fuzzy":
                compare_columns = pattern.get("compare_columns")
                link_df = df.copy()
                link_df = self._drop_na_with_warning(link_df, compare_columns)
                blocked_columns = pattern.get("blocked_columns", False)
                if len(compare_columns) == 1 and not blocked_columns and self.fast_mode:
                    chunks += self._chunk_via_starting_character(link_df, compare_columns, pattern_name, pattern)
                elif "ONLY_IN_DF" in df.columns and blocked_columns:
                    # keep only those rows who have duplicates on blocked (exact matching) columns
                    link_df = link_df[link_df[compare_columns].duplicated(subset=blocked_columns, keep=False)]
                    contains_df = link_df[blocked_columns + ["ONLY_IN_DF"]].groupby(blocked_columns).max()
                    contains_df = contains_df[contains_df].reset_index()
                    contains_df.rename(columns={"ONLY_IN_DF": "contains_df_entry"}, inplace=True)
                    link_df = link_df.merge(contains_df, on=blocked_columns, how="inner").drop(
                        "contains_df_entry", axis=1
                    )
                    df1 = link_df[link_df.ONLY_IN_DF]
                    if self._include_df_internal_matches:
                        df2 = link_df
                    else:
                        df2 = link_df[~link_df.ONLY_IN_DF]
                    chunks += [
                        {
                            "df1": df1,
                            "df2": df2,
                            "pattern_name": pattern_name,
                            "compare_columns": compare_columns,
                            "pattern": pattern,
                        }
                    ]
                else:
                    if blocked_columns:
                        link_df = link_df[df[compare_columns].duplicated(subset=blocked_columns, keep=False)]
                    n = link_df.shape[0]
                    if n < 2:
                        self._logger.info(f"No duplicates found for {pattern_name}")
                        continue

                    if n <= self._max_chunk_size:
                        chunks += [
                            {
                                "df1": link_df,
                                "df2": None,
                                "pattern_name": pattern_name,
                                "compare_columns": compare_columns,
                                "pattern": pattern,
                            }
                        ]
                        remaining_df = pd.DataFrame()

                    elif blocked_columns:
                        self._logger.info(f"{n} rows of df need to be compared for {pattern_name}")
                        link_df["CATEGORY"] = link_df[blocked_columns].astype(str).agg("-".join, axis=1)
                        group_counts = link_df["CATEGORY"].value_counts().copy()
                        remaining_df = link_df
                        while group_counts.shape[0] > 0 and group_counts.min() < self._max_chunk_size:
                            x = group_counts.sort_values()[
                                group_counts.sort_values().cumsum() < self._max_chunk_size
                            ].index
                            chunks += [
                                {
                                    "df1": link_df[link_df["CATEGORY"].isin(x.tolist())].drop("CATEGORY", axis=1),
                                    "df2": None,
                                    "compare_columns": compare_columns,
                                    "pattern": pattern,
                                    "pattern_name": pattern_name,
                                }
                            ]
                            remaining_df = remaining_df[~remaining_df["CATEGORY"].isin(x.tolist())]
                            group_counts = group_counts.drop(x)
                        if group_counts.shape[0] > 0:
                            remaining_df = remaining_df.drop("CATEGORY", axis=1)
                    else:
                        remaining_df = link_df

                    if not remaining_df.empty:
                        df_chunked = np.array_split(
                            remaining_df, self._calculate_optimal_chunk_size(remaining_df.shape[0])
                        )
                        processed_cases = []
                        for df_chunk in df_chunked:
                            chunks += [
                                {
                                    "df1": df_chunk,
                                    "df2": remaining_df[~remaining_df["_CASE_KEY_GENERATED"].isin(processed_cases)],
                                    "pattern": pattern,
                                    "pattern_name": pattern_name,
                                    "compare_columns": compare_columns,
                                }
                            ]
                            processed_cases += df_chunk["_CASE_KEY_GENERATED"].tolist()
        return chunks

    def _chunk_via_starting_character(self, link_df, compare_columns, pattern_name, pattern):
        df = link_df.copy()
        df["clean_string"] = df[compare_columns[0]].astype(str).str.lower().replace(r"[^a-zA-ZА-я\d]", "", regex=True)
        df_chunks = self._chunk_by_char_recursive(df, 0, [])
        chunks = []
        for df1 in df_chunks:
            chunks += [
                {
                    "df1": df1,
                    "df2": None,
                    "pattern_name": pattern_name,
                    "compare_columns": compare_columns,
                    "pattern": pattern,
                }
            ]

        return chunks

    def _chunk_by_char_recursive(self, df, i, chunks):
        """Recursively chunks the data by the i-th character from the beginning
        relays on the assumption that the characters in the front are more important
        """
        if df.shape[0] <= self._max_chunk_size:
            chunks += [df.loc[:, df.columns != "clean_string"]]
        else:
            char_list = df["clean_string"].str[i].unique()
            for char in char_list:
                chunks = self._chunk_by_char_recursive(df[df["clean_string"].str[i] == char], i + 1, chunks)
        return chunks

    def _calculate_optimal_chunk_size(self, dfsize):
        # TODO: how to find the optimal chunk size ?
        return dfsize / self._max_chunk_size

    def _compute_exact_matches(self, df, patterns):
        """Computes duplicates for patterns only involving exact matches"""
        possible_duplicates = pd.DataFrame()
        for pattern_name, pattern in patterns.items():
            if pattern.get("compare_type") == "exact":
                compare_columns = pattern.get("compare_columns")
                differ_columns = pattern.get("differ_columns")

                exact_duplicates = df[["_CASE_KEY_GENERATED"] + compare_columns + differ_columns].copy()
                exact_duplicates = self._drop_na_with_warning(exact_duplicates, compare_columns)
                exact_duplicates = exact_duplicates[exact_duplicates.duplicated(compare_columns, keep=False)]
                if differ_columns:
                    for col in differ_columns:
                        exact_duplicates = exact_duplicates[
                            ~exact_duplicates.duplicated(compare_columns + [col], keep=False)
                        ]
                    exact_duplicates = exact_duplicates.drop(differ_columns, axis=1)
                    exact_duplicates = exact_duplicates[exact_duplicates.duplicated(compare_columns, keep=False)]
                exact_duplicates = self._create_exact_group_ids(df, exact_duplicates, compare_columns, pattern_name)
                if "ONLY_IN_DF" in df.columns:
                    # filter on only pairs that contains 'new' entries
                    contains_df_entry = exact_duplicates[["GROUP_ID", "ONLY_IN_DF"]].groupby("GROUP_ID").max()
                    contains_df_entry = contains_df_entry[contains_df_entry["ONLY_IN_DF"]].index.tolist()
                    exact_duplicates = exact_duplicates[exact_duplicates.GROUP_ID.isin(contains_df_entry)]
                possible_duplicates = possible_duplicates.append(exact_duplicates)
        return possible_duplicates

    def _compute_fuzzy_matches(self, df, patterns):
        """Creates chunks and runs record linakge on the chunks"""
        self._logger.info("Searching for fuzzy matches ...")
        possible_duplicates = pd.DataFrame()
        chunks = self._chunk_df_fuzzy_matching(df, patterns)
        for chunk in tqdm.tqdm(chunks, disable=self.disable_tqdm):
            try:
                new_results = self._run_fuzzy_matching_on_chunk(chunk)
            except ZeroDivisionError:
                continue

            possible_duplicates = self._add_new_duplicates(new_results, possible_duplicates)

        possible_duplicates.drop("id_list", axis=1, inplace=True, errors="ignore")
        return possible_duplicates

    def _add_new_duplicates(self, new_results, all_results):
        """Needed for chunking to integrate subgroups into the bigger groups found in another chunk"""
        if not new_results.empty and not all_results.empty:
            intersection = new_results["_CASE_KEY_GENERATED"].isin(all_results["_CASE_KEY_GENERATED"])
            if not self.allow_group_intersection and any(intersection):
                keys = new_results[intersection]["_CASE_KEY_GENERATED"]
                id_list_new = new_results.drop_duplicates("GROUP_ID")[["id_list", "GROUP_ID"]]
                id_list_old = all_results[all_results["_CASE_KEY_GENERATED"].isin(keys)].drop_duplicates("GROUP_ID")[
                    ["id_list", "GROUP_ID"]
                ]
                for _, id_new in id_list_new.iterrows():
                    for _, id_old in id_list_old.iterrows():
                        if set(id_new.id_list) <= set(id_old.id_list):
                            new_results = new_results[new_results["GROUP_ID"] != id_new.GROUP_ID]
                        elif set(id_old.id_list) <= set(id_new.id_list):
                            all_results = all_results[all_results["GROUP_ID"] != id_old.GROUP_ID]

        all_results = all_results.append(new_results)
        return all_results

    def _run_fuzzy_matching_on_chunk(self, chunk):
        """Given a chunk callucates the fuzzy matches for this chunk"""
        pattern = chunk.get("pattern", None)
        pattern_name = chunk.get("pattern_name", None)
        if pattern is None or pattern_name is None:
            raise ValueError("the dict chunk needs to contain the pattern and pattern_name")
        compare_columns = chunk.get("compare_columns")
        differ_columns = pattern.get("differ_columns")
        df1 = chunk.get("df1", None)
        df2 = chunk.get("df2", None)
        if df1 is None:
            raise ValueError("No input dataframe found.")
        elif chunk.get("df2", None) is not None:
            df = pd.concat([df1, df2[~df2["_CASE_KEY_GENERATED"].isin(df1["_CASE_KEY_GENERATED"])]]).copy()
            # self._logger.info(
            #    f"running {df1.shape[0]} vs. {df2.shape[0]} for pattern: {pattern_name}"
            # )
            data_links = pattern.get("indexer").index(df1[compare_columns], df2[compare_columns])
            results = pattern.get("comparer").compute(data_links, df1[compare_columns], df2[compare_columns])
        else:
            df = df1
            data_links = pattern.get("indexer").index(df1)
            results = pattern.get("comparer").compute(data_links, df1[compare_columns])

        matches = results[results.sum(axis=1) > pattern.get("n_comparison_columns") - 1]
        groups = self._create_fuzzy_group_ids(df, matches)
        groups = groups.merge(df, on="_CASE_KEY_GENERATED")
        groups["PATTERN"] = pattern_name
        if differ_columns:
            differ_counts = (
                groups[differ_columns + ["GROUP_ID"]]
                .groupby("GROUP_ID")
                .nunique()
                .drop(columns="GROUP_ID", errors="ignore")
                .min(axis=1)
            )
            not_differing_groups = differ_counts[differ_counts == 1].index.tolist()
            groups = groups[~groups.GROUP_ID.isin(not_differing_groups)]
        id_list_df = groups.groupby('GROUP_ID')["_CASE_KEY_GENERATED"].apply(list).reset_index(name='id_list')
        groups = groups.merge(id_list_df, on="GROUP_ID", how="left")
        return groups

    def _create_fuzzy_group_ids(self, df, matches):
        """Matches from recordlinkage are turned into groups with a unique ID."""
        cases = df[["_CASE_KEY_GENERATED"]].copy()
        temp_df = pd.DataFrame(index=cases.index)
        temp_df["GROUP_ID"] = ""
        m_pivot = matches.reset_index()
        m_pivot.rename(columns={"level_0": "index_1", "level_1": "index_2"}, inplace=True)
        m_pivot["group_n"] = m_pivot.groupby("index_1").cumcount()
        m_pivot = m_pivot.pivot(index="index_1", columns="group_n", values="index_2")
        for i, r in m_pivot.drop_duplicates().iterrows():
            g = list(set(m_pivot[m_pivot.values == r.values].index.dropna().tolist() + r.dropna().tolist()))
            s = cases.loc[g, :].agg("".join, axis=1).tolist()
            s = ",".join(sorted(set(s)))
            group_name = "IDs:" + f"({s})"
            temp_df.loc[g, "GROUP_ID"] = group_name

        temp_df = temp_df[temp_df["GROUP_ID"] != ""]
        groups = cases.merge(temp_df, how="inner", left_index=True, right_index=True)
        groups.drop_duplicates(inplace=True)
        return groups

    def _create_exact_group_ids(self, df, exact_duplicates, compare_columns, pattern_name):
        """Matches from exact matching are turned into groups with a unique ID."""

        def _sort_join(x):
            return ",".join(sorted(x))

        groups = exact_duplicates.groupby(compare_columns, as_index=False).agg({"_CASE_KEY_GENERATED": _sort_join})
        groups.rename(columns={"_CASE_KEY_GENERATED": "GROUP_ID"}, inplace=True)
        groups["GROUP_ID"] = "IDs:(" + groups["GROUP_ID"] + ")"
        exact_duplicates = exact_duplicates.merge(groups, on=compare_columns)
        exact_duplicates = df.merge(
            exact_duplicates[["_CASE_KEY_GENERATED", "GROUP_ID"]], on="_CASE_KEY_GENERATED", how="inner"
        )
        exact_duplicates["PATTERN"] = pattern_name
        return exact_duplicates

    def _sanity_check(self, df):
        """Check settings"""
        if df.shape[0] > 1000000 and df.select_dtypes(include=["float64"]).columns:
            self._logger.warning("Size of DataFrame is very large, will some hours to process everything.")
        elif df.shape[0] > 1000000:
            self._logger.warning("Size of DataFrame is very large, and might lead to a memory error.")
        if self._max_chunk_size > 5000:
            self._logger.warning(
                "max_chunk_size is very large, this will probably lead to a MemoryError \
                 if run in a standard Celonis ML Workbench because of too little RAM."
            )

    def _process_search_patterns(self, df, search_patterns):  # noqa: C901
        """
        Instanciates record linkage comparer objects based on the provided search_patterns
        data columns to be checked for duplicates.
        Parameters
        ----------
        search_patterns : dict
            dict containing the search patterns,
            keys are names of the pattern, values should look like this:
            {
                "All Exact but REFERENCE different":{
                    "VENDOR_NAME": "exact",
                    "INVOICE_DATE": "exact",
                    "REFERENCE": "different",
                    "VALUE": "exact",
                    "_VENDOR_ID": "exact"
                }
            }
        """
        if search_patterns is None:
            raise ValueError("Please provide search_patterns.")

        processed_search_pattern = {}
        for pattern_name, current_pattern in search_patterns.items():
            differ_columns = []
            compare_columns = []
            # TODO: make this a property
            possible_inputs = list(self.celonis_patterns.keys()) + ["ignore", "exact", "different"]
            for column, matching_type in current_pattern.items():
                if column not in df.columns:
                    raise KeyError(
                        f"In PATTERN: {pattern_name} a comparison "
                        f"method is given for {column}, "
                        "which is missing from the dataframe df."
                    )
                elif matching_type == "different":
                    differ_columns += [column]
                elif matching_type != "ignore":
                    compare_columns += [column]
            if all(p in ["ignore", "exact", "different"] for p in list(current_pattern.values())):
                compare_type = "exact"
                indexer = blocked_columns = n_comparison_columns = comparer = None
            else:
                compare_type = "fuzzy"
                # needed inputs to build record linkage objects
                blocked_columns = []
                n_comparison_columns = 0
                indexer = recordlinkage.Index()
                comparer = recordlinkage.Compare()  # n_jobs=4
                blocks_exist = False
                for column in current_pattern:
                    if current_pattern[column] == "exact":
                        blocks_exist = True
                        blocked_columns.append(column)
                    elif current_pattern[column] in ["ignore", "different"]:
                        continue
                    else:
                        if current_pattern[column] in self.celonis_patterns.keys():
                            exec("comparer.add(" + self.celonis_patterns.get(current_pattern[column]) + ")")
                        else:
                            try:
                                if isinstance(current_pattern[column], str):
                                    exec("comparer.add(" + current_pattern[column] + ")")
                                else:
                                    comparer.add(current_pattern[column])
                            except (NameError, SyntaxError, ValueError, TypeError):
                                raise ValueError(
                                    f"The comparison method '{current_pattern[column]}' for column '{column}'"
                                    f"given in PATTERN: '{pattern_name}' is not a valid input. "
                                    f"It needs to be one of the standard types: {', '.join(possible_inputs)} "
                                    f"or one of the generic record linkage algorithms:\n"
                                    f"https://recordlinkage.readthedocs.io/en/latest/"
                                    f"ref-compare.html#module-recordlinkage.compare\n"
                                    f"where left_on, right_on, label = column. E.g.:\n"
                                    f"\"String(column, column, method='levenshtein', "
                                    f"threshold=None, missing_value=0.0, label=column)\"\n"
                                )
                        n_comparison_columns += 1
                if blocks_exist:
                    indexer.block(blocked_columns)
                else:
                    indexer.full()
            processed_search_pattern.update(
                {
                    pattern_name: {
                        "indexer": indexer,
                        "blocked_columns": blocked_columns,
                        "n_comparison_columns": n_comparison_columns,
                        "comparer": comparer,
                        "compare_columns": compare_columns,
                        "compare_type": compare_type,
                        "differ_columns": differ_columns,
                        "search_pattern": current_pattern,
                    }
                }
            )
        return processed_search_pattern

apply(self, df, search_patterns, unique_id_columns, df_reference=None, max_chunk_size=5000, return_preprocessed_df=False, allow_group_intersection=False, fast_mode=False, disable_tqdm=False)

Computes the duplicates on the table df, based on the search patterns specified in the init of the DuplicateChecker Object. If df_reference is given, in addition the duplicates between df and df_reference are computed.

Parameters:

Name Type Description Default
df

pd.DataFrame DataFrame containing the unique_id_columns and the columns that are to be compared against each other to find duplicates.

required
search_patterns Dict

dict, optional if search_patterns already set before. dict containing key value pairs of the form pattern_name: pattern where pattern_name is a string and pattern is a dict where the keys are columns of df and the values are the matching logic to apply. E.g.: search_patterns={ "Some Patter Name":{ "VENDOR_NAME": "exact", "INVOICE_DATE": "exact", "REFERENCE": "different", "VALUE": "exact", "_VENDOR_ID": "exact"},...}

The last used search patterns will always be stored in the DuplicateChecker object under .search_patterns .

required
unique_id_columns

str or List[str] The column or list of columns to be used a unique identifier.

required
df_reference

pd.DataFrame, optional DataFrame of same structure containing the already processed items, all items of df will be checked against each other and against all of df_reference, NOT checked will the items of df_reference against each other, by default None.

None
max_chunk_size

int, optional Size of the chunks compared at a time, decrease if memory problems occur, increase for speed. takes

5000

Returns:

Type Description
pd.DataFrame

DataFrame containing duplicates rows of df + 2 Additional columns: * GROUP_ID : Column which uniquely identifies and maps together those rows that are duplciates of each other. * PATTERN : Name of the seach pattern that maps the items of a group.

Source code in data_deduplication/duplicate_checker.py
def apply(
    self,
    df: pd.DataFrame,
    search_patterns: typing.Dict,
    unique_id_columns: typing.Union[str, typing.List[str]],
    df_reference: pd.DataFrame = None,
    max_chunk_size: int = 5000,
    return_preprocessed_df: bool = False,
    allow_group_intersection: bool = False,
    fast_mode: bool = False,
    disable_tqdm: bool = False,
) -> typing.Union[pd.DataFrame, typing.Tuple[pd.DataFrame, pd.DataFrame]]:
    """
    Computes the duplicates on the table df, based on the search patterns
    specified in the init of the DuplicateChecker Object.
    If df_reference is given, in addition the duplicates between df and
    df_reference are computed.

    Args:

        df : pd.DataFrame
            DataFrame containing the unique_id_columns and the columns that
            are to be compared against each other to find duplicates.
        search_patterns: dict, optional if search_patterns already set before.
            dict containing key value pairs of the form pattern_name: pattern
            where pattern_name is a string and pattern is a dict where the keys
            are
            columns of df and the values are the matching logic to apply. E.g.:
            search_patterns={
            "Some Patter Name":{
                "VENDOR_NAME": "exact",
                "INVOICE_DATE": "exact",
                "REFERENCE": "different",
                "VALUE": "exact",
                "_VENDOR_ID": "exact"},...}

            The last used search patterns will always be stored in the
            DuplicateChecker object under .search_patterns .
        unique_id_columns : str or List[str]
            The column or list of columns to be used a unique identifier.
        df_reference : pd.DataFrame, optional
            DataFrame of same structure containing the already processed items,
            all items of df will be checked against each other and against all
            of df_reference, NOT checked will the items of df_reference against
             each other, by default None.
        max_chunk_size : int, optional
            Size of the chunks compared at a time, decrease if memory problems occur,
             increase for speed. takes

    Returns:
        pd.DataFrame:
            DataFrame containing duplicates rows of df + 2 Additional columns:
            * GROUP_ID : Column which uniquely identifies and maps together those
                rows that are duplciates of each other.
            * PATTERN : Name of the seach pattern that maps the items of a group.
    """
    self.allow_group_intersection = allow_group_intersection
    self.disable_tqdm = disable_tqdm
    self._tracker.track(
        "Run duplicate checking",
        extra={
            "tracking_type": "DUPLICATE_CHECKER",
        },
    )

    self.fast_mode = fast_mode
    self._max_chunk_size = max_chunk_size
    if isinstance(unique_id_columns, str):
        unique_id_columns = [unique_id_columns]
    elif isinstance(unique_id_columns, list):
        unique_id_columns = unique_id_columns
    else:
        raise TypeError("unique_id_columns must be a string or a list of strings.")

    preprocessed_df = self._preprocess_df(df, unique_id_columns, df_reference)
    processed_search_patterns = self._process_search_patterns(preprocessed_df, search_patterns)
    results = self._compute_duplicates(preprocessed_df, processed_search_patterns)
    # cleaning the results
    if not self.allow_group_intersection:
        results = self._remove_group_intersection(results)
    results = self._remove_faulty_groups(results)
    df = df.drop_duplicates(unique_id_columns, keep="first")
    results = pd.concat(
        [results, df[~df.set_index(unique_id_columns).index.isin(results.set_index(unique_id_columns).index)]],
        ignore_index=True,
    )
    results["MATCH_FOUND"] = np.where(results["GROUP_ID"].isna(), "NO", "YES")
    results = results.drop("_CASE_KEY_GENERATED", axis=1, errors="ignore")

    if return_preprocessed_df:
        return results, preprocessed_df

    return results