Example Spark 3.0.1 Data Transformations in Python

Outline

Introduction

Going into Spark without training was difficult for me!

Creating AWS Instance

Installation

sudo apt-get update -y;
sudo apt-get install openjdk-8-jdk -y;
wget https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz;
tar -xvzf spark-3.0.1-bin-hadoop2.7.tgz;
sudo mkdir /etc/spark;
sudo chown -R ubuntu /etc/spark;
cp -r spark-3.0.1-bin-hadoop2.7/* /etc/spark/;
cp /etc/spark/conf/spark-env.sh.template /etc/spark/conf/spark-env.sh;
nano /etc/spark/conf/spark-env.sh;
PYSPARK_PYTHON=/usr/bin/python3
PYSPARK_DRIVER_PYTHON=/usr/bin/python3
cp /etc/spark/conf/log4j.properties.template /etc/spark/conf/log4j.properties;
nano /etc/spark/conf/log4j.properties;
Editing /etc/spark/conf/log4j.properties
wget https://mirrors.ocf.berkeley.edu/apache/kafka/2.7.0/kafka_2.12-2.7.0.tgz;
tar -xvzf kafka_2.12-2.7.0.tgz;
sudo mkdir /etc/kafka;
sudo chown -R ubuntu /etc/kafka;
cp -r kafka_2.12-2.7.0/* /etc/kafka/;
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &sleep 20; /etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &sleep 20; /etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
/etc/kafka/bin/kafka-topics.sh --create --topic testtopic --bootstrap-server localhost:9092
/etc/kafka/bin/kafka-console-producer.sh --topic testtopic --bootstrap-server localhost:9092
Creating Kafka topic and producer

Spark job 1: Output raw data to console

nano sparkjob.py
import sys
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
if __name__ == "__main__": # Checking validity of Spark submission command
if len(sys.argv) != 4:
print("Wrong number of args.", file=sys.stderr)
sys.exit(-1)
# Initializing Spark session
spark = SparkSession\
.builder\
.appName("MySparkSession")\
.getOrCreate()
# Setting parameters for the Spark session to read from Kafka
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
# Streaming data from Kafka topic as a dataframe
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()
# Expression that reads in raw data from dataframe as a string
# and names the column "json"
lines = lines\
.selectExpr("CAST(value AS STRING) as json")
# Writing dataframe to console in append mode
query = lines\
.writeStream\
.outputMode("append")\
.format("console")\
.start()
# Terminates the stream on abort
query.awaitTermination()
/etc/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 sparkjob.py localhost:9092 subscribe testtopic#===Here are the dependencies that get loaded at job startup==
# Package 1: org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1
# Package 2: org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
{name:”test data”, data:”1,2,3,4"}

Spark job 2: Run custom functions on input and output as new column

# User-defined function used to find and return the length of a 
# string from a given dataframe column

@udf(returnType=StringType())
def retstringlen(column):
return(len(column))
# Expression that reads in raw data from dataframe as a string,
# cacluates the length, and creates a new column to contain the
# length values

lines = lines\
.selectExpr("CAST(value AS STRING) as json")\
.withColumn("LEN", retstringlen(col("json")))
# Writing dataframe to console in append mode
query = lines\
.writeStream\
.outputMode("append")\
.format("console")\
.start()

Spark job 3: Parse JSON and output specific fields

# User defined function to return values from a JSON string by first 
# converting to a dictionary. The dictionary value returned is based
# on the "name" argument.

@udf(returnType=StringType())
def jsonparse(column, name):
if name == "name":
return str(json.loads(column)["NAME"])
if name == "num":
return str(json.loads(column)["NUM"])
# Expression that reads in raw data from dataframe as a string,
# parses the JSON to find the "NAME" and "NUM" values, and outputs
# the values into their own dataframe columns

lines = lines\
.selectExpr("CAST(value AS STRING) as json")\
.withColumn("NAME", jsonparse(col("json"), lit("name")))\
.withColumn("NUM", jsonparse(col("json"), lit("num")))\
.select("NAME", "NUM")
# Writing dataframe to console in append mode
query = lines\
.writeStream\
.outputMode("append")\
.format("console")\
.start()
{“NAME”:”Apple”, “NUM”:”178", “DESC”:”red”}

Spark job 4: Run SQL functions on streaming dataframe

# User defined function to return values from a JSON string by first 
# converting to a dictionary. The dictionary value returned is based
# on the "name" argument.

@udf(returnType=StringType())
def jsonparse(column, name):
if name == "name":
return str(json.loads(column)["NAME"])
if name == "num":
return str(json.loads(column)["NUM"])
# Expression that reads in raw data from dataframe as a string,
# parses the JSON to find the "NAME" and "NUM" values, outputs the
# values into their own dataframe columns, and then sums the "NUM"
# column by "NAME" before concatenating the columns

lines = lines\
.selectExpr("CAST(value AS STRING) as json")\
.withColumn("NAME", jsonparse(col("json"), lit("name")))\
.withColumn("NUM", jsonparse(col("json"), lit("num")))\
.select("NAME", "NUM")\
.withColumn("NUM", col("NUM").cast("double"))\
.groupBy("NAME").sum()\
.withColumnRenamed("sum(NUM)", "NUM")\
.withColumn("value", concat(col("NAME"), lit(":"), col("NUM")))\
.select("value")
# Writing dataframe to console in complete mode
query = lines\
.writeStream\
.outputMode("complete")\
.format("console")\
.start()
{"NAME":"Pear", "NUM":"2", "DESC":"green"}
{"NAME":"Pear", "NUM":"5", "DESC":"green"}
{"NAME":"Apple", "NUM":"10", "DESC":"red"}

Spark job 5: Using Kafka Topic as sink for Apache Spark stream

# User defined function to return values from a JSON string by first 
# converting to a dictionary. The dictionary value returned is based
# on the "name" argument.

@udf(returnType=StringType())
def jsonparse(column, name):
if name == "name":
return str(json.loads(column)["NAME"])
if name == "num":
return str(json.loads(column)["NUM"])
# Expression that reads in raw data from dataframe as a string,
# parses the JSON to find the "NAME" and "NUM" values, outputs the
# values into their own dataframe columns, and then sums the "NUM"
# column by "NAME" before concatenating the columns

lines = lines\
.selectExpr("CAST(value AS STRING) as json")\
.withColumn("NAME", jsonparse(col("json"), lit("name")))\
.withColumn("NUM", jsonparse(col("json"), lit("num")))\
.select("NAME", "NUM")\
.withColumn("NUM", col("NUM").cast("double"))\
.groupBy("NAME").sum()\
.withColumnRenamed("sum(NUM)", "NUM")\
.withColumn("value", concat(col("NAME"), lit(":"), col("NUM")))\
.select("value")
# Writing dataframe to Kafka topic in update mode and storing
# checkpoints in "/home/ubuntu/spark_checkpoints"

query = lines\
.writeStream\
.outputMode("update")\
.format("kafka")\
.option("checkpointLocation", "/home/ubuntu/spark_checkpoints")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option("topic", "trns_spk_data")\
.start()
{"NAME":"Apple", "NUM":"17", "DESC":"red"}
{"NAME":"Apple", "NUM":"23", "DESC":"red"}
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic trns_spk_data --from-beginning
{"NAME":"Apple", "NUM":"40", "DESC":"red"}
rm -rf spark_checkpoints; 
/etc/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic trns_spk_data;

Conclusion

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store