ECS core implements metrics in folder common/monitoring. Metrics stack in ALICE
experiment is based on influxdb and Telegraf, where Telegraf scrapes metrics
from given http endpoint in arbitrary format and sends it into the influxdb database
instance. In order to expose endpoint with metrics we can set cli parameter metricsEndpoint,
which is in format [port]/[endpoint] (default: 8088/ecsmetrics).
After running core with given parameter we can scrape this endpoint with eg. curl:
curl http://127.0.0.1:8088/ecsmetrics
Result of this command is for example:
kafka,subsystem=ECS,topic=aliecs.run send_bytes=1638400u,sent_messages=42u 1746457955000000000
This format is called influx line protocol. We will use this example to introduce influxdb metrics format. Every line is one metric and every metric is composed from multiple parts separated by commas and spaces:
- name of measurement - (required) string on the beginning of the line (
golangruntimemetrics) - comma
- tags - (optional) key-value list separated by commas.
subsystem=ECS,topic=aliecs.runis the example of key-value list, where subsystem and topic are the keys, ECS and aliecs.run are values. - space - divides measurement and tags from the fields holding measurement data
- fields - (required) actual values in same format as tags. We support
int64,uint64andfloat64values (/sched/goroutines:goroutines=42u) - space - divides fields and timestamp
- timestamp - (optional) int64 value of unix timestamp in ns
In order to provide support for this format we introduced Metric structure in common/monitoring/metric.go.
Following code shows how to create a Metric with measurement as measurement name,
one tag tag1=val1 and field field1=42u:
m := monitoring.NewMetric("measurement", time.Now())
m.AddTag("tag1", "val1")
m.SetFieldUInt64("field1", 42)However we also need to be able to store metrics, so these can be scraped correctly.
This mechanism is implemented in common/monitoring/monitoring.go.
Metrics endpoint is run by calling Run(port, endpointName). As this method is blocking
it is advised to call it from goroutine. After this method is called we can than send
metrics via methods Send and SendHistogrammable. If you want to send simple metrics
(eg. counter of messages sent) you are advised to use simple Send. However, if you are
interested into having percentiles reported from metrics you should use
SendHistogrammable.
go monigoring.Run(8088, "/metrics")
m := monitoring.NewMetric("measurement", time.Now())
m.AddTag("tag1", "val1")
m.SetFieldUInt64("field1", 42)
monigoring.Send(&m)
monigoring.SendHistogrammable(&m)
monigoring.Stop()Example for this use-case is duration of some function, eg. measure sending batch of messages. If we want the best coverage of metrics possible we can combine both of these to measure amount of messages send per batch and also measurement duration of the send. For example in code you can take a look actual actual code in writer.go where we are sending multiple fields per metric and demonstrate full potential of these metrics.
Previous code example will result in following metrics to be reported:
measurement,tag1=val1 field1=1 [timestamp]
measurement,tag1=val1 field1_mean=1,field1_median=1,field1_min=1,field1_p10=1,field1_p30=1,field1_p70=1,field1_p90=1,field1_max=1,field1_count=1,field1_poolsize=1 [timestamp]
In following text we will talk about aggregating over time interval which is always 1 second.
First metric is self explanatory, but we can see that Histogrammable metric
reports multiple percentiles, mean, min and max. These values can be the same if
we don't receive send values during 1 second. Moreover it reports count
and poolsize, where count describes number of times this metric was sent
and poolsize is internal metric, which will be described later.
We mentioned in previous part that there are two ways how to send metrics in ECS resulting in two different outcomes, based on these outcomes we talk about two metric types:
- Counter -
Send - Histogrammable -
SendHistogrammable
Creation of both metrics is done by NewMetric(measurement, timestamp),
so both use same object Metric. The distinction is done by Send methods:
Send and SendHistogrammable.
Simple Send(*Metric)is used to just store metric with given tags and fields without
creating any other information. This metric is just aggregated
(more about it later).
Sending metric through SendHistogrammable(*Metric)
will result in metric being added into the collection of metrics with same
measurement and tags. This collection is used to compute percentiles
(10, 30, 50, 70, 90), min, max and count (count of elements added into
the collection since last reset). These values are reported as fields
by appending strings to the name of original field:
| Meaning | Appended field |
|---|---|
| 10-percentile | field_10p |
| 30-percentile | field_30p |
| 70-percentile | field_70p |
| 90-percentile | field_90p |
| median (50-percentile) | field_median |
| mean | field_mean |
| min | field_min |
| max | field_max |
| count | field_count |
To reduce network bandwidth and RAM usage, we aggregate all metrics. This is done by grouping them into one-second intervals based on:
- Timestamps rounded down to the nearest second
- Measurement name
- Tags
This aggregation method applies to all metric types.
If multiple metrics share the same measurement name and tags but have timestamps less than one second apart, they will be grouped into a single bucket, using the timestamp rounded down to the nearest second.
However, if the measurement name or tags even slightly differ the metrics will not be grouped and will remain separate. Also, if a metric is missing a tag present in the others, it won't be aggregated with them.
notaggregated,tag1=val1 fields1=1i 1000000123
aggregated,tag1=val1 fields1=1i 1000000001
aggregated,tag1=val1 fields1=1i 1000000021
aggregated,tag1=val1 fields1=1i,fields2=1i 1000000021
aggregated,tag1=val1,tag2=val2 fields1=1i 1000030021
aggregated,tag1=val1 fields1=2i 2000000021
If all of these metrics are send and thus aggregated we will result in following:
notaggregated,tag1=val1 fields1=1i 1000000000
aggregated,tag1=val1 fields1=3i,fields2=1i 1000000000
aggregated,tag1=val1,tag2=val2 fields1=1i 1000000000
aggregated,tag1=val1 fields1=2i 2000000000
Explanation:
notaggregatedis unique measurement.aggregatedwith one tag will have value offields1equal to 3 as three metrics fell into the same timestamp bucket andfields2was aggregated into the same metric as tags and measurement were the same.aggregatedwith either multiple tags or timestamp which is over 2s cannot be aggregated anywhere and are held as unique values.
The same would happen Histogrammables, except that the aggregation would not be addition if different points, but creating statistical report as mentioned in previous part.
In order to send metrics from unlimited amount of goroutines, we need to have
robust and thread-safe mechanism. It is implemented in common/monitoring/monitoring.go
as event loop (eventLoop) that reads data from two buffered channels (metricsChannel and metricsHistosChannel)
with one goroutine. Apart from reading messages from these two channels event loop also handles
scraping requests from http.Server endpoint. As the http endpoint is called by a
different goroutine than the one processing event loop, we use another two channels:
metricsRequestedChannel which is used by the endpoint to request current metrics.
Transformed metrics are sent via metricsExportedToRequest back to the endpoint.
Methods Send and SendHistogrammable write to the corresponding channels,
which are consumed by event loop.
In order to correctly implement behaviour described in the part about Aggregation
we use the same implementation in two container aggregating objects
MetricsAggregate, MetricsReservoirSampling implemented in files
common/monitoring/metricsaggregate.go
and metricsreservoirsampling.go
in the same directory. The implementation is done as different buckets
in map with distinct keys (metricsBuckets). These keys need to be unique
according to the timestamp and tags. We use struct key composed
from time.Time and maphash.Hash. Hash was chosen so we don't have
to compare arbitrary amount of tags, where keys and it's values
must be compared piece by piece. If we are inserting new bucket into metricsBuckets
we create new key by rounding down timestamp time.Unix(metric.timestamp.Unix(), 0)
and hashing all tags and their values. This will result to unique buckets
distinguished by timestamp and tags collection.
However there is a potential problem:
We are storing tags as unsorted slice in Metric, so it is possible to create
two metrics with same tags, but in different order. These will result
in different hashes as maphash is order-dependent.
We are computing percentiles from all metrics sent via SendHistogrammable
method, but we are computing these from streaming data with unknown
limits, so we cannot easily create histogram. However there exist simple
solution called sampling reservoir and is discussed in this wiki article.
It uses easy principle where every value from streaming data must have
the same probability of staying inside fixed buffer called reservoir.