Project Documentation: Understanding the Customer Order Lifecycle

Welcome to the documentation for this project, which focuses on tracking the complete lifecycle of customer orders, from placement to final delivery. Our objective is to provide comprehensive insights into this process using a data warehousing technique known as “Accumulating Snapshot Fact Modelling.” This document will explain the core concepts and practical applications of this methodology.

Project Overview

This project addresses the critical need for detailed analytics within an online retail environment. Beyond simply recording what was purchased, businesses require a clear understanding of the operational efficiency and duration of key order fulfillment stages. Specifically, this solution provides insights into:
  • The time elapsed between order placement, shipment, and customer delivery.
  • The financial aspects associated with each order.
  • Detailed customer and product information linked to each transaction.
By transforming raw operational data into a structured and analytical format, this project empowers stakeholders with actionable insights, much like converting disparate raw materials into a refined, valuable asset.

Understanding Accumulating Snapshot Fact Modelling

Accumulating Snapshot Fact Modelling is an effective approach for tracking processes that progress through a series of discrete, sequential steps over time. Consider the journey of a typical customer order:
  1. Order Placed: The initial event, marking the beginning of the order’s lifecycle.
  2. Order Confirmed: The system acknowledges the order, indicating the next stage.
  3. Order Prepared: The product is being readied for dispatch.
  4. Order Shipped: The product leaves the warehouse via a carrier.
  5. Order Delivered: The product reaches the customer, signifying completion.
An Accumulating Snapshot Fact Table serves as a central repository that captures and updates key milestones for each ongoing process (in this case, each customer order). Rather than creating new records for every incremental change, it maintains a single, evolving record for each process instance. This record includes:
  • Key Event Timestamps: Such as order_purchase_timestamp, order_delivered_carrier_date, and order_delivered_customer_date.
  • Calculated Durations: Metrics like days_to_ship (time from purchase to carrier delivery) and days_to_deliver (time from carrier delivery to customer delivery).
  • Associated Attributes: Relevant details such as total_price, payment_type, and foreign keys linking to customer and product dimensions.
The “Accumulating” aspect refers to the practice of updating a single fact record as each new event or milestone occurs within the process. This design allows for straightforward calculation of elapsed times between stages and provides a holistic view of the entire process journey on a single row, enhancing analytical efficiency.

Business Scenarios for Accumulating Snapshot Fact Modelling

This modelling technique is highly versatile and applicable to any business process characterized by a defined sequence of steps. Here are several additional scenarios where it can provide significant value:
  1. Customer Onboarding Process:
    • Milestones: Application submission, document verification, account approval, first service activation, initial transaction.
    • Insights: Average onboarding duration, identification of bottlenecks in the customer journey, and conversion rates at each stage.
  2. Loan Application Lifecycle:
    • Milestones: Application receipt, credit assessment, approval, fund disbursement, first repayment.
    • Insights: Time-to-decision, time-to-fund disbursement, and analysis of application drop-off points.
  3. Manufacturing Production Workflow:
    • Milestones: Raw material intake, assembly start, quality control completion, packaging, final shipment.
    • Insights: Production cycle times, identification of delays in specific manufacturing phases, and overall process efficiency.
  4. Healthcare Patient Journey:
    • Milestones: Appointment scheduling, patient check-in, consultation, diagnosis, treatment initiation, discharge.
    • Insights: Patient wait times, duration of various treatment phases, and optimization of patient flow.
  5. Software Development Lifecycle (SDLC):
    • Milestones: Feature request, development commencement, testing phase, bug resolution, production deployment.
    • Insights: Lead time for feature delivery, identification of high-effort or delayed phases, and team productivity analysis.
In these contexts, the Accumulating Snapshot Fact table serves as a robust analytical framework for evaluating the duration, efficiency, and overall performance of multi-step business processes, facilitating data-driven decision-making.

Project Architecture: A Layered Approach

