Streaming Data from Microsoft SQL Server into Apache Kafka

Sandeep Kattepogu
8 min readJun 9, 2020

Creating a CDC data pipeline: Part 1

Outline

  • Introduction
  • Creating Security Groups and EC2 Instances (~15 min)
  • Configuring SQL Server for CDC (~15 min)
  • Installing/Configuring Kafka and Debezium Connector (~15 min)
  • Reading CDC Topic (~5 min)
  • Addendum 1: Important Commands Used
  • Addendum 2: Next Article in the Tutorial

Introduction

In this three-part tutorial, we will learn how to set up and configure AWS EC2 instances to take Change Data Capture row insertion data from Microsoft SQL Server 2019, collect it in Apache Kafka, aggregate periodically with Apache Spark’s streaming capability, and track the live updates using Grafana.

Part 1 will cover steps for creating an extendable data pipeline between SQL Server 2019 and Apache Kafka. This section of the full CDC-to-Grafana data pipeline will be supported by the Debezium MS SQL Server connector for Apache Kafka.

NOTE: Basic familiarity with creating and using AWS EC2 instances and basic command line operations is assumed. Familiarity with Microsoft SQL Server and Apache Kafka will be helpful for this tutorial, but is not required.

Pipeline diagram for this tutorial.

Step 1: Creating Security Groups and EC2 Instances (~15 min)

Finding correct CIDR for security rules

Before creating the instances, it is best to find out the CIDR for your default VPC. This will be important to create security rules later so that the instances can communicate over the network.

  • Log in to AWS.
  • Open up the Services tab on AWS home.
AWS Services menu
  • Type in “VPC” and select the “VPC-Isolated Cloud Resources” option.
  • Select “Your VPCs” from the menu left of the dashboard. Record the number in the “IPv4 CIDR” column.
Listing AWS VPCs

Creating Instances

Next, Create AWS instances with the following settings. Accept defaults where details are left unspecified.

MS SQL Server 2019 AWS Instance Details:

  • Image type: Microsoft Windows Server 2019 with SQL Server 2019 Standard
  • Minimum recommended instance type: t3a.xlarge
  • Number of instances: 1
  • Inbound Security Rules: RDP from My IP; All TCP from default VPC IPv4 CIDR

Apache Kafka AWS Details:

  • Image type: Ubuntu Server 18.04 LTS (HVM)
  • Minimum recommended instance type: t2.medium
  • Number of instances: 1
  • Inbound Security Rules: SSH from My IP; All TCP from default VPC CIDR

Step 2: Configuring SQL Server for CDC (~15 min)

  • RDP into the Microsoft Server 2019 instance.
  • Open the SQL Server Management Studio by typing “ssms” into Windows search.
Windows search for the Management Studio
  • Right-click the server in the Object Explorer and select “Properties”.
Selecting the server from the Object Explorer
  • Go to the “Security” menu and select “SQL Server and Windows Authentication mode”. This will allow the Debezium connector to connect to the server to read CDC data live.
SQL Server Security settings
  • Press “OK”. SQL Server will ask to restart. Press “OK”.
Restart notification
  • Now go to the Object Explorer. Expand “Security” and right-click “Logins”. Select “New Login…”.
Selecting login settings in the Object Explorer
  • Create a SQL server user called “testuser”. Use any password you wish by unchecking “Enforce password policy”. Do not click “OK” yet.
Creating a new user
  • Go to the “Server Roles” tab and check the box for “sysadmin”.
Setting up server roles
  • Click “OK”.
  • Basic setup is now complete. Close the Management Studio and open up an Admin Powershell window (select from the “Windows key+X” shortcut).
Opening an Admin Powershell
  • We will now create a “.sql” file that will create our test database, table, and data along with enabling CDC.
  • Type “notepad.exe” in the Powershell window. Paste in the following lines.
-- Create the test database and enable change tracking
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
ALTER DATABASE testDB
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
-- Create test table and data and enable change tracking
CREATE TABLE fruit (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
fruit_name VARCHAR(255) NOT NULL,
num_sold INTEGER NOT NULL
);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'fruit', @role_name = NULL, @supports_net_changes = 0;
GO
  • Save the file as “testdb.sql”.
  • Now restart the MS SQL Server service by typing the following commands. Make sure to enter “Y” when required.
net stop MSSQLSERVER
net start MSSQLSERVER
  • Start the SQL Server Agent by typing:
net start SQLSERVERAGENT
Powershell SQL Server service commands
  • Run the “testdb.sql” script by typing:
sqlcmd -U testuser -P {your password} -i {location of testdb.sql}
  • If the script ran successfully, you will see messages that confirm three row changes and no error messages.
