Link Search Menu Expand Document

Querying and Data Transformations

Table of contents

  1. Querying With Flux
    1. from()
    2. range()
      1. start and stop
      2. now()
        1. Calling range() with Relative Durations
      3. Defining Ranges with Integers
      4. Defining Ranges with Times
      5. Calculating Start and Stop Times
      6. Start and Stop Types Can Be Different
    3. filter()
      1. Filter Basics
      2. Anatomy of a Row
      3. Filtering Measurements
      4. Filtering Tags
      5. Filtering by Field
      6. Filter by Exists
      7. Filtering by Field Value
      8. Compound Filters
      9. Regular Expressions
      10. If, Then, Else
      11. Types In Comparisons
    4. Queries and the Data Model
  2. Flux Data Transformations
    1. Grouping
      1. group()
      2. group() and Type Conflicts
      3. drop()/keep()
      4. rename()
      5. Creating a Single Table or Ungrouping
    2. Windowing
      1. Windowing back in time
    3. Windowing and aggregateWindow()
    4. Real World Data Example of Grouping
      1. Default Grouping
      2. group()
      3. drop()/keep()
      4. Grouping and Type Conflicts
      5. Creating a Single Table
    5. Aggregations
      1. mean()
      2. min() and max()
      3. count()
      4. Aggregates and Selectors
    6. Yielding
      1. Returning multiple aggregations with multiple yield() functions
      2. Using variables to perform multiple aggregations
    7. Pivoting
      1. The fieldsAsCol() function
    8. Mapping
      1. In place transformation
      2. New column(s)
      3. Conditionally transform data
      4. Changing types
      5. The rows.map() function
    9. Returning values and arrays
      1. Returning records
      2. Returning columns
    10. Reducing
    11. Manipulating Time
      1. Converting timestamp formants
      2. Calculating durations
      3. Truncating or rounding timestamps
      4. Shifting time
      5. Other time manipulations
    12. Regex
      1. The Regexp Package
    13. The String Package
    14. Combining Data Streams
      1. Join
      2. Math across measurements
      3. Union
    15. Accessing External Data Sources
      1. The Flux SQL package
      2. CSV
        1. experimental csv.from()
        2. csv.from()
      3. JSON
    16. Materialized Views or Downsampling Tasks
    17. Further Reading

Querying With Flux

In the vernacular of Flux, a Flux script is called a “query.” This is despite the fact that you can write valid and useful Flux that doesn’t even query your own data at all. For our purposes now, we can think of writing a query as retrieving targeted data from the storage engine, as opposed to transforming and shaping the data, which will be discussed in detail in the following sections.

As described in the section “Just Enough Flux” in the previous chapter, you can see that a typical simple query involves 3 parts:

  1. The source bucket
  2. The range of time
  3. A set of filters

Additionally, a query may contain a yield() statement depending on circumstances.

from()

In most cases, a query starts by specifying a bucket to query from using the bucket name:

from(bucket: "bucket1")

In some cases, you may wish to use the bucket’s id instead:

from(bucketID: "497b48e409406cc7")

Typically, developers will address a bucket by its name for a few reasons. First, of course the bucket name is much more readable, the role of the bucket can be encoded in the name. Additionally, there may be times when deleting a bucket and creating a new one with the same name is the most expedient way to delete data. Addressing the bucket by its id has the advantage of being immutable. Someone can change the bucket name, and the query using the id will continue working.

There are cases that will be described below where you use a different kind of “from”, for example sql.from() or csv.from() or array.from() to bring in data from other sources.

range()

The range function is required directly after from()and its purpose is to specify the points to include based on their timestamps. range() has only two parameters.

start and stop

An argument for start is required, whereas stop is optional. In the case where you leave out an argument for stop, Flux will substitute now(), which is the current time when execution is scheduled.

now()

now() always returns the time when a Flux script is scheduled to start execution. This has some important implications:

  1. If your script is not run as part of a task, now() will return the time at the very start of execution of the script. If there are any delays, for example due to queuing as a result of excessive load, etc… now() will the time when the script was scheduled to run.
  2. If your script is running as part of a task, now() will return the time that your script was scheduled to run.
  3. Every call to now() in the script will return the same time.

Calling range() with Relative Durations

Possibly the most common way to use the range function is to use a start time like this:

range(start: -5m)

This says to provide all of the data that is available starting five minutes ago. This is inclusive, meaning that any data that is timestamped exactly with the nanosecond exactly five minutes ago will be included. Any data that is five minutes and one nanosecond older or more will not be included.

Conversely, stop is exclusive. That is to say that if you have any data that is timestamped exactly with the stop argument, it will NOT be included with the results.

So, for example, if there is data that is timestamped precisely 1 minute ago, and you have the following queries, that data will be included in the second query, but not the first.

from(bucket: "bucket1") 
|> range(start: -2m, stop: -1m)

from(bucket: "bucket1") 
|> range(start: -1m)

When a stop argument is not supplied Flux simply substitutes now(). So the following queries are equivalent:

from(bucket: "bucket1") 
|> range(start: -1m, stop: now())

from(bucket: "bucket1") 
|> range(start: -1m)

However, this is not true when the start time is in the future. This can happen if your timestamps are, for some reason, post-dated. If your start time is in the future, than now() is, logically before the start time, so this will cause an error:

from(bucket: "bucket1") 
|> range(start: 1m)

Simply support a stop duration that is later than the start to ensure that it works.

from(bucket: "bucket1") 
|> range(start: 1m, stop: 2m)

A duration is a type in Flux. So it is unquoted, and consists of a signed integer and unit. The following duration units are supported:

1ns // 1 nanosecond
1us // 1 microsecond
1ms // 1 millisecond
1s  // 1 second
1m  // 1 minute
1h  // 1 hour
1d  // 1 day
1w  // 1 week
1mo // 1 calendar month
1y  // 1 calendar year

So, for example, to select a week’s worth of data starting two weeks in the past, you can use relative durations like this:

|> range(start: -2w, stop: -1w)

Durations represent a span of time, not a specific time. Therefore, Flux does not understand things like:

|> range(start: now() - 5m)

That will result in an error because now() returns a specific time, whereas 5m represents a span of time. The types are not compatible. It is possible to do calculations based on times and durations, and this will be covered in detail in a later section.

Durations are not addable, either, so the following will throw an error:

|> range(start: -5m + -2m)

Defining Ranges with Integers

The start and stop parameters also accept integers. For example, you have already seen:

|> range(start: 0)

The integer represents the nanoseconds that have transpired since Thursday, January 1, 1970 12:00:00 AM, GMT, also known as “Unix Time.”

This is extremely useful, as many systems with which you may want to integrate natively use Unix Time. For example, Fri Jan 01 2021 06:00:00 GMT+0000 is represented as 1609480800000 in Unix time. However, in this case, notice that the time here is represented as milliseconds, not nanoseconds. To perform this conversion, simply multiply the milliseconds by 1,000,000, or you can define the precision when you write the data to the database.

So, for all of the data starting from Jan 1, 2021:

from(bucket: "bucket1") 
|> range(start: 1609480800000000000)

Unlike durations, integers are, of course, addable, so, to go back a year, this would work:

|> range(start: -365 * 24 * 60 * 60 * 100000000)

As with durations, if you supply an integer in the future, you must supply a stop time that is later.

Defining Ranges with Times

The third type that is accepted by start and stop is a time. A time object is expressed as RFC3339 timestamps. For example the following all represent the start of Unix Time:

  • 1970-01-01
  • 1970-01-01T00:00:00Z
  • 1970-01-01T00:00:00.000Z

So, to get data from the start of some day to now:

from(bucket: "bucket1") 
|> range(start: 2021-07-27)

To get data for some day in the past:

from(bucket: "bucket1")
|> range(start: 2021-07-25, stop: 2021-07-26)

By adding the “T” you can get arbitrarily fine grained resolution as well. For example, to skip the first nanosecond:

from(bucket: "bucket1") 
|> range(start: 2021-07-27T00:00:00.0000000001Z)

If you only care about seconds, you can leave off the fraction:

from(name: "bucket1") 
|> range(start: 2021-07-27T00:00:01Z)

Calculating Start and Stop Times

It is possible to compute start and stop times for the range.

<something here about subtracting time and adding time>

Start and Stop Types Can Be Different

The start and stop parameters do not require the same type to be used. The following work fine.

from(bucket: "operating-results")
|> range(start: -3d, stop: 2021-07-26)

from(bucket: "operating-results")
|> range(start: 1627347600000000000, stop: -1h)

filter()

A filter function must either implicitly or explicitly return a boolean value. A filter function operates on each row of each table, and in cases where there return value is true, the row is retained in the table. In cases where the return value is false, the row is removed from the table.

Filter Basics

A very common filter is to filter by measurement.

filter(fn: (r) => r._measurement == "measurement1")

The actual function is the argument for the fn parameter:

(r) => r._measurement == "measurement1"

“(r)” is the parameter list. A filter function always expects to only have a single parameter, and for it to be called “r.” Then the function body is a simple boolean expression that will evaluate to true or false. This function will return true when the _measurement for a row is “sensors” and so therefore the function will emit a stream of tables where all of the data has the sensor measurement.

Naturally, you can omit the sensors measurement in the same manner:

filter(fn: (r) => r._measurement != "measurement1")

Anatomy of a Row

When read from the storage engine and passed into the filter function, by default, before being transformed by other functions, every row has the same essential object model. Flux uses a leading underscore (“_”) to delineate reserved member names. In Flux, each member of a row is called a “column,” or sometimes a “field” depending on the context.

We can see how this is generally represented as a row in Flux.

_measurementtag1_field_value_time
measurement1tagvalue1fieldname11.0rfc3339time1
  • r._measurement is a string that is the measurement which defines the table that row is saved into.

  • r._field is a string that is the name of the field which defines the table that the row is saved into.

  • r._value is the actual value of the field.

  • r._time is the time stamp of the row.

Additionally, each tag value is accessible by its tag name. For example, r.tag1, which in this example has a value of “tagvalue1.”

Finally, there are two additional context specific members added. These members are determined by the query, not the underlying data:

  • r._start is the start time of the range() in the query.

  • r._stop() is the stop time of the range() in the query.

For example, if you query with a range of 5 minutes in the past (range(start: -5m)), you will get a _start and _stop 5 minutes apart:

_measurementtag1_field_value_start_stop_time
measurement1tagvalue1fieldname11.02021:08:20T20:00:000000000Z2021:08:20T20:05:000000000Zrfc3339time1

When you are filtering, you therefore have all of these columns to work from.

drawing

Filtering Measurements

A discussed in the data model section above, a measurement is the highest order aggregation of data inside a bucket. It is, therefore, the most common subject, and typically first, filter, as it filters out the most irrelevant data in a single statement.

Additionally, every table written by the storage engine has exactly one measurement, so the storage engine can quickly find the relevant tables and return them.

filter(fn: (r) => r._measurement == "measurement1")

Given the two tables below, only the first will be returned if the preceding filter is applied:

_measurementtag1_field_value_time
measurement1tagvalue1field11irfc3339time1
measurement1tagvalue1field12irfc3339time2
_measurementtag1_field_value_time
measurement2tagvalue1field11.0rfc3339time1
measurement2tagvalue1field12.0rfc3339time2

Filtering Tags

Multiple measurements can share the same tag set. As such, filtering by tag is sometimes secondary to filtering by measurement. The storage engine keeps track of where the tables with different tags for specific measurements are, so filtering by tag is typically reasonably fast.

The following tables have different measurements, but the same tag values, so the following filter will return both tables:

|> filter(fn: (r) => r.tag1 == "tagvalue1")
_measurementtag1_field_value_time
measurement1tagvalue1field11irfc3339time1
measurement1tagvalue1field12irfc3339time2
_measurementtag1_field_value_time
measurement2tagvalue1field11.0rfc3339time1
measurement2tagvalue1field12.0rfc3339time2

If you only want one measurement with that tag value, you can simply include both filters. The following will return only the first table:

|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tag1 == "tagvalue1")

Filtering by Field

Filtering by field is extremely common, and also very fast, as fields are part of the group key of tables. Given the following table, if you are interested in records in field1 in measurement1, you can simply query like so, and get back only the first table:

|> filter(fn: (r) => r._field == "field1")
_measurementtag1_field_value_time
measurement1tagvalue1field11irfc3339time1
measurement1tagvalue1field12irfc3339time2
_measurementtag1_field_value_time
measurement1tagvalue1field21.0rfc3339time1
measurement1tagvalue1field22.0rfc3339time2
_measurementtag1_field_value_time
measurement2tagvalue1field23.0rfc3339time1
measurement2tagvalue1field24.0rfc3339time2

However, this won’t work for field2, as that field name exists in measurement2 as well. Simply include a measurement filter as well:

|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "field2")

This will return only the first table.

Filter by Exists

There may be circumstances where you wish to only operate on tables that contain a specific tag value. You can use exists or not exists for this.

The following will ensure that only tables which contain the “tag1” are returned:

|> filter(fn: (r) => exists r.tag1)

Similarly, if you want to retain only tables that do not conain the “tag1”, use:

|> filter(fn: (r) => not exists r.tag1)

To illustrate that point, take the following two tables. Each record has a different time stamp. The second table differs only in that those points were recorded with an additional tag, “tag2”.

_measurementtag1_field_value_time
measurement1tagvalue1field11irfc3339time3
measurement1tagvalue1field12irfc3339time4
_measurementtag1tag2_field_value_time
measurement1tagvalue1tagvalue2field13irfc3339time1
measurement1tagvalue1tagvalue2field14irfc3339time2

The following query will return the first table:

> filter(fn: (r) => not exists r.tag2)

If you only wanted to return the second table with the points that lack the “tag2”, you can use not exists. Instead you must must drop that column all together. We’ll cover that in more detail in later sections.

Filtering by Field Value

Filtering by measurement(s), tag(s), or field(s) removes entire tables from the response. You can also filter out individual rows in tables. The most common way to do this is to filter by value.

For example, if we take our few rows of air sensor data, and first filter by field:

|> filter(fn: (r) => r._field == "fieldname1")

We are left with these two tables.

