trimmedDF = fileStreamDF.select( fileStreamDF.borough, fileStreamDF.year, fileStreamDF.month, fileStreamDF.value)
.withColumnRenamed("value", "convictions")
# Aggregate Function
recordsPerBorough = fileStreamDF.groupBy("borough").count().orderBy("count", ascending=False)
convictionsPerBorough = fileStreamDF.groupBy("borough").agg({"value": "sum"})
.withColumnRenamed("sum(value)", "convictions").orderBy("convictions", ascending=False)
INFO
After performing an aggregate operation on data we cannot again perform an another aggregate operation in Streaming data (Multiple aggregation not supported)
Start Streaming Process
query = trimmedDF.writeStream.outputMode("append").format("console").option("truncate", "false").option("numRows", 40).start()
query = recordsPerBorough.writeStream.outputMode("complete").format("console").option("truncate", "false").option("numRows", 40).start()
query = convictionsPerBorough.writeStream.outputMode("complete").format("console").option("truncate", "false").option("numRows", 40).start()
Using SQL Queries
fileStreamDF.createOrReplaceTempView("LondonCrimeData")
categoryDF = spark.sql("SELECT major_category, value FROM LondonCrimeData WHERE year = '2016'")
convictionsPerCategory = categoryDF.groupBy("major_category").agg({"value": "sum"})
.withColumnRenamed("sum(value)", "convictions").orderBy("convictions", ascending=False)
query = convictionsPerCategory.writeStream.outputMode("complete")
.format("console").option("truncate", "false").option("numRows", 30).start()
INFO
We cannot perform operations on this table using spark.sql() after the stream processing has started