# --- Centralized Pipeline Configuration ---
# This configuration drives the entire ETL process, making it highly automated.
# Add new tables here to extend the pipeline.
pipeline_config = {
"customers": {
"bronze_path": "Files/customers.csv",
"silver_rules": {
"primary_keys": ["customer_id"],
"cast_cols": {"customer_zip_code_prefix": "integer"},
"trim_cols": ["customer_city", "customer_state","Customer_Name"],
"timestamp_cols": [],
"replace_cols": {}
},
"dim_keys": ["customer_id", "customer_city", "customer_state"]
},
"products": {
"bronze_path": "Files/products.csv",
"silver_rules": {
"primary_keys": ["product_id"],
"cast_cols": {
"product_name_lenght": "integer",
"product_description_lenght": "integer",
"product_photos_qty": "integer",
"product_weight_g": "integer",
"product_length_cm": "integer",
"product_height_cm": "integer",
"product_width_cm": "integer"
},
"trim_cols": ["product_category_name"],
"timestamp_cols": [],
"replace_cols": {}
},
"dim_keys": ["product_id", "product_category_name"]
},
"orders": {
"bronze_path": "Files/orders.csv",
"silver_rules": {
"primary_keys": ["order_id"],
"cast_cols": {},
"trim_cols": [],
"timestamp_cols": [
"order_purchase_timestamp",
"order_approved_at",
"order_delivered_carrier_date",
"order_delivered_customer_date",
"order_estimated_delivery_date"
],
"replace_cols": {}
},
"dim_keys": [] # Not a dimension table
},
"order_items": {
"bronze_path": "Files/order_items.csv",
"silver_rules": {
"primary_keys": ["order_id","product_id", "product_id"],
"cast_cols": {
"price": "double",
"freight_value": "double"
},
"trim_cols": [],
"timestamp_cols": ["shipping_limit_date"],
"replace_cols": {}
},
"dim_keys": [] # Not a dimension table
},
"payments": {
"bronze_path": "Files/payments.csv",
"silver_rules": {
"primary_keys": ["order_id", "payment_sequential"],
"cast_cols": {
"payment_sequential": "integer",
"payment_installments": "integer",
"payment_value": "double"
},
"trim_cols": [],
"timestamp_cols": [],
"replace_cols": {"payment_type": ("_", " ")}
},
"dim_keys": [] # Not a dimension table
}
}
print("--- STEP 1: Loading Raw CSVs into Bronze Delta Tables ---")
for table_name, config in pipeline_config.items():
path = config["bronze_path"]
try:
print(f"Attempting to read {table_name} from {path}...")
df = spark.read.option("header", "true").option("inferSchema", "true").csv(path)
# Raising an error if the DataFrame is empty
if df.count() == 0:
raise ValueError(f"Error: The CSV file for {table_name} at {path} is empty. Cannot process an empty file.")
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"bronze_{table_name}")
print(f"Successfully loaded {table_name} into bronze_{table_name} Delta table.")
except Exception as e:
print(f"Error processing {table_name} from {path}: {e}")
print("Please check the file path, file existence, and permissions.")