This project employs a structured data architecture, commonly referred to as a “Medallion Architecture” (Bronze, Silver, Gold layers), to systematically build and refine the analytical fact table. Below, we’ll walk through the code snippets that bring each layer to life. Data and code notebook: Data-Warehousing-Types-of-Facts-design/Accumulating Snapshot Fact Modelling at main · Sarb1236/Data-Warehousing-Types-of-Facts-design

1. Bronze Layer: Raw Data Ingestion

The Bronze layer is our initial landing zone for raw, untransformed data. It preserves the original state of the source data, directly loading CSV files into Delta tables. Code Snippet:
# --- Centralized Pipeline Configuration ---
# This configuration drives the entire ETL process, making it highly automated.
# Add new tables here to extend the pipeline.
pipeline_config = {
    "customers": {
        "bronze_path": "Files/customers.csv",
        "silver_rules": {
            "primary_keys": ["customer_id"],
            "cast_cols": {"customer_zip_code_prefix": "integer"},
            "trim_cols": ["customer_city", "customer_state","Customer_Name"],
            "timestamp_cols": [],
            "replace_cols": {}
        },
        "dim_keys": ["customer_id", "customer_city", "customer_state"]
    },
    "products": {
        "bronze_path": "Files/products.csv",
        "silver_rules": {
            "primary_keys": ["product_id"],
            "cast_cols": {
                "product_name_lenght": "integer",
                "product_description_lenght": "integer",
                "product_photos_qty": "integer",
                "product_weight_g": "integer",
                "product_length_cm": "integer",
                "product_height_cm": "integer",
                "product_width_cm": "integer"
            },
            "trim_cols": ["product_category_name"],
            "timestamp_cols": [],
            "replace_cols": {}
        },
        "dim_keys": ["product_id", "product_category_name"]
    },
    "orders": {
        "bronze_path": "Files/orders.csv",
        "silver_rules": {
            "primary_keys": ["order_id"],
            "cast_cols": {},
            "trim_cols": [],
            "timestamp_cols": [
                "order_purchase_timestamp",
                "order_approved_at",
                "order_delivered_carrier_date",
                "order_delivered_customer_date",
                "order_estimated_delivery_date"
            ],
            "replace_cols": {}
        },
        "dim_keys": [] # Not a dimension table
    },
    "order_items": {
        "bronze_path": "Files/order_items.csv",
        "silver_rules": {
            "primary_keys": ["order_id","product_id", "product_id"],
            "cast_cols": {
                "price": "double",
                "freight_value": "double"
            },
            "trim_cols": [],
            "timestamp_cols": ["shipping_limit_date"],
            "replace_cols": {}
        },
        "dim_keys": [] # Not a dimension table
    },
    "payments": {
        "bronze_path": "Files/payments.csv",
        "silver_rules": {
            "primary_keys": ["order_id", "payment_sequential"],
            "cast_cols": {
                "payment_sequential": "integer",
                "payment_installments": "integer",
                "payment_value": "double"
            },
            "trim_cols": [],
            "timestamp_cols": [],
            "replace_cols": {"payment_type": ("_", " ")}
        },
        "dim_keys": [] # Not a dimension table
    }
}

print("--- STEP 1: Loading Raw CSVs into Bronze Delta Tables ---")
for table_name, config in pipeline_config.items():
    path = config["bronze_path"]
    try:
        print(f"Attempting to read {table_name} from {path}...")
        df = spark.read.option("header", "true").option("inferSchema", "true").csv(path)

        # Raising an error if the DataFrame is empty
        if df.count() == 0:
            raise ValueError(f"Error: The CSV file for {table_name} at {path} is empty. Cannot process an empty file.")

        df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"bronze_{table_name}")
        print(f"Successfully loaded {table_name} into bronze_{table_name} Delta table.")

    except Exception as e:
        print(f"Error processing {table_name} from {path}: {e}")
        print("Please check the file path, file existence, and permissions.")