_measurementtag1_field_value_time
measurement1tagvalue1field11.0rfc3339time1
measurement1tagvalue1field12.0rfc3339time2
_measurementtag1_field_value_time
measurement1tagvalue2field13.0rfc3339time1
measurement1tagvalue2field13.0rfc3339time2

If we also add a filter for value, we can filter out individual rows. For example:

|> filter(fn: (r) => r._field == "field1")
|> filter(fn: (r) => r._value >= 2.0)

Will result in the following stream of tables being emitted:. Note that the first table has dropped a single row:

_measurementtag1_field_value_time
measurement1tagvalue1field12.0rfc3339time2
_measurementtag1_field_value_time
measurement1tagvalue2field13.0rfc3339time1
measurement1tagvalue2field13.0rfc3339time2

The row where the field value was less than 2 was dropped.

Compound Filters

Boolean expressions in Flux can be compounded with “or” and “and.” For example, to retrieve all the tables with either the fields temperature or humidity, but no others, you use:

|> filter(fn: (r) => r._field == "field1" or r._field == "filed2")

You can aggregate with or using different members of r if needed:

|> filter(fn: (r) => r._field == "field1" or exist r.tag1)

You can use and as well:

|> filter(fn: (r) => r._field == "field1" and r.tag1 == "tagvalue1")

However, this is less commonly used because it is equivalent to simply supplying two filters. The follow two filters is equivalent to, and arguably easier to read and modify:

|> filter(fn: (r) => r._field == "field1")
|> filter(fn: (r) => r.sensor_id == "tagvalue1")

Regular Expressions

Sometimes your code will need to find substrings or even more complex pattern matching. Flux supports regular expressions for this purpose.

There are two regex operators in Flux, “=~” for matches, and “!~” for does not match. The operators expect a string on the left side, and regular expression on the right. You define a regular expression object by surrounding your regex in “/.” For example to find all values of tag1 that include the string “tag”. You can use:

|> filter(fn: (r) => r.tag1 =~ /tag/)

In this case, the regex operator is “matches”, i.e. find all of the tag1 values that match the regex, and the regex itself is tag. Every table where the tag1 tag value contains the string “tag” will be returned.

To exclude all such tables, simply use the “does not match” version:

|> filter(fn: (r) => r.tag1 !~ /tag/)

Flux supports the full range of regex expressiveness. To match the pattern of 3 capital letters and 4 digits:

|> filter(fn: (r) => r._value =~ /[A-Z]{3}[0-9]{4}/)

These operators work on any field that is of type string. So you can use this to filter by measurement, field name, and even field value when the field value is a string type.

However, it is important to note that the Flux storage engine cannot leverage the layout of tables when using regular expressions, so it must often scan every table, or even every row, to find matches. This can cause your queries to run much more slowly. Therefore, if you are regularly using regular expressions to filter your data, consider adding additional tags instead.

If, Then, Else

Flux also supports “if, then, else” statements. This can be useful if you want to express more complex conditions in a readable manner.

The following two filters are equivalent:

filter(fn: (r) => if r._value < 2.0 then true else  false)
filter(fn: (r) => r._value < 1.0)

Naturally, you can rather return the result of a boolean expression:

filter(fn: (r) => if r.tag1 == "tagvalue1" then r._value < 2.0 else  false)

If then else can also be chained:

filter(fn: (r) => if r.tag1 == "tagvalue1" 
then r._value < 2.0 
else if r.tag1 == "tagvalue2"
	then r._value < 3.0
else false)

Types In Comparisons

As a strongly typed language, in general, Flux does not support comparisons between variables with different types.

Flux does support comparing integers to floats:

int( v: "1") == 1.0

But does not support comparing other data types. For example, this will cause an error:

"1" == 1.0
unsupported binary expression string == float

Times can be compared:

2021-07-12T19:38:00.000Z < 2021-07-12T19:39:00.000Z

But cannot be compared to, for example, Unix Time integers:

2021-07-12T19:38:00.000Z < 1627923626000000000
unsupported binary expression time < int

But this can be done with some explicit casting:

uint(v: 2021-07-12T19:38:00.000Z) < 1627923626000000000

Details on how to cast time between different formats so that you calculate and compare time and durations from different formats is covered in detail in a future section.

Queries and the Data Model

This section focused on queries that only found and filtered data. As such, the results are a subset of tables and a subset of rows of the tables stored in the storage engine. There may be few tables, but the only change to the tables returned is that there may be rows filtered out.

In other words, the pattern from() |> range() |> filter() will not transform your data as stored on disk other than perhaps filtering it. The next section will go further and delve into many of the options for transforming the shape of the data.

Flux Data Transformations

In addition to retrieving data from disk, Flux is a powerful data transformation tool. You can use Flux to shape your data however needed, as well as apply powerful mathematical transformations to your data as well.

Grouping

To review, when you write data to InfluxDB, the storage engine persists it in tables, where each table is defined by a “group key.” The group key used to persist the data is a measurement name, a unique set of tag values, and a field name.

Consider the following example of 12 tables with two rows each, all containing the same measurement, but there are:

  • Two tags, with a total of 5 tag values
    • tag1 has the 2 tag values:
      • tagvalue1
      • tagvalue4
    • tag2 has 3 tag values:
      • tagvalue2
      • tagvalue3
      • tagvalue5
  • Two fields
    • field1
    • field2

Where the line protocol would look like:

measurement,tag1=tagvalue1,tag2=tagvalue2 field1=0.0 unixtime1
measurement,tag1=tagvalue1,tag2=tagvalue2 field1=1.0 unixtime2

measurement,tag1=tagvalue4,tag2=tagvalue2 field1=0.0 unixtime1
measurement,tag1=tagvalue4,tag2=tagvalue2 field1=1.0 unixtime2

measurement,tag1=tagvalue1,tag2=tagvalue3 field1=0.0 unixtime1
measurement,tag1=tagvalue1,tag2=tagvalue3 field1=1.0 unixtime2

measurement,tag1=tagvalue4,tag2=tagvalue3 field1=0.0 unixtime1
measurement,tag1=tagvalue4,tag2=tagvalue3 field1=1.0 unixtime2

measurement,tag1=tagvalue1,tag2=tagvalue5 field1=0.0 unixtime1
measurement,tag1=tagvalue1,tag2=tagvalue5 field1=1.0 unixtime2

measurement,tag1=tagvalue4,tag2=tagvalue5 field1=0.0 unixtime1
measurement,tag1=tagvalue4,tag2=tagvalue5 field1=1.0 unixtime2

measurement,tag1=tagvalue1,tag2=tagvalue2 field2=0.0 unixtime1
measurement,tag1=tagvalue1,tag2=tagvalue2 field2=1.0 unixtime2

measurement,tag1=tagvalue4,tag2=tagvalue2 field2=0.0 unixtime1
measurement,tag1=tagvalue4,tag2=tagvalue2 field2=1.0 unixtime2

measurement,tag1=tagvalue1,tag2=tagvalue3 field2=0.0 unixtime1
measurement,tag1=tagvalue1,tag2=tagvalue3 field2=1.0 unixtime2

measurement,tag1=tagvalue4,tag2=tagvalue3 field2=0.0 unixtime1
measurement,tag1=tagvalue4,tag2=tagvalue3 field2=1.0 unixtime2

measurement,tag1=tagvalue1,tag2=tagvalue5 field2=0.0 unixtime1
measurement,tag1=tagvalue1,tag2=tagvalue5 field2=1.0 unixtime2

measurement,tag1=tagvalue4,tag2=tagvalue5 field2=0.0 unixtime1
measurement,tag1=tagvalue4,tag2=tagvalue5 field2=1.0 unixtime2

I encourage you to replace the metasyntax timestamps with actual unix timestamps and try out the grouping on your own. You can use can use this unix timestamp converter to get two unix timestamps of your choice or you can use the following two values:

  • 1628229600000000000 (or 2021-08-06T06:00:00.000000000Z)
  • 1628229900000000000 (or 2021-08-06T06:05:00.000000000Z)

Then write the data to InfluxDB with the CLI, API, or InfluxDB UI.

The following query will return the following 12 separate tables:

from(bucket: "bucket1")
|> range(start: 0)

Note that an extra row has been added to each table to denote if each column is part of the group key. The start column has been removed from the table response for simplicity. The table column has been added to help you keep track of the number of columns. Remember, the group key for the table column is an exception and it’s always set to false. The group key for the table is set to false because users can’t directly change the table number. The table record will always be the same across rows even though the group key is set to false.

Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
0measurement1field1tagvalue1tagvalue20.0rfc3339time1
0measurement1field1tagvalue1tagvalue21.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
1measurement1field1tagvalue4tagvalue20.0rfc3339time1
1measurement1field1tagvalue4tagvalue21.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
2measurement1field1tagvalue1tagvalue30.0rfc3339time1
2measurement1field1tagvalue1tagvalue31.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
3measurement1field1tagvalue4tagvalue30.0rfc3339time1
3measurement1field1tagvalue4tagvalue31.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
4measurement1field1tagvalue1tagvalue50.0rfc3339time1
4measurement1field1tagvalue1tagvalue51.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
5measurement1field1tagvalue4tagvalue50.0rfc3339time1
5measurement1field1tagvalue4tagvalue51.0rfc3339time2

Aaa

Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
6measurement1field2tagvalue1tagvalue20.0rfc3339time1
6measurement1field2tagvalue1tagvalue21.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
7measurement1field2tagvalue4tagvalue20.0rfc3339time1
7measurement1field2tagvalue4tagvalue21.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
8measurement1field2tagvalue1tagvalue30.0rfc3339time1
8measurement1field2tagvalue1tagvalue31.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
9measurement1field2tagvalue4tagvalue30.0rfc3339time1
9measurement1field2tagvalue4tagvalue31.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
10measurement1field2tagvalue1tagvalue50.0rfc3339time1
10measurement1field2tagvalue1tagvalue51.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
11measurement1field2tagvalue4tagvalue50.0rfc3339time1
11measurement1field2tagvalue4tagvalue51.0rfc3339time2

By default each field name will be in a separate table, and then there are 6 unique combinations of tag values grouped with each field:

  • tagvalue1 and tagvalue2
  • tagvalue4 and tagvalue2
  • tagvalue1 and tagvalue3
  • tagvalue4 and tagvalue3
  • tagvalue1 and tagvalue5
  • tagvalue4 and tagvalue5

group()

The group() function can be to redefine the group keys, which will then result in regrouping the tables. We can begin by examining how defined the group key to a single column can affect the tables.

from(bucket: "bucket1")
|> range(start: 0)
|> group(columns: ["tag1"])

We know that there are 2 tag values for tag1 (tagvalue1 and tagvalue4), so we can predict that there will be two tables after the grouping:

Not In Group KeyNot In Group KeyNot In Group KeyIn Group KeyNot in Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
0measurement1field1tagvalue1tagvalue20.0rfc3339time1
0measurement1field1tagvalue1tagvalue21.0rfc3339time2
0measurement1field1tagvalue1tagvalue30.0rfc3339time1
0measurement1field1tagvalue1tagvalue31.0rfc3339time2
0measurement1field1tagvalue1tagvalue50.0rfc3339time1
0measurement1field1tagvalue1tagvalue51.0rfc3339time2
0measurement1field2tagvalue1tagvalue20.0rfc3339time1
0measurement1field2tagvalue1tagvalue21.0rfc3339time2
0measurement1field2tagvalue1tagvalue30.0rfc3339time1
0measurement1field2tagvalue1tagvalue31.0rfc3339time2
0measurement1field2tagvalue1tagvalue50.0rfc3339time1
0measurement1field2tagvalue1tagvalue51.0rfc3339time2
Not In Group KeyNot In Group KeyNot In Group KeyIn Group KeyNot in Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
1measurement1field1tagvalue4tagvalue20.0rfc3339time1
1measurement1field1tagvalue4tagvalue21.0rfc3339time2
1measurement1field1tagvalue4tagvalue30.0rfc3339time1
1measurement1field1tagvalue4tagvalue31.0rfc3339time2
1measurement1field1tagvalue4tagvalue50.0rfc3339time1
1measurement1field1tagvalue4tagvalue51.0rfc3339time2
1measurement1field2tagvalue4tagvalue20.0rfc3339time1
1measurement1field2tagvalue4tagvalue21.0rfc3339time2
1measurement1field2tagvalue4tagvalue30.0rfc3339time1
1measurement1field2tagvalue4tagvalue31.0rfc3339time2
1measurement1field2tagvalue4tagvalue50.0rfc3339time1
1measurement1field2tagvalue4tagvalue51.0rfc3339time2

If we group by both tags, then we can predict that there will be 6 tables because, as described above, there are three unique combinations of tag values:

from(bucket: "bucket1")
|> range(start: 0)
|> group(columns: ["tag1", "tag2"])
Not In Group KeyNot In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
0measurement1field1tagvalue1tagvalue20.0rfc3339time1
0measurement1field1tagvalue1tagvalue21.0rfc3339time2
0measurement1field2tagvalue1tagvalue20.0rfc3339time1
0measurement1field2tagvalue1tagvalue21.0rfc3339time2
Not In Group KeyNot In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
1measurement1field1tagvalue4tagvalue20.0rfc3339time1
1measurement1field1tagvalue4tagvalue21.0rfc3339time2
1measurement1field2tagvalue4tagvalue20.0rfc3339time1
1measurement1field2tagvalue4tagvalue21.0rfc3339time2
Not In Group KeyNot In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
2measurement1field1tagvalue1tagvalue30.0rfc3339time1
2measurement1field1tagvalue1tagvalue31.0rfc3339time2
2measurement1field2tagvalue1tagvalue30.0rfc3339time1
2measurement1field2tagvalue1tagvalue31.0rfc3339time2
Not In Group KeyNot In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
3measurement1field1tagvalue4tagvalue30.0rfc3339time1
3measurement1field1tagvalue4tagvalue31.0rfc3339time2
3measurement1field2tagvalue4tagvalue30.0rfc3339time1
3measurement1field2tagvalue4tagvalue31.0rfc3339time2
Not In Group KeyNot In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
4measurement1field1tagvalue1tagvalue50.0rfc3339time1
4measurement1field1tagvalue1tagvalue51.0rfc3339time2
4measurement1field2tagvalue1tagvalue50.0rfc3339time1
4measurement1field2tagvalue1tagvalue51.0rfc3339time2
Not In Group KeyNot In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
5measurement1field1tagvalue4tagvalue50.0rfc3339time1
5measurement1field1tagvalue4tagvalue51.0rfc3339time2
5measurement1field2tagvalue4tagvalue50.0rfc3339time1
5measurement1field2tagvalue4tagvalue51.0rfc3339time2

