Deprecations
Deprecation Warnings
In order to keep PyCelonis up-to-date and guarantee support, some outdated modules are marked
deprecated and will be removed from PyCelonis in Version 2.0:
- Data Deduplication
: please contact Service Desk to migrate to the official Data Deduplication product.
DuplicateChecker
¶
Class to check for data deduplication.
Source code in data_deduplication/duplicate_checker.py
class DuplicateChecker:
"""Class to check for data deduplication."""
def __init__(self, **kwargs) -> None:
self._tracker = PyCelonisTracker()
self.n_jobs = 1
self._max_chunk_size = 5000
self.celonis_patterns = {
"CompanyNameFuzzy": "comparers.CompanyNameComparer(column, column,label=column,"
"threshold=0.85,method='jarowinkler')",
"DateFuzzy": "comparers.DateComparer(column, column, label=column)",
"InvoiceReferenceFuzzy": "comparers.ReferenceComparer(column, column)",
"InvoiceValueFuzzy": "comparers.InvoiceAmountComparer(column, column, "
"method=\"linear\", offset=0.0, scale=40, label=column)",
"MaterialFuzzy": "comparers.MaterialComparer(column, column)",
"StringFuzzy": "recordlinkage.compare.String(column, column, "
"method='jarowinkler', threshold=0.85, missing_value=0.0, "
"label=column)",
"ZipCodeComparer": "comparers.CompareZipCodes(column,column,label=column)",
}
# TODO: this will allow to switch between catalog matching and incremental matching.
self._include_df_internal_matches = True
self.fast_mode = False
self.disable_tqdm = False
warnings.warn(
"Deprecation: The module 'data_deduplication' is deprecated and will be removed in PyCelonis version 2.0.\n"
"Please contact Service Desk to migrate to the official Data Deduplication product.",
DeprecationWarning,
stacklevel=2,
)
def apply(
self,
df: pd.DataFrame,
search_patterns: typing.Dict,
unique_id_columns: typing.Union[str, typing.List[str]],
df_reference: pd.DataFrame = None,
max_chunk_size: int = 5000,
return_preprocessed_df: bool = False,
allow_group_intersection: bool = False,
fast_mode: bool = False,
disable_tqdm: bool = False,
) -> typing.Union[pd.DataFrame, typing.Tuple[pd.DataFrame, pd.DataFrame]]:
"""
Computes the duplicates on the table df, based on the search patterns
specified in the init of the DuplicateChecker Object.
If df_reference is given, in addition the duplicates between df and
df_reference are computed.
Args:
df : pd.DataFrame
DataFrame containing the unique_id_columns and the columns that
are to be compared against each other to find duplicates.
search_patterns: dict, optional if search_patterns already set before.
dict containing key value pairs of the form pattern_name: pattern
where pattern_name is a string and pattern is a dict where the keys
are
columns of df and the values are the matching logic to apply. E.g.:
search_patterns={
"Some Patter Name":{
"VENDOR_NAME": "exact",
"INVOICE_DATE": "exact",
"REFERENCE": "different",
"VALUE": "exact",
"_VENDOR_ID": "exact"},...}
The last used search patterns will always be stored in the
DuplicateChecker object under .search_patterns .
unique_id_columns : str or List[str]
The column or list of columns to be used a unique identifier.
df_reference : pd.DataFrame, optional
DataFrame of same structure containing the already processed items,
all items of df will be checked against each other and against all
of df_reference, NOT checked will the items of df_reference against
each other, by default None.
max_chunk_size : int, optional
Size of the chunks compared at a time, decrease if memory problems occur,
increase for speed. takes
Returns:
pd.DataFrame:
DataFrame containing duplicates rows of df + 2 Additional columns:
* GROUP_ID : Column which uniquely identifies and maps together those
rows that are duplciates of each other.
* PATTERN : Name of the seach pattern that maps the items of a group.
"""
self.allow_group_intersection = allow_group_intersection
self.disable_tqdm = disable_tqdm
self._tracker.track(
"Run duplicate checking",
extra={
"tracking_type": "DUPLICATE_CHECKER",
},
)
self.fast_mode = fast_mode
self._max_chunk_size = max_chunk_size
if isinstance(unique_id_columns, str):
unique_id_columns = [unique_id_columns]
elif isinstance(unique_id_columns, list):
unique_id_columns = unique_id_columns
else:
raise TypeError("unique_id_columns must be a string or a list of strings.")
preprocessed_df = self._preprocess_df(df, unique_id_columns, df_reference)
processed_search_patterns = self._process_search_patterns(preprocessed_df, search_patterns)
results = self._compute_duplicates(preprocessed_df, processed_search_patterns)
# cleaning the results
if not self.allow_group_intersection:
results = self._remove_group_intersection(results)
results = self._remove_faulty_groups(results)
df = df.drop_duplicates(unique_id_columns, keep="first")
results = pd.concat(
[results, df[~df.set_index(unique_id_columns).index.isin(results.set_index(unique_id_columns).index)]],
ignore_index=True,
)
results["MATCH_FOUND"] = np.where(results["GROUP_ID"].isna(), "NO", "YES")
results = results.drop("_CASE_KEY_GENERATED", axis=1, errors="ignore")
if return_preprocessed_df:
return results, preprocessed_df
return results
def _remove_group_intersection(self, results):
"""Drops intersecting groups if one invoice in more than one group,
drop all but the biggest one (containing most items)."""
if not results.empty:
# find case_key that are duplicated + corresponding group ids
intersect_groups = results[results["_CASE_KEY_GENERATED"].duplicated(keep=False)].GROUP_ID.drop_duplicates()
group_sizes = (
results[results.GROUP_ID.isin(intersect_groups)]
.groupby("GROUP_ID")
.count()["_CASE_KEY_GENERATED"]
.reset_index()
)
group_sizes.rename(columns={"_CASE_KEY_GENERATED": "N_ITEMS_IN_GROUP"}, inplace=True)
# of the found groups filter these containing most cases
groups_to_keep = results[results["_CASE_KEY_GENERATED"].duplicated(keep=False)]
groups_to_keep = groups_to_keep.merge(group_sizes, on="GROUP_ID", how="left")
groups_to_keep = (
groups_to_keep.sort_values(["_CASE_KEY_GENERATED", "N_ITEMS_IN_GROUP"], ascending=False)
.drop_duplicates(['_CASE_KEY_GENERATED'], keep="first")
.GROUP_ID
)
groups_to_drop = intersect_groups[~intersect_groups.isin(groups_to_keep)]
results = results[~results.GROUP_ID.isin(groups_to_drop)]
results = results.drop_duplicates(["_CASE_KEY_GENERATED", "GROUP_ID"], keep="first")
results = results.drop_duplicates(["_CASE_KEY_GENERATED"], keep="first")
return results
def _remove_faulty_groups(self, results):
# drop groups that contain less than 1 item in it
# TODO: this is a bug but unclear where and how it happens
if not results.empty:
df_help = results.groupby("GROUP_ID").count()
results = results[~results.GROUP_ID.isin(df_help[df_help["_CASE_KEY_GENERATED"] < 2].index)]
return results
def _merge_df_reference_df(self, df, df_reference, unique_id_columns):
"""Merges df and df_reference and checks if are compatible"""
if not all(col in df.columns for col in unique_id_columns):
raise KeyError("Not all unique_id_columns contained in df.")
if not all(col in df_reference.columns for col in unique_id_columns):
raise KeyError("Not all unique_id_columns contained in df_reference.")
for col in set(df.columns.tolist() + df_reference.columns.tolist()):
if col not in df.columns:
self._logger.warning(f"Column {col} contained in df but not in df_reference")
if col not in df_reference.columns:
self._logger.warning(f"Column {col} contained in df_reference but not in df_reference")
df["ONLY_IN_DF"] = True
df_reference["ONLY_IN_DF"] = False
df = pd.concat([df, df_reference], ignore_index=True)
return df
def _drop_na_with_warning(self, df, subset):
"""For record linkage NAs need to be dropped, this function makes it verbose"""
nas = df[subset].isna()
for col in nas:
number_nas = nas[col].sum()
if number_nas > 0:
self._logger.warning(f"{number_nas} rows of column {col} are 'NA'. They will be dropped")
df.dropna(subset=subset, inplace=True)
return df
def _preprocess_df(self, df, unique_id_columns, df_reference=None):
"""Drop duplicate and separate into case and data columns"""
self._logger.info("Preprocessing DataFrame")
if df_reference is not None:
df = self._merge_df_reference_df(df, df_reference, unique_id_columns)
df = self._drop_na_with_warning(df, subset=unique_id_columns)
# access case key
for col in unique_id_columns:
if col not in df.columns:
raise KeyError(
f"df does not contain the column {col}. Please add a unique identifier column with this name."
)
if len(unique_id_columns) == 1:
df.loc[:, "_CASE_KEY_GENERATED"] = df[unique_id_columns[0]]
else:
df.loc[:, "_CASE_KEY_GENERATED"] = df[unique_id_columns].astype(str).agg("-".join, axis=1)
# drop rows with same key
if df.duplicated(subset=unique_id_columns).sum() > 0:
self._logger.warning("The specified unique_id_columns are not unique, duplicated ids will be dropped.")
df = df.drop_duplicates(unique_id_columns)
def _remove_time_zone_info(df):
"""
Timezone information is removed from datetime columns,
because recordlinkage doesn't accept
datetime columns with timezones.
"""
for column in df.select_dtypes(include=["datetimetz"]):
df[column] = df[column].dt.tz_convert(None)
return df
df = _remove_time_zone_info(df)
# self._sanity_check(df)
return df
def _compute_duplicates(self, df, processed_search_patterns):
"""Runs the duplicate checker with the current settings for fuzzy + exact"""
results = pd.DataFrame()
if df.empty:
raise ValueError("The dataframe df is empty. No cases to be processed")
results_exact = self._compute_exact_matches(df, processed_search_patterns)
results_fuzzy = self._compute_fuzzy_matches(df, processed_search_patterns)
# Implied hierachy that fuzzy matches are more important than exact
if not self.allow_group_intersection and not results_exact.empty and not results_fuzzy.empty:
results_fuzzy = results_fuzzy[~results_fuzzy["GROUP_ID"].isin(results_exact["GROUP_ID"])]
results_exact = results_exact[
~results_exact["_CASE_KEY_GENERATED"].isin(results_fuzzy["_CASE_KEY_GENERATED"])
]
results = pd.concat([results_fuzzy, results_exact], ignore_index=True)
if not results.empty:
results = results.sort_values(by="GROUP_ID")
if 'ONLY_IN_DF' in results.columns:
results["ONLY_IN_DF"] = np.where(results["ONLY_IN_DF"], 1, 0)
return results
def _chunk_df_fuzzy_matching(self, df, patterns):
"""If df is too big to process it is cut into smaller chunks"""
chunks = []
for pattern_name, pattern in patterns.items():
if pattern.get("compare_type") == "fuzzy":
compare_columns = pattern.get("compare_columns")
link_df = df.copy()
link_df = self._drop_na_with_warning(link_df, compare_columns)
blocked_columns = pattern.get("blocked_columns", False)
if len(compare_columns) == 1 and not blocked_columns and self.fast_mode:
chunks += self._chunk_via_starting_character(link_df, compare_columns, pattern_name, pattern)
elif "ONLY_IN_DF" in df.columns and blocked_columns:
# keep only those rows who have duplicates on blocked (exact matching) columns
link_df = link_df[link_df[compare_columns].duplicated(subset=blocked_columns, keep=False)]
contains_df = link_df[blocked_columns + ["ONLY_IN_DF"]].groupby(blocked_columns).max()
contains_df = contains_df[contains_df].reset_index()
contains_df.rename(columns={"ONLY_IN_DF": "contains_df_entry"}, inplace=True)
link_df = link_df.merge(contains_df, on=blocked_columns, how="inner").drop(
"contains_df_entry", axis=1
)
df1 = link_df[link_df.ONLY_IN_DF]
if self._include_df_internal_matches:
df2 = link_df
else:
df2 = link_df[~link_df.ONLY_IN_DF]
chunks += [
{
"df1": df1,
"df2": df2,
"pattern_name": pattern_name,
"compare_columns": compare_columns,
"pattern": pattern,
}
]
else:
if blocked_columns:
link_df = link_df[df[compare_columns].duplicated(subset=blocked_columns, keep=False)]
n = link_df.shape[0]
if n < 2:
self._logger.info(f"No duplicates found for {pattern_name}")
continue
if n <= self._max_chunk_size:
chunks += [
{
"df1": link_df,
"df2": None,
"pattern_name": pattern_name,
"compare_columns": compare_columns,
"pattern": pattern,
}
]
remaining_df = pd.DataFrame()
elif blocked_columns:
self._logger.info(f"{n} rows of df need to be compared for {pattern_name}")
link_df["CATEGORY"] = link_df[blocked_columns].astype(str).agg("-".join, axis=1)
group_counts = link_df["CATEGORY"].value_counts().copy()
remaining_df = link_df
while group_counts.shape[0] > 0 and group_counts.min() < self._max_chunk_size:
x = group_counts.sort_values()[
group_counts.sort_values().cumsum() < self._max_chunk_size
].index
chunks += [
{
"df1": link_df[link_df["CATEGORY"].isin(x.tolist())].drop("CATEGORY", axis=1),
"df2": None,
"compare_columns": compare_columns,
"pattern": pattern,
"pattern_name": pattern_name,
}
]
remaining_df = remaining_df[~remaining_df["CATEGORY"].isin(x.tolist())]
group_counts = group_counts.drop(x)
if group_counts.shape[0] > 0:
remaining_df = remaining_df.drop("CATEGORY", axis=1)
else:
remaining_df = link_df
if not remaining_df.empty:
df_chunked = np.array_split(
remaining_df, self._calculate_optimal_chunk_size(remaining_df.shape[0])
)
processed_cases = []
for df_chunk in df_chunked:
chunks += [
{
"df1": df_chunk,
"df2": remaining_df[~remaining_df["_CASE_KEY_GENERATED"].isin(processed_cases)],
"pattern": pattern,
"pattern_name": pattern_name,
"compare_columns": compare_columns,
}
]
processed_cases += df_chunk["_CASE_KEY_GENERATED"].tolist()
return chunks
def _chunk_via_starting_character(self, link_df, compare_columns, pattern_name, pattern):
df = link_df.copy()
df["clean_string"] = df[compare_columns[0]].astype(str).str.lower().replace(r"[^a-zA-ZА-я\d]", "", regex=True)
df_chunks = self._chunk_by_char_recursive(df, 0, [])
chunks = []
for df1 in df_chunks:
chunks += [
{
"df1": df1,
"df2": None,
"pattern_name": pattern_name,
"compare_columns": compare_columns,
"pattern": pattern,
}
]
return chunks
def _chunk_by_char_recursive(self, df, i, chunks):
"""Recursively chunks the data by the i-th character from the beginning
relays on the assumption that the characters in the front are more important
"""
if df.shape[0] <= self._max_chunk_size:
chunks += [df.loc[:, df.columns != "clean_string"]]
else:
char_list = df["clean_string"].str[i].unique()
for char in char_list:
chunks = self._chunk_by_char_recursive(df[df["clean_string"].str[i] == char], i + 1, chunks)
return chunks
def _calculate_optimal_chunk_size(self, dfsize):
# TODO: how to find the optimal chunk size ?
return dfsize / self._max_chunk_size
def _compute_exact_matches(self, df, patterns):
"""Computes duplicates for patterns only involving exact matches"""
possible_duplicates = pd.DataFrame()
for pattern_name, pattern in patterns.items():
if pattern.get("compare_type") == "exact":
compare_columns = pattern.get("compare_columns")
differ_columns = pattern.get("differ_columns")
exact_duplicates = df[["_CASE_KEY_GENERATED"] + compare_columns + differ_columns].copy()
exact_duplicates = self._drop_na_with_warning(exact_duplicates, compare_columns)
exact_duplicates = exact_duplicates[exact_duplicates.duplicated(compare_columns, keep=False)]
if differ_columns:
for col in differ_columns:
exact_duplicates = exact_duplicates[
~exact_duplicates.duplicated(compare_columns + [col], keep=False)
]
exact_duplicates = exact_duplicates.drop(differ_columns, axis=1)
exact_duplicates = exact_duplicates[exact_duplicates.duplicated(compare_columns, keep=False)]
exact_duplicates = self._create_exact_group_ids(df, exact_duplicates, compare_columns, pattern_name)
if "ONLY_IN_DF" in df.columns:
# filter on only pairs that contains 'new' entries
contains_df_entry = exact_duplicates[["GROUP_ID", "ONLY_IN_DF"]].groupby("GROUP_ID").max()
contains_df_entry = contains_df_entry[contains_df_entry["ONLY_IN_DF"]].index.tolist()
exact_duplicates = exact_duplicates[exact_duplicates.GROUP_ID.isin(contains_df_entry)]
possible_duplicates = possible_duplicates.append(exact_duplicates)
return possible_duplicates
def _compute_fuzzy_matches(self, df, patterns):
"""Creates chunks and runs record linakge on the chunks"""
self._logger.info("Searching for fuzzy matches ...")
possible_duplicates = pd.DataFrame()
chunks = self._chunk_df_fuzzy_matching(df, patterns)
for chunk in tqdm.tqdm(chunks, disable=self.disable_tqdm):
try:
new_results = self._run_fuzzy_matching_on_chunk(chunk)
except ZeroDivisionError:
continue
possible_duplicates = self._add_new_duplicates(new_results, possible_duplicates)
possible_duplicates.drop("id_list", axis=1, inplace=True, errors="ignore")
return possible_duplicates
def _add_new_duplicates(self, new_results, all_results):
"""Needed for chunking to integrate subgroups into the bigger groups found in another chunk"""
if not new_results.empty and not all_results.empty:
intersection = new_results["_CASE_KEY_GENERATED"].isin(all_results["_CASE_KEY_GENERATED"])
if not self.allow_group_intersection and any(intersection):
keys = new_results[intersection]["_CASE_KEY_GENERATED"]
id_list_new = new_results.drop_duplicates("GROUP_ID")[["id_list", "GROUP_ID"]]
id_list_old = all_results[all_results["_CASE_KEY_GENERATED"].isin(keys)].drop_duplicates("GROUP_ID")[
["id_list", "GROUP_ID"]
]
for _, id_new in id_list_new.iterrows():
for _, id_old in id_list_old.iterrows():
if set(id_new.id_list) <= set(id_old.id_list):
new_results = new_results[new_results["GROUP_ID"] != id_new.GROUP_ID]
elif set(id_old.id_list) <= set(id_new.id_list):
all_results = all_results[all_results["GROUP_ID"] != id_old.GROUP_ID]
all_results = all_results.append(new_results)
return all_results
def _run_fuzzy_matching_on_chunk(self, chunk):
"""Given a chunk callucates the fuzzy matches for this chunk"""
pattern = chunk.get("pattern", None)
pattern_name = chunk.get("pattern_name", None)
if pattern is None or pattern_name is None:
raise ValueError("the dict chunk needs to contain the pattern and pattern_name")
compare_columns = chunk.get("compare_columns")
differ_columns = pattern.get("differ_columns")
df1 = chunk.get("df1", None)
df2 = chunk.get("df2", None)
if df1 is None:
raise ValueError("No input dataframe found.")
elif chunk.get("df2", None) is not None:
df = pd.concat([df1, df2[~df2["_CASE_KEY_GENERATED"].isin(df1["_CASE_KEY_GENERATED"])]]).copy()
# self._logger.info(
# f"running {df1.shape[0]} vs. {df2.shape[0]} for pattern: {pattern_name}"
# )
data_links = pattern.get("indexer").index(df1[compare_columns], df2[compare_columns])
results = pattern.get("comparer").compute(data_links, df1[compare_columns], df2[compare_columns])
else:
df = df1
data_links = pattern.get("indexer").index(df1)
results = pattern.get("comparer").compute(data_links, df1[compare_columns])
matches = results[results.sum(axis=1) > pattern.get("n_comparison_columns") - 1]
groups = self._create_fuzzy_group_ids(df, matches)
groups = groups.merge(df, on="_CASE_KEY_GENERATED")
groups["PATTERN"] = pattern_name
if differ_columns:
differ_counts = (
groups[differ_columns + ["GROUP_ID"]]
.groupby("GROUP_ID")
.nunique()
.drop(columns="GROUP_ID", errors="ignore")
.min(axis=1)
)
not_differing_groups = differ_counts[differ_counts == 1].index.tolist()
groups = groups[~groups.GROUP_ID.isin(not_differing_groups)]
id_list_df = groups.groupby('GROUP_ID')["_CASE_KEY_GENERATED"].apply(list).reset_index(name='id_list')
groups = groups.merge(id_list_df, on="GROUP_ID", how="left")
return groups
def _create_fuzzy_group_ids(self, df, matches):
"""Matches from recordlinkage are turned into groups with a unique ID."""
cases = df[["_CASE_KEY_GENERATED"]].copy()
temp_df = pd.DataFrame(index=cases.index)
temp_df["GROUP_ID"] = ""
m_pivot = matches.reset_index()
m_pivot.rename(columns={"level_0": "index_1", "level_1": "index_2"}, inplace=True)
m_pivot["group_n"] = m_pivot.groupby("index_1").cumcount()
m_pivot = m_pivot.pivot(index="index_1", columns="group_n", values="index_2")
for i, r in m_pivot.drop_duplicates().iterrows():
g = list(set(m_pivot[m_pivot.values == r.values].index.dropna().tolist() + r.dropna().tolist()))
s = cases.loc[g, :].agg("".join, axis=1).tolist()
s = ",".join(sorted(set(s)))
group_name = "IDs:" + f"({s})"
temp_df.loc[g, "GROUP_ID"] = group_name
temp_df = temp_df[temp_df["GROUP_ID"] != ""]
groups = cases.merge(temp_df, how="inner", left_index=True, right_index=True)
groups.drop_duplicates(inplace=True)
return groups
def _create_exact_group_ids(self, df, exact_duplicates, compare_columns, pattern_name):
"""Matches from exact matching are turned into groups with a unique ID."""
def _sort_join(x):
return ",".join(sorted(x))
groups = exact_duplicates.groupby(compare_columns, as_index=False).agg({"_CASE_KEY_GENERATED": _sort_join})
groups.rename(columns={"_CASE_KEY_GENERATED": "GROUP_ID"}, inplace=True)
groups["GROUP_ID"] = "IDs:(" + groups["GROUP_ID"] + ")"
exact_duplicates = exact_duplicates.merge(groups, on=compare_columns)
exact_duplicates = df.merge(
exact_duplicates[["_CASE_KEY_GENERATED", "GROUP_ID"]], on="_CASE_KEY_GENERATED", how="inner"
)
exact_duplicates["PATTERN"] = pattern_name
return exact_duplicates
def _sanity_check(self, df):
"""Check settings"""
if df.shape[0] > 1000000 and df.select_dtypes(include=["float64"]).columns:
self._logger.warning("Size of DataFrame is very large, will some hours to process everything.")
elif df.shape[0] > 1000000:
self._logger.warning("Size of DataFrame is very large, and might lead to a memory error.")
if self._max_chunk_size > 5000:
self._logger.warning(
"max_chunk_size is very large, this will probably lead to a MemoryError \
if run in a standard Celonis ML Workbench because of too little RAM."
)
def _process_search_patterns(self, df, search_patterns): # noqa: C901
"""
Instanciates record linkage comparer objects based on the provided search_patterns
data columns to be checked for duplicates.
Parameters
----------
search_patterns : dict
dict containing the search patterns,
keys are names of the pattern, values should look like this:
{
"All Exact but REFERENCE different":{
"VENDOR_NAME": "exact",
"INVOICE_DATE": "exact",
"REFERENCE": "different",
"VALUE": "exact",
"_VENDOR_ID": "exact"
}
}
"""
if search_patterns is None:
raise ValueError("Please provide search_patterns.")
processed_search_pattern = {}
for pattern_name, current_pattern in search_patterns.items():
differ_columns = []
compare_columns = []
# TODO: make this a property
possible_inputs = list(self.celonis_patterns.keys()) + ["ignore", "exact", "different"]
for column, matching_type in current_pattern.items():
if column not in df.columns:
raise KeyError(
f"In PATTERN: {pattern_name} a comparison "
f"method is given for {column}, "
"which is missing from the dataframe df."
)
elif matching_type == "different":
differ_columns += [column]
elif matching_type != "ignore":
compare_columns += [column]
if all(p in ["ignore", "exact", "different"] for p in list(current_pattern.values())):
compare_type = "exact"
indexer = blocked_columns = n_comparison_columns = comparer = None
else:
compare_type = "fuzzy"
# needed inputs to build record linkage objects
blocked_columns = []
n_comparison_columns = 0
indexer = recordlinkage.Index()
comparer = recordlinkage.Compare() # n_jobs=4
blocks_exist = False
for column in current_pattern:
if current_pattern[column] == "exact":
blocks_exist = True
blocked_columns.append(column)
elif current_pattern[column] in ["ignore", "different"]:
continue
else:
if current_pattern[column] in self.celonis_patterns.keys():
exec("comparer.add(" + self.celonis_patterns.get(current_pattern[column]) + ")")
else:
try:
if isinstance(current_pattern[column], str):
exec("comparer.add(" + current_pattern[column] + ")")
else:
comparer.add(current_pattern[column])
except (NameError, SyntaxError, ValueError, TypeError):
raise ValueError(
f"The comparison method '{current_pattern[column]}' for column '{column}'"
f"given in PATTERN: '{pattern_name}' is not a valid input. "
f"It needs to be one of the standard types: {', '.join(possible_inputs)} "
f"or one of the generic record linkage algorithms:\n"
f"https://recordlinkage.readthedocs.io/en/latest/"
f"ref-compare.html#module-recordlinkage.compare\n"
f"where left_on, right_on, label = column. E.g.:\n"
f"\"String(column, column, method='levenshtein', "
f"threshold=None, missing_value=0.0, label=column)\"\n"
)
n_comparison_columns += 1
if blocks_exist:
indexer.block(blocked_columns)
else:
indexer.full()
processed_search_pattern.update(
{
pattern_name: {
"indexer": indexer,
"blocked_columns": blocked_columns,
"n_comparison_columns": n_comparison_columns,
"comparer": comparer,
"compare_columns": compare_columns,
"compare_type": compare_type,
"differ_columns": differ_columns,
"search_pattern": current_pattern,
}
}
)
return processed_search_pattern
apply(self, df, search_patterns, unique_id_columns, df_reference=None, max_chunk_size=5000, return_preprocessed_df=False, allow_group_intersection=False, fast_mode=False, disable_tqdm=False)
¶
Computes the duplicates on the table df, based on the search patterns specified in the init of the DuplicateChecker Object. If df_reference is given, in addition the duplicates between df and df_reference are computed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pd.DataFrame DataFrame containing the unique_id_columns and the columns that are to be compared against each other to find duplicates. |
required | |
search_patterns |
Dict |
dict, optional if search_patterns already set before. dict containing key value pairs of the form pattern_name: pattern where pattern_name is a string and pattern is a dict where the keys are columns of df and the values are the matching logic to apply. E.g.: search_patterns={ "Some Patter Name":{ "VENDOR_NAME": "exact", "INVOICE_DATE": "exact", "REFERENCE": "different", "VALUE": "exact", "_VENDOR_ID": "exact"},...} The last used search patterns will always be stored in the DuplicateChecker object under .search_patterns . |
required |
unique_id_columns |
str or List[str] The column or list of columns to be used a unique identifier. |
required | |
df_reference |
pd.DataFrame, optional DataFrame of same structure containing the already processed items, all items of df will be checked against each other and against all of df_reference, NOT checked will the items of df_reference against each other, by default None. |
None |
|
max_chunk_size |
int, optional Size of the chunks compared at a time, decrease if memory problems occur, increase for speed. takes |
5000 |
Returns:
Type | Description |
---|---|
pd.DataFrame |
DataFrame containing duplicates rows of df + 2 Additional columns: * GROUP_ID : Column which uniquely identifies and maps together those rows that are duplciates of each other. * PATTERN : Name of the seach pattern that maps the items of a group. |
Source code in data_deduplication/duplicate_checker.py
def apply(
self,
df: pd.DataFrame,
search_patterns: typing.Dict,
unique_id_columns: typing.Union[str, typing.List[str]],
df_reference: pd.DataFrame = None,
max_chunk_size: int = 5000,
return_preprocessed_df: bool = False,
allow_group_intersection: bool = False,
fast_mode: bool = False,
disable_tqdm: bool = False,
) -> typing.Union[pd.DataFrame, typing.Tuple[pd.DataFrame, pd.DataFrame]]:
"""
Computes the duplicates on the table df, based on the search patterns
specified in the init of the DuplicateChecker Object.
If df_reference is given, in addition the duplicates between df and
df_reference are computed.
Args:
df : pd.DataFrame
DataFrame containing the unique_id_columns and the columns that
are to be compared against each other to find duplicates.
search_patterns: dict, optional if search_patterns already set before.
dict containing key value pairs of the form pattern_name: pattern
where pattern_name is a string and pattern is a dict where the keys
are
columns of df and the values are the matching logic to apply. E.g.:
search_patterns={
"Some Patter Name":{
"VENDOR_NAME": "exact",
"INVOICE_DATE": "exact",
"REFERENCE": "different",
"VALUE": "exact",
"_VENDOR_ID": "exact"},...}
The last used search patterns will always be stored in the
DuplicateChecker object under .search_patterns .
unique_id_columns : str or List[str]
The column or list of columns to be used a unique identifier.
df_reference : pd.DataFrame, optional
DataFrame of same structure containing the already processed items,
all items of df will be checked against each other and against all
of df_reference, NOT checked will the items of df_reference against
each other, by default None.
max_chunk_size : int, optional
Size of the chunks compared at a time, decrease if memory problems occur,
increase for speed. takes
Returns:
pd.DataFrame:
DataFrame containing duplicates rows of df + 2 Additional columns:
* GROUP_ID : Column which uniquely identifies and maps together those
rows that are duplciates of each other.
* PATTERN : Name of the seach pattern that maps the items of a group.
"""
self.allow_group_intersection = allow_group_intersection
self.disable_tqdm = disable_tqdm
self._tracker.track(
"Run duplicate checking",
extra={
"tracking_type": "DUPLICATE_CHECKER",
},
)
self.fast_mode = fast_mode
self._max_chunk_size = max_chunk_size
if isinstance(unique_id_columns, str):
unique_id_columns = [unique_id_columns]
elif isinstance(unique_id_columns, list):
unique_id_columns = unique_id_columns
else:
raise TypeError("unique_id_columns must be a string or a list of strings.")
preprocessed_df = self._preprocess_df(df, unique_id_columns, df_reference)
processed_search_patterns = self._process_search_patterns(preprocessed_df, search_patterns)
results = self._compute_duplicates(preprocessed_df, processed_search_patterns)
# cleaning the results
if not self.allow_group_intersection:
results = self._remove_group_intersection(results)
results = self._remove_faulty_groups(results)
df = df.drop_duplicates(unique_id_columns, keep="first")
results = pd.concat(
[results, df[~df.set_index(unique_id_columns).index.isin(results.set_index(unique_id_columns).index)]],
ignore_index=True,
)
results["MATCH_FOUND"] = np.where(results["GROUP_ID"].isna(), "NO", "YES")
results = results.drop("_CASE_KEY_GENERATED", axis=1, errors="ignore")
if return_preprocessed_df:
return results, preprocessed_df
return results