Sending StatsD Metrics and Visualizing in Grafana

Creating a CDC data pipeline: Part 3

Outline

Introduction

This is the second part in a three-part tutorial describing instructions to create a Microsoft SQL Server CDC (Change Data Capture) data pipeline. However, this tutorial can work as a standalone tutorial to install and use Grafana to visualize metrics.

Example data pipeline from insertion to visualization

By the end of the first two parts of this tutorial, you will have a live Grafana dashboard that plots points for statistics (in this case, fruit sale numbers) aggregated every ten seconds. These statistics will be sent from the Apache Spark instance using StatsD and collected on the Grafana instance using Graphite Carbon.

Step 1: Creating Security Groups and EC2 Instances (~5 min)

NOTE: Refer here for instructions on finding out the CIDR for your default VPC if needed.

Create an AWS instance with the following settings. Accept defaults where details are left unspecified.

Apache Spark AWS Details:

Step 2: Installing Graphite Carbon, Graphite Web, and StatsD (~15 minutes)

This tutorial is adapted from “How To Install and Use Graphite on an Ubuntu 14.04 Server” by Justin Ellingwood. Please give that tutorial a look for more information on the following, if needed. Some steps are necessarily different.

sudo apt-get update -y
sudo apt-get upgrade -y
sudo apt-get -y install graphite-web graphite-carbon
Grahpite carbon whisper database prompt
sudo apt-get install -y postgresql libpq-dev python-psycopg2
nano ./setup.sql
CREATE USER graphite WITH PASSWORD 'password';
CREATE DATABASE graphite WITH OWNER graphite;
sudo -u postgres psql -f setup.sql
sudo nano /etc/graphite/local_settings.py
sudo graphite-manage migrate
sudo graphite-manage migrate --run-syncdb
Running Django migrations
sudo graphite-manage createsuperuser
Creating a superuser
sudo chmod -R 777 /var/log/graphite
sudo nano /etc/default/graphite-carbon
CARBON_CACHE_ENABLED=false
CARBON_CACHE_ENABLED=true
Carbon cache configuration
sudo nano /etc/carbon/carbon.conf
ENABLE_LOGROTATION = False
ENABLE_LOGROTATION = True
Carbon configuration
sudo nano /etc/carbon/storage-schemas.conf
[statsd]
pattern = ^stats.*
retentions = 10s:1d,1m:7d,10m:1y
Storage schemas
sudo apt-get install apache2 libapache2-mod-wsgi -y
sudo a2dissite 000-default
sudo cp /usr/share/graphite-web/apache2-graphite.conf /etc/apache2/sites-available
sudo a2ensite apache2-graphite
sudo service apache2 reload
sudo apt-get install -y git devscripts debhelper dh-systemd
curl -sL https://deb.nodesource.com/setup_14.x | sudo -E bash -
sudo apt-get install -y nodejs
mkdir ~/build
cd ~/build
git clone https://github.com/etsy/statsd.git
cd statsd
dpkg-buildpackage
cd ..
sudo dpkg -i statsd_0.8.6-1_all.deb
cd ~

Step 3: Installing Grafana (~ 5 min)

sudo apt-get install -y apt-transport-https
sudo apt-get install -y software-properties-common wget
wget -q -O - https://packages.grafana.com/gpg.key | sudo apt-key add -sudo add-apt-repository "deb https://packages.grafana.com/oss/deb stable main"
Adding Grafana repo
sudo apt-get update -y
sudo apt-get install grafana -y
sudo service grafana-server start
sudo service carbon-cache stop

sudo service carbon-cache start
sudo service statsd restart
sudo service apache2 reload
Graphite web
Grafana web interface

Step 4: Configuring StatsD (~5 min)

NOTE: If you are following this series of tutorials, this section will apply to your Apache Spark instance and the Python file created on it using the last part of this tutorial.