Creating database and table and enabling change tracking on both

Step 3: Installing/Configuring Kafka and Debezium Connector (~15 min)

  • Log into the Ubuntu 18.04 instance using an SSH client of your choice.
  • Create a script file by typing:
touch script.sh
  • Give full permissions on the script by typing
chmod 777 script.sh
Creating a script file
  • Open the file by typing:
nano script.sh
  • Copy and paste the below contents into the file. Enter “Y” for Microsoft license agreements if you keep the commands to install optional MS SQL Server tools.
## Update the repos
sudo apt-get update -y
## Install Java JDK 8
sudo apt-get install openjdk-8-jdk -y
## Download the Kafka files and extract
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xvzf kafka_2.12-2.5.0.tgz
## Download the Debezium files and extract
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.1.1.Final/debezium-connector-sqlserver-1.1.1.Final-plugin.tar.gz
tar -xvzf debezium-connector-sqlserver-1.1.1.Final-plugin.tar.gz
## Copy extracted files into an "/etc" directory
sudo mkdir /etc/kafka
sudo chown -R ubuntu /etc/kafka
cp -r kafka_2.12-2.5.0/* /etc/kafka/
mkdir /etc/kafka/conns
cp -r debezium-connector-sqlserver /etc/kafka/conns/
## Add a config so Kafka Connect can find the Debezium files
echo "plugin.path=/etc/kafka/conns" >> /etc/kafka/config/connect-distributed.properties
## OPTIONAL: Install MSSQL Tools for testing purposes
curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list | sudo tee /etc/apt/sources.list.d/msprod.list
sudo apt-get update -y
sudo apt-get install mssql-tools -y
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bashrc
source ~/.bashrc
  • Save the file and exit.
  • Run the script by typing
./script.sh
  • Once the files are in place, Kafka can be started.
  • Start the Zookeeper by running the below command. It will start the Zookeeper with default properties in the background and save output to a file.
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
  • Start the Broker by running:
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
  • Start Kafka Connect by running:
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
  • Now we will send a POST request to Kafka to create the MS SQL Server connector with the Debezium files.
  • To add the connector, enter the following POST request on one line replacing the bolded areas with your own information.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "{Private IP Address}", "database.port": "1433", "database.user": "testuser", "database.password": "password!", "database.dbname": "testDB", "database.server.name": "testDB", "table.whitelist": "dbo.fruit", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment" } }';
  • List all topics by typing the below command. If the following two topics are created, the steps have passed successfully.
/etc/kafka/bin/kafka-topics.sh --list -zookeeper localhost:2181
Listing Kafka topics

Step 4: Reading CDC Topic (~5 min)

  • In order to see the CDC JSON data being sent over the network, we will need to create a Kafka consumer on the Ubuntu 18.04 instance that reads the “testDB.dbo.fruit” topic.
  • Create the consumer by typing:
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testDB.dbo.fruit --from-beginning
  • You should see a long listing of JSON data.
Consumer output
  • In order to see the topic updated with new data, we will need to insert data into the fruit table.
  • RDP into the Windows Server 2019 instance.
  • Open an Admin Powershell.
  • Type “notepad.exe” and paste in the following lines.
use testDB;INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
  • Save the file as “insert.sql”.
  • Type the below command. You should see three alerts for row changes in Powershell.
sqlcmd -U testuser -P password! -i {location of insert.sql}
Insert script running
  • Check the Kafka consumer JSON output. You should see evidence that the rows were added.
Evidence of table change

Addendum 1: Important Commands Used

Powershell

  • Start/stop the SQL Server service:
net {start/stop} MSSQLSERVER
  • Start/stop the SQL Server Agent service:
net {start/stop} SQLSERVERAGENT
  • Run “.sql” scripts:
sqlcmd -U {username} -p {password} -i {filename}

Bash

  • Start Zookeeper in background:
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
  • Start Broker in background:
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
  • Start Connect in background:
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
  • Check running connectors:
curl -H "Accept:application/json" localhost:8083/connectors/;
  • Delete a connector:
curl -X DELETE localhost:8083/connectors/{connector name};
  • Add a connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{connector data in JSON format}'
  • List topics:
/etc/kafka/bin/kafka-topics.sh --list -zookeeper localhost:2181
  • Create consumer and read from beginning:
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {topic name} --from-beginning
  • Delete a topic:
/etc/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic {topic name}

Addendum 2: Next Article in the Tutorial

In the next part of this tutorial, we will install Apache Spark onto an Ubuntu 18.04 EC2 instance to stream, transform, and periodically aggregate the CDC data stored in this pipeline. A link will be added HERE when Part 2 is available.

--

--