Grouping by field alone, we can predict that we will see a total of 2 tables, because the data set has only 2 field names.

from(bucket: "bucket1")
|> range(start: 0)
|> group(columns: ["_field"])
Not In Group KeyNot In Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
0measurement1field1tagvalue1tagvalue20.0rfc3339time1
0measurement1field1tagvalue1tagvalue21.0rfc3339time2
0measurement1field1tagvalue4tagvalue20.0rfc3339time1
0measurement1field1tagvalue4tagvalue21.0rfc3339time2
0measurement1field1tagvalue1tagvalue30.0rfc3339time1
0measurement1field1tagvalue1tagvalue31.0rfc3339time2
0measurement1field1tagvalue4tagvalue30.0rfc3339time1
0measurement1field1tagvalue4tagvalue31.0rfc3339time2
0measurement1field1tagvalue1tagvalue50.0rfc3339time1
0measurement1field1tagvalue1tagvalue51.0rfc3339time2
0measurement1field1tagvalue4tagvalue50.0rfc3339time1
0measurement1field1tagvalue4tagvalue51.0rfc3339time2
Not In Group KeyNot In Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
1measurement1field2tagvalue1tagvalue20.0rfc3339time1
1measurement1field2tagvalue1tagvalue21.0rfc3339time2
1measurement1field2tagvalue4tagvalue20.0rfc3339time1
1measurement1field2tagvalue4tagvalue21.0rfc3339time2
1measurement1field2tagvalue1tagvalue30.0rfc3339time1
1measurement1field2tagvalue1tagvalue31.0rfc3339time2
1measurement1field2tagvalue4tagvalue30.0rfc3339time1
1measurement1field2tagvalue4tagvalue31.0rfc3339time2
1measurement1field2tagvalue1tagvalue50.0rfc3339time1
1measurement1field2tagvalue1tagvalue51.0rfc3339time2
1measurement1field2tagvalue4tagvalue50.0rfc3339time1
1measurement1field2tagvalue4tagvalue51.0rfc3339time2

Any combination of columns can be used for grouping depending on your purposes. For example, we can ask for tables for each field with each value for tag1. We can predict that there will be 4 such tables, because there are two fields and two tag values for tag1:

from(bucket: "bucket1")
|> range(start: 0)
|> group(columns: ["_field", "tag1"])
Not In Group KeyNot In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
0measurement1field1tagvalue1tagvalue20.0rfc3339time1
0measurement1field1tagvalue1tagvalue21.0rfc3339time2
0measurement1field1tagvalue1tagvalue30.0rfc3339time1
0measurement1field1tagvalue1tagvalue31.0rfc3339time2
0measurement1field1tagvalue1tagvalue50.0rfc3339time1
0measurement1field1tagvalue1tagvalue51.0rfc3339time2
Not In Group KeyNot In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
1measurement1field1tagvalue4tagvalue20.0rfc3339time1
1measurement1field1tagvalue4tagvalue21.0rfc3339time2
1measurement1field1tagvalue4tagvalue30.0rfc3339time1
1measurement1field1tagvalue4tagvalue31.0rfc3339time2
1measurement1field1tagvalue4tagvalue50.0rfc3339time1
1measurement1field1tagvalue4tagvalue51.0rfc3339time2
Not In Group KeyNot In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
2measurement1field2tagvalue1tagvalue20.0rfc3339time1
2measurement1field2tagvalue1tagvalue21.0rfc3339time2
2measurement1field2tagvalue1tagvalue30.0rfc3339time1
2measurement1field2tagvalue1tagvalue31.0rfc3339time2
2measurement1field2tagvalue1tagvalue50.0rfc3339time1
2measurement1field2tagvalue1tagvalue51.0rfc3339time2
Not In Group KeyNot In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
3measurement1field2tagvalue4tagvalue20.0rfc3339time1
3measurement1field2tagvalue4tagvalue21.0rfc3339time2
3measurement1field2tagvalue4tagvalue30.0rfc3339time1
3measurement1field2tagvalue4tagvalue31.0rfc3339time2
3measurement1field2tagvalue4tagvalue50.0rfc3339time1
3measurement1field2tagvalue4tagvalue51.0rfc3339time2

group() and Type Conflicts

In the above example, the values for field1 and field2 were always floats. Therefore, when grouping both of those fields into the same tables, it worked. However, recall that a column in InfluxDB can only have one type.

Given the following two tables which differ only in that they have a different field name, and their field values have a different type:

table_measurement_field_value_time
0measurement1field11irfc3339time2
table_measurement_field_value_time
1measurement1field21.0rfc3339time2

An attempt to group these two tables into the same table will result in the following error:

from(bucket: "bucket1")
|> range(start: 0)
|> group()
schema collision: cannot group integer and float types together

The simplest way to address this is to convert all of the values to floats using the toFloat() function. This function simply converts the value field for each record into a float if possible.

from(bucket: "bucket1")
|> range(start: 0)
|> toFloat()
|> group()
table_measurement_field_value_time
0measurement1field11.0rfc3339time2
0measurement1field21.0rfc3339time2

drop()/keep()

Another way to affect the group keys is to simply remove columns that are in the group key using the drop() or keep() functions. These two functions operate in the same manner, it’s just a matter of supplying a list of columns to eliminate vs. preserve.

Note that the following are equivalent:

from(bucket: "bucket1")
|> range(start: 0)
|> drop(columns: ["tag1","tag2"])

from(bucket: "bucket1")
|> range(start: 0)
|> keep(columns: ["_measurement","_field","_value","_time"])

In both cases the effect is to remove both of the tag columns from the table. Because tags are always in the group key by default, this change will leave only _measurement and _field in the group key. Because there is only one measurement, this will result in grouping solely by _field.

Not in Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_field_value_time
0measurement1field10.02021-08-17T21:22:00.000000000Z
0measurement1field11.02021-08-17T21:22:01.000000000Z
0measurement1field10.02021-08-17T21:22:02.000000000Z
0measurement1field11.02021-08-17T21:22:03.000000000Z
0measurement1field10.02021-08-17T21:22:04.000000000Z
0measurement1field11.02021-08-17T21:22:05.000000000Z
0measurement1field10.02021-08-17T21:22:00.000000000Z
0measurement1field11.02021-08-17T21:22:01.000000000Z
0measurement1field10.02021-08-17T21:22:02.000000000Z
0measurement1field11.02021-08-17T21:22:03.000000000Z
0measurement1field10.02021-08-17T21:22:04.000000000Z
0measurement1field11.02021-08-17T21:22:05.000000000Z
Not in Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
1_measurement_field_value_time
1measurement1field10.02021-08-17T21:22:00.000000000Z
1measurement1field11.02021-08-17T21:22:01.000000000Z
1measurement1field10.02021-08-17T21:22:02.000000000Z
1measurement1field11.02021-08-17T21:22:03.000000000Z
1measurement1field10.02021-08-17T21:22:04.000000000Z
1measurement1field11.02021-08-17T21:22:05.000000000Z
1measurement1field10.02021-08-17T21:22:00.000000000Z
1measurement1field11.02021-08-17T21:22:01.000000000Z
1measurement1field10.02021-08-17T21:22:02.000000000Z
1measurement1field11.02021-08-17T21:22:03.000000000Z
1measurement1field10.02021-08-17T21:22:04.000000000Z
1measurement1field11.02021-08-17T21:22:05.000000000Z

Note that drop() and keep() are both susceptible to the same type conflicts that can cause errors with group().

rename()

The rename() function does not change the group key, but simply changes the names of the columns. It works by providing the function with a mapping of old column names to new column names.

Given the following very simple table:

_measurementtag1_field_value_time
measurement1tagvalue1field11irfc3339time1

The following code simply renames the tag1 column:

from(bucket: "bucket1")
|> range(start: 0)
|> rename(columns: {"tag1":"tag2"})
_measurementtag2_field_value_time
measurement1tagvalue1field11irfc3339time1

You can rename any column which can be very useful for things like formatting data to present to users. However, renaming can have some unintended consequences. For example, if you rename the _value column, certain functions will have surprising results or fail because they operate on _value.

from(bucket: "bucket1")
|> range(start: 0)
|> rename(columns: {"_value":"astring"})
_measurementtag1_fieldastring_time
measurement1tagvalue1field11irfc3339time1
from(bucket: "bucket1")
|> range(start: 0)
|> rename(columns: {"_value":"astring"})
|> toFloat()
_measurementtag1_field_valueastring_time
measurement1tagvalue1field11irfc3339time1

In this case, because there was no _value column to convert from, the toFloat() method had no data to place in the _value column that it creates.

Creating a Single Table or Ungrouping

Finally, it is possible to put all of the data into a single table assuming that you avoid type conflicts. This is achieved by using the group() function with no arguments. Basically making the group key empty, so all of the data gets grouped into a single table. This is effectively the same as ungrouping.

from(bucket: "bucket1")
  |> range(start: 0)
  |> group()
Not In Group KeyNot In Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldtag1tag2_value_time
0measurement1field1tagvalue1tagvalue20.0rfc3339time1
0measurement1field1tagvalue1tagvalue21.0rfc3339time2
0measurement1field1tagvalue4tagvalue20.0rfc3339time1
0measurement1field1tagvalue4tagvalue21.0rfc3339time2
0measurement1field1tagvalue1tagvalue30.0rfc3339time1
0measurement1field1tagvalue1tagvalue31.0rfc3339time2
0measurement1field1tagvalue4tagvalue30.0rfc3339time1
0measurement1field1tagvalue4tagvalue31.0rfc3339time2
0measurement1field1tagvalue1tagvalue50.0rfc3339time1
0measurement1field1tagvalue1tagvalue51.0rfc3339time2
0measurement1field1tagvalue4tagvalue50.0rfc3339time1
0measurement1field1tagvalue4tagvalue51.0rfc3339time2
0measurement1field2tagvalue1tagvalue20.0rfc3339time1
0measurement1field2tagvalue1tagvalue21.0rfc3339time2
0measurement1field2tagvalue4tagvalue20.0rfc3339time1
0measurement1field2tagvalue4tagvalue21.0rfc3339time2
0measurement1field2tagvalue1tagvalue30.0rfc3339time1
0measurement1field2tagvalue1tagvalue31.0rfc3339time2
0measurement1field2tagvalue4tagvalue30.0rfc3339time1
0measurement1field2tagvalue4tagvalue31.0rfc3339time2
0measurement1field2tagvalue1tagvalue50.0rfc3339time1
0measurement1field2tagvalue1tagvalue51.0rfc3339time2
0measurement1field2tagvalue4tagvalue50.0rfc3339time1
0measurement1field2tagvalue4tagvalue51.0rfc3339time2

Windowing

Windowing is when you group data by the start and stop times with the window() function. In previous sections the _start and _stop columns have been omitted for simplicity because they usually represent the start and stop times defined in the range function and their values remain unchanged. However, the window() function affects the values of the _start and _stop columns so we’ll include these columns in this section. The window() function doesn’t affect the _time column, so we’ll exclude this column to simplify the examples in this section. To illustrate how the window() function works let’s filter our data for tagvalue1 and tagvalue4 and include the _start and _stop columns:

from(bucket: "bucket1")
  |> range(start: 2021-08-17T00:00:00, stop:2021-08-17T3:00:00 )
  |> filter(fn: (r) => r["tag1"] == "tagvalue1" or r["tag1"] == "tagvalue4")
  |> filter(fn: (r) => r["_field"] == "field1")  
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_start_stop_measurement_fieldtag1tag2_value_time
02021-08-17T00:00:002021-08-17T03:00:00measurement1field1tagvalue1tagvalue20.02021-08-17T01:00:00
02021-08-17T00:00:002021-08-17T03:00:00measurement1field1tagvalue1tagvalue21.02021-08-17T02:00:00
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_start_stop_measurement_fieldtag1tag2_value_time
12021-08-17T00:00:002021-08-17T03:00:00measurement1field1tagvalue4tagvalue20.02021-08-17T01:00:00
12021-08-17T00:00:002021-08-17T03:00:00measurement1field1tagvalue4tagvalue21.02021-08-17T02:00:00

If you apply the window() function the values in the _start and _stop column will change to reflect the defined window period:

from(bucket: "bucket1")
  |> range(start: 2021-08-17T00:00:00, stop:2021-08-17T3:00:00 )
  |> filter(fn: (r) => r["tag1"] == "tagvalue1" or r["tag1"] == "tagvalue4")
  |> filter(fn: (r) => r["tag2"] == "tagvalue2") 
  |> window(period: 90m)
  // the following syntax is synonymous with the line above |> window(every: 90m)
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_start_stop_measurement_fieldtag1tag2_value_time
02021-08-17T00:00:002021-08-17T01:30:00measurement1field1tagvalue1tagvalue20.02021-08-17T01:00:00
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_start_stop_measurement_fieldtag1tag2_value_time
12021-08-17T00:00:002021-08-17T01:30:00measurement1field1tagvalue4tagvalue20.02021-08-17T01:00:00
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_start_stop_measurement_fieldtag1tag2_value_time
22021-08-17T01:30:002021-08-17T03:00:00measurement1field1tagvalue1tagvalue21.02021-08-17T02:00:00
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_start_stop_measurement_fieldtag1tag2_value_time
32021-08-17T01:30:002021-08-17T03:00:00measurement1field1tagvalue4tagvalue21.02021-08-17T02:00:00

The boundary for the window period, as defined by either the period or every paramenter, is not based on execution time or the timestamps of any points returned by the range function. Instead windowing occurs at the top of the second, minute, hour, month, year. Additionally, The window boundaries or groupings won’t exceed the timestamps of the data you return from the range function. For example, imagine the following scenario: You query for data with |> range(start: 2022-01-20T20:18:00.000Z, stop: 2022-01-20T21:19:25.000Z) and your last record has a timestamp in that range of 2022-01-20T20:19:20.000Z. You’re every/periodduration is 30s. Then your last window group will have a _start value of 2022-01-20T20:19:00.000Z and a _stop of value of 2022-01-20T20:19:30.000Z. Notice how window grouping does not extend until the stop value specified by the range function. Instead, the grouping stops to include the final point.

