Yellow Rabbit

Influxdb как механизм хранения показаний датчиков

Хранение температуры в базе данных

База данных никогда не помешает, попробуем хранить данные об измерениях температуры. Посмотрим получится ли использовать influxdb на Raspberry Pi.

Установка

На момент написания пакет dev-lang/go имел версию 1.10.1, которая не работала на ARM, так что пришлось вносить правки в конфигурационные файлы1 и устанавливать go следующим образом:


emerge -1 '=dev-lang/go-9999'
emerge dev-db/influxdb

Проверяем: Проверка установки Influxdb

Есть соблазн быстренько набросать простейшую базу, заняться интеграцией её с Kotlin, а потом с какой-либо системой для построения красивых графиков… Стоп. Мы имеем дело с базой данных, пусть для учебных целей, но сразу тренируемся защищаться от несанкционированного доступа.

Создаём суперпользователя (имя пользователя и пароль изменены):


> create user spade with password 'super-password' with all privileges;

В файле /etc/influxdb/influxd.conf правим строчку auth-enabled = true и перезапускаем демон. После этого проверяем работоспособность авторизации: Проверка авторизации

Https

Для моих целей достаточно самоподписанного сертификата (не забываем указать CN как localhost):


ROOT@pi64 ~# openssl req -x509 -nodes -newkey rsa:2048 \
  -keyout /etc/ssl/influxdb-selfsigned.key \
  -out /etc/ssl/influxdb-selfsigned.crt -days 360

Добавляем этот сертификат в хранилище Java (пароль changeit или установленный вами):


cd /etc/ssl
openssl x509 -in influxdb-selfsigned.crt -outform der -out influxdb-selfsigned.der
cd /usr/lib64/icedtea8/bin
./keytool -import -alias mykeyroot -keystore /usr/lib64/icedtea8/jre/lib/security/cacerts -file /etc/ssl/influxdb-selfsigned.der                      

Правим /etc/influxdb/influxdb.conf:


  https-enabled = true
  https-certificate = "/etc/ssl/influxdb-selfsigned.crt"
  https-private-key = "/etc/ssl/influxdb-selfsigned.key"

После перезапуска демона проверяем из-под обычного пользователя: Проверка https

Связь с Kotlin

Нужно добавить influx в зависимости в build.gradle:


dependencies {
    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
    compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.22.5'
    compile 'org.influxdb:influxdb-java:2.9'
}

Для начала крохотная программка для проверки соединения с базой данных:


import org.influxdb.InfluxDBFactory
import kotlin.system.exitProcess

fun main(args: Array<String>) {
    println("*** Raspberry Pi Influxdb ***")

    val influxDB = InfluxDBFactory.connect(DB_SERVER, DB_USER, DB_PASS)
    influxDB.run {
        println("Connected to db.")
        close()
    }

    exitProcess(0)
}

const val DB_SERVER = "https://localhost:8086"
const val DB_USER = "spade"
const val DB_PASS = "*******"

Результат: Kotlin и influx

Возвращаемся к консоли и создаём нашу базу данных и специального пользователя, который может только писать данные в базу:


rabbit@pi64 ~/src/yrabbit-java/thermdb % influx -ssl -unsafeSsl -host localhost
Connected to https://localhost:8086 version unknown
InfluxDB shell version: unknown
> auth
username: spade
password: 
> create database thermdb
> create user sensor with password '********'
> grant write on thermdb to sensor
> create user grafana with password '********'
> grant read on thermdb to grafana
> show users
user   admin
----   -----
spade  true
sensor false
grafana false
> exit

Структура базы

Измерение temps имеет очень простую структуру:

Поле/тэг Тип
sensor_id string
temp float

Устанавливаем такую политику по умолчанию, что сырые данные хранятся 2 часа, упаковываются в 15-ти минутные интервалы, хранятся месяц, упаковываются в часовые интервалы и через 4 года удаляются совсем:


> create retention policy two_hours on thermdb duration 2h replication 1 default
> create retention policy one_month on thermdb duration 4w replication 1
> create retention policy four_years on thermdb duration 208w replication 1
> create continuous query cq_15m on thermdb begin select mean(temp) as mean_temp into one_month.downsampled_temps from temps group by time(15m),* end
> create continuous query cq_4w on thermdb begin select mean(mean_temp) as mean_temp into four_years.year_temps from one_month.downsampled_temps group by time(1h),* end

в итоге имеем измерения:

Измерение Интервал данных Сколько хранятся
temps сырые данные 2 часа
downsampled_temps 15 минут месяц
year_temps 1 час четыре года

Пожалуй приведу запросы, которые упаковывают данные в более читабельном виде:


create continuous query cq_15m on thermdb
  begin 
    select mean(temp) as mean_temp 
    into one_month.downsampled_temps 
    from temps 
    group by time(15m),* 
  end

