Ingesting MQTT Into InfluxDB with Telegraf

Jon Stopple
February 21, 2024

Telegraf makes your life easy moving data from an MQTT broker into an InfluxDB database. There’s practically no coding involved, even. Run the service and watch Influx populate with the proper tags, fields, and measurements as MQTT messages hit your broker. Let’s jump into it.

I’ll assume you’re familiar with MQTT and how to get data into a broker. If not, check out my other articles covering the basics and how to use Node-RED to publish data. I’m also assuming that you’re familiar with InfluxDB, a popular open-source time-series database.

Why Telegraf, anyway?

Good question. Telegraf, aptly named, moves data from one place to another. It is a powerful open-source tool written in Go and based on plugins. It lets you contextualize your data or perform operations on your data to provide full metrics straight to your data store. Let’s cover a few examples.

For this example, I have IoT sensor data being published over MQTT. I want to get that data into InfluxDB. Now, we could have a client setup to read the topics of interest, parse each message, and write a query to push the data to influx. Or, we add a few lines into Telegraf and see our data in Influx.

Or imagine we want to track metrics of our AWS EC2 instance and push that data to multiple MQTT brokers, a file, and for some crazy reason, InfluxDB, MongoDB, and PostgreSQL. Well, Telegraf lets you do that all from a single file.

The image below gives an idea of the extreme functionality of Telegraf.

Telegraf Input Plugins | InfluxData. Sample telegraf plugin options.

Installing Telegraf

Awesome, you’re convinced you need Telegraf! I’ll go through the Windows installation quick. Alternatively, you can download it here from the InfluxDB website for a different OS.

1. Grab files:

Open up an Admin PowerShell terminal and run the following commands.

wget `
https://dl.influxdata.com/telegraf/releases/telegraf-1.29.0_windows_amd64.zip `
-UseBasicParsing `
-OutFile telegraf-1.29.0_windows_amd64.zip `
Expand-Archive .\telegraf-1.29.0_windows_amd64.zip `
-DestinationPath 'C:\Program Files\InfluxData\telegraf\'

2. Organize files:

You’ll want to move your telegraf.conf and telegraf.exe from your versioned folder and into the main folder, C:\Program Files\InfluxData\telegraf. It’ll look like this:

C:\Program Files\InfluxData\telegraf directory after moving the files.

3. Install as a service:

Run the command below (from your telegraf folder) and you should see Telegraf in your list of services.

.\telegraf.exe --service install `--config "C:\Program Files\InfluxData\telegraf\telegraf.conf"

4. Run and test:

From your services list, you can now start the Telegraf service and set it to auto-start at startup. Until we update the telegraf.conf file, however, we don’t need to start the service. Let’s do that now.

Telegraf Configuration

Telegraf has some system wide settings and the rest of it is plugin driven, with Inputs, Outputs, Aggregators, and Processors. We’ll setup an InfluxDB Output, and an MQTT Consumer Input.

Each plugin is specified with the following format:

[[outputs.{plugin_name}]]
 # Plugin Configuration

[[inputs.{plugin_name}]]
 #Plugin Configuration

[[aggregators.{plugin_name}]]
 # Plugin Configuration

[[processors.{plugin_name}]]
 #Plugin Configuration

First, the InfluxDB output. It actually is quite simple. Simply replace the variables with your actual token, organization, and bucket. If you don’t happen to be running on port 8086 or your local machine you will need to replace the URL as well. If you’re on Docker, you would specify your container name instead of ‘127.0.0.1’.

[[outputs.influxdb_v2]]
 ## The URLs of the InfluxDB cluster nodes.
 ##
 ## Multiple URLs can be specified for a single cluster, only ONE of the
 ## urls will be written to each interval.
 ## urls exp: http://127.0.0.1:8086
 urls = ["http://127.0.0.1:8086"]

 ## Token for authentication.
 token = "{INFLUXTOKEN}"

 ## Organization is the name of the organization you wish to write to; must exist.
 organization = "{INFLUXORG}"

 ## Destination bucket to write into.
 bucket = "{INFLUXBUCKET}"

