Skip to main content
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Calling all Data Engineers! Fabric Data Engineer (Exam DP-700) live sessions are back! Starting October 16th. Sign up.

Reply
fo6168
New Contributor

Matching script using python in Fabric - running using pandas vs park data frame

Hello, I am trying to compare 2 datasets to determine if there are customers between them that are likely matches. We are using 2 methodolgies to compare and determine if there is a match. We combine the customer name and customer address into a single string to compare that between the data frames. We also combine the customer name and customer postal code into a single string to compare. 

 

I've ran this script successfully in the past, but we recently changed fabric capacities and now I am having some issues. I've tested a few different things but I have not been successful and I am hoping to get some guidance. 

 

Some context:

- df1_clean is a spark data frame with ~52k rows

- df2_clean is a pandas data frame with ~3k rows

 

(scripts referenced are pasted below)

1) This is the first script uses rapid fuzz and pandas to compare the 2 dataframes which technically does run but will take 20hrs to run, which seems really long for such a small dataset.

2) The second script uses spark and rapid fuzz to compare the 2 dataframes which runs successfully and quickly. However, I run into an error that the resulting dataframe can't be written to a table or exported to a csv or use display() or show(). I get an error that says "SparkSession should only be created and accessed on the driver." because Fabric restricts access to the SparkSession from worker nodes, which is what happens when the UDF tries to use fuzz.ratio(...) inside the distributed Spark job.

 

I would welcome workarounds for the spark issue or a suggestion on a different approach to acheive the desired result.

 

SCRIPT 1: 

from tqdm import tqdm

def find_fuzzy_matches_with_combined_fields(df1_clean, df2_clean, customer_number_column, customer_name_column, threshold=70):
    matches = []
    processed = set()

    # Wrap the outer loop with tqdm for progress tracking
    for i, row_i in tqdm(df1_clean.iterrows(), total=len(df1_clean), desc="Matching"):
        if row_i['Combined_Field_Main'] in processed or row_i['Combined_Field_Delivery'] in processed:
            continue

        for j, row_j in df2_clean.iterrows():
            if row_i[customer_number_column] == row_j[customer_number_column]:
                continue

            score_main = fuzz.ratio(row_i['Combined_Field_Main'], row_j['Combined_Field_Main'])
            score_delivery = fuzz.ratio(row_i['Combined_Field_Delivery'], row_j['Combined_Field_Delivery'])
            name_zip_score_main = fuzz.ratio(row_i['Name_Zip_Field_Main'], row_j['Name_Zip_Field_Main'])
            name_zip_score_delivery = fuzz.ratio(row_i['Name_Zip_Field_Delivery'], row_j['Name_Zip_Field_Delivery'])

            matched_main = score_main >= threshold or name_zip_score_main >= threshold
            matched_delivery = score_delivery >= threshold or name_zip_score_delivery >= threshold

            if matched_main or matched_delivery:
                if matched_main:
                    key_address_number = row_i.get('Address_number', 'N/A')
                    duplicate_address_number = row_j.get('Address_number', 'N/A')
                    key_customer_address = [row_i[f'Main_Address_{x}'] for x in range(1, 5)]
                    duplicate_customer_address = [row_j[f'Main_Address_{x}'] for x in range(1, 5)]
                elif matched_delivery:
                    key_address_number = row_i.get('Delivery_Address_Num', 'N/A')
                    duplicate_address_number = row_j.get('Delivery_Address_Num', 'N/A')
                    key_customer_address = [row_i[f'Delivery_Address_{x}'] for x in range(1, 5)]
                    duplicate_customer_address = [row_j[f'Delivery_Address_{x}'] for x in range(1, 5)]

                match_category = categorize_match(max(score_main, score_delivery))
                name_zip_category = categorize_match(max(name_zip_score_main, name_zip_score_delivery))

                matches.append({
                    'Key Customer Number': row_i[customer_number_column],
                    'Duplicate Customer Number': row_j[customer_number_column],
                    'Key Customer Name': row_i[customer_name_column],
                    'Duplicate Customer Name': row_j[customer_name_column],
                    'Key Cust Main Addr Match Value': row_i['Combined_Field_Main'],
                    'Key Cust Delivery Addr Match Value': row_i['Combined_Field_Delivery'],
                    'Duplicate Cust Main Addr Match Value': row_j['Combined_Field_Main'],
                    'Duplicate Cust Delivery Addr Match Value': row_j['Combined_Field_Delivery'],
                    'Score Main': score_main,
                    'Score Delivery': score_delivery,
                    'AddressMatch_Category': match_category,
                    'Key Cust Main Zip Match Value': row_i['Name_Zip_Field_Main'],
                    'Key Cust Delivery Zip Match Value': row_i['Name_Zip_Field_Delivery'],
                    'Duplicate Cust Main Zip Match Value': row_j['Name_Zip_Field_Main'],
                    'Duplicate Cust Delivery Zip Match Value': row_j['Name_Zip_Field_Delivery'],
                    'Name_Zip Score Main': name_zip_score_main,
                    'Name_Zip Score Delivery': name_zip_score_delivery,
                    'ZipMatch_Category': name_zip_category,
                    'Key Customer Address Number': key_address_number,
                    'Duplicate Customer Address Number': duplicate_address_number,
                    'Key Cust Match Address Line 1': key_customer_address[0],
                    'Key Cust Match Address Line 2': key_customer_address[1],
                    'Key Cust Match Address Line 3': key_customer_address[2],
                    'Key Cust Match Address Line 4': key_customer_address[3],
                    'Key Cust Main Postal Code': row_i['Main_Postal_Code'],
                    'Key Cust Delivery Postal Code': row_i['Delivery_Postal_Code'],
                    'Duplicate Cust Match Address Line 1': duplicate_customer_address[0],
                    'Duplicate Cust Match Address Line 2': duplicate_customer_address[1],
                    'Duplicate Cust Match Address Line 3': str(duplicate_customer_address[2]),
                    'Duplicate Cust Match Address Line 4': duplicate_customer_address[3],
                    'Duplicate Cust Main Postal Code': str(row_j['Main_Postal_Code']),
                    'Duplicate Cust Delivery Postal Code': str(row_j['Delivery_Postal_Code']),
                    'Duplicate?': '',
                    'Comment': ''
                })

                if score_main == 100 or score_delivery == 100:
                    processed.add(row_i['Combined_Field_Main'])
                    processed.add(row_i['Combined_Field_Delivery'])

    unique_matches = {frozenset(d.items()): d for d in matches}.values()
    return list(unique_matches)


