Streaming Data from MySQL into Apache Kafka

Outline

Introduction

Step 1: Creating Security Groups and EC2 Instances (~15 min)

AWS services menu
Listing AWS VPCs

Installing MySQL and Configuring to Allow Binary Log Reading

sudo yum update -y;
sudo yum upgrade -y;
sudo yum install nano vim wget curl -y;
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm;
sudo rpm -ivh mysql80-community-release-el7-3.noarch.rpm;
sudo yum install mysql-server -y;
sudo systemctl start mysqld.service;
sudo systemctl enable mysqld.service;
sudo grep 'temporary password' /var/log/mysqld.log;
Getting MySQL temp root password
sudo mysql_secure_installation;
Change the password for root? No
Remove anonymous users? Yes
Disallow root login remotely? No
Remove test database and access to it? Yes
Reload privilege tables now? Yes
Running MySQL secure installation script
sudo nano testdb.sql
-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;

-- Create some fruit
CREATE TABLE fruit (
id INTEGER ZEROFILL NOT NULL AUTO_INCREMENT,
fruit_name VARCHAR(255) NOT NULL,
num_sold INTEGER NOT NULL,
PRIMARY KEY(id)
);

-- Insert test values
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
sudo chmod 777 testdb.sql
mysql -u root -p < testdb.sql
sudo nano /etc/my.cnf.d/mysql-server.cnf
[mysql]
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
expire_logs_days=10
gtid_mode=ON
enforce_gtid_consistency=ON

interactive_timeout=600
wait_timeout=600
binlog_rows_query_log_events=ON
mysql -u root -p
CREATE USER 'dbuser'@'%' IDENTIFIED BY 'Password1@';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbuser'@'%';
ALTER USER 'dbuser'@'%' IDENTIFIED WITH mysql_native_password BY 'Password1@';
FLUSH PRIVILEGES;
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = OFF;
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = ON;
SET @@GLOBAL.GTID_MODE = OFF;
SET @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE = ON_PERMISSIVE;
SHOW STATUS LIKE 'ONGOING_ANONYMOUS_TRANSACTION_COUNT';
Ongoing anonymous transaction count should be “0”
SET @@GLOBAL.GTID_MODE = ON;
exit
sudo systemctl restart mysqld

Installing/Configuring Kafka and Debezium Connector

sudo yum update -y;
sudo yum upgrade -y;
sudo yum install nano vim wget curl -y;
sudo nano script.sh
#!/bin/bash
# Installing Java JDK 8
sudo yum install java-1.8.0-openjdk-devel -y
# Downloading Kafka files and untarring
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xvzf kafka_2.12-2.5.0.tgz
# Downloading Debezium files
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.1.1.Final/debezium-connector-mysql-1.1.1.Final-plugin.tar.gz
tar -xvzf debezium-connector-mysql-1.1.1.Final-plugin.tar.gz
# Creating "/etc" directory and moving files in
sudo mkdir /etc/kafka
sudo chown -R centos /etc/kafka
cp -r kafka_2.12-2.5.0/* /etc/kafka/
mkdir /etc/kafka/conns
cp -r debezium-connector-mysql /etc/kafka/conns/
# Adding path to Debezium files to Kafka configs
echo "plugin.path=/etc/kafka/conns" >> /etc/kafka/config/connect-distributed.properties
sudo chmod 777 script.sh
./script.sh
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
Starting Zookeeper, Broker, and Kafka and getting PIDs
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<Private IP of MySQL instance>", "database.port": "3306", "database.user": "dbuser", "database.password": "Password1@", "database.server.id": "8115", "database.allowPublicKeyRetrieval":"true", "database.server.name": "fruitserver", "database.whitelist": "testDB", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment", "include.schema.changes": "true" } }';
Successful POST request for connector
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fruitserver.testDB.fruit --from-beginning
JSON data showing that the connector is working

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store