Read File
When reading using wrong format it won’t throw error but the data read will not be proper. Data will only loaded partially
empdf = spark.read.csv("file:///home/ak/datasets/emp.csv")
empdf = spark.read.option("header", "true").option("sep", ",").option("inferSchema", "true").csv("file:///home/ak/datasets/emp.csv")
empdf = spark.read.csv("file:///home/ak/datasets/emp.csv", inferSchema=True, sep=True, header=True)
# Can only read Parquet file
loadParq = spark.read.load("file:///home/ak/datasets/emp.csv")
loadParq = spark.read.format("csv").load("file:///home/ak/datasets/emp.csv")
Write data to Disk
salesDF.write.save("file:///home/ak/data/sparksales")
salesDF.write.mode("overwrite").save("file:///home/ak/data/sparksales") # Replace exiting file
salesDF.repartition(8).write.save("file:///home/ak/data/sparksales")
View structure of data
empdf.printSchema()
empdf.show(<no-of-records>, truncate=False)
empdf.count()
df.describe('<column-name>').show() # Describe the values in the column
View Data
salesIND.select("Region", "Country", "Profit").show()
Number of partitions
empdf.rdd.getNumPartitions()
Change Column Names
retailDF.withColumnRenamed("_c0", "order_id").withColumnRenamed("_c1", "order_date")
retaildataDF = retaildataDF.toDF(*["order_id", "order_date", "order_customer_id", "order_status"])
Replace Values in a Column
df.withColumn('address', F.regexp_replace('address', 'lane', 'ln'))
df.withColumn('address', F.translate('address', 'lane', 'ln'))
df.na.fill({'age': 50, 'name': 'unknown'}).show() (Replace NULL values)
Change Datatype of Column
from pyspark.sql.types import DoubleType, IntegerType, StringType
cases = cases.withColumn('confirmed', F.col('confirmed').cast(IntegerType()))
Creating New Column (Constant Value)
df = df.withColumn("new_column_name", F.lit("constant_value"))
Filter Data
& and, | or, ~ not clause can be used to chain multiple conditions together
salesIND = salesDF.filter("Country = 'India'")
salesIND.select("Region", "Country", "Profit").filter("City='Agra'").show()
salesIND = salesDF.where("Country = 'India'") # Where is an alias for filter
from pyspark.sql.functions import col
salesIND.select("Region", "Country", "Profit").filter(col("Profit") != '0').show()
Remove Duplicate Values
df.distinct().count()
df.dropDuplicates().show()
Convert RDD data to DF
lines = sc.textFile("file:///home/ak/spark-2.4.5-bin-hadoop2.7/examples/src/main/resources/people.txt")
parts=lines.map(lambda line : line.split(","))
from pyspark.sql import Row
people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)
peopledf.show()
pySpark Dataframe Workbook.docx - Google Docs
PySpark_SQL_Cheat_Sheet_Python.pdf - Google Drive
The Most Complete Guide to pySpark DataFrames | Towards Data Science