Streaming Data from Apache Kafka Topic using Apache Spark 2.4.7 and Python

Sandeep Kattepogu
10 min readJun 12, 2020

Creating a CDC data pipeline: Part 2

Outline

  • Introduction
  • Creating Security Groups and EC2 Instances (~5 min)
  • Installing/Configuring Spark (~5 min)
  • Starting All Pipeline Services (~10 min)
  • Extracting CDC Row Insertion Data Using Pyspark (~15 min)
  • Running Own Functions on Output
  • Changing the Spark Job to Filter out Deletes and Updates
  • Completed Python File
  • Addendum

Introduction

This is the second part in a three-part tutorial describing instructions to create a Microsoft SQL Server CDC (Change Data Capture) data pipeline. However, this tutorial can work as a standalone tutorial to install Apache Spark 2.4.7 on AWS and use it to read JSON data from a Kafka topic.

Example data pipeline from insertion to transformation

By the end of the first two parts of this tutorial, you will have a Spark job that takes in all new CDC data from the Kafka topic every two seconds. In the case of the “fruit” table, every insertion of a fruit over that two second period will be aggregated such that the total number value for each unique fruit will be counted and displayed.

NOTE: This tutorial assumes you are only working with inserts on the given table. You may need to edit the Spark transformation to filter specific kinds of CDC data based on the “op” parameter in CDC data. This is discussed near the end of tutorial.

Step 1: Creating Security Groups and EC2 Instances (~5 min)

NOTE: this setup assumes you have created an EC2 instance with Kafka installed and running in your default VPC. Refer here for instructions on that if needed.

Create an AWS instance with the following settings. Accept defaults where details are left unspecified.

Apache Spark AWS Details:

  • Image type: Ubuntu Server 18.04 LTS (HVM)
  • Minimum recommended instance type: t2.medium
  • Number of instances: 1
  • Inbound Security Rules: SSH from My IP; All TCP from default VPC CIDR

Step 2: Installing/Configuring Spark (~5 min)

  • Log into the Ubuntu 18.04 instance using an SSH client of your choice.
  • Update the Apt repos and install Java JDK 8.
sudo apt-get update -y;
sudo apt-get install openjdk-8-jdk -y;
  • Download the Spark 2.4.7 package with Hadoop and extract the files.
wget http://mirror.cc.columbia.edu/pub/software/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz;tar -xvzf spark-2.4.7-bin-hadoop2.7.tgz;
  • Make an “/etc” directory for Spark, change the ownership to the “ubuntu” user, and copy the Spark files in.