Windowing is performed for two main reasons:

  1. To aggregate data across fields or tags with timestamps in the same period.
  2. To transform high precision series into a lower resolution aggregation.

To aggregate data across fields or tags with similar timestamps, you can first apply the window() function like above, then you can group your data by the _start times. Now data that’s in the same window will be in the same table, so you can apply an aggregation after:

from(bucket: "bucket1")
  |> range(start: 2021-08-17T00:00:00, stop:2021-08-17T3:00:00 )
  |> filter(fn: (r) => r["tag1"] == "tagvalue1" or r["tag1"] == "tagvalue4")
  |> window(period: 90m)
  |> group(columns: ["_start"], mode:"by")
  |> yield(name:"after group")
  |> sum()
  |> yield(name:"after sum") 

The result after the first yield, “after group” looks like:

Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group Key
table_start_stop_measurement_fieldtag1tag2_value_time
02021-08-17T00:00:002021-08-17T01:30:00measurement1field1tagvalue1tagvalue20.02021-08-17T01:00:00
02021-08-17T00:00:002021-08-17T01:30:00measurement1field1tagvalue4tagvalue20.02021-08-17T01:00:00
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_start_stop_measurement_fieldtag1tag2_value_time
12021-08-17T01:30:002021-08-17T03:00:00measurement1field1tagvalue1tagvalue21.02021-08-17T02:00:00
12021-08-17T01:30:002021-08-17T03:00:00measurement1field1tagvalue4tagvalue21.02021-08-17T02:00:00

The result after the first yield, “after sum” looks like:

Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group KeyNot in Group Key
table_start_stop_measurement_fieldtag1tag2_value
02021-08-17T00:00:002021-08-17T01:30:00measurement1field1tagvalue1tagvalue20.0
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group Key
table_start_stop_measurement_fieldtag1tag2_value
12021-08-17T01:30:002021-08-17T03:00:00measurement1field1tagvalue1tagvalue22.0

The sum() function is an aggregator so the _time column is removed because there isn’t a timestamp associated with the sum of two values. Keep in mind that in this example the timestamps in the _time column in the “after group” output happen to be the same, but this aggregation across fields within time windows would work even if the timestamps were different. The _time column isn’t a part of the group key.

Windowing back in time

When you apply the window() function, you group data on forward in time or create window bounds that align with the start of your data. You can’t window back it time, but you can produce the same effect by using the offset parameter. This parameter specifies the duration to shift the window boundaries by. You can use the offset parameter to make the windows always align with the current time which effectively groups the data backwards in time.

option offset = duration(v: int(v: now()))

data = from(bucket: "bucket1")
  |> range(start: -1m)
  |> filter(fn: (r) => r["_measurement"] == "measurement1")
  |> yield(name: "data")

data 
  |> aggregateWindow(every: 30s, fn: mean, createEmpty: false, offset: offset)
  |> yield(name: "offset effectively windowing backward in time")

Alternatively, you could calculate the duration difference between your actual start time and the required start time so that your windows align with stop time instead. Then you could add that duration to the start with the offset parameter. The previous approach is the recommended approach, but examining multiple approaches lends us an appreciation for the power and flexibility that Flux provides.

data = from(bucket: "bucket1")
  |> range(start: 2021-08-19T19:23:37.000Z, stop: 2021-08-19T19:24:18.000Z )
  |> filter(fn: (r) => r["_measurement"] == "measurement1")

lastpoint = data |> last() |> findRecord(fn: (key) => true , idx:0 )
// the last point is 2021-08-19T19:24:15.000Z
firstpoint = data |> first() |> findRecord(fn: (key) => true , idx:0 )
// the first point is 2021-08-19T19:23:40.000Z

time1 = uint(v: firstpoint._time)
// 1629401020000000000
time2 = uint(v: lastpoint._time)
// 1629401055000000000

mywindow = uint(v: 15s) 

remainder = math.remainder(x: float(v:time2) - float(v:time1), y: float(v:mywindow))
// remainder of (1629401055000000000 - 1629401055000000000)/15000000000 = 5000000000

myduration = duration(v: uint(v: remainder)) 
//5s

data
  |> aggregateWindow(every: 15s, fn: mean, offset: myduration) 

Windowing and aggregateWindow()

The most common reason for using the window() function is to transform high precision data into lower resolution aggregations. Simply applying a sum() after a window would calculate the sum of the data for each series within the window period. To better illustrate window() function let’s look at the following simplified input data:

Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_start_stop_measurement_field_value_time
02021-08-17T00:00:002021-08-17T01:30:00measurement1field10.02021-08-17T00:30:00
02021-08-17T00:00:002021-08-17T01:30:00measurement1field11.02021-08-17T01:00:00
02021-08-17T00:00:002021-08-17T01:30:00measurement1field12.02021-08-17T01:30:00
02021-08-17T00:00:002021-08-17T01:30:00measurement1field13.02021-08-17T02:00:00

The _time column has been removed because the window() function doesn’t affect the values of the _time column. It only affects the values of the _start and _stop columns. The window() function calculates windows of time based of the duration specified with the period or every parameter and groups the records based on the bounds of that window period. The following query would return one table with the sum for all the points in the series within a 90 min window:

from(bucket: "bucket1")
  |> range(start: 2021-08-17T00:00:00, stop:2021-08-17T3:00:00 )
  |> filter(fn: (r) => r["_field"] == "field1") 
  |> window(period: 90m) 
  |> sum()
Not in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group Key
table_start_stop_measurement_field_value
12021-08-17T00:00:002021-08-17T01:30:00measurement1field15.0

By using the window() function following an aggregation function, we’ve reduced the number of points in our series by half. We’ve transformed a higher resolution data set into a lower resolution sum over 90 min windows. This combination of functions introduces another similar function, the aggregateWindow() function.

The aggregateWindow() function windows data and applies an aggregate function or selector function to the data. You can think of the aggregateWindow() function as being a combination of the window() function followed by an aggregate or selector function. The difference between the window() function and the aggregateWindow() function is that the aggregateWindow() function applies a group to your data at the end so that your lower resolution aggregations aren’t separated into different tables by their window period. Instead all lower resolution aggregations are grouped together. In other words, these two queries are equivalent:

query1 = from(bucket: "bucket1")
  |> range(start: 2021-08-17T00:00:00, stop:2021-08-17T3:00:00 )
  |> filter(fn: (r) => r["_field"] == "field1" or r["_field"] == "field2" ) 
  |> window(period: 90m) 
  |> sum()
  |> group(columns: ["_field"],mode:"by")

query2 = from(bucket: "bucket1")
  |> range(start: 2021-08-17T00:00:00, stop:2021-08-17T3:00:00 )
  |> filter(fn: (r) => r["_field"] == "field1" or r["_field"] == "field2" ) 
  |> aggregateWindow(every: 90m, fn: sum)

Real World Data Example of Grouping

Having reviewed grouping and aggregation using clean instructive data, it is worthwhile to review the concepts again, but looking at real world data. The NOAA buoy data is a good sample set to look at due to its complexity.

For example, to take a closer look at wind speeds, the following query will simply return all of the tables with the field “wind_speed_mps.”

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r["_field"] == "wind_speed_mps")

This query reads back data for a total of around 628 tables (this value will be more or less depending on the time range queried). Due to the combination of tag values and the restricted time range, most of the tables returned have only a single row. Here are the first few rows as an example.

Default Grouping

In Group KeyIn Group KeyNot in Group KeyNot in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T03:35:12.486582468Z2021-08-03T15:35:12.486582468Z2021-08-03T05:00:00Z

9

wind_speed_mpsndbc

46303

Southern Georgia StraitEnvironment and Climate Change CanadaInternational Partnersbuoy
In Group KeyIn Group KeyNot in Group KeyNot in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T03:35:12.486582468Z2021-08-03T15:35:12.486582468Z2021-08-03T05:00:00Z

7.7

wind_speed_mpsndbcFWYF1Fowey Rock, FLNDBCNDBC Meteorological/Oceanfixed
In Group KeyIn Group KeyNot in Group KeyNot in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T03:35:12.486582468Z2021-08-03T15:35:12.486582468Z2021-08-03T04:30:00Z

2.1

wind_speed_mpsndbcGELO1Geneva on the Lake Light, OHNWS Eastern RegionIOOS Partnersfixed
In Group KeyIn Group KeyNot in Group KeyNot in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T03:35:12.486582468Z2021-08-03T15:35:12.486582468Z2021-08-03T05:18:00Z

4.6

wind_speed_mpsndbcFMOA18734673 - Fort Morgan, ALNOAA NOS PORTSNOS/CO-OPSfixed
In Group KeyIn Group KeyNot in Group KeyNot in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T03:35:12.486582468Z2021-08-03T15:35:12.486582468Z2021-08-03T05:18:00Z

6.7

wind_speed_mpsndbcTXPT28770822 - Texas Point, Sabine Pass, TXTCOONIOOS Partnersfixed
In Group KeyIn Group KeyNot in Group KeyNot in Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T03:35:12.486582468Z2021-08-03T15:35:12.486582468Z2021-08-03T05:30:00Z

2

wind_speed_mpsndbc

45170

Michigan City Buoy, INIllinois-Indiana Sea Grant and Purdue Civil EngineeringIOOS Partnersbuoy

group()

The group() function can be to redefine the group keys, which will then result in entirely different tables. For example:

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r["_field"] == "wind_speed_mps")
  |> group(columns: ["station_type"])

This call to group() tells Flux to make only the single column station_type to be in the set of columns in the group key. station_type has four possible values (“buoy”,”fixed”, “oilrig”, and “other”). As a result, we know that the results will then contain exactly 4 tables. Here are excerpts from those tables:

In Group KeyIn Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

3

wind_speed_mpsndbc

22102

Korean Meteorological AdministrationInternational Partnersbuoy
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

4

wind_speed_mpsndbc

22103

Korean Meteorological AdministrationInternational Partnersbuoy
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

2

wind_speed_mpsndbc

22104

Korean Meteorological AdministrationInternational Partnersbuoy
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

6

wind_speed_mpsndbc

22105

Korean Meteorological AdministrationInternational Partnersbuoy
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

5

wind_speed_mpsndbc

22106

Korean Meteorological AdministrationInternational Partnersbuoy
...
In Group KeyIn Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T04:30:00Z

5

wind_speed_mpsndbc32ST0StratusWoods Hole Oceanographic InstitutionIOOS Partnersfixed
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

4.1

wind_speed_mpsndbc

62103

Channel LightshipUK Met OfficeInternational Partnersfixed
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T04:45:00Z

0

wind_speed_mpsndbcACXS1Bennetts Point, ACE Basin Reserve, SCNational Estuarine Research Reserve SystemNERRSfixed
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:30:00Z

5.7

wind_speed_mpsndbcAMAA2East Amatuli Island Light, AKNDBCNDBC Meteorological/Oceanfixed
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T04:30:00Z

0

wind_speed_mpsndbcANMN6Field Station, Hudson River Reserve, NYNational Estuarine Research Reserve SystemNERRSfixed
...
In Group KeyIn Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

2.6

wind_speed_mpsndbc

62114

Tartan "A" AWSPrivate Industry Oil PlatformInternational Partnersoilrig
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

2.1

wind_speed_mpsndbc

62121

Carrack AWSPrivate Industry Oil PlatformInternational Partnersoilrig
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

1.5

wind_speed_mpsndbc

62144

Clipper AWSPrivate Industry Oil PlatformInternational Partnersoilrig
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

2.6

wind_speed_mpsndbc

62145

North SeaPrivate Industry Oil PlatformInternational Partnersoilrig
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:00:00Z

1.5

wind_speed_mpsndbc

62146

Lomond AWSPrivate Industry Oil PlatformInternational Partnersoilrig
...
In Group KeyIn Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:20:00Z

9

wind_speed_mpsndbc

41002

SOUTH HATTERAS - 225 NM South of Cape HatterasNDBCNDBC Meteorological/Oceanother
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:20:00Z

6

wind_speed_mpsndbc

41009

CANAVERAL 20 NM East of Cape Canaveral, FLNDBCNDBC Meteorological/Oceanother
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:20:00Z

12

wind_speed_mpsndbc

41010

CANAVERAL EAST - 120NM East of Cape CanaveralNDBCNDBC Meteorological/Oceanother
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:20:00Z

2

wind_speed_mpsndbc

41013

Frying Pan Shoals, NCNDBCNDBC Meteorological/Oceanother
2021-08-03T03:50:09.78158678Z2021-08-03T15:50:09.78158678Z2021-08-03T05:20:00Z

8

wind_speed_mpsndbc

41040

NORTH EQUATORIAL ONE- 470 NM East of MartiniqueNDBCNDBC Meteorological/Oceanother
...

You may note that _start and _stop are also in the group key. Remember that these columns are added by Flux to specify the time range of the data being returned. For data from a single query, these values will always be the same for all rows, and thus will not change the number of tables.

You can further group by including multiple columns. For example, one can add station_pgm (the name of the partner organization providing the data) to the group key as well:

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r["_field"] == "wind_speed_mps")
  |> group(columns: ["station_type", "station_pgm"])

Now we can see in the returned tables, that because station_type and station_pgm are in the group key, the unique combinations of those values are in separate tables. For example IOOS Partners have both buoy stations and fixed stations, so those different station types are grouped into separate tables.

In Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyIn Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T04:11:46.849771273Z2021-08-03T16:11:46.849771273Z2021-08-03T05:08:00Z

2

wind_speed_mpsndbc

41033

Fripp Nearshore, SC (FRP2)CORMPIOOS Partnersbuoy
2021-08-03T04:11:46.849771273Z2021-08-03T16:11:46.849771273Z2021-08-03T05:08:00Z

2

wind_speed_mpsndbc

41037

Wrightsville Beach Offshore, NC (ILM3)CORMPIOOS Partnersbuoy
2021-08-03T04:11:46.849771273Z2021-08-03T16:11:46.849771273Z2021-08-03T05:00:00Z

