trimmedDF = fileStreamDF.select( fileStreamDF.borough, fileStreamDF.year, fileStreamDF.month, fileStreamDF.value)
.withColumnRenamed( "value" , "convictions" )
# Aggregate Function
recordsPerBorough = fileStreamDF.groupBy( "borough" ).count().orderBy( "count" , ascending = False )
convictionsPerBorough = fileStreamDF.groupBy( "borough" ).agg({ "value" : "sum" })
.withColumnRenamed( "sum(value)" , "convictions" ).orderBy( "convictions" , ascending = False )
After performing an aggregate operation on data we cannot again perform an another aggregate operation in Streaming data (Multiple aggregation not supported)
Start Streaming Process
query = trimmedDF.writeStream.outputMode( "append" ).format( "console" ).option( "truncate" , "false" ).option( "numRows" , 40 ).start()
query = recordsPerBorough.writeStream.outputMode( "complete" ).format( "console" ).option( "truncate" , "false" ).option( "numRows" , 40 ).start()
query = convictionsPerBorough.writeStream.outputMode( "complete" ).format( "console" ).option( "truncate" , "false" ).option( "numRows" , 40 ).start()
Using SQL Queries
fileStreamDF.createOrReplaceTempView( "LondonCrimeData" )
categoryDF = spark.sql( "SELECT major_category, value FROM LondonCrimeData WHERE year = '2016'" )
convictionsPerCategory = categoryDF.groupBy( "major_category" ).agg({ "value" : "sum" })
.withColumnRenamed( "sum(value)" , "convictions" ).orderBy( "convictions" , ascending = False )
query = convictionsPerCategory.writeStream.outputMode( "complete" )
.format( "console" ).option( "truncate" , "false" ).option( "numRows" , 30 ).start()
We cannot perform operations on this table using spark.sql() after the stream processing has started