create continuous query cq_4w on thermdb 
  begin 
    select mean(mean_temp) as mean_temp
    into four_years.year_temps
    from one-month.downsampled_temps
    group by time(1h),*
  end

Пробная запись данных

Возвращаемся в Kotlin и пробуем добавить пару записей в базу данных под новым пользователем.


package io.github.yrabbit.kotlin

import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import org.influxdb.BatchOptions
import org.influxdb.InfluxDBFactory
import org.influxdb.dto.Point
import java.util.concurrent.TimeUnit
import kotlin.system.exitProcess

fun main(args: Array<String>) {
    println("*** Raspberry Pi Influxdb ***")

    val influxDB = InfluxDBFactory.connect(DB_SERVER, DB_USER, DB_PASS)
    influxDB.run {
        println("Connected to db")
        setDatabase(DB_NAME)
        setRetentionPolicy(DEFAULT_RETENTION)
        enableBatch(BatchOptions.DEFAULTS.flushDuration(FLUSH_INTERVAL))

        runBlocking {
            influxDB.write(Point.measurement(RAW_TEMP_MEASUREMENT)
                    .tag("sensor_id", "test sensor")
                    .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                    .addField("temp", 0.123)
                    .build())
            delay(1000)
            influxDB.write(Point.measurement(RAW_TEMP_MEASUREMENT)
                    .tag("sensor_id", "test sensor")
                    .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                    .addField("temp", 0.723)
                    .build())
        }
        close()
    }

    exitProcess(0)
}

const val DB_SERVER = "https://localhost:8086"
const val DB_USER = "sensor"
const val DB_PASS = "************"
const val DB_NAME = "thermdb"
const val RAW_TEMP_MEASUREMENT = "temps"
const val DEFAULT_RETENTION = "two_hours"

const val FLUSH_INTERVAL = 10 * 60 * 1000 // 10m

Сделав несколько запусков и оставив Raspberry Pi поработать некоторое время получаем:


rabbit@pi64 ~ % influx -precision rfc3339 -ssl -unsafeSsl -host localhost                       
Connected to https://localhost:8086 version unknown
InfluxDB shell version: unknown
> auth
username: spade
password: 
> use thermdb
Using database thermdb
> select * from temps
name: temps
time                     sensor_id   temp
----                     ---------   ----
2018-04-10T09:11:19.217Z test sensor 0.123
2018-04-10T09:11:20.265Z test sensor 0.723
2018-04-10T09:11:31.814Z test sensor 0.123
2018-04-10T09:11:32.856Z test sensor 0.723
2018-04-10T09:11:38.395Z test sensor 0.123
2018-04-10T09:11:39.439Z test sensor 0.723
2018-04-10T09:11:54.984Z test sensor 0.123
2018-04-10T09:11:56.026Z test sensor 0.723
...
2018-04-10T10:59:37.558Z test sensor 0.123
2018-04-10T10:59:38.609Z test sensor 0.723
2018-04-10T11:00:14.761Z test sensor 0.123
2018-04-10T11:00:15.804Z test sensor 0.723
> select * from one_month.downsampled_temps
name: downsampled_temps
time                 mean_temp           sensor_id
----                 ---------           ---------
2018-04-10T09:00:00Z 0.42299999999999993 test sensor
2018-04-10T09:15:00Z 0.42300000000000004 test sensor
2018-04-10T09:30:00Z 0.42300000000000004 test sensor
2018-04-10T09:45:00Z 0.423               test sensor
2018-04-10T10:00:00Z 0.42300000000000004 test sensor
2018-04-10T10:15:00Z 0.42299999999999993 test sensor
2018-04-10T10:30:00Z 0.42300000000000004 test sensor
2018-04-10T10:45:00Z 0.423               test sensor
> select * from four_years.year_temps
name: year_temps
time                 mean_temp           sensor_id
----                 ---------           ---------
2018-04-10T10:00:00Z 0.42300000000000004 test sensor
>

Отлично! Просматриваются непериодические сырые данные, первое и второе упорядочивания. Далее приведены данные на следующее утро: сырые данные уже уничтожены, сформированы 15-и минутные и часовые показания.