7

wind_speed_mpsndbc

41052

South of St. John, VICaribbean Integrated Coastal Ocean Observing System (CarICoos)IOOS Partnersbuoy
2021-08-03T04:11:46.849771273Z2021-08-03T16:11:46.849771273Z2021-08-03T05:00:00Z

4

wind_speed_mpsndbc

41053

San Juan, PRCaribbean Integrated Coastal Ocean Observing System (CarICoos)IOOS Partnersbuoy
2021-08-03T04:11:46.849771273Z2021-08-03T16:11:46.849771273Z2021-08-03T05:00:00Z

6

wind_speed_mpsndbc

41056

Vieques Island, PRCaribbean Integrated Coastal Ocean Observing System (CarICoos)IOOS Partnersbuoy
...
In Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyIn Group KeyIn Group Key
_start_stop_time_value_field_measurementstation_idstation_namestation_ownerstation_pgmstation_type
2021-08-03T04:11:46.849771273Z2021-08-03T16:11:46.849771273Z2021-08-03T04:30:00Z

5

wind_speed_mpsndbc32ST0StratusWoods Hole Oceanographic InstitutionIOOS Partnersfixed
2021-08-03T04:11:46.849771273Z2021-08-03T16:11:46.849771273Z2021-08-03T04:30:00Z

7

wind_speed_mpsndbc41NT0NTAS - Northwest Tropical AtlanticWoods Hole Oceanographic InstitutionIOOS Partnersfixed
2021-08-03T04:11:46.849771273Z2021-08-03T16:11:46.849771273Z2021-08-03T05:18:00Z

3.1

wind_speed_mpsndbcANPT28775241 - Aransas, Aransas Pass, TXTCOONIOOS Partnersfixed
2021-08-03T04:11:46.849771273Z2021-08-03T16:11:46.849771273Z2021-08-03T05:30:00Z

4.1

wind_speed_mpsndbcAPNM4Alpena Harbor Light, Alpena, MIGLERLIOOS Partnersfixed
2021-08-03T04:11:46.849771273Z2021-08-03T16:11:46.849771273Z2021-08-03T04:40:00Z

1.5

wind_speed_mpsndbcBHRI3Burns Harbor, INNWS Central RegionIOOS Partnersfixed
...

drop()/keep()

Another way to affect the group keys is to simply remove columns that are in the group key using the drop() or keep() functions. These two functions operate in the same manner, it’s just a matter of supplying a list of columns to eliminate vs. preserve.

The following are equivalent:

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> drop(columns: ["_measurement","_start","_stop","station_name","station_owner"])

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> keep(columns: ["_field","_value","_time","station_id","station_pgm", "station_type"])

The first thing to notice is that applying keep() cleans up the data dramatically, making it much easier to read and work with. A common use of drop() and keep() therefore, is just to make the data more readable.

However, if you drop a column that is in the group key, this will impact the tables. For example, the query above results in 559 tables, because it leaves station_id, station_pgm, and station_type all in the group key, and the combination of those unique sets of tag values, adds up to 559 different combinations.

If we also drop the station_id, this drops the tag with the most unique values:

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> drop(columns: ["_measurement","_start","_stop","station_name","station_owner","station_id"])

This results in a total of only 12 tables. There are a total of 6 station_pgm values, each with about 2 station_types, so only a total of 12 tables.

Grouping and Type Conflicts

It is possible using group(), drop(), keep() or other functions to remove _field from the group key. Remember that Flux is strongly typed, so a column cannot contain values of multiple types. As a result, it is possible to create errors when grouping. Because tag values are always strings, and _start, _stop, and _time are always times, this problem almost always happens due to _fields.

So, if we rerun one the queries above without filtering the field using the following:

from(bucket: "noaa")
  |> range(start: -24h)
  |> group(columns: ["station_type", "station_pgm"])

Part of the output is an error message:

schema collision: cannot group float and string types together

This happens because some of the fields (station_met, station_currents, station_waterquality, station_dart) are all strings, so cannot be grouped into tables with the fields that are floats.

One way to solve this is to keep the _field column in the group key:

from(bucket: "noaa")
  |> range(start: -24h)
  |> group(columns: ["station_type", "station_pgm", "_field"])

Though of course this will result in creating many more tables.

Creating a Single Table

Finally, it is possible to put all of the data into a single table assuming that you avoid type conflicts. This is achieved by using the group() function with no arguments. Basically making the group key empty, so all of the data gets grouped into a single table.

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> group()

Aggregations

In the Flux vernacular, an “aggregation” is a summary of a table. Indeed, one of the key reasons to regroup is in order to summarize as desired.

As an example, to answer the question “do the different station types have different average windows speeds?” the overall approach would be to:

  1. Query the time range of interest
  2. Filter to just the wind_speed_mps field
  3. Group the results into one table for each station type
  4. Calculate the mean for each table
  5. Put all the results into a single table

The Flux looks like this:

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> group(columns: ["station_type"])
  |> mean()
  |> group()
_start_stop_station_type_value
2021-08-05T01:28:12.024108193Z2021-08-05T13:28:12.024108193Zbuoy4.997794117647059
2021-08-05T01:28:12.024108193Z2021-08-05T13:28:12.024108193Zfixed3.1083950617283946
2021-08-05T01:28:12.024108193Z2021-08-05T13:28:12.024108193Zoilrig6.883999999999998
2021-08-05T01:28:12.024108193Z2021-08-05T13:28:12.024108193Zother5.675675675675675

That second-last step of collapsing all the data for each table into a mean is what Flux calls “aggregation.”

The following sections cover some of the most common Flux aggregations.

mean()

This function is typically used as demonstrated above:

|> mean()

When called without arguments, mean() function will use the _value column. However, it is possible that pivoted data will have more than one column that could be aggregated, and, additionally, it is possible to have renamed _value. In such cases, as demonstrated here:

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> group(columns: ["station_type"])
  |> rename(columns: {"_value":"wind_speed"})
  |> mean()
  |> group()
runtime error @7:6-7:12: mean: column "_value" does not exist

This can be fixed easily by specifying the column in the mean() function:

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> group(columns: ["station_type"])
  |> rename(columns: {"_value":"wind_speed"})
  |> mean(column: "wind_speed")
  |> group()
_start_stop_station_typewind_speed
2021-08-05T01:28:12.024108193Z2021-08-05T13:28:12.024108193Zbuoy4.997794117647059
2021-08-05T01:28:12.024108193Z2021-08-05T13:28:12.024108193Zfixed3.1083950617283946
2021-08-05T01:28:12.024108193Z2021-08-05T13:28:12.024108193Zoilrig6.883999999999998
2021-08-05T01:28:12.024108193Z2021-08-05T13:28:12.024108193Zother5.675675675675675

Note that because mean() aggregates data from all rows in a table, most columns get dropped. Only the columns in the group key and the column that was subject to the mean() function is preserved.

min() and max()

These will always return exactly one row, with the lowest or highest value in the _value column for each table. Like with the mean() funciton, you can specify the column you want to use, but the _value column is used by default.

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> keep(columns: ["_value","station_type","_time"])
  |> min()
  |> group()
_start_stop_time_value_field_masurement_station_id_station_namestation_ownerstation_pgmstation_type
2021-08-05T02:27:53.856929358Z2021-08-05T14:27:53.856929358Z2021-08-05T05:00:00Z1wind_speed_mpsndbc45152Lake NipissingEnvironment and Climate Change Canada,International Partnersbuoy
2021-08-05T02:27:53.856929358Z2021-08-05T14:27:53.856929358Z2021-08-05T04:30:00Z0wind_speed_mpsndbcANMN6Field Station, Hudson River Reserve, NYNational Estuarine Research Reserve SystemNERRSfixed
2021-08-05T02:27:53.856929358Z2021-08-05T14:27:53.856929358Z2021-08-05T05:30:00Z0wind_speed_mpsndbcKGRYGreen Canyon 338 / Front RunnerFederal Aviation AdministrationMarine METARoilrig
2021-08-05T02:27:53.856929358Z2021-08-05T14:27:53.856929358Z2021-08-05T05:30:00Z0wind_speed_mpsndbc46025,"Santa Monica Basin - 33NM WSW of Santa Monica, CA"NDBCNDBC Meteorological/Oceanother

In this case, all of the columns are retained. This is because min() and max() return a row per table. Effectively picking a row and filtering out the rest. These functions do not, therefore, need to combine values from different rows, so all of the columns are retained. Note that this can cause group() to fail if there are type conflicts in columns, as covered later in the section on type conflicts.

Of course, the data can be cleaned up by dropping unwanted columns:

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> group(columns: ["station_type"])
  |> min()
  |> group()
  |> keep(columns: ["_time", "_value", "station_type"])
_time_valuestation_type
2021-08-05T05:00:00Z1buoy
2021-08-05T04:30:00Z0fixed
2021-08-05T05:30:00Z0oilrig
2021-08-05T05:30:00Z0other

count()

The count() function returns the number of rows in a table. This can be particularly useful for counting events. In this case, it is used to count the number of the different station types reporting in:

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> group(columns: ["station_type"])
  |> count()
  |> group()
_start_stop_value_station_type
2021-08-05T04:00:59.334352876Z2021-08-05T16:00:59.334352876Z107buoy
2021-08-05T04:00:59.334352876Z2021-08-05T16:00:59.334352876Z395fixed
2021-08-05T04:00:59.334352876Z2021-08-05T16:00:59.334352876Z25oilrig
2021-08-05T04:00:59.334352876Z2021-08-05T16:00:59.334352876Z73other

As in the case of mean(), because count() combines values from different columns, only columns in the group key and the _value column are retained.

As expected, for cases where the _value column does not exist in the tables to be counted, you can specify a different column to count:

from(bucket: "noaa")
  |> range(start: -12h)
  |> filter(fn: (r) => r._measurement == "ndbc")
  |> filter(fn: (r) => r._field == "wind_speed_mps")
  |> group(columns: ["station_type"])
  |> rename(columns: {"_value":"windspeed"})
  |> count(column: "windspeed")
  |> group()

Aggregates and Selectors

While all transformations that summarize your data typically refered to as “aggregations” in Flux vernacular there are actually two types of aggregates:

  1. Aggregates: These functions return a single row output for every input table. The output also has the same group key as the input table(s)–the _time column is usually dropped. Aggregates include but are not limited to the following functions:
  2. Selectors: These functions return a one ore more rows for every input table. The output is an unmodified record–the _time column is typically included. Aggregates include but are not limited to the following functions:

Yielding

The yield() function determines which table inputs should be returned in a flux script. The yield() function also assigns a name to the output of a Flux query. The name is stored in the default annotation.

For example if we query the following table:

_measurementtag1_field_value_time
Measurement1tagvalue1field11i2021-09-17T21:22:52.00Z

Without the yield function:

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.00Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

The following Annotated CSV output is returned. Notice the default annotation is set to _results by default.

#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,_results,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1

Now if we add the yield() function:

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name: "myFluxQuery") 

The following Annotated CSV output is returned. Notice the default annotation has been changed to myFluxQuery.

#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,myFluxQuery,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1

The yield() function is important because invoking multiple yield() functions allows you to return multiple table streams from a single Flux script simultaneously.

Returning multiple aggregations with multiple yield() functions

Imagine that you want to return the min(), max(), and mean() values of a single table:

_measurement_field_value_time
measurement1field11.0rfc3339time1
measurement1field12.0rfc3339time2
measurement1field14.0rfc3339time3
measurement1field15.0rfc3339time4

We’ll use this meta syntactic example a lot. If you want to try following the solutions out for yourself, include the following Flux at the top of your script to produce the table above: import "array"

import "experimental"

rfc3339time1 = experimental.subDuration(d: -1m, from: now())
rfc3339time2 = experimental.subDuration(d: -2m, from: now())
rfc3339time3 = experimental.subDuration(d: -3m, from: now())
rfc3339time4 = experimental.subDuration(d: -4m, from: now())

data = array.from(rows: [
{_time: rfc3339time1, _value: 1.0, _field: "field1", _measurement: "measurement1"},
{_time: rfc3339time2, _value: 2.0, _field: "field1", _measurement: "measurement1"},
{_time: rfc3339time3, _value: 4.0, _field: "field1", _measurement: "measurement1"},
{_time: rfc3339time4, _value: 5.0, _field: "field1", _measurement: "measurement1"}])
|> yield(name: "metasyntaticExample")

New Flux users, especially those from a SQL or InfluxQL background have the inclination to run the following Flux query:

data
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> max()
|> mean()

This is because they’re accustomed to being able to perform SELECT min("field1"), max("field1"), mean("field1"). However, the Flux query above would actually just return the min value. Flux is pipe forwarded, so you must use multiple yield() functions to return the min, max, and mean together:

data
|> filter(fn: (r) => r["_measurement"] == "measurement1" and r["_field"] == "field1" )
|> min()
|> yield(name: "min") 

data
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> max()
|> yield(name: "max") 

data
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> mean()
|> yield(name: "mean")

The above script would result in three tables:

Result: min

_measurement_field_value_time
measurement1field11.0rfc3339time1

Result: max

_measurement_field_value_time
measurement1field15.0rfc339time4

Result: mean

_measurement_field_value
measurement1field13.0

An Aside: Remember that the mean() function doesn’t return a timestamp column because it’s an aggregator. There isn’t a timestamp associated with the mean value.

Using variables to perform multiple aggregations

While the Flux query above will yield all three transformations, it’s not an efficient query because you’re querying for the entire dataset multiple times. Instead store the base query in a variable and reference it like so:

data = from(bucket: "bucket1")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

data_min = data
|> min()
|> yield(name: "min") 

data_max = data
|> max()
|> yield(name: "max") 

data_mean = data
|> mean()
|> yield(name: "mean")

Important Note: Make sure not to name your variables the same as function names to avoid naming conflicts.

Pivoting

The pivot() function rotates column values into rows in a table. The most common use case for pivoting() data is for when users want to perform math across fields at the same timestamp.

The pivot() function has 3 input parameters:

  1. rowKey: the list of columns that determines the row output
  2. columnKey: the list of columns that determines the column output
  3. valueColumn: the column from which the column values populate the cells of the pivoted table