Explanation: This Python code snippet orchestrates the initial data loading. It uses a pipeline_config dictionary to centralize all configurations for various tables (customers, products, orders, etc.). For each table, it specifies the path to the raw CSV file. The code then iteratively reads these CSVs into Spark DataFrames, infers their schema, and writes them as bronze_{table_name} Delta tables. This ensures that our raw data is stored in a robust, versioned format, ready for the next stage.

2. Silver Layer: Data Cleansing and Standardization

In the Silver layer, data from the Bronze layer undergoes initial cleansing, deduplication, type casting, and standardization. This prepares the data for more advanced analytical modelling. Code Snippet:
print("\n--- STEP 2: Creating Silver Layer (Automated Cleaning and Deduplication) ---")
from pyspark.sql.functions import col, to_timestamp, trim, regexp_replace

def process_silver_table(bronze_table_name, silver_table_name, rules):
    """
    Reads data from a bronze table, applies cleaning rules, and writes to a silver table.
    """
    try:
        print(f"Processing {bronze_table_name} for {silver_table_name}")
        df = spark.table(bronze_table_name)

        # Deduplication based on primary keys defined in config
        if rules.get("primary_keys"):
            df = df.dropDuplicates(rules["primary_keys"])

        # Type Casting for specified columns
        for col_name, data_type in rules.get("cast_cols", {}).items():
            if col_name in df.columns:
                df = df.withColumn(col_name, col(col_name).cast(data_type))

        # Trimming whitespace from string columns
        for col_name in rules.get("trim_cols", []):
            if col_name in df.columns:
                df = df.withColumn(col_name, trim(col(col_name)))

        # Timestamp Conversion for date/time columns
        for col_name in rules.get("timestamp_cols", []):
            if col_name in df.columns:
                df = df.withColumn(col_name, to_timestamp(col(col_name)))

        # Regex Replacement for specific patterns (e.g., replacing underscores with spaces)
        for col_name, (pattern, replacement) in rules.get("replace_cols", {}).items():
            if col_name in df.columns:
                df = df.withColumn(col_name, regexp_replace(col(col_name), pattern, replacement))

        df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_table_name)
        print(f"Successfully created {silver_table_name}.")
    except Exception as e:
        print(f"Error processing {bronze_table_name} -> {silver_table_name}: {e}")

# Iterate through the configuration and process each table for the Silver layer
for table_name, config in pipeline_config.items():
    bronze_name = f"bronze_{table_name}"
    silver_name = f"silver_{table_name}"
    process_silver_table(bronze_name, silver_name, config["silver_rules"])
Explanation: This Python function process_silver_table takes a Bronze table, applies a set of cleaning rules defined in the pipeline_config, and writes the cleaned data to a silver_{table_name} Delta table. The rules include deduplication based on primary keys, type casting for numerical and timestamp columns, trimming whitespace from string columns, and performing regular expression replacements. This systematic approach ensures data quality and consistency before it’s used for analytical purposes.

3. Dimension Tables (SCD2): Tracking Historical Changes

We construct dimension tables for key entities such as customers and products. The “SCD2” (Slowly Changing Dimension Type 2) implementation ensures that a complete historical record of changes to these attributes is maintained, providing accurate historical context for analysis. We also create a dim_date table for time-based analysis. Code Snippet (dim_date):
# ---------------------------------------------
# STEP 3: Generate dim_date Table
# ---------------------------------------------
from pyspark.sql.functions import expr, sequence, to_date, col, year, month, dayofweek, dayofmonth, weekofyear, quarter, dayofyear, date_format

# Generate a sequence of dates from 2016 to 2025
date_range = spark.sql("SELECT sequence(to_date('2016-01-01'), to_date('2025-12-31'), interval 1 day) as dates")