> select * from temps
> select * from one_month.downsampled_temps
name: downsampled_temps
time                 mean_temp           sensor_id
----                 ---------           ---------
2018-04-10T09:00:00Z 0.42299999999999993 test sensor
2018-04-10T09:15:00Z 0.42300000000000004 test sensor
2018-04-10T09:30:00Z 0.42300000000000004 test sensor
2018-04-10T09:45:00Z 0.423               test sensor
2018-04-10T10:00:00Z 0.42300000000000004 test sensor
2018-04-10T10:15:00Z 0.42299999999999993 test sensor
2018-04-10T10:30:00Z 0.42300000000000004 test sensor
2018-04-10T10:45:00Z 0.423               test sensor
2018-04-10T11:00:00Z 0.423               test sensor
2018-04-10T11:15:00Z 0.423               test sensor
2018-04-10T12:00:00Z 0.423               test sensor
2018-04-10T12:15:00Z 0.423               test sensor
> select * from four_years.year_temps
name: year_temps
time                 mean_temp           sensor_id
----                 ---------           ---------
2018-04-10T10:00:00Z 0.42300000000000004 test sensor
2018-04-10T11:00:00Z 0.423               test sensor
2018-04-10T12:00:00Z 0.423               test sensor
> 

Добавим в программу опрос датчиков, корректное завершение работы с базой данных и оставим поработать до завтра:smiley:


package io.github.yrabbit.kotlin

import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import org.influxdb.BatchOptions
import org.influxdb.InfluxDBFactory
import org.influxdb.dto.Point
import java.io.File
import java.nio.file.FileSystems
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import kotlin.system.exitProcess

fun main(args: Array<String>) {
    println("*** Raspberry Pi Influxdb ***")

    val influxDB = InfluxDBFactory.connect(DB_SERVER, DB_USER, DB_PASS)
    influxDB.run {
        println("Connected to db")
        // exit correctly
        Runtime.getRuntime().addShutdownHook(Thread {
            run {
                println("Finish.")
                influxDB.close()
            }
        })

        setDatabase(DB_NAME)
        setRetentionPolicy(DEFAULT_RETENTION)
        enableBatch(BatchOptions.DEFAULTS.flushDuration(FLUSH_INTERVAL))

        runBlocking {
            while (true) {
                val sensors = findSensors()
                sensors.forEach { sensor_id ->
                    val (therm, status) = readSensor(sensor_id)
                    if (status) {
                        influxDB.write(Point.measurement(RAW_TEMP_MEASUREMENT)
                                .tag("sensor_id", sensor_id)
                                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                                .addField("temp", therm)
                                .build())
                    }
                }
                delay(SENSOR_READ_INTERVAL)
            }
        }
    }

    exitProcess(0)
}

/*
 * Find all sensors in /sys/bus/w1/devices/
 */
fun findSensors(): List<String> {
    val dir = File(SENSORS_PATH)
    val fileNames = dir.list().filter { name -> name.startsWith("28-")}
    return(fileNames)
}

/*
 * Read sensor value
 * status = true -> data Ok
 * status = false -> error
 */
data class thermResult(val therm: Double, val status: Boolean)
fun readSensor(path: String): thermResult {
    var status = false
    var therm = 0.0
    try {
        val sensorData = File("$SENSORS_PATH/$path/w1_slave").readLines()
        if (sensorData.size == 2) {
            if (sensorData[0].endsWith("YES")) {
                therm =sensorData[1].takeLast(5).toDouble() * 0.001
                status = true
            }
        }
    } catch(e: Exception) {

    }

    return(thermResult(therm, status))
}

const val DB_SERVER = "https://localhost.ssl:8086"
const val DB_USER = "sensor"
const val DB_PASS = "look"
const val DB_NAME = "thermdb"
const val RAW_TEMP_MEASUREMENT = "temps"
const val DEFAULT_RETENTION = "two_hours"

const val FLUSH_INTERVAL = 10 * 60 * 1000 // 10m
const val SENSOR_READ_INTERVAL = 10 * 1000 // 10 seconds

const val SENSORS_PATH = "/sys/bus/w1/devices"

Grafana

Займемся визуализацией данных. Самый простой способ - использовать такую штуку как Grafana


~# emerge grafana-bin 

И далее следует обычная чёрная магия: установив этот пакет мы на самом деле не используем сервер из него, поскольку он всё равно не работает и разбираться почему так мне лень. Так что мы соберём свой сервер из исходников и заменим исполняемый файл. Заметим, что вся сборка идёт под обычным пользователем:


~% go get github.com/grafana/grafana
~% ~/go/src/github.com/grafana/grafana
~% go run build.go setup
~% go run build.go build
~% cp ~/go/bin/grafana-server /tmp 
~% sudo mv /tmp/grafana-server /usr/bin/
~% sudo rc-service grafana start

В браузере открываем страницу http://localhost:3000. Имя пользователя admin/admin (или то, что указано в /etc/grafana/grafana.ini2). Начальный экран

Добавляем источник данных: Добавление источника данных

Добавление источника данных

Далее добавляем панель, самую простую: Панель Панель Панель Панель

А как вам такая красота?:smiley: Панель

  1. Элементарные изменения в файлах /etc/portage/package.accept_keywords т.д. (ну старейшины Gentoo в курсе:wink:

  2. Хорошая идея изменить имя суперпользователя и пароль.