from pyspark.sql.functions import collect_list, sort_array, struct
fact_vendor_activity = processed_ordered.groupBy("vendorID", "activity_month").agg(
sort_array(
collect_list(
struct(
col("reverse_day_index").alias("idx"),
col("activity_flag_val").alias("activity_flag"),
col("daily_trip_count_val").alias("trip_count"),
col("daily_passenger_count_val").alias("passenger_count"),
col("daily_trip_distance_val").alias("trip_distance"),
col("daily_fare_amount_val").alias("fare_amount"),
col("daily_extra_amount_val").alias("extra_amount"),
col("daily_mta_tax_val").alias("mta_tax"),
col("daily_tip_amount_val").alias("tip_amount"),
col("daily_tolls_amount_val").alias("tolls_amount"),
col("daily_total_amount_val").alias("total_amount")
)
)
).alias("collected_data")
).withColumn("activity_flags_array", col("collected_data.activity_flag")) \
.withColumn("daily_trip_counts_array", col("collected_data.trip_count")) \
.withColumn("daily_passenger_counts_array", col("collected_data.passenger_count")) \
.withColumn("daily_trip_distances_array", col("collected_data.trip_distance")) \
.withColumn("daily_fare_amounts_array", col("collected_data.fare_amount")) \
.withColumn("daily_extra_amounts_array", col("collected_data.extra_amount")) \
.withColumn("daily_mta_taxes_array", col("collected_data.mta_tax")) \
.withColumn("daily_tip_amounts_array", col("collected_data.tip_amount")) \
.withColumn("daily_tolls_amounts_array", col("collected_data.tolls_amount")) \
.withColumn("daily_total_amounts_array", col("collected_data.total_amount")) \
.drop("collected_data")
fact_vendor_activity.orderBy("vendorID", "activity_month").show(5, truncate=False)