Given the following input data:

Not in Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_field_value_time
0measurement1field11.0rfc3339time1
0measurement1field12.0rfc3339time2
Not in Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group Key
table_measurement_field_value_time
1measurement1field23.0rfc3339time1
1measurement1field24.0rfc3339time2

We perform the following pivot:

data
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
Not in Group KeyIn Group KeyNot in Group KeyNot in Group KeyNot in Group Key
table_measurementfield2field1_time
0measurement13.01.0rfc3339time1
0measurement14.02.0rfc3339time2

Oftentimes users also want to pivot() on tags to compare a single field across multiple tags. For instance if a user wanted to calculate the difference between the last temperature value across two sensors from the Air Sensor sample dataset, they could uses the following query:

from(bucket: "Air sensor sample dataset")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "airSensors")
|> filter(fn: (r) => r["_field"] == "co"
|> filter(fn: (r) => r["sensor_id"] == "TLM0100" or r["sensor_id"] == "TLM0101")
// the limit function is used to return the first two records in each table stream
|> limit(n:2)
|> yield(name: "before pivot") 
|> pivot(rowKey:["_time"], columnKey: ["sensor_id"], valueColumn: "_value")
|> yield(name: "after pivot")

Where the first yield returns the “before pivot” result:

Not in Group KeyNot in Group KeyIn Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_field_valuesensor_id_time
0airSensorsco0.4901148636678805TLM0100rfc3339time1
0airSensorsco0.4850389571399865TLM0100rfc3339time2
Not in Group KeyNot in Group KeyIn Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_field_valuesensor_id_time
1airSensorsco0.48242588117742446TLM0101rfc3339time1
1airSensorsco0.47503934770988365TLM0101rfc3339time2

Where the second yield() returns the “after pivot” result:

Not in Group KeyNot in Group KeyIn Group KeyNot in Group KeyNot in Group KeyNot in Group Key
table_measurement_fieldTLM0101TLM0100_time
0airSensorsco0.482425881177424460.4901148636678805rfc3339time1
0airSensorsco0.475039347709883650.4850389571399865rfc3339time2

You can also pivot on multiple columns. This allows you to include values across fields and tags within the same record in a table. Let’s take the previous example but this time we filter for two fields instead of one and pivot on both the sensor_id and field:

from(bucket: "Air sensor sample dataset")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "airSensors")
|> filter(fn: (r) => r["_field"] == "co" or r["_field"] == "temperature"
|> filter(fn: (r) => r["sensor_id"] == "TLM0100" or r["sensor_id"] == "TLM0101")
|> yield(name: "before pivot on two fields and sensors") 
|> pivot(rowKey:["_time"], columnKey: ["sensor_id","_field"], valueColumn: "_value")
|> yield(name: "after pivot before pivot on two fields and sensors")

Where the first yield returns the “before pivot on two fields and sensors” result:

Not in Group KeyNot in Group KeyIn Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_field_valuesensor_id_time
0airSensorsco0.4901148636678805TLM0100rfc3339time1
0airSensorsco0.4850389571399865TLM0100rfc3339time2
Not in Group KeyNot in Group KeyIn Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_field_valuesensor_id_time
1airSensorsco0.48242588117742446TLM0101rfc3339time1
1airSensorsco0.47503934770988365TLM0101rfc3339time2
Not in Group KeyNot in Group KeyIn Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_field_valuesensor_id_time
2airSensorstemperature71.21039164125095TLM0100rfc3339time1
2airSensorstemperature71.24535411172452TLM0100rfc3339time2
Not in Group KeyNot in Group KeyIn Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_field_valuesensor_id_time
3airSensorstemperature71.83744572272158TLM0101rfc3339time1
3airSensorstemperature71.85395748942119TLM0101rfc3339time2

Where the second yield() returns the “after pivot before pivot on two fields and sensors” result:

Not in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group KeyNot in Group Key
table_measurementTLM0100_coTLM0101_coTLM0100_temperatureTLM0101_temperature_time
0airSensors0.49011486366788050.4824258811774244671.2103916412509571.83744572272158rfc3339time1
0airSensors0.48503895713998650.4750393477098836571.2453541117245271.85395748942119rfc3339time2

The fieldsAsCol() function

Pivoting fields on the timestamp column, as described in the first pivoting example, is the most common type of pivoting. Users frequently expect that their data be presented in that way, where the column name contains the field key and the field values are in that column. This application of the pivot() function is so commonly used that the schema.fieldsAsCols() function was created. This function works identically to:

|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

Mapping

The map() function is an extremely powerful tool. It applies a function to each record in the table. Use the map() function to:

  • Perform a transformation on values in a column and replace the original values transformation.
  • Add new columns to store the transformations or new data.
  • Conditionally transform records with conditional query logic within the map function.
  • Change the types of values in a column.

For this section we’ll use the map() function to transform the following data from the Air Sensor sample dataset:

data = from(bucket: "Air sensor sample dataset")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "airSensors")
|> filter(fn: (r) => r["_field"] == "co")
|> filter(fn: (r) => r["sensor_id"] == "TLM0100")
|> yield(name:"map")
Not in Group KeyNot in Group KeyIn Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_field_valuesensor_id_time
0airSensorsco0.4901148636678805TLM0100rfc3339time1
0airSensorsco0.4850389571399865TLM0100rfc3339time2

In place transformation

The map() function requires a single input parameter:

  • fn: The function to apply to each record in the table stream.

To perform an in-column transformation make sure to reuse a column name in the function. For example, imagine that our TM0100 sensor is faulty and consistently off by 0.02 ppm. We can add 0.02 to every record in the _value column in our data with the map function:

data
|> map(fn: (r) => ({ r with _value: r._value + 0.02}))

Which yields the following result:

Not in Group KeyIn Group KeyIn Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_field_valuesensor_id_time
0airSensorsco0.5101148636678805TLM0100rfc3339time1
0airSensorsco0.5050389571399865TLM0100rfc3339time2

In other words, the with operator updates a column if that column already exists.

New column(s)

You can use the map function to add new columns to your data. For example we could perform the following column to add a new column with the adjustment value and then calculate the true value with the map() function:

data
|> map(fn: (r) => ({ r with adjustment: 0.02 , trueValue: r._value + r.adjustment})) 

Which yields the following result:

Not in Group KeyIn Group KeyIn Group KeyIn Group KeyNot in Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_fieldadjustment_valuetrueValuesensor_id_time
0airSensorsco0.020.51011486366788050.5301148636678805TLM0100rfc3339time1
0airSensorsco0.020.50503895713998650.5070389571399865TLM0100rfc3339time2

In other words, the with operator creates a new column if one doesn’t already exist. You can also add new columns with the map() function without the with operator. However, when you use the map() function in this way you drop all of the columns that aren’t explicitly mapped. For example, the following query:

data
|> map(fn: (r) => ({adjustment: 0.02, _time:r._time})) 

Yields the following result:

Not in Group KeyIn Group KeyNot in Group Key
tableadjustment_time
00.02rfc3339time1
00.02rfc3339time2

Note: You can also use the set() function to create a new column with a string value.

Conditionally transform data

Conditionally transforming data with the map() function is an especially useful feature. This combination unlocks another level of sophisticated transformation work. A common use for conditional mapping is to assign conditions or state to numeric values. This is especially common for users who want to create custom checks. Suppose that any co value greater than 0.49 is concerning and and value below that is normal, then we can write the following query to summarize that behaviour in a new tag or column with conditional mapping: data

|> map(fn: (r) => ({r with level:
      if r._value >= 0.49 then "warn"
      else "normal"
    })
  )

The query above yields the following result:

table_measurement_field_valuelevelsensor_id_time
0airSensorsco0.4901148636678805warningTLM0100rfc3339time1
0airSensorsco0.4850389571399865normalTLM0100rfc3339time2

Changing types

Changing data types is useful for a variety of situations including:

  • Performing math across fields with different data types with the map() function
  • To address some of the challenges around grouping data with different datatypes
  • Preparing data for further transformation work both with Flux and outside of InfluxDB

If we wanted to change the our data from a float to an integer we would perform the following query:

data 
|> map(fn: (r) => ({ r with _value: int(v: r._value)})) 
Not in Group KeyIn Group KeyIn Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_field_valuesensor_id_time
0airSensorsco0TLM0100rfc3339time1
0airSensorsco0TLM0100rfc3339time2

Note: you can also use the toFloat(), toInt(), and toString() function to convert values in the _value column to a float, integer, and string respectively. However, the map() function allows you to convert any column you like. You might also want to use the map() function to conditionally convert types when querying for multiple fields.

The rows.map() function

The rows.map() function is a simplified version of the map() function. It is much more efficient but also more limited than the map() function. Remember the map() function can modify group keys. However, the rows.map() function cannot. Attempts to modify columns in the group key are ignored. For example, if we tried to change the measurement name with the rows.map() function it would be unsuccessful. However we could adjust the field value like before:

data
|> rows.map(fn: (r) => ({r with _measurement: "in group key so it's ignored"}))
|> rows.map(fn: (r) => ({ r with r._value: r._value + 0.02}))

Which yields the following result:

Not in Group KeyIn Group KeyIn Group KeyNot in Group KeyIn Group KeyNot in Group Key
table_measurement_field_valuesensor_id_time
0airSensorsco0.5101148636678805TLM0100rfc3339time1
0airSensorsco0.5050389571399865TLM0100rfc3339time2

Returning values and arrays

Sometimes users need to be able to query their data, obtain a value or array of values, and then incorporate those values in subsequent transformation work. The findRecord() and findColumns() functions allow you to return individual records and columns, respectively.

Returning records

The findRecord() function requires two input parameters:

  • fn: The predicate function for returning the table with matching keys, provided by the user.
  • idx: The index of the record you want to extract.

The easiest way to use the fromRecord() function is to query our data so that you have only one row in your output that contains the scalar value you want to extract. This way you can just set the fn parameter to true and idx to 0.

data = from(buket : "bucket1")
	|> range(start: 0)
	|> filter(fn:(r) => r._measurement == "measurement1" and r._field =  "field1")
	|> yield(name: "data")
	
meanRecord = data
|> mean() 
|> findRecord(fn: (key) => true,
      	      idx: 0)

data |> map(fn: (r) => ({ value_mult_by_mean: r._value * meanRecord._value }))
     |> yield(name: "final result")

Given that the first yield() function returns “data”:

_measurement_field_value_time
measurement1field11.0rfc339time1
measurement1field12.0rfc339time2
measurement1field14.0rfc339time3
measurement1field15.0rfc339time4

Then meanRecord._value = 4.0. Therefore the second yield() function returns “final result”:

_measurement_field_valuevalue_mult_by_mean_time
measurement1field11.04.0rfc339time1
measurement1field12.08.0rfc339time2
measurement1field14.016.0rfc339time3
measurement1field15.020.0rfc339time4

To illustrate how to use fromRecord() let’s use the Air Sensor sample dataset to calculate the water vapour pressure from one sensor with the mean temperature. The equation for the water vapour pressure is:

water vapour pressure = humidity * ( gas constant * temperature/ molecular weight of water).

For this example, we’ll incorporate the following hypothetical assumption: we want to use the mean temperature instead of the actual temperature because our temperature sensors are faulty. Let’s also assume that the temperature and humidity values are in the correct units for simplicity.

Therefore, we can calculate the water vapour pressure with the following Flux:

data = from(bucket: "Air sensor sample dataset")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "airSensors")
|> filter(fn: (r) => r["sensor_id"] == "TLM0100")
|> limit(n:5) 

meanRecord = data
|> filter(fn: (r) => r["_field"] == "temperature")
|> yield(name:"raw temperature")
|> mean()
|> findRecord(fn: (key) => true, idx: 0)

data
|> filter(fn: (r) => r["_field"] == "humidity")
|>  map(fn: (r) => ({ r with mean_record: meanRecord._value}))
|> map(fn: (r) => ({ r with water_vapor_pressure: r._value * (8.31 * meanRecord._value / 18.02)}))
|> yield(name:"final result")

Where the output of the first yield() function returns the “raw temperature”:

_measurement_field_valuesensor_id_time
airSensortemperature71.18548279203421TLM0100rfc3339time1
airSensortemperature71.22676508109254TLM0100rfc3339time2
airSensortemperature71.27370100659799TLM0100rfc3339time3
airSensortemperature71.28825526616907TLM0100rfc3339time4
airSensortemperature71.25024765248021TLM0100rfc3339time5

And the output of the second yield() function returns the “final result”:

_measurement_field_valuesensor_idmean_recordwater_vapor_pressure_time
airSensortemperature71.18548279203421TLM010071.24489035967481153.9546087866322rfc3339time1
airSensortemperature71.22676508109254TLM010071.24489035967481153.9546087866322rfc3339time2
airSensortemperature71.27370100659799TLM010071.24489035967481153.9546087866322rfc3339time3
airSensortemperature71.28825526616907TLM010071.24489035967481153.9546087866322rfc3339time4
airSensortemperature71.25024765248021TLM010071.24489035967481153.9546087866322rfc3339time5

Another common use for the findRecord() function is extracting a timestamp at the time of an event (or when some of your data meets a certain condition) and then using that timestamp to query for other data at the time of the event. For example, we can query for humidity from one sensor in the Air Sensor sample dataset after the first time the temperature exceeded 72.2 degrees.

data = from(bucket: "Air sensor sample dataset")
  |> range(start: 0)
  |> filter(fn: (r) => r["_measurement"] == "airSensors")
  |> filter(fn: (r) => r["sensor_id"] == "TLM0101")

tempTime = data 
  |> filter(fn: (r) => r["_field"] == "temperature")
  |> filter(fn: (r) => r["_value"] >= 72.2)
  |> findRecord(fn: (key) => true, idx: 0)

data 
|> range(start: tempTime._time) 
|> filter(fn: (r) => r["_field"] == "humidity") 

This example brings up two other interesting points about the range() and filter() function:

  1. You can use the range() function multiple times within the same query to further reduce the output of your query.
  2. You can also further limit the response to within a specific time range with the filter() function instead of using range twice. In other words we could have replaced the last three lines with:
data 
|> filter(fn: (r) => r["_field"] >= tempTime._time) 
|> filter(fn: (r) => r["_field"] == "humidity") 