# Explode the dates and extract various date components
dim_date = date_range.selectExpr("explode(dates) as full_date") \
                     .withColumn("year", year("full_date")) \
                     .withColumn("month", month("full_date")) \
                     .withColumn("day_of_month", dayofmonth("full_date")) \
                     .withColumn("day_of_week", dayofweek("full_date")) \
                     .withColumn("week_of_year", weekofyear("full_date")) \
                     .withColumn("quarter", quarter("full_date")) \
                     .withColumn("day_of_year", dayofyear("full_date")) \
                     .withColumn("month_name", date_format(col("full_date"), "MMMM")) \
                     .withColumn("day_name", date_format(col("full_date"), "EEEE"))

dim_date.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("dim_date")
print("Successfully created dim_date.")
Explanation (dim_date): This Python code generates a dim_date table, which is crucial for time-based analysis. It creates a sequence of dates for a defined range (2016-2025) and then extracts various time attributes like year, month, day of month, day of week, week of year, quarter, day of year, month name, and day name. This comprehensive date dimension allows for flexible aggregation and filtering of data by different time granularities. Code Snippet (SCD2 Merge):
print("\n--- STEP 4: Applying SCD2 Merge Logic for Dimensions (from Silver) ---")
from pyspark.sql.functions import current_timestamp, lit, md5, concat_ws

