Streaming Data from MySQL into Apache Kafka

Sandeep Kattepogu
6 min readNov 27, 2020

CDC-like data pipeline using MySQL binary logs

Outline

  • Introduction
  • Creating Security Groups and EC2 Instances (~15 min)
  • Installing MySQL and Configuring to Allow Binary Log Reading (~15 min)
  • Installing/Configuring Kafka and Debezium Connector (~15 min)

Introduction

In my previous set of tutorials, I explained how to use the Debezium connector to stream database changes from Microsoft SQL Server. However, Debezium has connectors for many other databases. One of the more popular choices is Oracle’s MySQL and we’ll be going over using Debezium to stream changes from it.

NOTE: This tutorial can more or less stand in for Part 1 in my “Creating a CDC data pipeline” series. There are differences in operating system, however, as we will be using Centos instead of Ubuntu. This means some commands/instructions that worked from that series won’t work on these VMs. Broadly, however, they should be simple to port between.

Step 1: Creating Security Groups and EC2 Instances (~15 min)

Finding correct CIDR for security rules

Before creating the instances, it is best to find out the CIDR for your default VPC. This will be important to create security rules later so that the instances can communicate over the network.

  • Log in to AWS.
  • Open up the Services tab on AWS home.
AWS services menu
  • Type in “VPC” and select the “VPC-Isolated Cloud Resources” option.
  • Select “Your VPCs” from the menu left of the dashboard. Record the number in the “IPv4 CIDR” column.
Listing AWS VPCs

Creating Instances

Next, Create AWS instances with the following settings. Accept defaults where details are left unspecified.

MySQL AWS Instance Details:

  • Image type: CentOS 7 (x86_64) — with Updates HVM
  • Minimum recommended instance type: t2.medium
  • Number of instances: 1
  • Inbound Security Rules: SSH from My IP; All TCP from default VPC IPv4 CIDR

Apache Kafka AWS Details:

  • Image type: CentOS 7 (x86_64) — with Updates HVM
  • Minimum recommended instance type: t2.medium
  • Number of instances: 1
  • Inbound Security Rules: SSH from My IP; All TCP from default VPC CIDR

Installing MySQL and Configuring to Allow Binary Log Reading

Installation

  • SSH into the MySQL instance.
  • Update and upgrade the available software.
sudo yum update -y;
sudo yum upgrade -y;
  • Install a text editor of your choice, wget, and curl.
sudo yum install nano vim wget curl -y;
  • Download the MySQL 8 Community Version and install.
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm;
sudo rpm -ivh mysql80-community-release-el7-3.noarch.rpm;
sudo yum install mysql-server -y;
  • Start the MySQL service and enable it to start on system startup.
sudo systemctl start mysqld.service;
sudo systemctl enable mysqld.service;
  • Get the temporary password and remember it for when we run the secure installation script.
sudo grep 'temporary password' /var/log/mysqld.log;
Getting MySQL temp root password
  • Run the secure installation script. It will ask for the temporary password and then force a password change. Something with a capital, a lowercase, and number, and a symbol with good length, like “Password1@”, is enough.
sudo mysql_secure_installation;
  • Use the following settings.
Change the password for root? No
Remove anonymous users? Yes
Disallow root login remotely? No
Remove test database and access to it? Yes
Reload privilege tables now? Yes
Running MySQL secure installation script

Configuration

Now we will be creating a test database and doing some configurations for MySQL that will allow the Debezium connector to have read access on MySQL’s binary logs.

  • Create a new SQL script file.
sudo nano testdb.sql
  • Copy the following into the file.
-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;

-- Create some fruit
CREATE TABLE fruit (
id INTEGER ZEROFILL NOT NULL AUTO_INCREMENT,
fruit_name VARCHAR(255) NOT NULL,
num_sold INTEGER NOT NULL,
PRIMARY KEY(id)
);

-- Insert test values
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
  • Save the file and exit with: Ctrl-X + Y + Enter.
  • Change permissions on the file to make execution is allowed.
sudo chmod 777 testdb.sql
  • Run the script as MySQL root.
mysql -u root -p < testdb.sql
  • Create the mysql-server.cnf configuration file.