Returning columns

You can also return entire arrays that contain the values from a single column with Flux with the findColumn() function. The findColumn() function is similar to the findRecord() function and requires the following two input parameters:

  • fn: The predicate function for returning the table with matching keys, provided by the user.
  • column: The column of the records you want to extract in an array.

Let’s replace the findRecord() function from the last example in the previous section, Returning records, with findColumn().

data = from(bucket: "Air sensor sample dataset")
  |> range(start: 0)
  |> filter(fn: (r) => r["_measurement"] == "airSensors")
  |> filter(fn: (r) => r["sensor_id"] == "TLM0101")

tempTime = data 
  |> filter(fn: (r) => r["_field"] == "temperature")
  |> filter(fn: (r) => r["_value"] >= 72.2)
  |> findColumn(fn: (key) => true, column: "_time")

data 
|> range(start: tempTime[0]) 
|> filter(fn: (r) => r["_field"] == "humidity") 

Reducing

The reduce() function is used to perform custom aggregations. The reduce() function takes two parameters:

  1. fn: the reducer function, where you define the function that you want to apply to each record in the table with the identity.
  2. identity: where you define the initial values when creating a reducer function.

For this section we’ll use the following data:

In Group KeyIn Group KeyNot In Group KeyNot In Group Key
_measurement_field_value_time
measurement1field11.0rcc3339time1
measurement1field12.0rcc3339time2
measurement1field14.0rcc3339time3
measurement1field15.0rcc3339time4

Here is a simple example of how to uses the reduce() function to calculate the sum of the values:

data = from(bucket: "bucket1")
|> range(start:0)
|> filter(fn: (r) => r["_measurement"] == "measurement1" and r["_field"] == "field1" )

data
|> reduce(
        fn: (r, accumulator) => ({
          sum: r._value + accumulator.sum
        }),
        identity: {sum: 0.0}
    )
|> yield(name: "sum_reduce") 

The sum identity is initialized at 0.0. The reducer function takes the accumulator.sum and adds it to the field value in each record. The output of the reducer function is given back as the input into the accumulator.sum.

The Flux above yields following result: \

In Group KeyIn Group KeyNot In Group Key
_measurement_fieldsum
measurement1field112.0

Only columns that are part of the group key are included in the output of the reduce() function.

To further understand the reduce() function, let’s calculate the min(), max(), and mean() simultaneously with the reduce() function.

data
|> reduce(
      identity: {count: 0.0, sum: 0.0, min: 0.0, max: 0.0, mean: 0.0},
      fn: (r, accumulator) => ({
        count: accumulator.count + 1.0,
        sum: r._value + accumulator.sum,
        min: if accumulator.count == 0.0 then r._value else if r._value < accumulator.min then r._value else accumulator.min,
        max: if accumulator.count == 0.0 then r._value else if r._value > accumulator.max then r._value else accumulator.max,
        mean: (r._value + accumulator.sum) / (accumulator.count + 1.0)
      })
    )
|> yield(name: "min_max_mean_reduce") 

The Flux above yields following result:

In Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group Key
_measurement_fieldcountsumminmaxmean
measurement1field14.012.01.05.03.0

Generally, the reduce() function isn’t more performant than built-in aggregators and selectors. Therefore, you shouldn’t use the query above to calculate the min, max, and mean. Instead, store your data in a variable and apply the min(), max(), and mean() functions separately with corresponding yield() functions to simultaneously deliver the results, as described previously in the Yielding section.

The reducer() function is intended to be used to apply custom aggregations. For example, the following example uses the reducer() function to find the necessary variables used to calculate the slope and y-intercept for linear regression:

 |> reduce(
            fn: (r, accumulator) => ({
                sx: r.x + accumulator.sx,
                sy: r.y + accumulator.sy,
                N: accumulator.N + 1.0,
                sxy: r.x * r.y + accumulator.sxy,
                sxx: r.x * r.x + accumulator.sxx,
            }),
            identity: {
                sxy: 0.0,
                sx: 0.0,
                sy: 0.0,
                sxx: 0.0,
                N: 0.0,
            },
        )

Where…

sx is the sum of the index or independent variable.

sy is the sum of the dependent variable.

`N` is the index.


`sxy` is the sum of the multiple of the independent and dependent variables.


`sxx` is the sum of the multiple of the independent variables. 

Important Note: the reduce() function excludes any columns that aren’t in the group key in the output.

Manipulating Time

Manipulating timestamps is critical for any time series analysis tool. Timestamp manipulation in Flux includes:

  • Converting timestamp formats
  • Calculating durations
  • Truncating or rounding timestamps
  • Shifting times
  • Other time manipulations

Converting timestamp formants

So far timestamps have been represented as the following formats:

  • Unix: 1567029600
  • RFC3339: 2019-08-28T22:00:00Z
  • Relative Duration: -1h
  • Duration: 1h

The range() function accepts all of those timestamps formats. However, the Annotated CSV output of a Flux query returns the timestamp data in RFC3339 by default. Users need to return the data in another timestamp format to avoid parsing strings for application development on top of InfluxDB.

Convert your timestamp from RFC3339 to Unix by using the uint() or int() function. Use the map() function to convert every record in your your _time column to a Unix timestamp.

data
  |> map(fn: (r) => ({ r with _time: int(v: r._time)}))

Or

data
  |> map(fn: (r) => ({ r with _time: uint(v: r._time)})

Convert your timestamp from Unix to RFC3339 by using the time() function.

data
  |> map(fn: (r) => ({ r with _time: time(v: r._time)}))

Using the Air Sensor sample dataset we can manipulate the _time column from RFC339 to Unix and back into RFC339 again, storing the results in separate columns:

from(bucket: "Air sensor sample dataset")
  |> range(start:0)
  |> filter(fn: (r) => r["_measurement"] == "airSensors")
  |> filter(fn: (r) => r["_field"] == "co")
  |> filter(fn: (r) => r["sensor_id"] == "TLM0100")
  |> map(fn: (r) => ({ r with unix_time: int(v: r._time)}))
  |> map(fn: (r) => ({ r with rfc3339_time: time(v: r._time)}))

Important Note: the time() function requires that the unix timestamp be in nanosecond precision.

Calculating durations

Converting time from RFC3339 to Unix is especially useful when you want to find the duration between two points. To calculate the duration between two data points:

  1. Convert the time to Unix timestamp
  2. Subtract the two Unix timestamps from each other
  3. Use the duration() function to convert the Unix time difference into a duration

Let’s calculate the duration between the current time and a few points from the Air Sensor sample dataset:

unix_now = uint(v:now())

from(bucket: "Air sensor sample dataset")
  |> range(start:0)
  |> filter(fn: (r) => r["_measurement"] == "airSensors")
  |> filter(fn: (r) => r["_field"] == "co")
  |> filter(fn: (r) => r["sensor_id"] == "TLM0100")
  |> limit(n:5)
  |> map(fn: (r) => ({r with duration_from_now: string(v: duration(v: unix_now - uint(v: r._time)))}))

Important Note: Flux tables don’t support the duration time format. You must use the string() function to convert the duration to a string.

It’s common for users who gather data from IoT devices at the edge to collect data for a while before pushing some of it to InfluxDB Cloud. They frequently want to include both the timestamp that the device recorded a metric and the timestamp when the data was actually written to InfluxDB Cloud. In this instance users should store the timestamp of the metric reading as a field as a string. Then they might want to find the duration between the time the sensor recorded the metric and the time the data was written to InfluxDB. Given the following data:

In Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot In Group key
_measurement_field_value_time_device_time
measurement1field11.02021-09-10T07:15:12.000Z1631812512000

You can use a combination of int(), uint(), duration(), and string() functions to:

  • Convert the _device_time from a string to an integer
  • Convert unix timestamp into nanosecond precision by multiplying by 10000
  • Convert the rfc3339 timestamp of the _time column to a unix timestamp
  • Calculate the duration and convert it to a string
data
  |> map(fn: (r) => ({ r with _device_time: int(v:r._device_time) * 1000000 }))
  |> map(fn: (r) => ({ r with duration: string(v: duration(v:uint(v:r._device_time) - uint(v: r._time)))}))
In Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot In Group keyNot In Group key
_measurement_field_value_time_device_timeduration
measurement1field11.02021-09-10T07:15:12.000Z16318125120000000006d10h

Truncating or rounding timestamps

Frequently users have data that’s irregular or recorded at different intervals. The most common reason for rounding timestamps is to either:

  1. Transform an irregular time series into a regular one. An irregular time series is data that isn’t collected at a regular interval. Event data is an example of irregular time series.
  2. Align different time series collected at different intervals so that the user can perform subsequent data transformations on top of the aligned data.

Given the following input data:

In Group KeyIn Group KeyNot In Group KeyNot In Group Key
_measurement_field_value_time
measurement1field11.02021-07-17T12:05:21
measurement1field12.02021-07-17T12:05:24
measurement1field14.02021-07-17T12:05:27
measurement1field15.02021-07-17T12:05:28

Use the truncateTimeColumn() function to to convert an irregular time series into a regular one:

data 
|> truncateTimeColumn(unit: 5s)
In Group KeyIn Group KeyNot In Group KeyNot In Group Key
_measurement_field_value_time
measurement1field11.02021-07-17T12:05:20
measurement1field12.02021-07-17T12:05:20
measurement1field14.02021-07-17T12:05:25
measurement1field15.02021-07-17T12:05:25

Truncating timestamps is similar to the section on Windowing. The window() function groups data by start and stop times. This allows you to perform aggregations across different fields or tags that have different timestamps. Similarly you can aggregate across fields by truncating timestamps to align series with different intervals. Given the following data:

Not In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_field_value_time
0measurement1field11.02021-07-17T12:05:50
0measurement1field12.02021-07-17T12:05:20
Not In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_field_value_time
1measurement1field24.02021-07-17T12:05:27
1measurement1field25.02021-07-17T12:05:45
data 
|> truncateTimeColumn(unit: 30s)
|> group(columns:["_time"])
|> sum() 
Not In Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_value_time
0measurement13.02021-07-17T12:05:00
Not In Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_value_time
1measurement19.02021-07-17T12:05:30

Shifting time

Users frequently need to shift their timestamps to convert their data to a different timezone. Given the following data:

Not In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_field_value_time
0measurement1field11.02021-07-17T08:00:00
0measurement1field12.02021-07-17T09:00:00

Use the timeShift() function to shift the data 2 hours ahead:

data 
|> timeShift(duration: 2h)
Not In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_field_value_time
0measurement1field11.02021-07-17T10:00:00
0measurement1field12.02021-07-17T11:00:00

Note: By default the timeShift() function shifts the timestamps in the _start, _stop, and _time columns.

Other time manipulations

There are several other timestamp manipulation functions to be aware of in Flux. Although we won’t go into detail about how to use them all, it’s worth being aware of them:

Regex

Using regular expressions or regex in Flux is a very powerful tool for filtering for data subsets by matching patterns. Regex is most commonly used in conjunction with functions like the filter(), map(), keep(), or drop() functions. Let’s use the Air Sensor sample dataset, to highlight how to use regex. Remember, we have the following tag and tag keys:

  • 1 tag: sensor_id
    • 8 sensor_id tag values:
      • TML0100
      • TML0101
      • TML0102
      • TML0103
      • TML0200
      • TML0201
      • TML0202
      • TML0203

If we wanted to filter for all of sensors with in the 100 range, we could uses the following query:

from(bucket: "Air sensor sample dataset")
|> range(start:0)
|> filter(fn: (r) => r["sensor_id"] =~ /TML0[1][0][0-3]$/)

Flux uses Go’s regexp package. When constructing a regex it’s a good idea to use a regex tester to make sure that your regex is returning the correct data. You can find a wide selection of regex testers online. I enjoy regex101. To increase the performance of your Flux query it’s a good idea to make your regex as specific as possible. For example, we could use the following query with bad regex instead:

from(bucket: "Air sensor sample dataset")
|> range(start:0)
|> filter(fn: (r) => r["sensor_id"] =~ /10/)

While it will work and only return data for the TML0100, TML0101, TML0102, and TML0103 sensors, it’s far less specific and efficient than our original regex. You can also use regex to filter for columns like so:

from(bucket: "Air sensor sample dataset")
|> range(start:0)
|> filter(fn: (r) => r["sensor_id"] == "TML0100")
|> filter(fn: (r) => r["_field"] == "co")
|> drop(fn: (column) => column !~ /^_.*/)

This query drops all columns that don’t start with an underscore. Since our dataset only has one tag, “sensor_id”, that’s the column that will be dropped.

The Regexp Package

Flux also has a regexp package. This package has a variety of functions that make it easy to work with regex. You can store regex as strings in InfluxDB and use the regexp.compile() function to compile the strings into regex to filter for those strings. This is especially useful if you’re using a map() function with conditional mapping. Compiling a string into a regex outside of the map() is more efficient than compiling inside of the map(). In the example below we’re evaluating whether or not the URL field values are https or http URLS.

url = regexp.compile(v: "^https" )
data
|> map(fn: (r) => ({
    r with
    isEncrypted:
      if r._value =~ url then "yes"
      else "no"
    })
  )
Not In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot In Group Key
table_measurement_field_valueisEncrypted_time
0measurement1URLhttps://fooyesrfc3339time1
0measurement1URLhttp://barnorfc3339time2

The String Package

The Flux string package has a large selection of functions that allow you to manipulate string values. With the Flux string package you can do things like:

  • Compare two strings to see if they match
  • See if one string contains characters in another string or contains a specified substring
  • Contains uppercase letters, lowercase letters, digits
  • Replace, split, or join strings
  • And much more

For example we could replace the query in The Regexp Package section with:

import "strings"

data
|> map(fn: (r) => ({
    r with
    isEncrypted: strings.containsStr(v: r._value, substr: "https")
    })
  )

Thereby returning a similar output:

Not In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot in Group KeyNot In Group Key
table_measurement_field_valueisEncrypted_time
0measurement1URLhttps://footruerfc3339time1
0measurement1URLhttp://barfalserfc3339time2

Combining Data Streams

A data stream is the output from a singular yield() function. A single table stream contains one or more tables. There are two primary ways that users can combine data streams together:

  1. Joining allows you to perform an inner join on two data streams. Performing a join expands the width of the data.
  2. Unioning allows you to concatenate two or more streams into a single output stream. Performing a join expands the height of the data.

Join

Joining merges two input streams into a single output stream based on columns with equal values. There are two Flux functions for joining data:

  1. join(): The join() function takes the two data streams as input parameters and returns a joined table stream.
  2. experimental.join(): The experimental.join() function is a more performant version of the join() function.

Joining your data results in a table stream output with an increased width.

Math across measurements

The most common reason for joining data is to perform math across measurements. To illustrate how to perform math across measurements, imagine the following scenario:

You are an operator at a chemical plant, and you need to monitor the temperatures of a counter-current heat exchanger. You collect temperatures of the cold (TC) and hot (TH) streams from four different temperature sensors. There are two inlet (Tc2, Th1) sensors and two outlet (Tc1, Th2) sensors at positions x1 and x2 respectively.

heat exchanger

After making some assumptions, you can calculate the efficiency of heat transfer with this formula:

formula

Where…

  • ɳ is the efficiency of the heat transfer
  • Tc2 is the the temperature of the cold stream at position x2.
  • Tc1 is the temperature of the cold stream at position x1.
  • Th1 is the the temperature of the hot stream at position x1.
  • Th2 is the temperature of the hot stream at position x2.

You collect temperature reading from each sensor at 2 different times for a total of 8 points with the following schema:

  • 1 bucket: sensors
  • 4 measurements: Tc1, Tc2, Th1, Th2
  • 1 Field: temperature

Since the temperature readings are stored in different measurements, you need to join the data in order to calculate the efficiency.

First, I want to gather the temperature readings for each sensor. I start with Th1. I need to prepare the data. I drop the “_start” and “_stop” columns because I’m not performing any group by’s or windowing. Dropping these columns is by no means necessary, it just simplifies the example. I will just be performing math across values on identical timestamps, so I keep the “_time” column.

Th1 = from(bucket: "sensors")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "Th1" and r._field == "temperature")
  |> yield(name: "Th1")
