Example Spark 3.0.1 Data Transformations in Python

Step-by-step transformations on arbitrary text input

Outline

  • Introduction (including software versions used)
  • Creating AWS Instance
  • Installation (Spark, Kafka)
  • Spark job 1: Output raw data to console
  • Spark job 2: Run custom functions on input and output as new column
  • Spark job 3: Parse JSON and output specific fields
  • Spark job 4: Run SQL functions on streaming dataframe
  • Spark job 5: Using Kafka Topic as sink for Apache Spark stream
  • Conclusion

Introduction

When I started out learning how to use Spark to transform data from Kafka streams, I had some difficulty figuring out what I needed to do. Even getting the example Python Spark files proved somewhat difficult!

Going into Spark without training was difficult for me!

While the Spark streaming documentation is good and many Spark tutorials exist, I wish I had an article that showed me how to read from a Kafka topic and run SQL-like aggregations on the data in a step-by-step manner back then. That is what I hope to do with this tutorial. The initial Spark job we will run is very simple, but by the end, you will have learned how to edit it to do the following:

  1. Read raw data from an arbitrary Kafka topic into a streaming dataframe.
  2. Use user-defined functions to run custom calculations on any dataframe column.
  3. Parse out information from JSON format and outputting to a column.
  4. Run aggregation functions (like max(), min(), or sum()) on the dataframe.
  5. Use Kafka as a data sink instead of the console so that calculations persist after the Spark job ends.

Software Versions

Keep in mind the software versions we will be using for the following tutorial as instructions/techniques can change based on these.

Creating AWS Instance

Create an AWS instance with the following settings. Accept defaults where details are left unspecified.

  • Image type: Ubuntu Server 20.04 LTS (HVM), SSD Volume Type
  • Minimum recommended instance type: t2.large
  • Number of instances: 1
  • Inbound Security Rules: SSH from My IP

Installation

Installing Spark

  • Log in to the Ubuntu 20.04 instance using an SSH client of your choice.
  • Update the Apt repos and install Java JDK 8.
sudo apt-get update -y;
sudo apt-get install openjdk-8-jdk -y;
  • Download the Spark 3.0.1 with Hadoop package and extract the files.
wget https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz;
tar -xvzf spark-3.0.1-bin-hadoop2.7.tgz;

NOTE: Scala 2.12 is pre-built with Spark 3.0.1 and will not need separate installation.

  • Make an “/etc” directory for Spark, change the ownership to the “ubuntu” user, and copy the Spark files in.
