Streaming Data from Microsoft SQL Server into Apache Kafka

Creating a CDC data pipeline: Part 1

Outline

  • Introduction

Introduction

In this three-part tutorial, we will learn how to set up and configure AWS EC2 instances to take Change Data Capture row insertion data from Microsoft SQL Server 2019, collect it in Apache Kafka, aggregate periodically with Apache Spark’s streaming capability, and track the live updates using Grafana.

Part 1 will cover steps for creating an extendable data pipeline between SQL Server 2019 and Apache Kafka. This section of the full CDC-to-Grafana data pipeline will be supported by the Debezium MS SQL Server connector for Apache Kafka.

NOTE: Basic familiarity with creating and using AWS EC2 instances and basic command line operations is assumed. Familiarity with Microsoft SQL Server and Apache Kafka will be helpful for this tutorial, but is not required.

Pipeline diagram for this tutorial.

Step 1: Creating Security Groups and EC2 Instances (~15 min)

Finding correct CIDR for security rules

Before creating the instances, it is best to find out the CIDR for your default VPC. This will be important to create security rules later so that the instances can communicate over the network.

  • Log in to AWS.
AWS Services menu
  • Type in “VPC” and select the “VPC-Isolated Cloud Resources” option.
Listing AWS VPCs

Creating Instances

Next, Create AWS instances with the following settings. Accept defaults where details are left unspecified.

MS SQL Server 2019 AWS Instance Details:

  • Image type: Microsoft Windows Server 2019 with SQL Server 2019 Standard

Apache Kafka AWS Details:

  • Image type: Ubuntu Server 18.04 LTS (HVM)

Step 2: Configuring SQL Server for CDC (~15 min)

  • RDP into the Microsoft Server 2019 instance.
Windows search for the Management Studio
  • Right-click the server in the Object Explorer and select “Properties”.
Selecting the server from the Object Explorer
  • Go to the “Security” menu and select “SQL Server and Windows Authentication mode”. This will allow the Debezium connector to connect to the server to read CDC data live.
SQL Server Security settings
  • Press “OK”. SQL Server will ask to restart. Press “OK”.
Restart notification
  • Now go to the Object Explorer. Expand “Security” and right-click “Logins”. Select “New Login…”.
Selecting login settings in the Object Explorer
  • Create a SQL server user called “testuser”. Use any password you wish by unchecking “Enforce password policy”. Do not click “OK” yet.
Creating a new user
  • Go to the “Server Roles” tab and check the box for “sysadmin”.
Setting up server roles
  • Click “OK”.
Opening an Admin Powershell
  • We will now create a “.sql” file that will create our test database, table, and data along with enabling CDC.
-- Create the test database and enable change tracking
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
ALTER DATABASE testDB
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
-- Create test table and data and enable change tracking
CREATE TABLE fruit (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
fruit_name VARCHAR(255) NOT NULL,
num_sold INTEGER NOT NULL
);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'fruit', @role_name = NULL, @supports_net_changes = 0;
GO
  • Save the file as “testdb.sql”.
net stop MSSQLSERVER
net start MSSQLSERVER
  • Start the SQL Server Agent by typing:
net start SQLSERVERAGENT
Powershell SQL Server service commands
  • Run the “testdb.sql” script by typing:
sqlcmd -U testuser -P {your password} -i {location of testdb.sql}
  • If the script ran successfully, you will see messages that confirm three row changes and no error messages.
Creating database and table and enabling change tracking on both

Step 3: Installing/Configuring Kafka and Debezium Connector (~15 min)

  • Log into the Ubuntu 18.04 instance using an SSH client of your choice.
touch script.sh
  • Give full permissions on the script by typing
chmod 777 script.sh
Creating a script file
  • Open the file by typing:
nano script.sh
  • Copy and paste the below contents into the file. Enter “Y” for Microsoft license agreements if you keep the commands to install optional MS SQL Server tools.
## Update the repos
sudo apt-get update -y
## Install Java JDK 8
sudo apt-get install openjdk-8-jdk -y
## Download the Kafka files and extract
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xvzf kafka_2.12-2.5.0.tgz
## Download the Debezium files and extract
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.1.1.Final/debezium-connector-sqlserver-1.1.1.Final-plugin.tar.gz
tar -xvzf debezium-connector-sqlserver-1.1.1.Final-plugin.tar.gz
## Copy extracted files into an "/etc" directory
sudo mkdir /etc/kafka
sudo chown -R ubuntu /etc/kafka
cp -r kafka_2.12-2.5.0/* /etc/kafka/
mkdir /etc/kafka/conns
cp -r debezium-connector-sqlserver /etc/kafka/conns/
## Add a config so Kafka Connect can find the Debezium files
echo "plugin.path=/etc/kafka/conns" >> /etc/kafka/config/connect-distributed.properties
## OPTIONAL: Install MSSQL Tools for testing purposes
curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list | sudo tee /etc/apt/sources.list.d/msprod.list
sudo apt-get update -y
sudo apt-get install mssql-tools -y
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bashrc
source ~/.bashrc
  • Save the file and exit.
./script.sh
  • Once the files are in place, Kafka can be started.
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
  • Start the Broker by running:
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
  • Start Kafka Connect by running:
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
  • Now we will send a POST request to Kafka to create the MS SQL Server connector with the Debezium files.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "{Private IP Address}", "database.port": "1433", "database.user": "testuser", "database.password": "password!", "database.dbname": "testDB", "database.server.name": "testDB", "table.whitelist": "dbo.fruit", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment" } }';
  • List all topics by typing the below command. If the following two topics are created, the steps have passed successfully.
/etc/kafka/bin/kafka-topics.sh --list -zookeeper localhost:2181
Listing Kafka topics

Step 4: Reading CDC Topic (~5 min)

  • In order to see the CDC JSON data being sent over the network, we will need to create a Kafka consumer on the Ubuntu 18.04 instance that reads the “testDB.dbo.fruit” topic.
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testDB.dbo.fruit --from-beginning
  • You should see a long listing of JSON data.
Consumer output
  • In order to see the topic updated with new data, we will need to insert data into the fruit table.
use testDB;INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
  • Save the file as “insert.sql”.
sqlcmd -U testuser -P password! -i {location of insert.sql}
Insert script running
  • Check the Kafka consumer JSON output. You should see evidence that the rows were added.
Evidence of table change

Addendum 1: Important Commands Used

Powershell

  • Start/stop the SQL Server service:
net {start/stop} MSSQLSERVER
  • Start/stop the SQL Server Agent service:
net {start/stop} SQLSERVERAGENT
  • Run “.sql” scripts:
sqlcmd -U {username} -p {password} -i {filename}

Bash

  • Start Zookeeper in background:
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
  • Start Broker in background:
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
  • Start Connect in background:
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
  • Check running connectors:
curl -H "Accept:application/json" localhost:8083/connectors/;
  • Delete a connector:
curl -X DELETE localhost:8083/connectors/{connector name};
  • Add a connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{connector data in JSON format}'
  • List topics:
/etc/kafka/bin/kafka-topics.sh --list -zookeeper localhost:2181
  • Create consumer and read from beginning:
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {topic name} --from-beginning
  • Delete a topic:
/etc/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic {topic name}

Addendum 2: Next Article in the Tutorial

In the next part of this tutorial, we will install Apache Spark onto an Ubuntu 18.04 EC2 instance to stream, transform, and periodically aggregate the CDC data stored in this pipeline. A link will be added HERE when Part 2 is available.

A man with a passion for information technology.