sudo nano /etc/my.cnf.d/mysql-server.cnf
  • Copy the following into the file.
[mysql]
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
expire_logs_days=10
gtid_mode=ON
enforce_gtid_consistency=ON

interactive_timeout=600
wait_timeout=600
binlog_rows_query_log_events=ON
  • Save the file and exit with: Ctrl-X + Y + Enter.
  • Log into the MySQL shell as root.
mysql -u root -p
  • Create a new user with username “dbuser” and password “Password1@”, grant some necessary privileges, force MySQL to use the native password module (instead of something like ‘caching_sha2_password’ which will cause authentication issues across the network), and then flush the privileges.
CREATE USER 'dbuser'@'%' IDENTIFIED BY 'Password1@';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbuser'@'%';
ALTER USER 'dbuser'@'%' IDENTIFIED WITH mysql_native_password BY 'Password1@';
FLUSH PRIVILEGES;
  • Set the GTID Consistency and the GTID Mode by changing them step-by-step to the desired state. GTID Mode will be changed once more later.
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = OFF;
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = ON;
SET @@GLOBAL.GTID_MODE = OFF;
SET @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE = ON_PERMISSIVE;
  • Before proceeding, make sure the following table shows a value of “0”.
SHOW STATUS LIKE 'ONGOING_ANONYMOUS_TRANSACTION_COUNT';
Ongoing anonymous transaction count should be “0”
  • Change the GTID Mode to “ON” and then exit the MySQL shell.
SET @@GLOBAL.GTID_MODE = ON;
exit
  • Restart the MySQL server.
sudo systemctl restart mysqld

Installing/Configuring Kafka and Debezium Connector

Installation

  • SSH into the Kafka Instance.
  • Update and upgrade the available software.
sudo yum update -y;
sudo yum upgrade -y;
  • Install a text editor of your choice, wget, and curl.
sudo yum install nano vim wget curl -y;
  • Create a new script file.
sudo nano script.sh
  • Copy the following into the file.
#!/bin/bash
# Installing Java JDK 8
sudo yum install java-1.8.0-openjdk-devel -y
# Downloading Kafka files and untarring
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xvzf kafka_2.12-2.5.0.tgz
# Downloading Debezium files
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.1.1.Final/debezium-connector-mysql-1.1.1.Final-plugin.tar.gz
tar -xvzf debezium-connector-mysql-1.1.1.Final-plugin.tar.gz
# Creating "/etc" directory and moving files in
sudo mkdir /etc/kafka
sudo chown -R centos /etc/kafka
cp -r kafka_2.12-2.5.0/* /etc/kafka/
mkdir /etc/kafka/conns
cp -r debezium-connector-mysql /etc/kafka/conns/
# Adding path to Debezium files to Kafka configs
echo "plugin.path=/etc/kafka/conns" >> /etc/kafka/config/connect-distributed.properties
  • Save the file and exit with: Ctrl-X + Y + Enter.
  • Make the script runnable.
sudo chmod 777 script.sh
  • Run the script.
./script.sh

Configuration

We will now start the Kafka services and create an instance of the Debezium connector. Then, by creating a Kafka consumer, we will see what information is streamed by Debezium from MySQL.

  • Start Kafka Zookeeper.
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
  • Start the Kafka Broker.
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
  • Start Kafka Connect.
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
  • NOTE: If you receive any errors on the command line, you may need to start the processes that failed. Use the displayed process IDs to figure out which ones need to be restarted. You may even need to start all the processes over.
Starting Zookeeper, Broker, and Kafka and getting PIDs
  • Once Zookeeper, Broker, and Connect are running, create the Debezium connector with a POST request.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<Private IP of MySQL instance>", "database.port": "3306", "database.user": "dbuser", "database.password": "Password1@", "database.server.id": "8115", "database.allowPublicKeyRetrieval":"true", "database.server.name": "fruitserver", "database.whitelist": "testDB", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment", "include.schema.changes": "true" } }';
Successful POST request for connector
  • Open a consumer on the Change Data topic.
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fruitserver.testDB.fruit --from-beginning
  • See if the topic is populated with JSON data. Then test to see if an insert on the MySQL database is picked up by the topic.
JSON data showing that the connector is working

--

--