From 61aa23779790aa2b41e7caed7d9ba553331ad004 Mon Sep 17 00:00:00 2001 From: Scrin Date: Wed, 10 Jan 2024 18:38:35 +0200 Subject: [PATCH] Process measurements in parallel for InfluxDB sinks --- data_sinks/influxdb.go | 68 +++++++++++++++++++++-------------------- data_sinks/influxdb3.go | 68 +++++++++++++++++++++-------------------- 2 files changed, 70 insertions(+), 66 deletions(-) diff --git a/data_sinks/influxdb.go b/data_sinks/influxdb.go index 1a7b50e..17ee262 100644 --- a/data_sinks/influxdb.go +++ b/data_sinks/influxdb.go @@ -45,39 +45,41 @@ func InfluxDB(conf config.InfluxDBPublisher) chan<- parser.Measurement { log.WithFields(log.Fields{"mac": measurement.Mac}).Trace("Skipping InfluxDB publish due to interval limit") continue } - p := influxdb.NewPointWithMeasurement(measurementName). - AddTag("dataFormat", fmt.Sprintf("%d", measurement.DataFormat)). - AddTag("mac", strings.ReplaceAll(measurement.Mac, ":", "")) - if measurement.Name != nil { - p.AddTag("name", *measurement.Name) - } - for tag, value := range conf.AdditionalTags { - p.AddTag(tag, value) - } - addFloat(p, "temperature", measurement.Temperature) - addFloat(p, "humidity", measurement.Humidity) - addFloat(p, "pressure", measurement.Pressure) - addFloat(p, "accelerationX", measurement.AccelerationX) - addFloat(p, "accelerationY", measurement.AccelerationY) - addFloat(p, "accelerationZ", measurement.AccelerationZ) - addFloat(p, "batteryVoltage", measurement.BatteryVoltage) - addInt(p, "txPower", measurement.TxPower) - addInt(p, "rssi", measurement.Rssi) - addInt(p, "movementCounter", measurement.MovementCounter) - addInt(p, "measurementSequenceNumber", measurement.MeasurementSequenceNumber) - addFloat(p, "accelerationTotal", measurement.AccelerationTotal) - addFloat(p, "absoluteHumidity", measurement.AbsoluteHumidity) - addFloat(p, "dewPoint", measurement.DewPoint) - addFloat(p, "equilibriumVaporPressure", measurement.EquilibriumVaporPressure) - addFloat(p, "airDensity", measurement.AirDensity) - addFloat(p, "accelerationAngleFromX", measurement.AccelerationAngleFromX) - addFloat(p, "accelerationAngleFromY", measurement.AccelerationAngleFromY) - addFloat(p, "accelerationAngleFromZ", measurement.AccelerationAngleFromZ) - p.SetTime(time.Now()) - err := writeAPI.WritePoint(context.Background(), p) - if err != nil { - log.WithError(err).Error("Failed to send data to InfluxDB") - } + go func(measurement parser.Measurement) { + p := influxdb.NewPointWithMeasurement(measurementName). + AddTag("dataFormat", fmt.Sprintf("%d", measurement.DataFormat)). + AddTag("mac", strings.ReplaceAll(measurement.Mac, ":", "")) + if measurement.Name != nil { + p.AddTag("name", *measurement.Name) + } + for tag, value := range conf.AdditionalTags { + p.AddTag(tag, value) + } + addFloat(p, "temperature", measurement.Temperature) + addFloat(p, "humidity", measurement.Humidity) + addFloat(p, "pressure", measurement.Pressure) + addFloat(p, "accelerationX", measurement.AccelerationX) + addFloat(p, "accelerationY", measurement.AccelerationY) + addFloat(p, "accelerationZ", measurement.AccelerationZ) + addFloat(p, "batteryVoltage", measurement.BatteryVoltage) + addInt(p, "txPower", measurement.TxPower) + addInt(p, "rssi", measurement.Rssi) + addInt(p, "movementCounter", measurement.MovementCounter) + addInt(p, "measurementSequenceNumber", measurement.MeasurementSequenceNumber) + addFloat(p, "accelerationTotal", measurement.AccelerationTotal) + addFloat(p, "absoluteHumidity", measurement.AbsoluteHumidity) + addFloat(p, "dewPoint", measurement.DewPoint) + addFloat(p, "equilibriumVaporPressure", measurement.EquilibriumVaporPressure) + addFloat(p, "airDensity", measurement.AirDensity) + addFloat(p, "accelerationAngleFromX", measurement.AccelerationAngleFromX) + addFloat(p, "accelerationAngleFromY", measurement.AccelerationAngleFromY) + addFloat(p, "accelerationAngleFromZ", measurement.AccelerationAngleFromZ) + p.SetTime(time.Now()) + err := writeAPI.WritePoint(context.Background(), p) + if err != nil { + log.WithError(err).Error("Failed to send data to InfluxDB") + } + }(measurement) } client.Close() }() diff --git a/data_sinks/influxdb3.go b/data_sinks/influxdb3.go index b964404..9fbb5c3 100644 --- a/data_sinks/influxdb3.go +++ b/data_sinks/influxdb3.go @@ -46,39 +46,41 @@ func InfluxDB3(conf config.InfluxDB3Publisher) chan<- parser.Measurement { log.WithFields(log.Fields{"mac": measurement.Mac}).Trace("Skipping InfluxDB3 publish due to interval limit") continue } - p := influxdb3.NewPointWithMeasurement(measurementName). - AddTag("dataFormat", fmt.Sprintf("%d", measurement.DataFormat)). - AddTag("mac", strings.ReplaceAll(measurement.Mac, ":", "")) - if measurement.Name != nil { - p.AddTag("name", *measurement.Name) - } - for tag, value := range conf.AdditionalTags { - p.AddTag(tag, value) - } - influx3AddFloat(p, "temperature", measurement.Temperature) - influx3AddFloat(p, "humidity", measurement.Humidity) - influx3AddFloat(p, "pressure", measurement.Pressure) - influx3AddFloat(p, "accelerationX", measurement.AccelerationX) - influx3AddFloat(p, "accelerationY", measurement.AccelerationY) - influx3AddFloat(p, "accelerationZ", measurement.AccelerationZ) - influx3AddFloat(p, "batteryVoltage", measurement.BatteryVoltage) - influx3AddInt(p, "txPower", measurement.TxPower) - influx3AddInt(p, "rssi", measurement.Rssi) - influx3AddInt(p, "movementCounter", measurement.MovementCounter) - influx3AddInt(p, "measurementSequenceNumber", measurement.MeasurementSequenceNumber) - influx3AddFloat(p, "accelerationTotal", measurement.AccelerationTotal) - influx3AddFloat(p, "absoluteHumidity", measurement.AbsoluteHumidity) - influx3AddFloat(p, "dewPoint", measurement.DewPoint) - influx3AddFloat(p, "equilibriumVaporPressure", measurement.EquilibriumVaporPressure) - influx3AddFloat(p, "airDensity", measurement.AirDensity) - influx3AddFloat(p, "accelerationAngleFromX", measurement.AccelerationAngleFromX) - influx3AddFloat(p, "accelerationAngleFromY", measurement.AccelerationAngleFromY) - influx3AddFloat(p, "accelerationAngleFromZ", measurement.AccelerationAngleFromZ) - p.SetTimestamp(time.Now()) - err := client.WritePoints(context.Background(), p) - if err != nil { - log.WithError(err).Error("Failed to send data to InfluxDB3") - } + go func(measurement parser.Measurement) { + p := influxdb3.NewPointWithMeasurement(measurementName). + AddTag("dataFormat", fmt.Sprintf("%d", measurement.DataFormat)). + AddTag("mac", strings.ReplaceAll(measurement.Mac, ":", "")) + if measurement.Name != nil { + p.AddTag("name", *measurement.Name) + } + for tag, value := range conf.AdditionalTags { + p.AddTag(tag, value) + } + influx3AddFloat(p, "temperature", measurement.Temperature) + influx3AddFloat(p, "humidity", measurement.Humidity) + influx3AddFloat(p, "pressure", measurement.Pressure) + influx3AddFloat(p, "accelerationX", measurement.AccelerationX) + influx3AddFloat(p, "accelerationY", measurement.AccelerationY) + influx3AddFloat(p, "accelerationZ", measurement.AccelerationZ) + influx3AddFloat(p, "batteryVoltage", measurement.BatteryVoltage) + influx3AddInt(p, "txPower", measurement.TxPower) + influx3AddInt(p, "rssi", measurement.Rssi) + influx3AddInt(p, "movementCounter", measurement.MovementCounter) + influx3AddInt(p, "measurementSequenceNumber", measurement.MeasurementSequenceNumber) + influx3AddFloat(p, "accelerationTotal", measurement.AccelerationTotal) + influx3AddFloat(p, "absoluteHumidity", measurement.AbsoluteHumidity) + influx3AddFloat(p, "dewPoint", measurement.DewPoint) + influx3AddFloat(p, "equilibriumVaporPressure", measurement.EquilibriumVaporPressure) + influx3AddFloat(p, "airDensity", measurement.AirDensity) + influx3AddFloat(p, "accelerationAngleFromX", measurement.AccelerationAngleFromX) + influx3AddFloat(p, "accelerationAngleFromY", measurement.AccelerationAngleFromY) + influx3AddFloat(p, "accelerationAngleFromZ", measurement.AccelerationAngleFromZ) + p.SetTimestamp(time.Now()) + err := client.WritePoints(context.Background(), p) + if err != nil { + log.WithError(err).Error("Failed to send data to InfluxDB3") + } + }(measurement) } client.Close() }()