PySpark with Kafka
Start Zookeeper and Kafka Broker Create a topic and Start Producer
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5
Read from a Kafka Topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "covid_data").load()
# Read only the value field from Kafka topic
lines = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "covid_data")
.load().selectExpr("CAST(value AS STRING)")
Process Data
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
words = lines.select(explode(split(lines.value, " ")).alias("word"))
wordCounts = words.groupBy("word").count()
Output Sink
query = wordCounts.writeStream.outputMode("complete").format("console").start()
Specify offset for Reading Data (Batch Processing)
df = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1, host2:port2")
.option("subscribe", "topic1, topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Syntax
{topic:{partition:offset}}
Offset -2: Earliest Message
Offset -1: Latest Message
Structured Streaming + Kafka Integration Guide - Spark 3.5.0 Documentation