sudo mkdir /etc/spark;
sudo chown -R ubuntu /etc/spark;
cp -r spark-2.4.7-bin-hadoop2.7/* /etc/spark/;
  • Copy the Spark environment template file.
cp /etc/spark/conf/spark-env.sh.template /etc/spark/conf/spark-env.sh
  • Add the following lines to the end of the copied file so Spark uses Python 3 for Pyspark jobs.
PYSPARK_PYTHON=/usr/bin/python3
PYSPARK_DRIVER_PYTHON=/usr/bin/python3
  • (Optional) Copy the log properties template file and change any instances of “INFO” to “WARN”. This will reduce screen clutter when viewing live Spark streams.
cp /etc/spark/conf/log4j.properties.template /etc/spark/conf/log4j.properties
Editing /etc/spark/conf/log4j.properties
  • Install Pip (Python Package Installer) for Python 3 and install the “findspark” package.
sudo apt-get install python3-pip -y;
sudo pip3 install findspark;
sudo pip3 install pyspark;

Step 3: Starting All Pipeline Services (~10 min)

NOTE: Remember to check any IP address configurations as they might change.

  • RDP into the Windows Server instance.
  • Open an admin Powershell.
  • Make sure the MS SQL Server SQL Server Agent services are running.
net start MSSQLSERVER
net start SQLSERVERAGENT
Starting SQL Server services
  • SSH into the Apache Kafka Ubuntu instance.
  • Start the Kafka Zookeeper, Broker, and Connect programs.
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
  • Make sure the Debezium connector is added with the first command. If it isn’t, edit and use the second command to add it again.
curl -H "Accept:application/json" localhost:8083/connectors/;curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "{Private IP Address}", "database.port": "1433", "database.user": "testuser", "database.password": "password!", "database.dbname": "testDB", "database.server.name": "testDB", "table.whitelist": "dbo.fruit", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment" } }';
Connector not added
Connector added

NOTE: Refer to the first part of this tutorial for more detailed instructions for starting Kafka and MS SQL services.

NOTE: Make sure CDC data is appearing in the topic using a consumer and make sure the connector is installed as it may be deleted when Kafka Connector goes down. You may need to check any IP address configurations.

Step 4: Extracting CDC Row Insertion Data Using Pyspark (~15 min)

Running a Pyspark Job to Read JSON Data from a Kafka Topic

  • Create a file called “readkafka.py”.
touch readkafka.py
  • Open the file with your favorite text editor.
  • Copy the following into the file.
#Imports and running findspark
import findspark
findspark.init('/etc/spark')
import pyspark
from pyspark import RDD
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
#Spark context details
sc = SparkContext(appName="PythonSparkStreamingKafka")
ssc = StreamingContext(sc,2)
#Creating Kafka direct stream
dks = KafkaUtils.createDirectStream(ssc, ["testDB.dbo.fruit"], {"metadata.broker.list":"|replace with your Kafka private address|:9092"})
counts = dks.pprint()
#Starting Spark context
ssc.start()
ssc.awaitTermination()

NOTE: THIS SECTION OF THE TUTORIAL WILL GO OVER ITERATIONS OF THE ABOVE PYTHON FILE. IF YOU WANT THE COMPLETED FILE, SCROLL TO THE BOTTOM OF THIS SECTION.

  • Save the file and close it.
  • Run a Pyspark job by running the below. If it works, you should start seeing timestamps separated by 2 seconds.
/etc/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.3 readkafka.py
Pyspark stream timestamps
  • Start adding data to the Kafka topic by running an insert on the “fruit” table on the MS SQL Server instance. You will see the entire JSON output in the Spark window.
CDC JSON tuple
  • Notice that the JSON data is packaged inside a Python tuple. We will have to edit the Python program to extract the JSON from the tuple.
  • Press “CTRL + C” to end the Spark context.

Extracting JSON data from tuple

  • Change the following line.
# To extract JSON data from the tuple, change this...
counts = dks.pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).pprint()
  • Save the file and run the Pyspark job again.
  • Insert another row into the fruit table.
  • Observe that the Spark window now shows the JSON data extracted from the tuple.
  • Notice that there are two main entries in this JSON list: “schema” and “payload”. In this example, we will need to extract the “payload”. We will begin by isolating the “schema” and the “payload” into separate tuples.
  • Press “CTRL + C” to end the Spark context.

Separating major sections of CDC JSON data

  • Change the following line in “readkafka.py”.
# To separate the schema and the payload, change this...
counts = dks.map(lambda x: json.loads(x[1])).pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).pprint()
  • Save the file and run the Pyspark job again.
  • Insert another row into the fruit table.
  • Observe that the schema and payload sections are separated into different tuples.
Separate schema and payload
  • Now we will need to further transform the data by isolating the payload.
  • Press “CTRL + C” to end the Spark context.

Isolating table change data

  • Change the following line in “readkafka.py”.
# To isolate the payload, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").pprint()
  • Save the file and run the Pyspark job again.
  • Insert another row into the fruit table.
  • Observe that now only the payload is visible.
Payload only
  • We will need to extract the data from the row inserted. This is found in the “after” section of the payload.
  • Press “CTRL + C” to end the Spark context.

Extracting insertion data

  • Change the following line in “readkafka.py”.
# To get insertion data, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]
  • Save the file and run the Pyspark job again.
  • Insert another row into the fruit table.
  • Observe that insertion data is returned in a tuple (this row inserted a fruit called “Apple” with a number value of 5).
Insertion data
  • Observe if multiple insertions happen within two seconds of each other.
Insertion of multiple rows in one transaction
  • The final step is to add the number values based on key. This is called reducing.
  • Press “CTRL + C” to end the Spark context.

Reducing by fruit name

  • Change the following line in “readkafka.py”.
# To get reduce by key, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).pprint()
  • Save the file and run the Pyspark job again.
  • Run at least two insertions within two seconds of each other with the same fruit name. You should see the numbers added together.
With and without reducing by key
  • Press “CTRL + C” to end the Spark context.

Step 5: Running Own Functions on Output

While printing aggregated CDC data is interesting, it is hardly useful. If you want to run your own functions (whether to store the information on the Spark node or stream it elsewhere), changes need to be made to the completed file. One way to do it is to substitute the “pprint()” function for “foreachRDD” so that each reduced set of fruit and totals can have a function run on them.

# To program your own behavior, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(somefunction)

Once this is done, custom functions can be run by replacing “somefunction” above with the function name. Here is an example function that will do the same behavior as “pprint()”, but, by virtue of the format the Kafka data is read into Spark, will leave out superfluous timestamps.

def printy(a, b):
listy = b.collect()
for l in listy:
print(l)
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(printy)
Using a custom function to leave out timestamps

Notice that there are four different aggregation events with no timestamps between them and prints nothing if no insertions happen. With a little bit of editing this function can export these values to a separate program that can track the totals for each fruit over different spans of time. This will be covered in the final part of this tutorial.

Step 6: Changing the Spark Job to Filter out Deletes and Updates

Updates and deletes are not considered. If you require updates and deletes to be filtered out, it will take some work with Python logic and some extra filtering of the JSON data. This will be based on the “op” parameter found at the end of each JSON data string.

Operation parameter for inserting a new row
Operation parameter for updating a row
Operation parameter for deleting a row

Completed Python File

The below file, when submitted as a Spark job with /etc/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.3 readkafka.py, takes in all new CDC data from the Kafka topic every two seconds. In the case of the “fruit” table, every insertion of a fruit over that two second period will be aggregated such that the total number value for each unique fruit will be counted and displayed.

#Imports and running findspark
import findspark
findspark.init('/etc/spark')
import pyspark
from pyspark import RDD
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
#Spark context details
sc = SparkContext(appName="PythonSparkStreamingKafka")
ssc = StreamingContext(sc,2)
#Creating Kafka direct stream
dks = KafkaUtils.createDirectStream(ssc, ["testDB.dbo.fruit"], {"metadata.broker.list":"{replace with your Kafka private address}:9092"})
# Transforming CDC JSON data to sum fruit numbers
# based on fruit name

def printy(a, b):
listy = b.collect()
for l in listy:
print(l)
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(printy)
#Starting Spark context
ssc.start()
ssc.awaitTermination()

Addendum

In the next part of this tutorial, we will install Grafana, Graphite Carbon, and Graphite Web onto an Ubuntu 18.04 EC2 instance to stream and plot the CDC data transformed by Spark. The Spark Python job from this tutorial will also be edited to use StatsD to interface with Graphite Carbon. A link will be added HERE when Part 3 is available.

--

--