Wiki: home/Utilities / Python Influxdb Downsample



Influx Downsample

A python script to query raw data from InfluxDB and perform local aggregation for downsampling.

The idea here is the same as for using Kapacitor: to move downsampling resource demands off the InfluxDB instance.

Usage

The script implements downsampling, scheduling is up to you (cron is your friend).

Direct

./app/downsample.py

Docker

docker run -v $PWD/config.yaml:/config.yaml bentasker12/downsample

Configuration check

./app/downsample.py lint
docker run -v $PWD/config.yaml:/config.yaml bentasker12/downsample lint

Environment Variables

CONFIG_FILE     Location of the main configuration file
CONFIG_DIR      Location of the directory containing additional config files
DRY_RUN         If Set to Y, the generated query will be printed, but not run
RUN_TIME        Can be used to override the time used as the task run time (affecting the time period queried)
                expected format is %Y-%m-%dT%H:%M:%S

Config

Configuration is achieved via YAML

# If present and set to True, the script will only log
# when it starts a job
quiet: True

# When calculating query time bounds from the run start time
# round the time down to this many seconds
#
# i.e. if the job started at 00:00:15 it should behave as though
# it were 00:00:00
round_start_time: 60

# If a write fails, should we dump details of the data to disk?
#
# This allows for potential future replay, preventing loss of data
# if the target instance is having issues, *but* can quickly consume
# significant disk resources.
#
# Set record to True to enable
failures:
    record: False
    path: dump.d

# Define the InfluxDB instances which can be communicated with
#
# For 1.x with auth the token should be of the form
# user:password
#
influxes:
    home1x:
        url: http://192.168.3.84:8086
        org: ""
        token: ""
        timeout: 60

# Optionally write run-time stats out to a measurement
stats:
    active: True
    influx: home1x
    bucket: telegraf/autogen
    measurement: downsample_stats
    # Key-value pairs of tags to add to any stats writes
    #
    # if used, $HOSTNAME will be replaced with the system's 
    # hostname at run time
    #
    tags:
        role: "hourlydownsamples"
        host: "$HOSTNAME"

# Define downsampling jobs
downsamples:
    # Job
    downsample1:
        # Name for the task
        name: "Downsample power stats"
        # Which influx should we query against
        influx: home1x

        # What time period should we query
        period: 10

        # What time period should we aggregate over?
        # if this is omitted, it'll default to the value in
        # period (i.e. the full query)
        window: 10

        # Which bucket
        bucket: Systemstats

        # Which measurement (optional, but recommended)
        #
        # Can also be provided as a list of strings to include multiple measurements
        measurement: power_watts

        # Which fields should we retrieve (leave empty for all)
        fields: []

        # Fields which  should be excluded 
        # list of strings
        not_fields: []

        # Columns to use for custom grouping
        group_by: []

        # Any additional filters to inject into the query
        filters:
            - 'r.host == "power-meter"'

        # Custom flux lines to be appended to the generated query
        flux_calls: []

        # Any flux packages to import (i.e. if you've referenced one in flux_calls above)
        imports: []

        # Don't generate a flux query, use this instead (see details below)
        query: ""

        # Which aggregates to generate
        # field_suffix is optional, if not included
        # the source fields name will be used
        aggregates: 
            mean:
              as: "mean"
            median:
              as:
                consumption: "median_consump"
                today_cost: "median_cost"
            mode:
              field_suffix: "_mode"
            stdev:
              field_suffix: "_stddev"
            copy:
            first:
              field_suffix: "_first"
            last:
              field_suffix: "_last"
            min:
              field_suffix: "_min"
            max:
              field_suffix: "_max"
            sum:
              field_suffix: "_sum"
            count:
              field_suffix: "_count"
            percentile:
               - 99
               - 20
        # Which influx instance are we writing to
        # this can be a list of objects if multiple outputs are wanted
        # (see below for more info)
        output_influx: home1x
        # Which bucket?
        output_bucket: testing_db

The attribute measurement can be a string (the name of a single measurement) or a list of strings (so that the query uses multiple measurements):

measurement:
    - cpu
    - mem

Splitting config across files

Whilst it's possible to put all configuration into config.yaml, that can become unweildy when there are multiple jobs to run.

However, it is also possible to split config across files by using !include within the YAML config, by putting YAML within conf.d and then referencing it either via filename or glob.

For example

config.yaml

# This config file makes use of include to split config out to files in the
# conf.d directory
quiet: false

# Define the InfluxDB instances which can be communicated with
#
# For 1.x with auth the token should be of the form
# user:password
#
influxes: !include influx-*.yaml