Ok, that’s the easier part. Next, setting up the MQTT Consumer. Here’s where the customization is applied relative to your MQTT message formats. Below is the full configuration before I get into the details.

[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:8883"]
  name_override = "mosquitto" # Optional name tag in Influx metric, defaults to mqtt_consumer otherwise
  topics = ["+/+/+/amperage/log","+/+/+/weather/log"]

  ## Username and password to connect MQTT server.
  username = "admin" # Replace with your MQTT username
  password = "admin" # Replace with your MQTT password

  ## Data format to consume.
  data_format = "json"
  tag_keys = ["device"]
  #json_string_fields = ["mode", "relay"]
  ## Enable extracting tag values from MQTT topics
  ## _ denotes an ignored entry in the topic path
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "+/+/+/+/+"
  # area/device/log
  tags = "customer/site/area/_/type"
  measurement = "_/_/_/measurement/_"
  #fields = ""

servers — Same format as before except we’re using the tcp protocol and need to specify the host and port for the MQTT broker. You can enter multiple servers into a list to bulk process them.

name_override — If you want to have the metric show as a different name than mqtt_consumer in Influx, use this setting. Can be helpful if you have multiple brokers or would like to know which Telegraf instance is processing the data.

topics — You can specify a list of each topic you would like to subscribe to and apply the topic parsing for. If you have multiple message formats that need different rules, simply use another [[inputs.mqtt_consumer]] section for that topic.

data_format — JSON is typical but MQTT can have a variety of formats. Match the format of your published message.

tag_keys — Specify additional tags that are contained in the message JSON payload. By default, the payload keys are converted to fields, or ignored if they are strings and not specified.

json_string_fields — String fields are ignored by default and must be specified here to store them. For example, if your payload contains the pair “fault”: “device fault” you could use this parameter.

Topic Parsing

Topic parsing is very powerful. By default, the topic gets stored as is and requires extra processing to be useful. Topic parsing adds functionality by allowing you to specify tags, fields, and measurements immediately.

[[inputs.mqtt_consumer.topic_parsing]]
  topic = "+/+/+/+/+"
  # area/device/log
  tags = "customer/site/area/_/type"
  measurement = "_/_/_/measurement/_"
  #fields = ""

topic — Declare which topic(s) to apply parsing. In the example above, “+/+/+/+/+”, I’m parsing topics with a length of 5 that also match the input parameter from above, but I don’t care about the actual topic values. I could have used “+/+/+/+/log” as well, or a similar combination.

tags, measurements, and fields — Together, the length of these match up to the declared topic. So, my topic parsing rule has a length of 5. So each tag, measurement, or field (if used) must have a length of 5. Underscores, _, specify that an ignore condition for that setting, but must be used as fillers to get to the correct length. The example above specifies that each topic will get split into 4 additional tags and 1 measurement. Let’s take an example topic of “mycompany/chicago/2/weather/log” since that would be a valid input to the consumer. We would end up with the following:

# Sample
tags: {
  'customer': 'mycompany',
  'site': 'chicago',
  'area': '2',
  'type': 'log'
}

measurements: { 
  'measurement': 'weather'
}

fields: None specified

We can see how powerful this plugin truly is. Parsing a topic into usable, queryable fields, tags, and measurements provides valuable context. We immediately took a topic and created 4 new tags and assign the entire payload to the proper measurement (table in SQL terms).

Don’t forget to start the service (or stop and restart if already running) so you can sit back and watch Telegraf work its magic!

Topic design is a consideration that must be taken into account, but once established, data can be swiftly moved from an MQTT broker into an InfluxDB instance with a simple to configure and run service in Telegraf. If you’re curious about more plugin potential, check out this list.