sudo mkdir /etc/spark;
sudo chown -R ubuntu /etc/spark;
cp -r spark-3.0.1-bin-hadoop2.7/* /etc/spark/;
  • Copy the Spark environment template file and open it with your favorite text editor.
cp /etc/spark/conf/spark-env.sh.template /etc/spark/conf/spark-env.sh;
nano /etc/spark/conf/spark-env.sh;
  • Copy the following lines to the end of the copied file so Spark uses Python 3 for Pyspark jobs.
PYSPARK_PYTHON=/usr/bin/python3
PYSPARK_DRIVER_PYTHON=/usr/bin/python3
  • (Optional) Copy the log properties template file and change any instances of “INFO” to “WARN”. This will reduce screen clutter when viewing live Spark streams.
cp /etc/spark/conf/log4j.properties.template /etc/spark/conf/log4j.properties;
nano /etc/spark/conf/log4j.properties;
Editing /etc/spark/conf/log4j.properties

Installing Kafka

NOTE: Installing Kafka is optional if you want to use a different streaming source (see here for more options). However, Kafka is ideal because the producer/consumer architecture allows to you easily give arbitrary input to the Spark stream to quickly test your Spark transformations.

  • Download the Kafka 2.7.0 for Scala 2.12 package and extract the files.
wget https://mirrors.ocf.berkeley.edu/apache/kafka/2.7.0/kafka_2.12-2.7.0.tgz;
tar -xvzf kafka_2.12-2.7.0.tgz;
  • Make an “/etc” directory for Kafka, change the ownership to the “ubuntu” user, and copy the Kafka files in.
sudo mkdir /etc/kafka;
sudo chown -R ubuntu /etc/kafka;
cp -r kafka_2.12-2.7.0/* /etc/kafka/;

Starting Kafka

  • Start the Zookeeper, Broker, and Connect with the following commands. The sleep commands ensure that each component has sufficient time to start before the next one begins.
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &sleep 20; /etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &sleep 20; /etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
  • Create a new topic called testtopic.
/etc/kafka/bin/kafka-topics.sh --create --topic testtopic --bootstrap-server localhost:9092
  • Open a producer on the topic that we will use after setting up Spark. It will display a carat and you will be able to type data in. Each time you press <Enter>, information will be posted on the topic. Spark will read this data into a stream later.
/etc/kafka/bin/kafka-console-producer.sh --topic testtopic --bootstrap-server localhost:9092
Creating Kafka topic and producer

Spark job 1: Output raw data to console

NOTE: If you are using Kafka, you will need to open a separate SSH connection for Spark so that you can keep the producer open.

  • Log in to the Ubuntu 20.04 instance in a different terminal.
  • Create and open a file called sparkjob.py.
nano sparkjob.py
  • Copy and paste the following lines into the file. The bold section is the only section that will change throughout the tutorial.
import sys
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
if __name__ == "__main__": # Checking validity of Spark submission command
if len(sys.argv) != 4:
print("Wrong number of args.", file=sys.stderr)
sys.exit(-1)
# Initializing Spark session
spark = SparkSession\
.builder\
.appName("MySparkSession")\
.getOrCreate()
# Setting parameters for the Spark session to read from Kafka
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
# Streaming data from Kafka topic as a dataframe
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()
# Expression that reads in raw data from dataframe as a string
# and names the column "json"
lines = lines\
.selectExpr("CAST(value AS STRING) as json")
# Writing dataframe to console in append mode
query = lines\
.writeStream\
.outputMode("append")\
.format("console")\
.start()
# Terminates the stream on abort
query.awaitTermination()
  • Start the Spark job.
/etc/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 sparkjob.py localhost:9092 subscribe testtopic#===Here are the dependencies that get loaded at job startup==
# Package 1: org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1
# Package 2: org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

— This command may look complicated, but most of the length is just the “packages” option. In this case, we are using Maven coordinates to get dependencies. Read more about Maven coordinates here.

For now, just know that the 3.0.1 bit should match our Spark version and that the 2.12 bit should match our Scala version. See more of Apache Spark’s Maven repos here.

In case you are curious, the 0–10 bit in our Maven coordinates would ideally match our Kafka version, but these dependencies will still work despite our using Kafka 2.7.0. The only thing I could dig up related to this is Kafka’s bidirectional client compatibility. Read here to learn more about the Kafka compatibility policy.

— localhost:9092 is the Kafka bootstrap server we are going to connect through. In many cases, “localhost” will be replaced with the IP of a remote server.

subscribe is the “subscribe type”. Other values exist, but we won’t be using them.

— testtopic is the name of the topic we will be reading from.

  • You will see the following table if everything has gone right.
  • Now go back to the producer and type in the following and the press “Enter”.
{name:”test data”, data:”1,2,3,4"}
  • Check the Spark job. You should see that your data is under the “json” column.
  • You can continue to input more data (does not have to be JSON data) and see the stream update for each piece of information.

Spark job 2: Run custom functions on input and output as new column

  • Stop the Spark job by typing <Ctrl-C>.
  • Open “sparkjob.py” and replace the bold section from Spark Job 1 with this.
# User-defined function used to find and return the length of a 
# string from a given dataframe column

@udf(returnType=StringType())
def retstringlen(column):
return(len(column))
# Expression that reads in raw data from dataframe as a string,
# cacluates the length, and creates a new column to contain the
# length values

lines = lines\
.selectExpr("CAST(value AS STRING) as json")\
.withColumn("LEN", retstringlen(col("json")))
# Writing dataframe to console in append mode
query = lines\
.writeStream\
.outputMode("append")\
.format("console")\
.start()

— What we have done here is create a User-Defined Function that will run on every input and put its result to a new column,”LEN”.

— If we were to simply run the len() function on the column (i.e. len(col(“json”)), the stream would fail because “len” can’t use the Python Spark column type. Using the UDF takes the string from the column and allows us to run calculations on it before returning another Spark datatype, in this case “StringType()”.

— Read more about UDF’s here: https://www.bmc.com/blogs/how-to-write-spark-udf-python/, https://docs.databricks.com/spark/latest/spark-sql/udf-python.html .

  • Start the Spark job.
  • Go back to the producer and and input any string.
  • Go back to the Spark job and notice that the length has been recorded correctly.

Spark job 3: Parse JSON and output specific fields

  • Stop the Spark job by typing <Ctrl-C>.
  • Open “sparkjob.py” and replace the bold section from Spark Job 1 with this.
# User defined function to return values from a JSON string by first 
# converting to a dictionary. The dictionary value returned is based
# on the "name" argument.

@udf(returnType=StringType())
def jsonparse(column, name):
if name == "name":
return str(json.loads(column)["NAME"])
if name == "num":
return str(json.loads(column)["NUM"])
# Expression that reads in raw data from dataframe as a string,
# parses the JSON to find the "NAME" and "NUM" values, and outputs
# the values into their own dataframe columns

lines = lines\
.selectExpr("CAST(value AS STRING) as json")\
.withColumn("NAME", jsonparse(col("json"), lit("name")))\
.withColumn("NUM", jsonparse(col("json"), lit("num")))\
.select("NAME", "NUM")
# Writing dataframe to console in append mode
query = lines\
.writeStream\
.outputMode("append")\
.format("console")\
.start()

— In this iteration, we create a new UDF that takes a piece of information and parses out and returns JSON values based on a parameter (we pass this using the “lit()” method). Then the stream uses those values to create two new columns (adds up to three with the “json” column) and selects only the new columns to display.

— From here on, the data passed to the Kafka topic must be JSON data with at least two fields: NAME and NUMBER.

  • Start the Spark job.
  • Go to the producer and and enter this string:
{“NAME”:”Apple”, “NUM”:”178", “DESC”:”red”}

NOTE: You can replace the values to inlcude any new name, number, or description and the job should not crash. You can even add new fields. However, even missing a quotation mark will cause this Spark job to crash. To prevent bad inputs from crashing the job, you would have to add some kind of error handling in the UDF.

  • Go back to the Spark job and notice that the the “NAME” and “NUM” fields have been parsed out recorded correctly.

Spark job 4: Run SQL functions on streaming dataframe

  • Stop the Spark job by typing <Ctrl-C>.
  • Open “sparkjob.py” and replace the bold section from Spark Job 1 with this.
# User defined function to return values from a JSON string by first 
# converting to a dictionary. The dictionary value returned is based
# on the "name" argument.

@udf(returnType=StringType())
def jsonparse(column, name):
if name == "name":
return str(json.loads(column)["NAME"])
if name == "num":
return str(json.loads(column)["NUM"])
# Expression that reads in raw data from dataframe as a string,
# parses the JSON to find the "NAME" and "NUM" values, outputs the
# values into their own dataframe columns, and then sums the "NUM"
# column by "NAME" before concatenating the columns

lines = lines\
.selectExpr("CAST(value AS STRING) as json")\
.withColumn("NAME", jsonparse(col("json"), lit("name")))\
.withColumn("NUM", jsonparse(col("json"), lit("num")))\
.select("NAME", "NUM")\
.withColumn("NUM", col("NUM").cast("double"))\
.groupBy("NAME").sum()\
.withColumnRenamed("sum(NUM)", "NUM")\
.withColumn("value", concat(col("NAME"), lit(":"), col("NUM")))\
.select("value")
# Writing dataframe to console in complete mode
query = lines\
.writeStream\
.outputMode("complete")\
.format("console")\
.start()

— In this iteration, we cast the “NUM” column as a double type so that when we run the groupBy().sum() function, the numbers in that column get summed together. Then we simply rename the result column, create a new column concatenating the “NAME” and “NUM” values with a colon between them, and select the concatenated value column. The sums get reset every time the Spark job is restarted.

— Another change is the outputMode from append to complete. This is because groupBy functions require either complete or update outputModes to aggregate correctly. Spark would error out if you kept append mode.

— Read more here: https://sparkbyexamples.com/spark/spark-streaming-outputmode/

  • Start the Spark job.
  • Go to the producer and and enter these strings one after another:
{"NAME":"Pear", "NUM":"2", "DESC":"green"}
{"NAME":"Pear", "NUM":"5", "DESC":"green"}
{"NAME":"Apple", "NUM":"10", "DESC":"red"}
  • Go back to the Spark job and notice that the length has been recorded correctly with the “Pear” values being summed to “7”.

Spark job 5: Using Kafka Topic as sink for Apache Spark stream

  • Stop the Spark job by typing <Ctrl-C>.
  • Open “sparkjob.py” and replace the bold section from Spark Job 1 with this.
# User defined function to return values from a JSON string by first 
# converting to a dictionary. The dictionary value returned is based
# on the "name" argument.

@udf(returnType=StringType())
def jsonparse(column, name):
if name == "name":
return str(json.loads(column)["NAME"])
if name == "num":
return str(json.loads(column)["NUM"])
# Expression that reads in raw data from dataframe as a string,
# parses the JSON to find the "NAME" and "NUM" values, outputs the
# values into their own dataframe columns, and then sums the "NUM"
# column by "NAME" before concatenating the columns

lines = lines\
.selectExpr("CAST(value AS STRING) as json")\
.withColumn("NAME", jsonparse(col("json"), lit("name")))\
.withColumn("NUM", jsonparse(col("json"), lit("num")))\
.select("NAME", "NUM")\
.withColumn("NUM", col("NUM").cast("double"))\
.groupBy("NAME").sum()\
.withColumnRenamed("sum(NUM)", "NUM")\
.withColumn("value", concat(col("NAME"), lit(":"), col("NUM")))\
.select("value")
# Writing dataframe to Kafka topic in update mode and storing
# checkpoints in "/home/ubuntu/spark_checkpoints"

query = lines\
.writeStream\
.outputMode("update")\
.format("kafka")\
.option("checkpointLocation", "/home/ubuntu/spark_checkpoints")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option("topic", "trns_spk_data")\
.start()

— In this iteration, we didn’t change the processing of our data. We instead changed where Spark will write our results to: a Kafka topic called “trns_spk_data”. This topic will be created for us.

— Running a Spark job this way allows for computations to be made using all data that has made it into the topic. In other words, additions to the input topic “testtopic” while the Spark job isn’t running will be processed/aggregated when the Spark job is started later and values persist: any aggregates will not be reset. This powerful behavior necessitates checkpointing by Spark and this is done based on a directory path you give to the “checkpointLocation” option.

— We changed the outputMode to “update” so that writes to the sink topic “trns_spk_data” only occur when the sum for a specific “NAME” value changes. This is instead of “complete” where the entire sums table is output whenever there is a change.

  • Start the Spark job.
  • Go to the producer and and enter this string:
{"NAME":"Apple", "NUM":"17", "DESC":"red"}
  • You should not see any tables in the Spark output because all of that is being stored in Kafka.
  • Go back to the producer and and enter this string:
{"NAME":"Apple", "NUM":"23", "DESC":"red"}
  • Now read the “trns_spk_data” topic (you may need to open a new SSH connection or stop your producer temporarily). Notice that the “Apple” value got accumulated to “40”.
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic trns_spk_data --from-beginning
  • Now, to show that changes made to the input topic while the Spark job is stopped get computed, quit the Spark job and enter this string in the producer:
{"NAME":"Apple", "NUM":"40", "DESC":"red"}
  • Restart the Spark job. Don’t worry if you see many “WARN” messages. They are benign.
  • Read the “trns_spk_data” topic again and notice that the new value got accumulated to what we added before.

NOTE: If you need to reset your results topic so you can aggregate new data sets, run these commands to remove the topic and the checkpoints.

rm -rf spark_checkpoints; 
/etc/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic trns_spk_data;

Conclusion

In this tutorial, we have gone over how to program and run Spark jobs using Python. We went over streaming input from a Kafka topic, the different ways we can process the input stream, and the different ways to view output.

Further steps from this tutorial might include getting a real data source to input to the Kafka topic (for example, database change data from a MySQL server using the Debezium connector) and/or creating a new Spark job to read every new aggregation result from the Kafka topic and do something with it such as sending out StatsD metrics to create a constantly updating graph.

A man with a passion for information technology.