sudo pip3 install statsd
# Imports and running findspark
import findspark
findspark.init('/etc/spark')
import pyspark
from pyspark import RDD
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
# Spark context details
sc = SparkContext(appName="PythonSparkStreamingKafka")
ssc = StreamingContext(sc,2)
# Creating Kafka direct stream
dks = KafkaUtils.createDirectStream(ssc, ["testDB.dbo.fruit"], {"metadata.broker.list":"|replace with your Kafka private address|:9092"})
# Transforming CDC JSON data to sum fruit numbers
# based on fruit name

def printy(a, b):
listy = b.collect()
for l in listy:
print(l)
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(printy)#Starting Spark context
ssc.start()
ssc.awaitTermination()
import statsd
c = statsd.StatsClient('{private IP for Grafana instance}', 8125)
# Spark context details
def printy(a, b):
listy = b.collect()
for l in listy:
print(l)
def printy(a, b):
listy = b.collect()
for l in listy:
c.incr("{0}.sold".format(l[0]), l[1])
Apple.sold:10|g
metric_name:metric_value|metric_type

Step 5: Starting All Pipeline Services (~10 min)

NOTE: Remember to check any IP address configurations as they might change.

net start MSSQLSERVER
net start SQLSERVERAGENT
Starting services
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
curl -H "Accept:application/json" localhost:8083/connectors/;curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "{Private IP Address}", "database.port": "1433", "database.user": "testuser", "database.password": "password!", "database.dbname": "testDB", "database.server.name": "testDB", "table.whitelist": "dbo.fruit", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment" } }';
Connector not added
Connector added
/etc/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.3 readkafka.py

NOTE: Refer to the first and second parts of this tutorial for more detailed instructions for starting Kafka and MS SQL services.

NOTE: Make sure CDC data is appearing in the topic using a consumer and make sure the connector is installed as it may be deleted when Kafka Connector goes down. You may need to check any IP address configurations.

Step 6: Configuring Grafana and Creating a Dashboard (~10 min)

Grafana web interface
Changing the admin password
Grafana welcome page
Adding a new data source
http://127.0.0.1:80
Saving the new data source
Creating a new dashboard
Adding a new panel
New live graph
Changing data source for query
Display settings
Time range
Refresh rate
use testDB;INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Watermelon', 25);
sqlcmd -U testuser -P {your password} -i {location of testdb.sql}
Running an insert

NOTE: These options should be available by clicking if Graphite and StatsD were running correctly and if the Spark job has the correct Grafana private IP address for its StatsD client when the table insert happened. Check these services and their configurations if these options are not available.

Selecting metrics
Adding a new series
Second series
Third and fourth series
Grafana live graph

Completed Python File

# Imports and running findspark
import findspark
findspark.init('/etc/spark')
import pyspark
from pyspark import RDD
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import statsd
c = statsd.StatsClient('{private IP for Grafana instance}', 8125)
# Spark context details
sc = SparkContext(appName="PythonSparkStreamingKafka")
ssc = StreamingContext(sc,2)
# Creating Kafka direct stream
dks = KafkaUtils.createDirectStream(ssc, ["testDB.dbo.fruit"], {"metadata.broker.list":"|replace with your Kafka private address|:9092"})
# Transforming CDC JSON data to sum fruit numbers
# based on fruit name

def printy(a, b):
listy = b.collect()
for l in listy:
c.incr("{0}.sold".format(l[0]), l[1])
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(printy)#Starting Spark context
ssc.start()
ssc.awaitTermination()

Conclusion

By following parts one, two, and three of this tutorial, you should have a CDC data pipeline that stores all CDC data, streams and transforms it, and graphs that information live for immediate visual review. Care must be taken because this tutorial assumes you are only working with inserts on the given table. You may need to edit the Spark transformation to filter specific kinds of CDC data based on the “op” parameter in CDC data (discussed in part 2).

Further, you may wish to combine certain services on the same instance such as Kafka and Spark. This is possible, but remember to increase your instance size accordingly.

If you have any questions about any part of this tutorial, please feel free to post a comment on this post or connect with me on LinkedIn.

A man with a passion for information technology.