Influx Downsample
A python script to query raw data from InfluxDB and perform local aggregation for downsampling.
The idea here is the same as for using Kapacitor: to move downsampling resource demands off the InfluxDB instance.
Usage
The script implements downsampling, scheduling is up to you (cron
is your friend).
Direct
./app/downsample.py
Docker
docker run -v $PWD/config.yaml:/config.yaml bentasker12/downsample
Configuration check
./app/downsample.py lint
docker run -v $PWD/config.yaml:/config.yaml bentasker12/downsample lint
Environment Variables
CONFIG_FILE Location of the main configuration file
CONFIG_DIR Location of the directory containing additional config files
DRY_RUN If Set to Y, the generated query will be printed, but not run
RUN_TIME Can be used to override the time used as the task run time (affecting the time period queried)
expected format is %Y-%m-%dT%H:%M:%S
Config
Configuration is achieved via YAML
# If present and set to True, the script will only log
# when it starts a job
quiet: True
# When calculating query time bounds from the run start time
# round the time down to this many seconds
#
# i.e. if the job started at 00:00:15 it should behave as though
# it were 00:00:00
round_start_time: 60
# If a write fails, should we dump details of the data to disk?
#
# This allows for potential future replay, preventing loss of data
# if the target instance is having issues, *but* can quickly consume
# significant disk resources.
#
# Set record to True to enable
failures:
record: False
path: dump.d
# Define the InfluxDB instances which can be communicated with
#
# For 1.x with auth the token should be of the form
# user:password
#
influxes:
home1x:
url: http://192.168.3.84:8086
org: ""
token: ""
timeout: 60
# Optionally write run-time stats out to a measurement
stats:
active: True
influx: home1x
bucket: telegraf/autogen
measurement: downsample_stats
# Key-value pairs of tags to add to any stats writes
#
# if used, $HOSTNAME will be replaced with the system's
# hostname at run time
#
tags:
role: "hourlydownsamples"
host: "$HOSTNAME"
# Define downsampling jobs
downsamples:
# Job
downsample1:
# Name for the task
name: "Downsample power stats"
# Which influx should we query against
influx: home1x
# What time period should we query
period: 10
# What time period should we aggregate over?
# if this is omitted, it'll default to the value in
# period (i.e. the full query)
window: 10
# Which bucket
bucket: Systemstats
# Which measurement (optional, but recommended)
#
# Can also be provided as a list of strings to include multiple measurements
measurement: power_watts
# Which fields should we retrieve (leave empty for all)
fields: []
# Fields which should be excluded
# list of strings
not_fields: []
# Columns to use for custom grouping
group_by: []
# Any additional filters to inject into the query
filters:
- 'r.host == "power-meter"'
# Custom flux lines to be appended to the generated query
flux_calls: []
# Any flux packages to import (i.e. if you've referenced one in flux_calls above)
imports: []
# Don't generate a flux query, use this instead (see details below)
query: ""
# Which aggregates to generate
# field_suffix is optional, if not included
# the source fields name will be used
aggregates:
mean:
as: "mean"
median:
as:
consumption: "median_consump"
today_cost: "median_cost"
mode:
field_suffix: "_mode"
stdev:
field_suffix: "_stddev"
copy:
first:
field_suffix: "_first"
last:
field_suffix: "_last"
min:
field_suffix: "_min"
max:
field_suffix: "_max"
sum:
field_suffix: "_sum"
count:
field_suffix: "_count"
percentile:
- 99
- 20
# Which influx instance are we writing to
# this can be a list of objects if multiple outputs are wanted
# (see below for more info)
output_influx: home1x
# Which bucket?
output_bucket: testing_db
The attribute measurement
can be a string (the name of a single measurement) or a list of strings (so that the query uses multiple measurements):
measurement:
- cpu
- mem
Splitting config across files
Whilst it's possible to put all configuration into config.yaml
, that can become unweildy when there are multiple jobs to run.
However, it is also possible to split config across files by using !include
within the YAML config, by putting YAML within conf.d
and then referencing it either via filename or glob.
For example
config.yaml
# This config file makes use of include to split config out to files in the
# conf.d directory
quiet: false
# Define the InfluxDB instances which can be communicated with
#
# For 1.x with auth the token should be of the form
# user:password
#
influxes: !include influx-*.yaml
# Define downsampling jobs
downsamples: !include job-*.yaml
conf.d/influx-home1x.yaml
home1x:
url: http://192.168.3.84:8086
org: ""
token: ""
timeout: 60
conf.d/job-downsample_custom_query.yaml
downsample_custom_query:
# Name for the task
name: "Downsample with custom query"
influx: home1x
period: 10
window: 10
query: >
from(bucket: "Systemstats")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn:(r) => r._measurement == "power_watts")
|> filter(fn:(r) => r.host == "power-meter")
|> filter(fn:(r) => r._field == "today_cost" or r._field == "consumption")
|> window(every: v.windowPeriod)
# Which aggregates to generate
# field_suffix is optional, if not included
# the source fields name will be used
aggregates:
mean:
as:
today_cost: "mean_custquery"
consumption: "usage_mean_custquery"
# Which influx instance are we writing to
output_influx: home1x
# Which bucket?
output_bucket: testing_db
The location of the additional configuration directory can be controlled with environment variable CONFIG_DIR
.
If running with docker, conf.d
will need to be exported into the container, making the incantation
docker run \
-v $PWD/config.yaml:/config.yaml \
-v $PWD/conf.d:/conf.d \
bentasker12/downsample
Multiple Outputs
It's possible to send the result of a job to multiple outputs by providing a list of objects in attribute output_influx
:
output_influx:
- influx: home1x
bucket: testing_db/autogen2
- influx: home1x
# Which bucket?
output_bucket: testing_db
The attribute bucket
is optional, if it's absent that output will use the bucket defined in output_bucket
.
Applying and Configuring Aggregates
Aggregates are applied per job, using the aggregates
attribute. Multiple aggregates can be applied to a job.
Most aggregates accept two additional config items
field_suffix
: a suffix to append to the field nameas
: a new name for the field, or a map of fields to new names
If as
is present, then field_suffix
will be ignored. If neither is present (and/or both are empty) the existing field name will be used (be aware that this'll mean you can only use a single aggregate).
So, in the following example
fields:
- usage_user
aggregates:
mean:
as: "mean"
median:
field_suffix: "_median"
mode:
first:
field_suffix: "_first"
as: "first"
- The mean will be written into a new field called
mean
- The median will be written into a new field called
usage_user_median
- The mode will be written into the existing field
usage_user
- The first value in each group will be written into a new field called
first
(field_suffix
will be ignored)
The attribute as
can be a string or a dict. Jobs processing a single field can use a string, but those processing multiple input fields willl want to use a dict
aggregates:
mean:
as:
source_fieldname: new_name
usage_user: "mean-user-usage"
The copy
Aggregate
The copy
aggregate is strictly speaking, not an aggregate at all - it doesn't perform any aggregation and simply copies points from the input into the output queue.
It doesn't perform any transforms and so doesn't accept any attributes.
It can be used whilst other aggregates are active, but it's important to ensure that other aggregates use field_suffix
or as
to define new field names - otherwise the source field name will be used for them, resulting in a mishmash of raw and aggregated values.
Because some internal conversion is necessary, the copy
aggregate cannot be used with pivoted data: it relies on the presence of columns _field
and _value
to identify the field - all other non-contextual columns will be treated as tags.
Example: CPU
Query an hour of CPU stats from a local instance and write the mean values for each 15 minutes into InfluxDB Cloud
influxes:
home1x:
url: http://192.168.3.84:8086
org: ""
token: ""
timeout: 60
thecloud:
url: eu-central-1-1.aws.cloud2.influxdata.com
org: "myorg"
token: "mytoken"
timeout: 60
# Define downsampling jobs
downsamples:
downsample1:
name: "Downsample CPU"
influx: home1x
period: 60
window: 15
bucket: telegraf
measurement: cpu
fields: []
group_by: []
aggregates:
mean:
output_influx: thecloud
output_bucket: telegraf_downsampled
Example: CPU and Network
Query CPU stats from a local instance and generate mean, min, max and 99th percentile statistics
influxes:
home1x:
url: http://192.168.3.84:8086
org: ""
token: ""
timeout: 60
thecloud:
url: eu-central-1-1.aws.cloud2.influxdata.com
org: "myorg"
token: "mytoken"
timeout: 60
# Define downsampling jobs
downsamples:
cpu:
name: "Downsample CPU"
influx: home1x
period: 15
bucket: telegraf
measurement: cpu
fields: []
group_by: []
aggregates:
mean:
field_suffix: "_mean"
min:
field_suffix: "_min"
max:
field_suffix: "_max"
percentile:
- 99
output_influx: home1x
output_bucket: telegraf_downsampled
net:
name: "Downsample net"
influx: home1x
period: 15
bucket: telegraf
measurement: net
fields:
- bytes_recv
- bytes_sent
- drop_in
- drop_out
- err_in
- err_out
group_by: []
aggregates:
mean:
field_suffix: "_mean"
min:
field_suffix: "_min"
max:
field_suffix: "_max"
percentile:
- 99
output_influx: home1x
output_bucket: telegraf_downsampled
Or, the same can be achived within a single job by using multi-measurement support:
downsamples:
cpu_and_net:
name: "Downsample CPU and Net"
influx: home1x
period: 15
bucket: telegraf
measurement:
- cpu
- net
fields:
- usage_user
- usage_system
- usage_idle
- bytes_recv
- bytes_sent
- drop_in
- drop_out
- err_in
- err_out
group_by: []
aggregates:
mean:
field_suffix: "_mean"
min:
field_suffix: "_min"
max:
field_suffix: "_max"
percentile:
- 99
output_influx: home1x
output_bucket: telegraf_downsampled
Example: Custom Flux Calls
It's also possible to simply provide custom flux to be appended to the end of the query that the system generates.
This can be achieved using the flux_calls
attribute, which is a list of flux:
downsample_custom_function:
# Name for the task
name: "Downsample power stats with custom calls"
influx: home1x
period: 10
window: 10
bucket: Systemstats
measurement: power_watts
fields:
- consumption
group_by: []
filters:
- 'r.host == "power-meter"'
# Define some additional flux calls to be appended to the
# generated query
#
# Define as a list:
flux_calls:
- // Convert to kWh
- >
|> map(fn: (r) => ({
r with _value: float(v: r._value) / 1000.0
}))
aggregates:
mean:
as:
consumption: "usage_mean_kWh"
# Which influx instance are we writing to
output_influx: home1x
# Which bucket?
output_bucket: testing_db
The result of the above is that the following Flux query is run
from(bucket: "Systemstats" )
|> range(start: 2023-02-05T12:27:32.190226Z, stop: 2023-02-05T12:37:32.190226Z)
|> filter(fn:(r) => r._measurement == "power_watts")
|> filter(fn:(r) => r.host == "power-meter")
|> filter(fn:(r) => r._field == "consumption" )
|> window(every: 10m)
// Convert to kWh
|> map(fn: (r) => ({
r with _value: float(v: r._value) / 1000.0
}))
If flux packages are required (for example strings
or date
) they can be specified in the imports
attribute
imports:
- date
- strings
flux_calls:
- // blah
Example: Field Exclusion
Sometimes it's desirable to exclude specific fields. This can be achieved by enumerating all the desired fields in the fields
attribute, or by including excluded fields in the not_fields
attribute. Where both are used, not_fields
will take precedence (i.e. anything listed in both will be excluded)
influxes:
home1x:
url: http://192.168.3.84:8086
org: ""
token: ""
timeout: 60
# Define downsampling jobs
downsamples:
downsample1:
name: "Downsample CPU"
influx: home1x
period: 60
window: 15
bucket: telegraf
measurement: cpu
fields: []
# Exclude the following fields
not_fields:
- usage_idle
- usage_steal
group_by: []
aggregates:
mean:
output_influx: home1x
output_bucket: telegraf_downsampled
Custom Query
By default, the script builds a Flux query to retrieve points. However, it is also possible to provide a custom query, using the query
attribute:
downsample_custom_query:
# Name for the task
name: "Downsample with custom query"
influx: home1x
period: 10
window: 10
query: >
from(bucket: "Systemstats")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn:(r) => r._measurement == "power_watts")
|> filter(fn:(r) => r.host == "power-meter")
|> filter(fn:(r) => r._field == "today_cost" or r._field == "consumption")
|> window(every: v.windowPeriod)
# Which aggregates to generate
# field_suffix is optional, if not included
# the source fields name will be used
aggregates:
mean:
as:
today_cost: "mean_custquery"
consumption: "usage_mean_custquery"
# Which influx instance are we writing to
output_influx: home1x
# Which bucket?
output_bucket: testing_db
The placeholders v.timeRangeStart
, v.timeRangeStop
and v.windowPeriod
will be replaced at execution time.
Using a custom query is advanced usage, and there are a few things which need to be considered
- There should always be a
window()
call to ensure that timestamps are correctly aligned - As a rule, aggregates should not be used (as this moves resource overhead back to the InfluxDB instance)
- Avoid using relative durations in
range()
- these will work, but will cause issues if task start times are overridden withRUN_TIME
Copyright
Copyright (c) 2023 B Tasker Released under BSD 3 Clause License.