Enterprise approach on Spark and SCD Type 1

Ravi Pandey
3 min readOct 12, 2022

HI Geeks!!!

If you are Data enthusiast or ever heard about Data warehousing you must must encounter with the word called SCD. If not let me give you a traditional definition for it:-

A Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time in a data warehouse. It is considered and implemented as one of the most critical ETL tasks in tracking the history of dimension records.

So I assume that you know about SCD. I decided to make series of blogs on SCD types where we explore these types in details. So here we start with the first type SCD 1.

Basically in enterprise the data sources are heterogenous and coming from various sources. Our data source might be structured , semi-structured and might be audio or video , images etc. Data might be in various format like database table files, csv, tsv might be JSON, XML text files from forms ,CRM , website etc.

We assume that the data is coming from different sources and landed into object storage let us assume S3 . These makes various object files in S3, so first thing we should do to clean and validate the data according to our business logic. After that we should define a schema for that dataset so we can differentiate between useful dataset and corrupt dataset. We can also filter out important keys from that dataset . It will save your lots of recourses.

We will need to do customized and complex validation so this can be achieved by using udf. Here is an dummy example

from pyspark.sql.functions import col
from pyspark.sql.functions import udf
def exist(code):
if code in codes:
return code
else:
return null
codes = ['ab', 'cd', 'ef', 'gh']
sqlContext.udf.register("check_code_exists", exist)data = [{'amount':100,"code":'ab'},{'amount':500,"code":'vb'}]
df = sqlContext.createDataFrame(data)
exist_udf = udf(exist)
df.select('code',exist_udf("code")).show()

We will automate the data cleaning and data validation using spark and other orchestration tools and save newly validated and cleaned data into a new folder in S3.

Still we have lots of cleaned and validated data files from different sources and it is not advisable to work with multiple datasets. Hence, we need datasets to be joined together. Let me give you an example:-

If there are two datasets, namely that of a customers section and customers Order Details , then both can be joined together via common fields, i.e. customerNumber.
This phase calls for intensive operation since the amount of data can be very large. Automation can be brought into consideration, so that these things are executed, without any human intervention.

Ok let me give you a dummy code to explain it.

import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(‘How to Apply aggregation using PySpark SQL 1’).getOrCreate()

customers_df = spark.read.csv(‘/path/of/customer/data/customers-data.csv’,header=True,inferSchema=True)
orders_df = spark.read.csv(‘/path/of/order/data/orders.csv’,header=True,inferSchema=True)

customers_df.createOrReplaceTempView(“CustomersTbl”) orders_df.createOrReplaceTempView(“OrdersTbl”)

innerjoinquery = spark.sql(“select * from CustomersTbl ct join OrdersTbl ot on (ct.customerNumber = ot.customerNumber) “)

innerjoinquery.show(5)

Initially our data sources are heterogenous and distributed from above two operations we have now now a single source dataset which is unified much cleaner and gives a broad scenario.

Now we have a single source dataset and we will apply various other transformation these transformation intermediate data can be stored as intermediate results according to use case and it can be more than one target dataset possible.

After reading the target file and doing withReplace operation or sql update operation on spark dataframe we will write on same file in overwrite mode. Let us suppose our target data source is stored in location s3a://pysparkcsvs3/pysparks3/csv/sales.csv

innerjoinquery.write.format(‘csv’).option(‘header’,’true’).save(‘s3a://pysparkcsvs3/pysparks3/csv/sales.csv’,mode=’overwrite’)

so using this approach the dataset updated and this is a classic example of SCD 1.

What If we need to record the historical details of update and delete we will discuss these approaches in upcoming blogs of this series .

please fill free to comment and kindly provide your valuable thoughts .

Thank You

https://www.linkedin.com/in/ravi-pandey-0006807183

--

--