Streaming Data from Apache Kafka Topic using Apache Spark 2.4.7 and Python

Outline

Introduction

Example data pipeline from insertion to transformation

Step 1: Creating Security Groups and EC2 Instances (~5 min)

Step 2: Installing/Configuring Spark (~5 min)

sudo apt-get update -y;
sudo apt-get install openjdk-8-jdk -y;
wget http://mirror.cc.columbia.edu/pub/software/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz;tar -xvzf spark-2.4.7-bin-hadoop2.7.tgz;
sudo mkdir /etc/spark;
sudo chown -R ubuntu /etc/spark;
cp -r spark-2.4.7-bin-hadoop2.7/* /etc/spark/;
cp /etc/spark/conf/spark-env.sh.template /etc/spark/conf/spark-env.sh
PYSPARK_PYTHON=/usr/bin/python3
PYSPARK_DRIVER_PYTHON=/usr/bin/python3
cp /etc/spark/conf/log4j.properties.template /etc/spark/conf/log4j.properties
Editing /etc/spark/conf/log4j.properties
sudo apt-get install python3-pip -y;
sudo pip3 install findspark;
sudo pip3 install pyspark;

Step 3: Starting All Pipeline Services (~10 min)

net start MSSQLSERVER
net start SQLSERVERAGENT
Starting SQL Server services
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
curl -H "Accept:application/json" localhost:8083/connectors/;curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "{Private IP Address}", "database.port": "1433", "database.user": "testuser", "database.password": "password!", "database.dbname": "testDB", "database.server.name": "testDB", "table.whitelist": "dbo.fruit", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment" } }';
Connector not added
Connector added

Step 4: Extracting CDC Row Insertion Data Using Pyspark (~15 min)

touch readkafka.py
#Imports and running findspark
import findspark
findspark.init('/etc/spark')
import pyspark
from pyspark import RDD
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
#Spark context details
sc = SparkContext(appName="PythonSparkStreamingKafka")
ssc = StreamingContext(sc,2)
#Creating Kafka direct stream
dks = KafkaUtils.createDirectStream(ssc, ["testDB.dbo.fruit"], {"metadata.broker.list":"|replace with your Kafka private address|:9092"})
counts = dks.pprint()
#Starting Spark context
ssc.start()
ssc.awaitTermination()
/etc/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.3 readkafka.py
Pyspark stream timestamps
CDC JSON tuple
# To extract JSON data from the tuple, change this...
counts = dks.pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).pprint()
# To separate the schema and the payload, change this...
counts = dks.map(lambda x: json.loads(x[1])).pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).pprint()
Separate schema and payload
# To isolate the payload, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").pprint()
Payload only
# To get insertion data, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]
Insertion data
Insertion of multiple rows in one transaction
# To get reduce by key, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).pprint()
With and without reducing by key

Step 5: Running Own Functions on Output

# To program your own behavior, change this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).pprint()
# To this...
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(somefunction)
def printy(a, b):
listy = b.collect()
for l in listy:
print(l)
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(printy)
Using a custom function to leave out timestamps

Step 6: Changing the Spark Job to Filter out Deletes and Updates

Operation parameter for inserting a new row
Operation parameter for updating a row
Operation parameter for deleting a row

Completed Python File

#Imports and running findspark
import findspark
findspark.init('/etc/spark')
import pyspark
from pyspark import RDD
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
#Spark context details
sc = SparkContext(appName="PythonSparkStreamingKafka")
ssc = StreamingContext(sc,2)
#Creating Kafka direct stream
dks = KafkaUtils.createDirectStream(ssc, ["testDB.dbo.fruit"], {"metadata.broker.list":"{replace with your Kafka private address}:9092"})
# Transforming CDC JSON data to sum fruit numbers
# based on fruit name

def printy(a, b):
listy = b.collect()
for l in listy:
print(l)
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(printy)
#Starting Spark context
ssc.start()
ssc.awaitTermination()

Addendum

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store