Streaming Data from MySQL into Apache Kafka

CDC-like data pipeline using MySQL binary logs

Outline

  • Introduction

Introduction

In my previous set of tutorials, I explained how to use the Debezium connector to stream database changes from Microsoft SQL Server. However, Debezium has connectors for many other databases. One of the more popular choices is Oracle’s MySQL and we’ll be going over using Debezium to stream changes from it.

NOTE: This tutorial can more or less stand in for Part 1 in my “Creating a CDC data pipeline” series. There are differences in operating system, however, as we will be using Centos instead of Ubuntu. This means some commands/instructions that worked from that series won’t work on these VMs. Broadly, however, they should be simple to port between.

Step 1: Creating Security Groups and EC2 Instances (~15 min)

Finding correct CIDR for security rules

Before creating the instances, it is best to find out the CIDR for your default VPC. This will be important to create security rules later so that the instances can communicate over the network.

  • Log in to AWS.
AWS services menu
  • Type in “VPC” and select the “VPC-Isolated Cloud Resources” option.
Listing AWS VPCs

Creating Instances

Next, Create AWS instances with the following settings. Accept defaults where details are left unspecified.

MySQL AWS Instance Details:

  • Image type: CentOS 7 (x86_64) — with Updates HVM

Apache Kafka AWS Details:

  • Image type: CentOS 7 (x86_64) — with Updates HVM

Installing MySQL and Configuring to Allow Binary Log Reading

Installation

  • SSH into the MySQL instance.
sudo yum update -y;
sudo yum upgrade -y;
  • Install a text editor of your choice, wget, and curl.
sudo yum install nano vim wget curl -y;
  • Download the MySQL 8 Community Version and install.
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm;
sudo rpm -ivh mysql80-community-release-el7-3.noarch.rpm;
sudo yum install mysql-server -y;
  • Start the MySQL service and enable it to start on system startup.
sudo systemctl start mysqld.service;
sudo systemctl enable mysqld.service;
  • Get the temporary password and remember it for when we run the secure installation script.
sudo grep 'temporary password' /var/log/mysqld.log;
Getting MySQL temp root password
  • Run the secure installation script. It will ask for the temporary password and then force a password change. Something with a capital, a lowercase, and number, and a symbol with good length, like “Password1@”, is enough.
sudo mysql_secure_installation;
  • Use the following settings.
Change the password for root? No
Remove anonymous users? Yes
Disallow root login remotely? No
Remove test database and access to it? Yes
Reload privilege tables now? Yes
Running MySQL secure installation script

Configuration

Now we will be creating a test database and doing some configurations for MySQL that will allow the Debezium connector to have read access on MySQL’s binary logs.

  • Create a new SQL script file.
sudo nano testdb.sql
  • Copy the following into the file.
-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;

-- Create some fruit
CREATE TABLE fruit (
id INTEGER ZEROFILL NOT NULL AUTO_INCREMENT,
fruit_name VARCHAR(255) NOT NULL,
num_sold INTEGER NOT NULL,
PRIMARY KEY(id)
);

-- Insert test values
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
  • Save the file and exit with: Ctrl-X + Y + Enter.
sudo chmod 777 testdb.sql
  • Run the script as MySQL root.
mysql -u root -p < testdb.sql
  • Create the mysql-server.cnf configuration file.
sudo nano /etc/my.cnf.d/mysql-server.cnf
  • Copy the following into the file.
[mysql]
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
expire_logs_days=10
gtid_mode=ON
enforce_gtid_consistency=ON

interactive_timeout=600
wait_timeout=600
binlog_rows_query_log_events=ON
  • Save the file and exit with: Ctrl-X + Y + Enter.
mysql -u root -p
  • Create a new user with username “dbuser” and password “Password1@”, grant some necessary privileges, force MySQL to use the native password module (instead of something like ‘caching_sha2_password’ which will cause authentication issues across the network), and then flush the privileges.
CREATE USER 'dbuser'@'%' IDENTIFIED BY 'Password1@';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbuser'@'%';
ALTER USER 'dbuser'@'%' IDENTIFIED WITH mysql_native_password BY 'Password1@';
FLUSH PRIVILEGES;
  • Set the GTID Consistency and the GTID Mode by changing them step-by-step to the desired state. GTID Mode will be changed once more later.
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = OFF;
SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = ON;
SET @@GLOBAL.GTID_MODE = OFF;
SET @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;
SET @@GLOBAL.GTID_MODE = ON_PERMISSIVE;
  • Before proceeding, make sure the following table shows a value of “0”.
SHOW STATUS LIKE 'ONGOING_ANONYMOUS_TRANSACTION_COUNT';
Ongoing anonymous transaction count should be “0”
  • Change the GTID Mode to “ON” and then exit the MySQL shell.
SET @@GLOBAL.GTID_MODE = ON;
exit
  • Restart the MySQL server.
sudo systemctl restart mysqld

Installing/Configuring Kafka and Debezium Connector

Installation

  • SSH into the Kafka Instance.
sudo yum update -y;
sudo yum upgrade -y;
  • Install a text editor of your choice, wget, and curl.
sudo yum install nano vim wget curl -y;
  • Create a new script file.
sudo nano script.sh
  • Copy the following into the file.
#!/bin/bash
# Installing Java JDK 8
sudo yum install java-1.8.0-openjdk-devel -y
# Downloading Kafka files and untarring
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xvzf kafka_2.12-2.5.0.tgz
# Downloading Debezium files
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.1.1.Final/debezium-connector-mysql-1.1.1.Final-plugin.tar.gz
tar -xvzf debezium-connector-mysql-1.1.1.Final-plugin.tar.gz
# Creating "/etc" directory and moving files in
sudo mkdir /etc/kafka
sudo chown -R centos /etc/kafka
cp -r kafka_2.12-2.5.0/* /etc/kafka/
mkdir /etc/kafka/conns
cp -r debezium-connector-mysql /etc/kafka/conns/
# Adding path to Debezium files to Kafka configs
echo "plugin.path=/etc/kafka/conns" >> /etc/kafka/config/connect-distributed.properties
  • Save the file and exit with: Ctrl-X + Y + Enter.
sudo chmod 777 script.sh
  • Run the script.
./script.sh

Configuration

We will now start the Kafka services and create an instance of the Debezium connector. Then, by creating a Kafka consumer, we will see what information is streamed by Debezium from MySQL.

  • Start Kafka Zookeeper.
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &
  • Start the Kafka Broker.
/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &
  • Start Kafka Connect.
/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
  • NOTE: If you receive any errors on the command line, you may need to start the processes that failed. Use the displayed process IDs to figure out which ones need to be restarted. You may even need to start all the processes over.
Starting Zookeeper, Broker, and Kafka and getting PIDs
  • Once Zookeeper, Broker, and Connect are running, create the Debezium connector with a POST request.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<Private IP of MySQL instance>", "database.port": "3306", "database.user": "dbuser", "database.password": "Password1@", "database.server.id": "8115", "database.allowPublicKeyRetrieval":"true", "database.server.name": "fruitserver", "database.whitelist": "testDB", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment", "include.schema.changes": "true" } }';
Successful POST request for connector
  • Open a consumer on the Change Data topic.
/etc/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fruitserver.testDB.fruit --from-beginning
  • See if the topic is populated with JSON data. Then test to see if an insert on the MySQL database is picked up by the topic.
JSON data showing that the connector is working

A man with a passion for information technology.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store