Sending StatsD Metrics and Visualizing in Grafana

Creating a CDC data pipeline: Part 3

Outline

  • Introduction

Introduction

This is the second part in a three-part tutorial describing instructions to create a Microsoft SQL Server CDC (Change Data Capture) data pipeline. However, this tutorial can work as a standalone tutorial to install and use Grafana to visualize metrics.

Example data pipeline from insertion to visualization

By the end of the first two parts of this tutorial, you will have a live Grafana dashboard that plots points for statistics (in this case, fruit sale numbers) aggregated every ten seconds. These statistics will be sent from the Apache Spark instance using StatsD and collected on the Grafana instance using Graphite Carbon.

Step 1: Creating Security Groups and EC2 Instances (~5 min)

NOTE: Refer here for instructions on finding out the CIDR for your default VPC if needed.

Create an AWS instance with the following settings. Accept defaults where details are left unspecified.

Apache Spark AWS Details:

  • Image type: Ubuntu Server 18.04 LTS (HVM)

Step 2: Installing Graphite Carbon, Graphite Web, and StatsD (~15 minutes)

This tutorial is adapted from “How To Install and Use Graphite on an Ubuntu 14.04 Server” by Justin Ellingwood. Please give that tutorial a look for more information on the following, if needed. Some steps are necessarily different.

  • Log into the Ubuntu 18.04 instance using an SSH client of your choice.
sudo apt-get update -y
sudo apt-get upgrade -y
sudo apt-get -y install graphite-web graphite-carbon
Grahpite carbon whisper database prompt
  • Run the following to install Postgresql and some helper packages.
sudo apt-get install -y postgresql libpq-dev python-psycopg2
  • Open a new file called “setup.sql” with a text editor of your choice.
nano ./setup.sql
  • Write the following lines in the file. This will create a user for Graphite with the password “password” when executed.
CREATE USER graphite WITH PASSWORD 'password';
CREATE DATABASE graphite WITH OWNER graphite;
  • Save and exit.
sudo -u postgres psql -f setup.sql
  • Open “/etc/graphite/local_settings.py” using a text editor of your choice.
sudo nano /etc/graphite/local_settings.py
  • Find the following lines and edit them as shown. Remember to delete the old lines.
  • Save and exit.
sudo graphite-manage migrate
sudo graphite-manage migrate --run-syncdb
Running Django migrations
  • Create a superuser for Graphite with the username root and any password you wish.
sudo graphite-manage createsuperuser
Creating a superuser
  • Run the following to allow logging to run without error (you may need to change the permissions to suit your security policies).
sudo chmod -R 777 /var/log/graphite
  • Open “/etc/default/graphite-carbon” using a text editor of your choice.
sudo nano /etc/default/graphite-carbon
  • Change the following line as shown.
CARBON_CACHE_ENABLED=false
CARBON_CACHE_ENABLED=true
Carbon cache configuration
  • Save and exit.
sudo nano /etc/carbon/carbon.conf
  • Change the following line as shown.
ENABLE_LOGROTATION = False
ENABLE_LOGROTATION = True
Carbon configuration
  • Save and exit.
sudo nano /etc/carbon/storage-schemas.conf
  • Add the following setting near the top of the file.
[statsd]
pattern = ^stats.*
retentions = 10s:1d,1m:7d,10m:1y
Storage schemas
  • Save and exit.
sudo apt-get install apache2 libapache2-mod-wsgi -y
  • Disable the default Apache site.
sudo a2dissite 000-default
  • Copy the Graphite Web Apache configuration file to the available sites directory.
sudo cp /usr/share/graphite-web/apache2-graphite.conf /etc/apache2/sites-available
  • Enable the site and reload.
sudo a2ensite apache2-graphite
sudo service apache2 reload
  • Now that Graphite Carbon and Web are installed, StatsD will be installed as an front-end to Graphite.
sudo apt-get install -y git devscripts debhelper dh-systemd
  • NodeJS is also needed as a preliminary package. Use the following to install it.
curl -sL https://deb.nodesource.com/setup_14.x | sudo -E bash -
sudo apt-get install -y nodejs
  • Create a build directory and move into it.
