Streaming Data from Apache Kafka Topic using Apache Spark 2.4.7 and Python

Creating a CDC data pipeline: Part 2

Outline

Introduction

This is the second part in a three-part tutorial describing instructions to create a Microsoft SQL Server CDC (Change Data Capture) data pipeline. However, this tutorial can work as a standalone tutorial to install Apache Spark 2.4.7 on AWS and use it to read JSON data from a Kafka topic.

Example data pipeline from insertion to transformation

By the end of the first two parts of this tutorial, you will have a Spark job that takes in all new CDC data from the Kafka topic every two seconds. In the case of the “fruit” table, every insertion of a fruit over that two second period will be aggregated such that the total number value for each unique fruit will be counted and displayed.

NOTE: This tutorial assumes you are only working with inserts on the given table. You may need to edit the Spark transformation to filter specific kinds of CDC data based on the “op” parameter in CDC data. This is discussed near the end of tutorial.

Step 1: Creating Security Groups and EC2 Instances (~5 min)

NOTE: this setup assumes you have created an EC2 instance with Kafka installed and running in your default VPC. Refer here for instructions on that if needed.

Create an AWS instance with the following settings. Accept defaults where details are left unspecified.

Apache Spark AWS Details:

Step 2: Installing/Configuring Spark (~5 min)

sudo apt-get update -y;
sudo apt-get install openjdk-8-jdk -y;
wget http://mirror.cc.columbia.edu/pub/software/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz;tar -xvzf spark-2.4.7-bin-hadoop2.7.tgz;
sudo mkdir /etc/spark;
sudo chown -R ubuntu /etc/spark;
cp -r spark-2.4.7-bin-hadoop2.7/* /etc/spark/;
cp /etc/spark/conf/spark-env.sh.template /etc/spark/conf/spark-env.sh
PYSPARK_PYTHON=/usr/bin/python3
PYSPARK_DRIVER_PYTHON=/usr/bin/python3
cp /etc/spark/conf/log4j.properties.template /etc/spark/conf/log4j.properties
Editing /etc/spark/conf/log4j.properties
sudo apt-get install python3-pip -y;
sudo pip3 install findspark;
sudo pip3 install pyspark;

Step 3: Starting All Pipeline Services (~10 min)

NOTE: Remember to check any IP address configurations as they might change.

net start MSSQLSERVER
net start SQLSERVERAGENT
Starting SQL Server services
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
curl -H "Accept:application/json" localhost:8083/connectors/;curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "{Private IP Address}", "database.port": "1433", "database.user": "testuser", "database.password": "password!", "database.dbname": "testDB", "database.server.name": "testDB", "table.whitelist": "dbo.fruit", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment" } }';
Connector not added
Connector added

NOTE: Refer to the first part of this tutorial for more detailed instructions for starting Kafka and MS SQL services.

NOTE: Make sure CDC data is appearing in the topic using a consumer and make sure the connector is installed as it may be deleted when Kafka Connector goes down. You may need to check any IP address configurations.

Step 4: Extracting CDC Row Insertion Data Using Pyspark (~15 min)

Running a Pyspark Job to Read JSON Data from a Kafka Topic

touch readkafka.py
#Imports and running findspark
import findspark
findspark.init('/etc/spark')
import pyspark
from pyspark import RDD
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
#Spark context details
sc = SparkContext(appName="PythonSparkStreamingKafka")
ssc = StreamingContext(sc,2)
#Creating Kafka direct stream
dks = KafkaUtils.createDirectStream(ssc, ["testDB.dbo.fruit"], {"metadata.broker.list":"|replace with your Kafka private address|:9092"})
counts = dks.pprint()
#Starting Spark context
ssc.start()
ssc.awaitTermination()

NOTE: THIS SECTION OF THE TUTORIAL WILL GO OVER ITERATIONS OF THE ABOVE PYTHON FILE. IF YOU WANT THE COMPLETED FILE, SCROLL TO THE BOTTOM OF THIS SECTION.

/etc/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.3 readkafka.py
Pyspark stream timestamps
CDC JSON tuple

Extracting JSON data from tuple

# To extract JSON data from the tuple, change this...
counts = dks.pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).pprint()

Separating major sections of CDC JSON data

# To separate the schema and the payload, change this...
counts = dks.map(lambda x: json.loads(x[1])).pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).pprint()
Separate schema and payload

Isolating table change data

# To isolate the payload, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").pprint()
Payload only

Extracting insertion data

# To get insertion data, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]
Insertion data
Insertion of multiple rows in one transaction

Reducing by fruit name

# To get reduce by key, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).pprint()
With and without reducing by key

Step 5: Running Own Functions on Output

While printing aggregated CDC data is interesting, it is hardly useful. If you want to run your own functions (whether to store the information on the Spark node or stream it elsewhere), changes need to be made to the completed file. One way to do it is to substitute the “pprint()” function for “foreachRDD” so that each reduced set of fruit and totals can have a function run on them.

# To program your own behavior, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(somefunction)

Once this is done, custom functions can be run by replacing “somefunction” above with the function name. Here is an example function that will do the same behavior as “pprint()”, but, by virtue of the format the Kafka data is read into Spark, will leave out superfluous timestamps.

def printy(a, b):
listy = b.collect()
for l in listy:
print(l)
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(printy)
Using a custom function to leave out timestamps

Notice that there are four different aggregation events with no timestamps between them and prints nothing if no insertions happen. With a little bit of editing this function can export these values to a separate program that can track the totals for each fruit over different spans of time. This will be covered in the final part of this tutorial.

Step 6: Changing the Spark Job to Filter out Deletes and Updates

Updates and deletes are not considered. If you require updates and deletes to be filtered out, it will take some work with Python logic and some extra filtering of the JSON data. This will be based on the “op” parameter found at the end of each JSON data string.

Operation parameter for inserting a new row
Operation parameter for updating a row
Operation parameter for deleting a row

Completed Python File

The below file, when submitted as a Spark job with /etc/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.3 readkafka.py, takes in all new CDC data from the Kafka topic every two seconds. In the case of the “fruit” table, every insertion of a fruit over that two second period will be aggregated such that the total number value for each unique fruit will be counted and displayed.

#Imports and running findspark
import findspark
findspark.init('/etc/spark')
import pyspark
from pyspark import RDD
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
#Spark context details
sc = SparkContext(appName="PythonSparkStreamingKafka")
ssc = StreamingContext(sc,2)
#Creating Kafka direct stream
dks = KafkaUtils.createDirectStream(ssc, ["testDB.dbo.fruit"], {"metadata.broker.list":"{replace with your Kafka private address}:9092"})
# Transforming CDC JSON data to sum fruit numbers
# based on fruit name

def printy(a, b):
listy = b.collect()
for l in listy:
print(l)
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(printy)
#Starting Spark context
ssc.start()
ssc.awaitTermination()

Addendum

In the next part of this tutorial, we will install Grafana, Graphite Carbon, and Graphite Web onto an Ubuntu 18.04 EC2 instance to stream and plot the CDC data transformed by Spark. The Spark Python job from this tutorial will also be edited to use StatsD to interface with Graphite Carbon. A link will be added HERE when Part 3 is available.

A man with a passion for information technology.