# Define downsampling jobs
downsamples: !include job-*.yaml

conf.d/influx-home1x.yaml

home1x:
    url: http://192.168.3.84:8086
    org: ""
    token: ""
    timeout: 60

conf.d/job-downsample_custom_query.yaml

downsample_custom_query:
    # Name for the task
    name: "Downsample with custom query"
    influx: home1x
    period: 10
    window: 10
    query: >
        from(bucket: "Systemstats")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn:(r) => r._measurement == "power_watts")
        |> filter(fn:(r) => r.host == "power-meter")
        |> filter(fn:(r) => r._field == "today_cost" or r._field == "consumption")
        |> window(every: v.windowPeriod)

    # Which aggregates to generate
    # field_suffix is optional, if not included
    # the source fields name will be used
    aggregates: 
        mean:
            as: 
            today_cost: "mean_custquery"
            consumption: "usage_mean_custquery"
    # Which influx instance are we writing to
    output_influx: home1x
    # Which bucket?
    output_bucket: testing_db

The location of the additional configuration directory can be controlled with environment variable CONFIG_DIR.

If running with docker, conf.d will need to be exported into the container, making the incantation

docker run \
-v $PWD/config.yaml:/config.yaml \
-v $PWD/conf.d:/conf.d \
bentasker12/downsample

Multiple Outputs

It's possible to send the result of a job to multiple outputs by providing a list of objects in attribute output_influx:

        output_influx: 
            - influx: home1x
              bucket: testing_db/autogen2
            - influx: home1x
        # Which bucket?
        output_bucket: testing_db

The attribute bucket is optional, if it's absent that output will use the bucket defined in output_bucket.


Applying and Configuring Aggregates

Aggregates are applied per job, using the aggregates attribute. Multiple aggregates can be applied to a job.

Most aggregates accept two additional config items