Not In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_field_value_time
0Th1temperature80.90rfc3339time1
0Th1temperature81.00rfc3339time2
Th2 = from(bucket: "sensors")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "Th2" and r._field == "temperature")
  |> yield(name: "Th2")
Not In Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_field_value_time
0Th2temperature70.2rfc3339time1
0Th2temperature71.6rfc3339time2

Next, join the two tables.

TH = join(tables: {Th1: Th1, Th2: Th2}, on: ["_time","_field"])
Not In Group KeyIn Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot In Group Key
table_measurement_Th1_measurement_Th2_field_value_Th1_value_Th2_time
0Th1Th2temperature80.9070.2rfc3339time1
0Th1Th2temperature81.0071.6rfc3339time2

The join() function takes a key table pair as input to the tables parameter and column names to the on parameter. The join() function only executes inner joins and joins all columns with equal values. The _time and _field columns have equal values where the _value and _measuremnt columns do not. The table key is appended to the column name to trace like columns with different values back to their input table. Any columns that aren’t included in the on parameter won’t be joined.

Next, apply this logic to the cold stream as well:

TC = join(tables: {Tc1: Tc1, Tc2: Tc2}, on: ["_time"])
Not In Group KeyIn Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot In Group Key
table_measurement_Tc1_measurement_Tc2_field_value_Tc1_value_Tc2_time
0Tc1Tc2temperature50.5060.3rfc3339time1
0Tc1Tc2temperature51.0059.3rfc3339time2

Next, join TC with TH and calculate the efficiency. For the sake of simplicity we’ll drop the measurement columns as well.

THTC = join(tables: {TH: TH, TC: TC}, on: ["_time"])
|> drop( columns: ["_measurement_Th1","_measurement_Th2","_measurement_Tc1","_measurement_Tc2"])
|> yield(name: "TCTH")
Not In Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group Key
table_field_value_Th1_value_Th2_value_Tc1_value_Tc2_time
0temperature80.9070.250.5060.3rfc3339time1
0temperature81.0071.651.0059.3rfc3339time2

Finally, I can use the map() to calculate the efficiency across all of the measurements. This is what the code looks like all together:

TCTH
|> map(fn: (r) => (r with efficiency: r._value_Tc2 - r._value_Tc1)/(r._value_Th1 - r._value_Th2)*100)
|> yield(name: "efficiency")

I can see that the heat transfer efficiency has decreased over time.

Not In Group KeyIn Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group KeyNot In Group Key
table_field_value_Th1_value_Th2_value_Tc1_value_Tc2efficiency_time
0temperature80.9070.250.5060.392rfc3339time1
0temperature81.0071.651.0059.388rfc3339time2

Union

The union() function allows you to combine one more table stream which results in a table stream output with an increased table length. Union is frequently used to:

  • Merge data across measurements or tags.
  • Merge transformed data with the original data.
  • Merge data with different time ranges to make data continuous.

For example imagine we had the following data:

Not in Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_field_value_time
0measurement1field11.0rcc3339time1
0measurement1field12.0rcc3339time2
Not in Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_field_value_time
0measurement2field24.0rcc3339time1
0measurement2field25.0rcc3339time2
Not in Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_field_value_time
0measurement3field33.0rcc3339time1
0measurement3field37.0rcc3339time2

For example we could uses array.from() to construct that example:
import "experimental"

import "array"

rfc3339time1 = experimental.subDuration(d: -1m, from: now())
rfc3339time2 = experimental.subDuration(d: -2m, from: now())

data1 = array.from(rows: [
{_time: rfc3339time1, _value: 1.0, _field: "field1", _measurement: "measurement1"},
{_time: rfc3339time2, _value: 2.0, _field: "field1", _measurement: "measurement1"}])

data2 = array.from(rows: [{_time: rfc3339time1, _value: 4.0, _field: "field2", _measurement: "measurement2"},
{_time: rfc3339time2, _value: 5.0, _field: "field2", _measurement: "measurement2"}])

data3 = array.from(rows: [{_time: rfc3339time1, _value: 4.0, _field: "field3", _measurement: "measurement3"},
{_time: rfc3339time2, _value: 5.0, _field: "field3", _measurement: "measurement3"}])

Now we might use union() to combine the three table streams together and pivot on the field and measurement:

union(tables: [data1, data2, data3])
|> yield(name:"after union")
|> pivot(rowKey:["_time"], columnKey: ["_field", "_measurement"], valueColumn: "_value")
|> yield(name:"after pivot")

Where the first yield() function returns “after union”:

Not in Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_measurement_field_value_time
0measurement1field11.0rcc3339time1
0measurement1field12.0rcc3339time2
0measurement2field24.0rcc3339time1
0measurement2field25.0rcc3339time2
0measurement3field33.0rcc3339time1
0measurement3field37.0rcc3339time2

The second yield() function returns “after pivot”

Not in Group KeyIn Group KeyIn Group KeyNot In Group KeyNot In Group Key
table_field1_measurement1_field2_measurement2_field3_measurement3_time
01.04.03.0rcc3339time1
02.05.07.0rcc3339time2

Using union() and pivot in this way allows you to achieve a result similar to using a join() function. However, unlike the join() function, the union() function allows you to combine more than two tables together.

Accessing External Data Sources

You can use Flux to bring in data from a variety of other sources including SQL databases, other InfluxDB Cloud Accounts, Annotated CSV from a URL, and JSON.

The Flux SQL package

You can use the Flux SQL package to query and write to a variety of SQL data source including:

  • Amazon RDS
  • Athena
  • Google BigQuery
  • CockroachDB
  • MariaDB
  • MySQL
  • Percona
  • PostgreSQL
  • SAP HANA
  • Snowflake
  • Microsoft SQL Server
  • SQLite

Use the sql.from() function to query a SQL source. For example, to query a local Postgres instance use the following Flux query: import "sql"

sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://user:password@localhost",
  query:"SELECT * FROM TestTable"
)

Use the sql.to() function to write data to SQL database. For, example to write data to a local MySQL instance use the following Flux query:

import "sql"
data
|> sql.to(
  driverName: "mysql",
  dataSourceName: "username:password@tcp(localhost:3306)/dbname?param=value",
  table: "example_table",
  batchSize: 10000
)

Kee the following data requirements in mind when using the sql.to() function:

  • Data in the steam must have the same column names as your SQL database. Use a combination of drop(), keep(), map(), and rename() to prepare your data before using the sql.to() function.
  • Remember your SQL schema rules. All data that doesn’t conform to your SQL schema rules will be dropped. Use the map() function to conform data to our SQL schema rules.

CSV

You can use Flux to import a Raw CSV or Annotated CSV from a URL (or from a local file) with the csv.from() functions. There are two csv.from() functions:

  1. csv.from() from the Flux experimental CSV package which supports Annotated CSV
  2. csv.from() from stdlib which supports Annotated or Raw CSV

experimental csv.from()

Use the csv.from() function from the Flux experimental CSV package to retrieve an Annotated CSV from a URL. For example the NOAA water sample data pulls data from an Annotated CSV:

import "experimental/csv"

csv.from(url: "https://influx-testdata.s3.amazonaws.com/noaa.csv")

Note: You can also upload Annotated CSV from a local file with the csv.from() function stdlib with the Flux REPL. You need to build the Flux REPL from source and use it to access your local file system. This version of csv.from() also returns a stream of tables from Annotated CSV stored in a Flux variable.

csv.from()

Use the csv.from() function from stdlib to retrieve a Raw CSV from a URL. For example you can use the csv.from() function to parse CSV data from API and write it to InfluxDB in a task. A great example of this can be found in the Earthquake Feed Ingestion task from the Earthquake Command Center Community Template. Here is the relevant Flux from that task:

onedayago = strings.trimSuffix(v: string(v: date.truncate(t: experimental.subDuration(d: 1d, from: now()), unit: 1m)), suffix: ".000000000Z")
csv_data_url = "https://earthquake.usgs.gov/fdsnws/event/1/query?format=csv&starttime=" + onedayago + "&includedeleted=true&orderby=time-asc"
csv_data = string(v: http.get(url: csv_data_url).body)
states = ["Alaska", "California", "CA", "Hawaii", "Idaho", "Kansas", "New Mexico", "Nevada", "North Carolina", "Oklahoma", "Oregon", "Washington", "Utah"]
countries_dictionary = dict.fromList(pairs: [{key: "MX", value: "Mexico"}])

csv.from(csv: csv_data, mode: "raw")

First the user builds their URL. Since this is a task, or a Flux script that’s executed on a schedule, the user wants to build their URL with a dynamic starttime value. They use the experimental.Subduration() function to get the timestamp from -1d. Then they truncate the timestamp with date.truncate() to round the timestamp down to the last minute or ".000000000Z" . The string() function is used to convert the timestamp into a string and the strings.trimSuffix() function removes the subseconds to format the starttime into the required format as specified by the USGS Earthquake API. Next they use the http.get() function to submit an HTTP GET request to the USGS Earthquake API. Finally they use the csv.from() function to parse the CSV.

To learn about how to install a Community Template, please look at the

JSON

Use the json.parse() function from the Flux experimental JSON package to return values from a JSON. Like the example above, you can also use json.parse() with http.get() to parse a HTTP GET JSON response and convert it to a Flux table:

import "array"
import "experimental/json"
import "experimental/http"
resp = http.get(url: "https://api.openweathermap.org/data/2.5/weather?q=London,uk&APPID=0xx2")
jsonData = json.parse(data: resp.body)
array.from(rows: [{_time: now(), _value: float(v:jsonData.main.temp)}])
|> yield()

Which produces the following table:

Not in Group KeyNot Iin Group KeyNot In Group Key
table_value_time
0285.33rcc3339time1

Where the OpenWeatherMap current weather data API yields the following HTTP GET JSON response:

{"coord":{"lon":-0.1257,"lat":51.5085},"weather":[{"id":801,"main":"Clouds","description":"few clouds","icon":"02n"}],"base":"stations","main":{"temp":285.33,"feels_like":284.67,"temp_min":282.94,"temp_max":287.35,"pressure":1024,"humidity":79},"visibility":10000,"wind":{"speed":2.11,"deg":254,"gust":4.63},"clouds":{"all":21},"dt":1633546918,"sys":{"type":2,"id":2019646,"country":"GB","sunrise":1633500560,"sunset":1633541256},"timezone":3600,"id":2643743,"name":"London","cod":200}

Materialized Views or Downsampling Tasks

Materialized views or downsampling is the process of converting high resolution data to lower resolution aggregates. Downsampling is an important practice in time series database management because it allows users to preserve disk space while retaining low precision trends of their data over long periods of time. Users typically apply an aggregate or selector function to their high resolution data to create a materialized view of a lower resolution summary:

To downsample the data temperature from the Air Sensor sample dataset, you might perform the following query: \

from(bucket: "airsensor")
  |> range(start: -10d)
  |> filter(fn: (r) => r["_measurement"] == "airSensors")
  |> aggregateWindow(every:1d, fn: mean, createEmpty: false)
  |> to(bucket: "airSensors_materializedView"0

Use the to() function to write the data to a destination bucket. Destination buckets usually have a longer retention policy than the source bucket to conserve on disk space. Running this query will write the materialized view to the “airSensors_materializedView” bucket once. However, users typically perform downsampling on a schedule, or a task. Using tasks to create materialized views will be covered in detail in Part 3.

Next Section

Further Reading

  1. TL;DR InfluxDB Tech Tips: Multiple Aggregations with yield() in Flux
  2. TL;DR InfluxDB Tech Tips – Aggregating across Tags or Fields and Ungrouping
  3. TL;DR InfluxDB Tech Tips: Parameterized Flux Queries with InfluxDB
  4. https://www.influxdata.com/blog/top-5-hurdles-for-intermediate-flux-users-and-resources-for-optimizing-flux/
  5. Top 5 Hurdles for Flux Beginners and Resources for Learning to Use Flux
  6. TL;DR InfluxDB Tech Tips – From Subqueries to Flux!
  7. TL;DR InfluxDB Tech Tips – How to Extract Values, Visualize Scalars, and Perform Custom Aggregations with Flux and InfluxDB
  8. TL;DR Tech Tips – How to Construct a Table with Flux
  9. Anomaly Detection with Median Absolute Deviation