def scd2_merge(table_name, business_keys):
    # Read from the silver table
    df = spark.table(f"silver_{table_name}")
    
    # Add SCD2 specific columns: surrogate key (sk), effective start/end dates, and is_current flag
    df = df.withColumn("sk", md5(concat_ws("||", *business_keys))) \
           .withColumn("effective_start_date", current_timestamp()) \
           .withColumn("effective_end_date", lit(None).cast("timestamp")) \
           .withColumn("is_current", lit(True))

    target_table = f"dim_{table_name}"

    # Define columns to exclude from the dynamic schema generation (these are added explicitly)
    excluded_cols = ["sk", "effective_start_date", "effective_end_date", "is_current"]
    
    # Filter out excluded columns and format for SQL DDL
    base_columns_for_ddl = [f"`{col}` {df.schema[col].dataType.simpleString().upper()}" for col in df.columns if col not in excluded_cols]

    # Create the dimension table if it doesn't exist
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {target_table} (
        sk STRING,
        {', '.join(base_columns_for_ddl)},
        effective_start_date TIMESTAMP,
        effective_end_date TIMESTAMP,
        is_current BOOLEAN
    ) USING DELTA
    """)

    df.createOrReplaceTempView("staging")

    # Perform the SCD Type 2 merge
    spark.sql(f"""
    MERGE INTO {target_table} tgt
    USING staging src
    ON tgt.sk = src.sk AND tgt.is_current = TRUE
    WHEN MATCHED THEN
      UPDATE SET tgt.is_current = FALSE, tgt.effective_end_date = current_timestamp()
    WHEN NOT MATCHED THEN
      INSERT *
    """)
    print(f"Successfully applied SCD2 merge to {target_table}.")

# Iterate through the configuration and apply SCD2 to dimension tables
for table_name, config in pipeline_config.items():
    if config.get("dim_keys"): # Check if dim_keys are defined for this table
        scd2_merge(table_name, config["dim_keys"])
    else:
        print(f"Skipping SCD2 merge for {table_name} as it's not configured as a dimension table.")
Explanation (SCD2 Merge): This Python code implements the SCD Type 2 logic for dimension tables like dim_customers and dim_products. The scd2_merge function takes a silver table and its business keys, then adds sk (surrogate key), effective_start_date, effective_end_date, and is_current columns. It then uses a MERGE INTO SQL statement to update existing records (setting is_current to FALSE and effective_end_date for old versions) and insert new or changed records. This ensures that historical changes to customer or product attributes are preserved, allowing for accurate historical reporting.

4. Accumulating Snapshot Fact Table (Gold Layer): The Core of Our Insights

This is the central analytical table, fact_order_lifecycle. It integrates cleaned data from the Silver layer with surrogate keys from the dimension tables. This table is designed to track the critical timestamps and derived durations of the order lifecycle. Code Snippet:
print("\n--- STEP 5: Building Accumulating Snapshot Fact with Surrogate Keys ---")
from pyspark.sql import functions as F

# Read from silver tables
orders = spark.table("silver_orders")
items = spark.table("silver_order_items")
payments = spark.table("silver_payments")

# Read from gold dimensions (which are now populated from silver)
dim_customers = spark.table("dim_customers").filter("is_current = TRUE")
dim_products = spark.table("dim_products").filter("is_current = TRUE")

# Join and derive fact metrics
# Use common column names directly after joins.
# For columns with potential ambiguity or from specific tables, use aliases like 'items.price'
fact_df = orders.alias("orders").join(items.alias("items"), "order_id", "left")  \
    .join(payments.alias("payments"), "order_id", "left") \
    .join(dim_customers.select("customer_id", "sk").withColumnRenamed("sk", "customer_sk"), "customer_id", "left") \
    .join(dim_products.select("product_id", "sk").withColumnRenamed("sk", "product_sk"), "product_id", "left")  \
     .selectExpr(
        "order_id"  , # Directly use order_id as it's the join key and unambiguous
        "customer_sk" ,
         "product_sk" ,
         "order_purchase_timestamp" ,
         "order_delivered_carrier_date" ,
         "order_delivered_customer_date" ,
         "payment_type" ,
         "items.price + items.freight_value as total_price" , # Specify table alias for clarity
         "payments.payment_value", # Specify table alias for clarity
         "datediff(order_delivered_carrier_date, order_purchase_timestamp) as days_to_ship",
         "datediff(order_delivered_customer_date, order_delivered_carrier_date) as days_to_deliver"
    )

fact_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("fact_order_lifecycle")
print("Successfully created fact_order_lifecycle.")
Explanation: This Python code constructs the fact_order_lifecycle table, which is the heart of our analytical solution. It joins the silver_orders, silver_order_items, and silver_payments tables. Crucially, it also joins with the dim_customers and dim_products tables (filtered to is_current = TRUE to get the latest dimension attributes) to bring in the surrogate keys (customer_sk, product_sk). Finally, it selects relevant columns and calculates key metrics like total_price (combining item price and freight value) and the durations (days_to_ship, days_to_deliver) using date difference functions. This table provides a single, comprehensive view of each order’s journey and its associated details.

5. Gold View for Reporting: Simplified Analytics

A final, user-friendly view named gold_order_summary is created. This view simplifies data access for business intelligence tools and reporting, abstracting away the underlying complexity of joins and calculations. Code Snippet:
-- ---------------------------------------------
-- STEP 6: Gold View for Reporting (Power BI or DuckDB)
-- ---------------------------------------------
CREATE OR REPLACE VIEW gold_order_summary AS
SELECT
  f.order_id,
  d.Customer_Name,
    d.customer_city,
  p.product_category_name,
  f.order_purchase_timestamp AS order_date,
  f.total_price,
  f.days_to_ship,
  f.days_to_deliver

FROM fact_order_lifecycle f
LEFT JOIN dim_customers d ON f.customer_sk = d.sk AND d.is_current = TRUE
LEFT JOIN dim_products p ON f.product_sk = p.sk AND p.is_current = TRUE
WHERE f.days_to_deliver IS NOT NULL
Explanation: This SQL code creates a VIEW called gold_order_summary. This view simplifies data consumption for reporting tools by pre-joining the fact_order_lifecycle table with the dim_customers and dim_products tables (again, only considering the current dimension records). It selects key business-friendly columns like Customer_Name, product_category_name, order_date, total_price, and the calculated days_to_ship and days_to_deliver. By providing a view, business users can easily query this consolidated data without needing to understand the underlying table structures or join logic. This layered architectural approach ensures data quality, traceability, and ease of consumption for diverse analytical requirements.