If as is present, then field_suffix will be ignored. If neither is present (and/or both are empty) the existing field name will be used (be aware that this'll mean you can only use a single aggregate).

So, in the following example

       fields:
          - usage_user
       aggregates: 
            mean:
              as: "mean"
            median:
              field_suffix: "_median"
            mode:
            first:
              field_suffix: "_first"
              as: "first"

The attribute as can be a string or a dict. Jobs processing a single field can use a string, but those processing multiple input fields willl want to use a dict

aggregates:
    mean:
        as:
            source_fieldname: new_name
            usage_user: "mean-user-usage"

The copy Aggregate

The copy aggregate is strictly speaking, not an aggregate at all - it doesn't perform any aggregation and simply copies points from the input into the output queue.

It doesn't perform any transforms and so doesn't accept any attributes.

It can be used whilst other aggregates are active, but it's important to ensure that other aggregates use field_suffix or as to define new field names - otherwise the source field name will be used for them, resulting in a mishmash of raw and aggregated values.

Because some internal conversion is necessary, the copy aggregate cannot be used with pivoted data: it relies on the presence of columns _field and _value to identify the field - all other non-contextual columns will be treated as tags.


Example: CPU

Query an hour of CPU stats from a local instance and write the mean values for each 15 minutes into InfluxDB Cloud

influxes:
    home1x:
        url: http://192.168.3.84:8086
        org: ""
        token: ""
        timeout: 60
    thecloud:
        url: eu-central-1-1.aws.cloud2.influxdata.com
        org: "myorg"
        token: "mytoken"
        timeout: 60

# Define downsampling jobs
downsamples:
    downsample1:
        name: "Downsample CPU"
        influx: home1x
        period: 60
        window: 15
        bucket: telegraf
        measurement: cpu
        fields: []
        group_by: []
        aggregates: 
            mean:
        output_influx: thecloud
        output_bucket: telegraf_downsampled

Example: CPU and Network

Query CPU stats from a local instance and generate mean, min, max and 99th percentile statistics

influxes:
    home1x:
        url: http://192.168.3.84:8086
        org: ""
        token: ""
        timeout: 60
    thecloud:
        url: eu-central-1-1.aws.cloud2.influxdata.com
        org: "myorg"
        token: "mytoken"
        timeout: 60

# Define downsampling jobs
downsamples:
    cpu:
        name: "Downsample CPU"
        influx: home1x
        period: 15
        bucket: telegraf
        measurement: cpu
        fields: []
        group_by: []
        aggregates: 
            mean:
                field_suffix: "_mean"
            min:
                field_suffix: "_min"
            max:
                field_suffix: "_max"
            percentile:
                - 99
        output_influx: home1x
        output_bucket: telegraf_downsampled

    net:
        name: "Downsample net"
        influx: home1x
        period: 15
        bucket: telegraf
        measurement: net
        fields: 
            - bytes_recv
            - bytes_sent
            - drop_in
            - drop_out
            - err_in
            - err_out
        group_by: []
        aggregates: 
            mean:
                field_suffix: "_mean"
            min:
                field_suffix: "_min"
            max:
                field_suffix: "_max"
            percentile:
                - 99
        output_influx: home1x
        output_bucket: telegraf_downsampled

Or, the same can be achived within a single job by using multi-measurement support:

downsamples:
    cpu_and_net:
        name: "Downsample CPU and Net"
        influx: home1x
        period: 15
        bucket: telegraf
        measurement:
            - cpu
            - net
        fields:
            - usage_user
            - usage_system
            - usage_idle
            - bytes_recv
            - bytes_sent
            - drop_in
            - drop_out
            - err_in
            - err_out
        group_by: []
        aggregates: 
            mean:
                field_suffix: "_mean"
            min:
                field_suffix: "_min"
            max:
                field_suffix: "_max"
            percentile:
                - 99
        output_influx: home1x
        output_bucket: telegraf_downsampled

Example: Custom Flux Calls

It's also possible to simply provide custom flux to be appended to the end of the query that the system generates.

This can be achieved using the flux_calls attribute, which is a list of flux:

   downsample_custom_function:
        # Name for the task
        name: "Downsample power stats with custom calls"
        influx: home1x
        period: 10
        window: 10
        bucket: Systemstats
        measurement: power_watts
        fields: 
            - consumption
        group_by: []
        filters:
            - 'r.host == "power-meter"'

        # Define some additional flux calls to be appended to the
        # generated query
        # 
        # Define as a list:
        flux_calls:
            - // Convert to kWh
            - >
                |> map(fn: (r) => ({
                    r with _value: float(v: r._value) / 1000.0
                }))
        aggregates: 
            mean:
              as: 
                consumption: "usage_mean_kWh"
        # Which influx instance are we writing to
        output_influx: home1x
        # Which bucket?
        output_bucket: testing_db

The result of the above is that the following Flux query is run

from(bucket: "Systemstats" )
|> range(start: 2023-02-05T12:27:32.190226Z, stop: 2023-02-05T12:37:32.190226Z)
|> filter(fn:(r) => r._measurement == "power_watts")
|> filter(fn:(r) => r.host == "power-meter")
|> filter(fn:(r) =>  r._field == "consumption" )
|> window(every: 10m)
// Convert to kWh
|> map(fn: (r) => ({
r with _value: float(v: r._value) / 1000.0
}))

If flux packages are required (for example strings or date) they can be specified in the imports attribute

        imports:
            - date
            - strings
        flux_calls:
            - // blah

Example: Field Exclusion

Sometimes it's desirable to exclude specific fields. This can be achieved by enumerating all the desired fields in the fields attribute, or by including excluded fields in the not_fields attribute. Where both are used, not_fields will take precedence (i.e. anything listed in both will be excluded)

influxes:
    home1x:
        url: http://192.168.3.84:8086
        org: ""
        token: ""
        timeout: 60

# Define downsampling jobs
downsamples:
    downsample1:
        name: "Downsample CPU"
        influx: home1x
        period: 60
        window: 15
        bucket: telegraf
        measurement: cpu
        fields: []
        # Exclude the following fields
        not_fields:
            - usage_idle
            - usage_steal
        group_by: []
        aggregates: 
            mean:
        output_influx: home1x
        output_bucket: telegraf_downsampled

Custom Query

By default, the script builds a Flux query to retrieve points. However, it is also possible to provide a custom query, using the query attribute:

    downsample_custom_query:
        # Name for the task
        name: "Downsample with custom query"
        influx: home1x
        period: 10
        window: 10
        query: >
            from(bucket: "Systemstats")
            |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
            |> filter(fn:(r) => r._measurement == "power_watts")
            |> filter(fn:(r) => r.host == "power-meter")
            |> filter(fn:(r) => r._field == "today_cost" or r._field == "consumption")
            |> window(every: v.windowPeriod)

        # Which aggregates to generate
        # field_suffix is optional, if not included
        # the source fields name will be used
        aggregates: 
            mean:
              as: 
                today_cost: "mean_custquery"
                consumption: "usage_mean_custquery"
        # Which influx instance are we writing to
        output_influx: home1x
        # Which bucket?
        output_bucket: testing_db

The placeholders v.timeRangeStart, v.timeRangeStop and v.windowPeriod will be replaced at execution time.

Using a custom query is advanced usage, and there are a few things which need to be considered


Copyright

Copyright (c) 2023 B Tasker Released under BSD 3 Clause License.