Streaming Data from Microsoft SQL Server into Apache Kafka

Creating a CDC data pipeline: Part 1

Outline

Introduction

In this three-part tutorial, we will learn how to set up and configure AWS EC2 instances to take Change Data Capture row insertion data from Microsoft SQL Server 2019, collect it in Apache Kafka, aggregate periodically with Apache Spark’s streaming capability, and track the live updates using Grafana.

Part 1 will cover steps for creating an extendable data pipeline between SQL Server 2019 and Apache Kafka. This section of the full CDC-to-Grafana data pipeline will be supported by the Debezium MS SQL Server connector for Apache Kafka.

NOTE: Basic familiarity with creating and using AWS EC2 instances and basic command line operations is assumed. Familiarity with Microsoft SQL Server and Apache Kafka will be helpful for this tutorial, but is not required.

Pipeline diagram for this tutorial.

Step 1: Creating Security Groups and EC2 Instances (~15 min)

Finding correct CIDR for security rules

Before creating the instances, it is best to find out the CIDR for your default VPC. This will be important to create security rules later so that the instances can communicate over the network.

AWS Services menu
Listing AWS VPCs

Creating Instances

Next, Create AWS instances with the following settings. Accept defaults where details are left unspecified.

MS SQL Server 2019 AWS Instance Details:

Apache Kafka AWS Details:

Step 2: Configuring SQL Server for CDC (~15 min)

Windows search for the Management Studio
Selecting the server from the Object Explorer
SQL Server Security settings
Restart notification
Selecting login settings in the Object Explorer
Creating a new user
Setting up server roles
Opening an Admin Powershell
-- Create the test database and enable change tracking
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
ALTER DATABASE testDB
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
-- Create test table and data and enable change tracking
CREATE TABLE fruit (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
fruit_name VARCHAR(255) NOT NULL,
num_sold INTEGER NOT NULL
);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'fruit', @role_name = NULL, @supports_net_changes = 0;
GO
net stop MSSQLSERVER
net start MSSQLSERVER
net start SQLSERVERAGENT
Powershell SQL Server service commands
sqlcmd -U testuser -P {your password} -i {location of testdb.sql}
Creating database and table and enabling change tracking on both

Step 3: Installing/Configuring Kafka and Debezium Connector (~15 min)

touch script.sh
chmod 777 script.sh
Creating a script file
nano script.sh
## Update the repos
sudo apt-get update -y
## Install Java JDK 8
sudo apt-get install openjdk-8-jdk -y
## Download the Kafka files and extract
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xvzf kafka_2.12-2.5.0.tgz
## Download the Debezium files and extract
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.1.1.Final/debezium-connector-sqlserver-1.1.1.Final-plugin.tar.gz
tar -xvzf debezium-connector-sqlserver-1.1.1.Final-plugin.tar.gz
## Copy extracted files into an "/etc" directory
sudo mkdir /etc/kafka
sudo chown -R ubuntu /etc/kafka
cp -r kafka_2.12-2.5.0/* /etc/kafka/
mkdir /etc/kafka/conns
cp -r debezium-connector-sqlserver /etc/kafka/conns/
## Add a config so Kafka Connect can find the Debezium files
echo "plugin.path=/etc/kafka/conns" >> /etc/kafka/config/connect-distributed.properties
## OPTIONAL: Install MSSQL Tools for testing purposes
curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list | sudo tee /etc/apt/sources.list.d/msprod.list
sudo apt-get update -y
sudo apt-get install mssql-tools -y
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bashrc
source ~/.bashrc
./script.sh
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "{Private IP Address}", "database.port": "1433", "database.user": "testuser", "database.password": "password!", "database.dbname": "testDB", "database.server.name": "testDB", "table.whitelist": "dbo.fruit", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment" } }';
/etc/kafka/bin/kafka-topics.sh --list -zookeeper localhost:2181
Listing Kafka topics

Step 4: Reading CDC Topic (~5 min)

/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testDB.dbo.fruit --from-beginning
Consumer output
use testDB;INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
sqlcmd -U testuser -P password! -i {location of insert.sql}
Insert script running
Evidence of table change

Addendum 1: Important Commands Used

Powershell

net {start/stop} MSSQLSERVER
net {start/stop} SQLSERVERAGENT
sqlcmd -U {username} -p {password} -i {filename}

Bash

/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
curl -H "Accept:application/json" localhost:8083/connectors/;
curl -X DELETE localhost:8083/connectors/{connector name};
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{connector data in JSON format}'
/etc/kafka/bin/kafka-topics.sh --list -zookeeper localhost:2181
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {topic name} --from-beginning
/etc/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic {topic name}

Addendum 2: Next Article in the Tutorial

In the next part of this tutorial, we will install Apache Spark onto an Ubuntu 18.04 EC2 instance to stream, transform, and periodically aggregate the CDC data stored in this pipeline. A link will be added HERE when Part 2 is available.

A man with a passion for information technology.