# Check the DataFrame types and convert if necessary
if isinstance(df1_clean, F.DataFrame):
    df1_pandas = df1_clean.toPandas()
else:
    df1_pandas = df1_clean

if isinstance(df2_clean, F.DataFrame):
    df2_pandas = df2_clean.toPandas()
else:
    df2_pandas = df2_clean


    # Run the fuzzy matching process for customers
matches = find_fuzzy_matches_with_combined_fields(df1_pandas, df2_pandas, 'Customer_number', 'Customer_name')

# Convert the results to a Pandas DataFrame
df_matches = pd.DataFrame(matches)
 
SCRIPT 2:
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType
from rapidfuzz import fuzz

# Broadcast df2_clean
df2_broadcast = spark.sparkContext.broadcast(df2_clean.to_dict(orient='records'))

# Define UDF to compare each row in df1_clean to all rows in df2_clean
def find_matches_udf(row_combined_main, row_combined_delivery, row_name_zip_main, row_name_zip_delivery, row_cust_num, threshold=70):
    matches = []
    for row_j in df2_broadcast.value:
        if row_cust_num == row_j['Customer_number']:
            continue

        score_main = fuzz.ratio(row_combined_main, row_j['Combined_Field_Main'])
        score_delivery = fuzz.ratio(row_combined_delivery, row_j['Combined_Field_Delivery'])
        name_zip_score_main = fuzz.ratio(row_name_zip_main, row_j['Name_Zip_Field_Main'])
        name_zip_score_delivery = fuzz.ratio(row_name_zip_delivery, row_j['Name_Zip_Field_Delivery'])

        if max(score_main, score_delivery, name_zip_score_main, name_zip_score_delivery) >= threshold:
            matches.append({
                'Duplicate Customer Number': row_j['Customer_number'],
                'Score Main': score_main,
                'Score Delivery': score_delivery,
                'Name_Zip Score Main': name_zip_score_main,
                'Name_Zip Score Delivery': name_zip_score_delivery
            })
    return matches

# Register UDF
schema = ArrayType(StructType([
    StructField("Duplicate Customer Number", StringType()),
    StructField("Score Main", IntegerType()),
    StructField("Score Delivery", IntegerType()),
    StructField("Name_Zip Score Main", IntegerType()),
    StructField("Name_Zip Score Delivery", IntegerType())
]))

match_udf = udf(find_matches_udf, schema)

# Apply UDF to df1_clean
df_matches = df1_clean.withColumn(
    "Matches",
    match_udf(
        col("Combined_Field_Main"),
        col("Combined_Field_Delivery"),
        col("Name_Zip_Field_Main"),
        col("Name_Zip_Field_Delivery"),
        col("Customer_number")
    )
).filter(col("Matches").isNotNull())
1 ACCEPTED SOLUTION
HarishKM
Valued Contributor

@fo6168 Hey,
I will follow below steps to troubleshoot the issue.

for script 1:

1) Consider using multiprocessing to parallelize the computation, leveraging multiple cores on your machine.

Alternatively, use dask library for parallelized dataframe operations.

2) Simplify the inner loop comparison by using vectorized operations where possible with numpy and pandas, though this may require significant changes in how comparisons are performed.

3) Use similarity heuristics or initial filters to reduce the number of candidates to compare before performing detailed fuzzy matching.

 

Script 2: Spark with RapidFuzz

Suggestion: 