mkdir ~/build
cd ~/build
  • Clone the StatsD repo from GitHub and move into it.
git clone https://github.com/etsy/statsd.git
cd statsd
  • Build the .deb package, move up a directory, and install the package.
dpkg-buildpackage
cd ..
sudo dpkg -i statsd_0.8.6-1_all.deb
  • Move back to the home directory.
cd ~

Step 3: Installing Grafana (~ 5 min)

  • Install the following preliminary packages.
sudo apt-get install -y apt-transport-https
sudo apt-get install -y software-properties-common wget
  • Downlaod and add the GPG key for Grafana (on the same line) and add the Grafana repo.
wget -q -O - https://packages.grafana.com/gpg.key | sudo apt-key add -sudo add-apt-repository "deb https://packages.grafana.com/oss/deb stable main"
Adding Grafana repo
  • Update Apt and install Grafana.
sudo apt-get update -y
sudo apt-get install grafana -y
  • Start the Grafana server and restart Carbon Cache and Graphite Web. Remember to wait a few seconds between stopping and starting Carbon Cache. Running a service “restart” is not advised.
sudo service grafana-server start
sudo service carbon-cache stop

sudo service carbon-cache start
sudo service statsd restart
sudo service apache2 reload
  • Make sure Graphite Web is working by visiting port 80 via a browser from your IP.
Graphite web
  • Make sure the Grafana server is working by visiting port 3000 via a browser from your IP.
Grafana web interface

Step 4: Configuring StatsD (~5 min)

NOTE: If you are following this series of tutorials, this section will apply to your Apache Spark instance and the Python file created on it using the last part of this tutorial.

  • Before configuring Grafana and creating a dashboard, we will start sending metrics to Graphite Carbon using StatsD on the Apache Spark instance.
sudo pip3 install statsd
  • Open the “readkafka.py” file created in the last tutorial.
# Imports and running findspark
import findspark
findspark.init('/etc/spark')
import pyspark
from pyspark import RDD
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
# Spark context details
sc = SparkContext(appName="PythonSparkStreamingKafka")
ssc = StreamingContext(sc,2)
# Creating Kafka direct stream
dks = KafkaUtils.createDirectStream(ssc, ["testDB.dbo.fruit"], {"metadata.broker.list":"|replace with your Kafka private address|:9092"})
# Transforming CDC JSON data to sum fruit numbers
# based on fruit name

def printy(a, b):
listy = b.collect()
for l in listy:
print(l)
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(printy)#Starting Spark context
ssc.start()
ssc.awaitTermination()
  • Add the following lines at the top of the file to use the StatsD library and create a StatsD client.
import statsd
c = statsd.StatsClient('{private IP for Grafana instance}', 8125)
  • Edit the printy function to send StatsD statistics from each RDD.
# Spark context details
def printy(a, b):
listy = b.collect()
for l in listy:
print(l)
def printy(a, b):
listy = b.collect()
for l in listy:
c.incr("{0}.sold".format(l[0]), l[1])
  • The above “incr” function uses the StatsD client to send a StatsD “count” type metric. What is being sent is a UDP packet with a data paylod similar to the following if you are sending sale numbers for fruits.
Apple.sold:10|g
  • The above metric says that 10 apples were sold. The general structure for a StatsD metric is as follows.
metric_name:metric_value|metric_type
  • Click here to learn more about StatsD from their documentation.

Step 5: Starting All Pipeline Services (~10 min)

NOTE: Remember to check any IP address configurations as they might change.

  • RDP into the Windows Server instance.
net start MSSQLSERVER
net start SQLSERVERAGENT
Starting services
  • SSH into the Apache Kafka Ubuntu instance.
/etc/kafka/bin/zookeeper-server-start.sh /etc/kafka/config/zookeeper.properties &> zookeeper_log &/etc/kafka/bin/kafka-server-start.sh /etc/kafka/config/server.properties &> broker_log &/etc/kafka/bin/connect-distributed.sh /etc/kafka/config/connect-distributed.properties &> connect_log &
  • Make sure the Debezium connector is added with the first command. If it isn’t, use the second command to add it again.
