From 1901b56b9515b0c34f5d25a5bce982dfc543d64b Mon Sep 17 00:00:00 2001 From: Evan Anderson Date: Sun, 7 Jun 2020 23:12:01 -0700 Subject: [PATCH] Allow custom view.Meters to export metrics for other Resources (#1212) * Remove call to time.Now() on worker thread when handling record reqs (#1210) Time is already recorded on the client side and stored in the currently unused recordReq.t field. Avoiding these repeated calls to time.Now while the worker is blocked can significantly reduce worker contention. * Update Meter to track and report Resource for metric data. Co-authored-by: Ian Milligan --- stats/view/view_to_metric.go | 5 +- stats/view/view_to_metric_test.go | 4 +- stats/view/worker.go | 17 ++++- stats/view/worker_test.go | 105 ++++++++++++++++++++---------- 4 files changed, 94 insertions(+), 37 deletions(-) diff --git a/stats/view/view_to_metric.go b/stats/view/view_to_metric.go index 293c1646d..5e1656a1f 100644 --- a/stats/view/view_to_metric.go +++ b/stats/view/view_to_metric.go @@ -18,6 +18,8 @@ package view import ( "time" + "go.opencensus.io/resource" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/stats" ) @@ -125,7 +127,7 @@ func rowToTimeseries(v *viewInternal, row *Row, now time.Time, startTime time.Ti } } -func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricdata.Metric { +func viewToMetric(v *viewInternal, r *resource.Resource, now time.Time, startTime time.Time) *metricdata.Metric { if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 || v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 { startTime = time.Time{} @@ -144,6 +146,7 @@ func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricda m := &metricdata.Metric{ Descriptor: *v.metricDescriptor, TimeSeries: ts, + Resource: r, } return m } diff --git a/stats/view/view_to_metric_test.go b/stats/view/view_to_metric_test.go index 18c877117..b6df3a0f7 100644 --- a/stats/view/view_to_metric_test.go +++ b/stats/view/view_to_metric_test.go @@ -447,7 +447,7 @@ func Test_ViewToMetric(t *testing.T) { tc.vi.addSample(tag.FromContext(ctx), v, nil, now) } - gotMetric := viewToMetric(tc.vi, now, startTime) + gotMetric := viewToMetric(tc.vi, nil, now, startTime) if !cmp.Equal(gotMetric, tc.wantMetric) { // JSON format is strictly for checking the content when test fails. Do not use JSON // format to determine if the two values are same as it doesn't differentiate between @@ -509,7 +509,7 @@ func TestUnitConversionForAggCount(t *testing.T) { for _, tc := range tests { tc.vi.addSample(tag.FromContext(context.Background()), 5.0, nil, now) - gotMetric := viewToMetric(tc.vi, now, startTime) + gotMetric := viewToMetric(tc.vi, nil, now, startTime) gotUnit := gotMetric.Descriptor.Unit if !cmp.Equal(gotUnit, tc.wantUnit) { t.Errorf("Verify Unit: %s: Got:%v Want:%v", tc.name, gotUnit, tc.wantUnit) diff --git a/stats/view/worker.go b/stats/view/worker.go index 09a208fc5..51be9e278 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -20,6 +20,8 @@ import ( "sync" "time" + "go.opencensus.io/resource" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricproducer" "go.opencensus.io/stats" @@ -47,6 +49,7 @@ type worker struct { c chan command quit, done chan bool mu sync.RWMutex + r *resource.Resource exportersMu sync.RWMutex exporters map[Exporter]struct{} @@ -91,6 +94,10 @@ type Meter interface { RegisterExporter(Exporter) // UnregisterExporter unregisters an exporter. UnregisterExporter(Exporter) + // SetResource may be used to set the Resource associated with this registry. + // This is intended to be used in cases where a single process exports metrics + // for multiple Resources, typically in a multi-tenant situation. + SetResource(*resource.Resource) // Start causes the Meter to start processing Record calls and aggregating // statistics as well as exporting data. @@ -249,6 +256,14 @@ func NewMeter() Meter { } } +// SetResource associates all data collected by this Meter with the specified +// resource. This resource is reported when using metricexport.ReadAndExport; +// it is not provided when used with ExportView/RegisterExporter, because that +// interface does not provide a means for reporting the Resource. +func (w *worker) SetResource(r *resource.Resource) { + w.r = r +} + func (w *worker) Start() { go w.start() } @@ -371,7 +386,7 @@ func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric { startTime = w.startTimes[v] } - return viewToMetric(v, now, startTime) + return viewToMetric(v, w.r, now, startTime) } // Read reads all view data and returns them as metrics. diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index ee7f149aa..193aad0e2 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -18,10 +18,13 @@ package view import ( "context" "errors" + "sort" "sync" "testing" "time" + "go.opencensus.io/resource" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricexport" "go.opencensus.io/stats" @@ -123,8 +126,13 @@ func Test_Worker_MultiExport(t *testing.T) { // This test reports the same data for the default worker and a secondary // worker, and ensures that the stats are kept independently. + extraResource := resource.Resource{ + Type: "additional", + Labels: map[string]string{"key1": "value1", "key2": "value2"}, + } worker2 := NewMeter().(*worker) worker2.Start() + worker2.SetResource(&extraResource) m := stats.Float64("Test_Worker_MultiExport/MF1", "desc MF1", "unit") key := tag.MustNewKey(("key")) @@ -162,50 +170,62 @@ func Test_Worker_MultiExport(t *testing.T) { } } - wantRows := []struct { - w Meter - view string - rows []*Row - }{{ - view: count.Name, - rows: []*Row{ + makeKey := func(r *resource.Resource, view string) string { + if r == nil { + r = &resource.Resource{} + } + return resource.EncodeLabels(r.Labels) + "/" + view + } + + // Format is Resource.Labels encoded as string, then + wantPartialData := map[string][]*Row{ + makeKey(nil, count.Name): []*Row{ {[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}}, {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}}, }, - }, { - view: sum.Name, - rows: []*Row{ - {nil, &SumData{Value: 7.5}}}, - }, { - w: worker2, - view: count.Name, - rows: []*Row{ + makeKey(nil, sum.Name): []*Row{ + {nil, &SumData{Value: 7.5}}, + }, + makeKey(&extraResource, count.Name): []*Row{ {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}}, }, - }} + } - for _, wantRow := range wantRows { - retrieve := RetrieveData - if wantRow.w != nil { - retrieve = wantRow.w.(*worker).RetrieveData - } - gotRows, err := retrieve(wantRow.view) - if err != nil { - t.Fatalf("RetrieveData(%v), got error %v", wantRow.view, err) + te := &testExporter{} + metricexport.NewReader().ReadAndExport(te) + for _, m := range te.metrics { + key := makeKey(m.Resource, m.Descriptor.Name) + want, ok := wantPartialData[key] + if !ok { + t.Errorf("Unexpected data for %q: %v", key, m) + continue } - for _, got := range gotRows { - if !containsRow(wantRow.rows, got) { - t.Errorf("%s: got row %#v; want none", wantRow.view, got) - break + gotTs := m.TimeSeries + sort.Sort(byLabel(gotTs)) + + for i, ts := range gotTs { + for j, label := range ts.LabelValues { + if want[i].Tags[j].Value != label.Value { + t.Errorf("Mismatched tag values (want %q, got %q) for %v in %q", want[i].Tags[j].Value, label.Value, ts, key) + } } - } - for _, want := range wantRow.rows { - if !containsRow(gotRows, want) { - t.Errorf("%s: got none, want %#v", wantRow.view, want) - break + switch wantValue := want[i].Data.(type) { + case *CountData: + got := ts.Points[0].Value.(int64) + if wantValue.Value != got { + t.Errorf("Mismatched value (want %d, got %d) for %v in %q", wantValue, got, ts, key) + } + case *SumData: + got := ts.Points[0].Value.(float64) + if wantValue.Value != got { + t.Errorf("Mismatched value (want %f, got %f) for %v in %q", wantValue, got, ts, key) + } + default: + t.Errorf("Unexpected type of data: %T for %v in %q", wantValue, want[i], key) } } } + // Verify that worker has not been computing sum: got, err := worker2.RetrieveData(sum.Name) if err == nil { @@ -577,9 +597,11 @@ func TestWorkerRace(t *testing.T) { } type testExporter struct { + metrics []*metricdata.Metric } func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { + te.metrics = metrics return nil } @@ -619,3 +641,20 @@ func restart() { defaultWorker = NewMeter().(*worker) go defaultWorker.start() } + +// byTag implements sort.Interface for *metricdata.TimeSeries by Labels. +type byLabel []*metricdata.TimeSeries + +func (ts byLabel) Len() int { return len(ts) } +func (ts byLabel) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } +func (ts byLabel) Less(i, j int) bool { + if len(ts[i].LabelValues) != len(ts[j].LabelValues) { + return len(ts[i].LabelValues) < len(ts[j].LabelValues) + } + for k := range ts[i].LabelValues { + if ts[i].LabelValues[k].Value != ts[j].LabelValues[k].Value { + return ts[i].LabelValues[k].Value < ts[j].LabelValues[k].Value + } + } + return false +}