1) Instead of fuzzy matching every record, perform a join operation on comparable columns and then apply fuzzy matching only on resulting candidate pairs.

 

df_candidate_pairs = df1_clean.join(df2_clean, df1_clean['some_column'] == df2_clean['some_column'], 'inner')

 

2) UDF can slow down Spark as they often run slower than built-in functions. Consider trying Spark SQL functions available or a pandas_udf.

 

3) Convert the resulting Spark DataFrame to Pandas for final operations that require the session access, bypassing worker restriction issues.

 

4) I will use pandas_udf: Leverage Pandas UDF, which operates on Python objects directly and is executed at Python-level inside the JVM.

 

from pyspark.sql.functions import pandas_udf @pandas_udf(schema, SparkSession.sqlContext) def fuzzy_match_udf(df1_chunk: pd.DataFrame) -> pd.Series: matches = [] for row_i in df1_chunk.itertuples(): # Perform the same logic as defined in your function # Append results to matches return pd.Series(matches) df_matches = df1_clean.withColumn("Matches", fuzzy_match_udf(df1_clean))

 

Ensure df2_clean is as small and optimized for broadcasting operations, minimizing memory footprint.

 

Thanks

Harish M

Kindly accept it as solution if it solved your problem. Kindly give kudos.

View solution in original post

6 REPLIES 6
Thomaslleblanc
Contributor

Panda(s) data frame has additional options that are not available in spark data frames. The performance differences should be minimal

BhaveshPatel
Honored Contributor

Hi @fo6168 

 

This is how it works:

 

First Python (Pandas DataFrame ) -- > Apache Spark ( Data Lake )   sdf = spark.createDataFrame("df")  -- > Delta Lake ( Data Lakehouse ).         sdf.write.mode("overwrite").format("delta").SaveAsTable("DimTable")

 

Python --> Parquet Table -- > Bronze Tables -- > Silver Tables -- > Gold ( In Memory Engine ).

Thanks & Regards,
Bhavesh

Love the Self Service BI.
Please use the 'Mark as answer' link to mark a post that answers your question. If you find a reply helpful, please remember to give Kudos.
HarishKM
Valued Contributor

@fo6168 Hey,
I will follow below steps to troubleshoot the issue.

for script 1:

1) Consider using multiprocessing to parallelize the computation, leveraging multiple cores on your machine.

Alternatively, use dask library for parallelized dataframe operations.

2) Simplify the inner loop comparison by using vectorized operations where possible with numpy and pandas, though this may require significant changes in how comparisons are performed.

3) Use similarity heuristics or initial filters to reduce the number of candidates to compare before performing detailed fuzzy matching.

 

Script 2: Spark with RapidFuzz

Suggestion: 

1) Instead of fuzzy matching every record, perform a join operation on comparable columns and then apply fuzzy matching only on resulting candidate pairs.

 

df_candidate_pairs = df1_clean.join(df2_clean, df1_clean['some_column'] == df2_clean['some_column'], 'inner')

 

2) UDF can slow down Spark as they often run slower than built-in functions. Consider trying Spark SQL functions available or a pandas_udf.

 

3) Convert the resulting Spark DataFrame to Pandas for final operations that require the session access, bypassing worker restriction issues.

 

4) I will use pandas_udf: Leverage Pandas UDF, which operates on Python objects directly and is executed at Python-level inside the JVM.

 

from pyspark.sql.functions import pandas_udf @pandas_udf(schema, SparkSession.sqlContext) def fuzzy_match_udf(df1_chunk: pd.DataFrame) -> pd.Series: matches = [] for row_i in df1_chunk.itertuples(): # Perform the same logic as defined in your function # Append results to matches return pd.Series(matches) df_matches = df1_clean.withColumn("Matches", fuzzy_match_udf(df1_clean))

 

Ensure df2_clean is as small and optimized for broadcasting operations, minimizing memory footprint.

 

Thanks

Harish M

Kindly accept it as solution if it solved your problem. Kindly give kudos.

v-pgoloju
Honored Contributor

Hi @fo6168,

 

Thank you for reaching out to the Microsoft Fabric Forum Community, and special thanks to @HarishKM , @BhaveshPatel  and @Thomaslleblanc  for prompt and helpful responses.

 

Just following up to see if the responses provided by community members were helpful in addressing the issue.

If one of the responses helped resolve your query, please consider marking it as the Accepted Solution. Feel free to reach out if you need any further clarification or assistance.

 

Best regards,
Prasanna Kumar

v-pgoloju
Honored Contributor

Hi @fo6168,

 

Just following up to see if the Response provided was helpful in resolving your issue. Please feel free to let us know if you need any further assistance.

 

Best regards,

Prasanna Kumar

v-pgoloju
Honored Contributor

Hi @fo6168,

 

Just following up to see if the responses provided by community members were helpful in addressing the issue.

If one of the responses helped resolve your query, please consider marking it as the Accepted Solution. Feel free to reach out if you need any further clarification or assistance.

 

Best regards,
Prasanna Kumar

Helpful resources

Announcements
Users online (9,586)