curl -H "Accept:application/json" localhost:8083/connectors/;curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "{Private IP Address}", "database.port": "1433", "database.user": "testuser", "database.password": "password!", "database.dbname": "testDB", "database.server.name": "testDB", "table.whitelist": "dbo.fruit", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "dbhistory.fulfillment" } }';
Connector not added
Connector added
  • SSH into the Apache Spark instance.
/etc/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.3 readkafka.py

NOTE: Refer to the first and second parts of this tutorial for more detailed instructions for starting Kafka and MS SQL services.

NOTE: Make sure CDC data is appearing in the topic using a consumer and make sure the connector is installed as it may be deleted when Kafka Connector goes down. You may need to check any IP address configurations.

Step 6: Configuring Grafana and Creating a Dashboard (~10 min)

  • Get to the Grafana web page.
Grafana web interface
  • Input “admin” as the username and “admin” as the password. Then change the password as required.
Changing the admin password
Grafana welcome page
  • Click the “Add your first data source” panel.
Adding a new data source
  • Input the following for the URL.
http://127.0.0.1:80
  • Click the “Save and Test” button and make sure the “Data source is working” notification appears.
Saving the new data source
  • Press the “Back” button.
Creating a new dashboard
  • Click the “Add new panel” button.
Adding a new panel
New live graph
  • Select “Graphite” as your data source.
Changing data source for query
  • Set the following panel display settings.
Display settings
  • Set the following time settings.
Time range
Refresh rate
  • Now RDP back to the Microsoft SQL Server instance.
use testDB;INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Apple', 5);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Pear', 10);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Peach', 20);
INSERT INTO fruit(fruit_name, num_sold)
VALUES ('Watermelon', 25);
  • Open an admin Powershell.
sqlcmd -U testuser -P {your password} -i {location of testdb.sql}
Running an insert
  • Go back to the Grafana browser interface.

NOTE: These options should be available by clicking if Graphite and StatsD were running correctly and if the Spark job has the correct Grafana private IP address for its StatsD client when the table insert happened. Check these services and their configurations if these options are not available.

Selecting metrics
Adding a new series
  • Add another query by clicking the “+ Query” button.
Second series
  • Add two more series in a similar fashion for the other two fruits, Peach and Watermelon.
Third and fourth series
  • Observe the graph and see that the data points have been stacked accurately by value with Watermelon at the top and Apple at the bottom.
Grafana live graph
  • Run more inserts to see the graph change.

Completed Python File

# Imports and running findspark
import findspark
findspark.init('/etc/spark')
import pyspark
from pyspark import RDD
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
import statsd
c = statsd.StatsClient('{private IP for Grafana instance}', 8125)
# Spark context details
sc = SparkContext(appName="PythonSparkStreamingKafka")
ssc = StreamingContext(sc,2)
# Creating Kafka direct stream
dks = KafkaUtils.createDirectStream(ssc, ["testDB.dbo.fruit"], {"metadata.broker.list":"|replace with your Kafka private address|:9092"})
# Transforming CDC JSON data to sum fruit numbers
# based on fruit name

def printy(a, b):
listy = b.collect()
for l in listy:
c.incr("{0}.sold".format(l[0]), l[1])
counts = dks.map(lambda x: json.loads(x[1])).flatMap(lambda dict: dict.items()).filter(lambda items: items[0]=="payload").map(lambda tupler: (tupler[1]["after"]["fruit_name"], tupler[1]["after"]["num_sold"])).reduceByKey(lambda a, b: a+b).foreachRDD(printy)#Starting Spark context
ssc.start()
ssc.awaitTermination()

Conclusion

By following parts one, two, and three of this tutorial, you should have a CDC data pipeline that stores all CDC data, streams and transforms it, and graphs that information live for immediate visual review. Care must be taken because this tutorial assumes you are only working with inserts on the given table. You may need to edit the Spark transformation to filter specific kinds of CDC data based on the “op” parameter in CDC data (discussed in part 2).

Further, you may wish to combine certain services on the same instance such as Kafka and Spark. This is possible, but remember to increase your instance size accordingly.

If you have any questions about any part of this tutorial, please feel free to post a comment on this post or connect with me on LinkedIn.

A man with a passion for information technology.