Documentation Index
Fetch the complete documentation index at: https://sarb-aulakh1236-organization.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Compact Monthly Vendor Activity Table Documentation
This project explores efficient data modeling and compression techniques for NYC Yellow Taxi trip data using PySpark on Microsoft Fabric. The goal is to reduce storage footprint and improve query performance while maintaining the ability to conduct granular day-level analysis.
Code Repo:
Overview: What the Code Does
The notebook ingests raw yellow_tripdata, aggregates it at a daily level, and then compresses it into a monthly-level fact table where each vendor has one row per month. Within this row, daily values (e.g., trip counts, fares, tips) are stored in arrays, ordered using a reverse-day index (e.g., 31st = 0, 30th = 1, etc.).
Full Code Workflow (Step-by-Step)
Step 1: Load Raw Data
from pyspark.sql.functions import *
df = spark.read.table("yellow_tripdata")
df.show(50, truncate=False)
Description: Reads the full NYC Taxi trip data table from Microsoft Fabric Lakehouse. Displays the first 50 rows for inspection.
Step 2: Create Date Columns
from pyspark.sql.functions import dayofmonth, last_day, to_date, trunc
df = df.withColumn("trip_date", to_date("tpepPickupDateTime")) \
.withColumn("activity_month", trunc("tpepPickupDateTime", "MM")) \
.withColumn("reverse_day_index",
dayofmonth(last_day("tpepPickupDateTime")) - dayofmonth("tpepPickupDateTime"))
Description: Adds helper columns to enable time-based aggregation. reverse_day_index allows the final arrays to be reverse-ordered.
Step 3: Aggregate Daily Activity
from pyspark.sql.functions import sum as F_sum, count
daily_summary = df.groupBy("vendorID", "activity_month", "reverse_day_index").agg(
count("*").alias("trip_count"),
F_sum("passengerCount").alias("total_passengerCount"),
F_sum("tripDistance").alias("total_tripDistance"),
F_sum("fareAmount").alias("total_fareAmount"),
F_sum("extra").alias("total_extra"),
F_sum("mtaTax").alias("total_mtaTax"),
F_sum("tipAmount").alias("total_tipAmount"),
F_sum("tollsAmount").alias("total_tollsAmount"),
F_sum("totalAmount").alias("total_totalAmount")
)
daily_summary.orderBy("vendorID", "activity_month", "reverse_day_index").show(50, truncate=False)
Description: Aggregates daily taxi activity per vendor by month and reverse day index.
Sample Output:
+--------+--------------+-------------------+-----------+---------------------+
|vendorID|activity_month|reverse_day_index |trip_count|total_fareAmount |
+--------+--------------+-------------------+-----------+---------------------+
|2 |2009-01-01 |0 |150 |3450.75 |
|2 |2009-01-01 |1 |167 |3872.60 |
...
Step 4: Ensure All Days Are Present
max_days = 31
base_days = spark.createDataFrame([(i,) for i in range(max_days)], ["reverse_day_index"])
vendor_months = daily_summary.select("vendorID", "activity_month").distinct()
full = vendor_months.crossJoin(base_days)
full_with_summary = full.join(daily_summary, ["vendorID", "activity_month", "reverse_day_index"], how="left")
full_with_summary.orderBy("vendorID", "activity_month", "reverse_day_index").show(50, truncate=False)
Description: Builds a full calendar grid for each vendor and month, even if some days had no activity.
Sample Output: Includes NULLs for days with no trips.
Step 5: Prepare Data for Compression
from pyspark.sql.functions import coalesce, lit, when
processed_ordered = full_with_summary.withColumn("activity_flag_val", when(col("trip_count") > 0, 1).otherwise(0)) \
.withColumn("daily_trip_count_val", coalesce(col("trip_count"), lit(0))) \
.withColumn("daily_passenger_count_val", coalesce(col("total_passengerCount"), lit(0))) \
.withColumn("daily_trip_distance_val", coalesce(col("total_tripDistance"), lit(0.0))) \
.withColumn("daily_fare_amount_val", coalesce(col("total_fareAmount"), lit(0.0))) \
.withColumn("daily_extra_amount_val", coalesce(col("total_extra"), lit(0.0))) \
.withColumn("daily_mta_tax_val", coalesce(col("total_mtaTax"), lit(0.0))) \
.withColumn("daily_tip_amount_val", coalesce(col("total_tipAmount"), lit(0.0))) \
.withColumn("daily_tolls_amount_val", coalesce(col("total_tollsAmount"), lit(0.0))) \
.withColumn("daily_total_amount_val", coalesce(col("total_totalAmount"), lit(0.0)))
Description: Handles missing/null values and ensures uniform schema for all array metrics.
Step 6: Build Compact Fact Table with Arrays
from pyspark.sql.functions import collect_list, sort_array, struct
fact_vendor_activity = processed_ordered.groupBy("vendorID", "activity_month").agg(
sort_array(
collect_list(
struct(
col("reverse_day_index").alias("idx"),
col("activity_flag_val").alias("activity_flag"),
col("daily_trip_count_val").alias("trip_count"),
col("daily_passenger_count_val").alias("passenger_count"),
col("daily_trip_distance_val").alias("trip_distance"),
col("daily_fare_amount_val").alias("fare_amount"),
col("daily_extra_amount_val").alias("extra_amount"),
col("daily_mta_tax_val").alias("mta_tax"),
col("daily_tip_amount_val").alias("tip_amount"),
col("daily_tolls_amount_val").alias("tolls_amount"),
col("daily_total_amount_val").alias("total_amount")
)
)
).alias("collected_data")
).withColumn("activity_flags_array", col("collected_data.activity_flag")) \
.withColumn("daily_trip_counts_array", col("collected_data.trip_count")) \
.withColumn("daily_passenger_counts_array", col("collected_data.passenger_count")) \
.withColumn("daily_trip_distances_array", col("collected_data.trip_distance")) \
.withColumn("daily_fare_amounts_array", col("collected_data.fare_amount")) \
.withColumn("daily_extra_amounts_array", col("collected_data.extra_amount")) \
.withColumn("daily_mta_taxes_array", col("collected_data.mta_tax")) \
.withColumn("daily_tip_amounts_array", col("collected_data.tip_amount")) \
.withColumn("daily_tolls_amounts_array", col("collected_data.tolls_amount")) \
.withColumn("daily_total_amounts_array", col("collected_data.total_amount")) \
.drop("collected_data")
fact_vendor_activity.orderBy("vendorID", "activity_month").show(5, truncate=False)
Description: Compresses daily metrics into reverse-ordered arrays.
Step 7: Save Table
fact_vendor_activity.write \
.mode("overwrite") \
.format("delta") \
.saveAsTable("fact_vendor_activity")
Description: Saves the final compact fact table to the Lakehouse.
Step 8: Explode and Validate
-- Explode back to daily granularity
WITH exploded AS (
SELECT
vendorID,
activity_month,
posexplode(daily_trip_counts_array) AS (reverse_day_index, trip_count),
daily_passenger_counts_array,
daily_trip_distances_array,
daily_fare_amounts_array,
daily_extra_amounts_array,
daily_mta_taxes_array,
daily_tip_amounts_array,
daily_tolls_amounts_array,
daily_total_amounts_array
FROM fact_vendor_activity
),
final_daily AS (
SELECT
vendorID,
activity_month,
DATE_SUB(LAST_DAY(activity_month), reverse_day_index) AS trip_date,
trip_count,
daily_passenger_counts_array[reverse_day_index] AS passenger_count,
daily_trip_distances_array[reverse_day_index] AS trip_distance,
daily_fare_amounts_array[reverse_day_index] AS fare_amount,
daily_extra_amounts_array[reverse_day_index] AS extra,
daily_mta_taxes_array[reverse_day_index] AS mta_tax,
daily_tip_amounts_array[reverse_day_index] AS tip_amount,
daily_tolls_amounts_array[reverse_day_index] AS tolls_amount,
daily_total_amounts_array[reverse_day_index] AS total_amount
FROM exploded
)
SELECT * FROM final_daily WHERE vendorID=2 AND activity_month='2019-09-01' ORDER BY 3;
Description: Converts the monthly arrays back into row-level records for verification, visual inspection, or further analysis.
Comparison with Other Tables
Print Sizes & Counts
def print_table_size_and_count(table_name):
desc = spark.sql(f"DESCRIBE DETAIL {table_name}").select("sizeInBytes").collect()[0][0]
rows = spark.sql(f"SELECT COUNT(*) FROM {table_name}").collect()[0][0]
print(f"{table_name}: rows={rows}, size={desc/(1024**2):.2f} MB")
for tbl in ["yellow_tripdata","fact_vendor_daily_activity","fact_vendor_activity"]:
print_table_size_and_count(tbl)
Benefits of This Approach
- Storage Efficiency: Reduces rows by 30× per vendor/month, storing daily metrics in arrays.
- Query Performance: Smaller fact table means faster scans and joins .
- Flexibility: Arrays can be exploded back to day‑level for anomaly detection or trend analysis.
- Reduced Shuffling: Minimal data movement when joining with dimension tables (e.g.,
dim_vendor).
Real-World Use Cases
1. NYC Taxi Vendor Activity
- Traditional Daily Model: 2 vendors × 30 days = 60 rows per month
- Compact Model: 2 rows per month
- Gain: 30× row reduction, less storage, faster dashboards.
2. IoT Device Telemetry (Hypothetical)
- Daily Records: 10 M devices × 30 days = 300 M rows/month
- Compact Model: 10 M rows/month with arrays of daily metrics
- Gain: 30× reduction, enabling scalable time-series analytics.
3. Web Analytics Summary
- Summarize daily user sessions or event counts per site/per month into arrays for quick monthly overviews and drill‑downs.