Streaming Data from Microsoft SQL Server into Apache Kafka

Outline

Introduction

Pipeline diagram for this tutorial.

Step 1: Creating Security Groups and EC2 Instances (~15 min)

AWS Services menu
Listing AWS VPCs

Step 2: Configuring SQL Server for CDC (~15 min)

Windows search for the Management Studio
Selecting the server from the Object Explorer
SQL Server Security settings
Restart notification
Selecting login settings in the Object Explorer
Creating a new user
Setting up server roles
Opening an Admin Powershell
-- Create the test database and enable change tracking
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
ALTER DATABASE testDB
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
-- Create test table and data and enable change tracking
CREATE TABLE fruit (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
fruit_name VARCHAR(255) NOT NULL,
num_sold INTEGER NOT NULL
);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'fruit', @role_name = NULL, @supports_net_changes = 0;
GO
net stop MSSQLSERVER
net start MSSQLSERVER
net start SQLSERVERAGENT
Powershell SQL Server service commands
sqlcmd -U testuser -P {your password} -i {location of testdb.sql}
Creating database and table and enabling change tracking on both

Step 3: Installing/Configuring Kafka and Debezium Connector (~15 min)

touch script.sh
chmod 777 script.sh
Creating a script file
nano script.sh
## Update the repos
sudo apt-get update -y
## Install Java JDK 8
sudo apt-get install openjdk-8-jdk -y
## Download the Kafka files and extract
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xvzf kafka_2.12-2.5.0.tgz
## Download the Debezium files and extract
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.1.1.Final/debezium-connector-sqlserver-1.1.1.Final-plugin.tar.gz
tar -xvzf debezium-connector-sqlserver-1.1.1.Final-plugin.tar.gz
## Copy extracted files into an "/etc" directory
sudo mkdir /etc/kafka
sudo chown -R ubuntu /etc/kafka
cp -r kafka_2.12-2.5.0/* /etc/kafka/
mkdir /etc/kafka/conns
cp -r debezium-connector-sqlserver /etc/kafka/conns/
## Add a config so Kafka Connect can find the Debezium files
echo "plugin.path=/etc/kafka/conns" >> /etc/kafka/config/connect-distributed.properties
## OPTIONAL: Install MSSQL Tools for testing purposes
curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list | sudo tee /etc/apt/sources.list.d/msprod.list
sudo apt-get update -y
sudo apt-get install mssql-tools -y
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bashrc
source ~/.bashrc
./script.sh
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "{Private IP Address}", "database.port": "1433", "database.user": "testuser", "database.password": "password!", "database.dbname": "testDB", "database.server.name": "testDB", "table.whitelist": "dbo.fruit", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment" } }';
/etc/kafka/bin/kafka-topics.sh --list -zookeeper localhost:2181
Listing Kafka topics

Step 4: Reading CDC Topic (~5 min)

/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testDB.dbo.fruit --from-beginning
Consumer output
use testDB;INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
sqlcmd -U testuser -P password! -i {location of insert.sql}
Insert script running
Evidence of table change

Addendum 1: Important Commands Used

net {start/stop} MSSQLSERVER
net {start/stop} SQLSERVERAGENT
sqlcmd -U {username} -p {password} -i {filename}
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
curl -H "Accept:application/json" localhost:8083/connectors/;
curl -X DELETE localhost:8083/connectors/{connector name};
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{connector data in JSON format}'
/etc/kafka/bin/kafka-topics.sh --list -zookeeper localhost:2181
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {topic name} --from-beginning
/etc/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic {topic name}

Addendum 2: Next Article in the Tutorial

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store