414 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			414 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
// Copyright 2017, OpenCensus Authors
 | 
						|
//
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
//
 | 
						|
 | 
						|
package view
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"go.opencensus.io/resource"
 | 
						|
 | 
						|
	"go.opencensus.io/metric/metricdata"
 | 
						|
	"go.opencensus.io/metric/metricproducer"
 | 
						|
	"go.opencensus.io/stats"
 | 
						|
	"go.opencensus.io/stats/internal"
 | 
						|
	"go.opencensus.io/tag"
 | 
						|
)
 | 
						|
 | 
						|
func init() {
 | 
						|
	defaultWorker = NewMeter().(*worker)
 | 
						|
	go defaultWorker.start()
 | 
						|
	internal.DefaultRecorder = record
 | 
						|
}
 | 
						|
 | 
						|
type measureRef struct {
 | 
						|
	measure string
 | 
						|
	views   map[*viewInternal]struct{}
 | 
						|
}
 | 
						|
 | 
						|
type worker struct {
 | 
						|
	measures   map[string]*measureRef
 | 
						|
	views      map[string]*viewInternal
 | 
						|
	startTimes map[*viewInternal]time.Time
 | 
						|
 | 
						|
	timer      *time.Ticker
 | 
						|
	c          chan command
 | 
						|
	quit, done chan bool
 | 
						|
	mu         sync.RWMutex
 | 
						|
	r          *resource.Resource
 | 
						|
 | 
						|
	exportersMu sync.RWMutex
 | 
						|
	exporters   map[Exporter]struct{}
 | 
						|
}
 | 
						|
 | 
						|
// Meter defines an interface which allows a single process to maintain
 | 
						|
// multiple sets of metrics exports (intended for the advanced case where a
 | 
						|
// single process wants to report metrics about multiple objects, such as
 | 
						|
// multiple databases or HTTP services).
 | 
						|
//
 | 
						|
// Note that this is an advanced use case, and the static functions in this
 | 
						|
// module should cover the common use cases.
 | 
						|
type Meter interface {
 | 
						|
	stats.Recorder
 | 
						|
	// Find returns a registered view associated with this name.
 | 
						|
	// If no registered view is found, nil is returned.
 | 
						|
	Find(name string) *View
 | 
						|
	// Register begins collecting data for the given views.
 | 
						|
	// Once a view is registered, it reports data to the registered exporters.
 | 
						|
	Register(views ...*View) error
 | 
						|
	// Unregister the given views. Data will not longer be exported for these views
 | 
						|
	// after Unregister returns.
 | 
						|
	// It is not necessary to unregister from views you expect to collect for the
 | 
						|
	// duration of your program execution.
 | 
						|
	Unregister(views ...*View)
 | 
						|
	// SetReportingPeriod sets the interval between reporting aggregated views in
 | 
						|
	// the program. If duration is less than or equal to zero, it enables the
 | 
						|
	// default behavior.
 | 
						|
	//
 | 
						|
	// Note: each exporter makes different promises about what the lowest supported
 | 
						|
	// duration is. For example, the Stackdriver exporter recommends a value no
 | 
						|
	// lower than 1 minute. Consult each exporter per your needs.
 | 
						|
	SetReportingPeriod(time.Duration)
 | 
						|
 | 
						|
	// RegisterExporter registers an exporter.
 | 
						|
	// Collected data will be reported via all the
 | 
						|
	// registered exporters. Once you no longer
 | 
						|
	// want data to be exported, invoke UnregisterExporter
 | 
						|
	// with the previously registered exporter.
 | 
						|
	//
 | 
						|
	// Binaries can register exporters, libraries shouldn't register exporters.
 | 
						|
	RegisterExporter(Exporter)
 | 
						|
	// UnregisterExporter unregisters an exporter.
 | 
						|
	UnregisterExporter(Exporter)
 | 
						|
	// SetResource may be used to set the Resource associated with this registry.
 | 
						|
	// This is intended to be used in cases where a single process exports metrics
 | 
						|
	// for multiple Resources, typically in a multi-tenant situation.
 | 
						|
	SetResource(*resource.Resource)
 | 
						|
 | 
						|
	// Start causes the Meter to start processing Record calls and aggregating
 | 
						|
	// statistics as well as exporting data.
 | 
						|
	Start()
 | 
						|
	// Stop causes the Meter to stop processing calls and terminate data export.
 | 
						|
	Stop()
 | 
						|
 | 
						|
	// RetrieveData gets a snapshot of the data collected for the the view registered
 | 
						|
	// with the given name. It is intended for testing only.
 | 
						|
	RetrieveData(viewName string) ([]*Row, error)
 | 
						|
}
 | 
						|
 | 
						|
var _ Meter = (*worker)(nil)
 | 
						|
 | 
						|
var defaultWorker *worker
 | 
						|
 | 
						|
var defaultReportingDuration = 10 * time.Second
 | 
						|
 | 
						|
// Find returns a registered view associated with this name.
 | 
						|
// If no registered view is found, nil is returned.
 | 
						|
func Find(name string) (v *View) {
 | 
						|
	return defaultWorker.Find(name)
 | 
						|
}
 | 
						|
 | 
						|
// Find returns a registered view associated with this name.
 | 
						|
// If no registered view is found, nil is returned.
 | 
						|
func (w *worker) Find(name string) (v *View) {
 | 
						|
	req := &getViewByNameReq{
 | 
						|
		name: name,
 | 
						|
		c:    make(chan *getViewByNameResp),
 | 
						|
	}
 | 
						|
	w.c <- req
 | 
						|
	resp := <-req.c
 | 
						|
	return resp.v
 | 
						|
}
 | 
						|
 | 
						|
// Register begins collecting data for the given views.
 | 
						|
// Once a view is registered, it reports data to the registered exporters.
 | 
						|
func Register(views ...*View) error {
 | 
						|
	return defaultWorker.Register(views...)
 | 
						|
}
 | 
						|
 | 
						|
// Register begins collecting data for the given views.
 | 
						|
// Once a view is registered, it reports data to the registered exporters.
 | 
						|
func (w *worker) Register(views ...*View) error {
 | 
						|
	req := ®isterViewReq{
 | 
						|
		views: views,
 | 
						|
		err:   make(chan error),
 | 
						|
	}
 | 
						|
	w.c <- req
 | 
						|
	return <-req.err
 | 
						|
}
 | 
						|
 | 
						|
// Unregister the given views. Data will not longer be exported for these views
 | 
						|
// after Unregister returns.
 | 
						|
// It is not necessary to unregister from views you expect to collect for the
 | 
						|
// duration of your program execution.
 | 
						|
func Unregister(views ...*View) {
 | 
						|
	defaultWorker.Unregister(views...)
 | 
						|
}
 | 
						|
 | 
						|
// Unregister the given views. Data will not longer be exported for these views
 | 
						|
// after Unregister returns.
 | 
						|
// It is not necessary to unregister from views you expect to collect for the
 | 
						|
// duration of your program execution.
 | 
						|
func (w *worker) Unregister(views ...*View) {
 | 
						|
	names := make([]string, len(views))
 | 
						|
	for i := range views {
 | 
						|
		names[i] = views[i].Name
 | 
						|
	}
 | 
						|
	req := &unregisterFromViewReq{
 | 
						|
		views: names,
 | 
						|
		done:  make(chan struct{}),
 | 
						|
	}
 | 
						|
	w.c <- req
 | 
						|
	<-req.done
 | 
						|
}
 | 
						|
 | 
						|
// RetrieveData gets a snapshot of the data collected for the the view registered
 | 
						|
// with the given name. It is intended for testing only.
 | 
						|
func RetrieveData(viewName string) ([]*Row, error) {
 | 
						|
	return defaultWorker.RetrieveData(viewName)
 | 
						|
}
 | 
						|
 | 
						|
// RetrieveData gets a snapshot of the data collected for the the view registered
 | 
						|
// with the given name. It is intended for testing only.
 | 
						|
func (w *worker) RetrieveData(viewName string) ([]*Row, error) {
 | 
						|
	req := &retrieveDataReq{
 | 
						|
		now: time.Now(),
 | 
						|
		v:   viewName,
 | 
						|
		c:   make(chan *retrieveDataResp),
 | 
						|
	}
 | 
						|
	w.c <- req
 | 
						|
	resp := <-req.c
 | 
						|
	return resp.rows, resp.err
 | 
						|
}
 | 
						|
 | 
						|
func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
 | 
						|
	defaultWorker.Record(tags, ms, attachments)
 | 
						|
}
 | 
						|
 | 
						|
// Record records a set of measurements ms associated with the given tags and attachments.
 | 
						|
func (w *worker) Record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
 | 
						|
	req := &recordReq{
 | 
						|
		tm:          tags,
 | 
						|
		ms:          ms.([]stats.Measurement),
 | 
						|
		attachments: attachments,
 | 
						|
		t:           time.Now(),
 | 
						|
	}
 | 
						|
	w.c <- req
 | 
						|
}
 | 
						|
 | 
						|
// SetReportingPeriod sets the interval between reporting aggregated views in
 | 
						|
// the program. If duration is less than or equal to zero, it enables the
 | 
						|
// default behavior.
 | 
						|
//
 | 
						|
// Note: each exporter makes different promises about what the lowest supported
 | 
						|
// duration is. For example, the Stackdriver exporter recommends a value no
 | 
						|
// lower than 1 minute. Consult each exporter per your needs.
 | 
						|
func SetReportingPeriod(d time.Duration) {
 | 
						|
	defaultWorker.SetReportingPeriod(d)
 | 
						|
}
 | 
						|
 | 
						|
// SetReportingPeriod sets the interval between reporting aggregated views in
 | 
						|
// the program. If duration is less than or equal to zero, it enables the
 | 
						|
// default behavior.
 | 
						|
//
 | 
						|
// Note: each exporter makes different promises about what the lowest supported
 | 
						|
// duration is. For example, the Stackdriver exporter recommends a value no
 | 
						|
// lower than 1 minute. Consult each exporter per your needs.
 | 
						|
func (w *worker) SetReportingPeriod(d time.Duration) {
 | 
						|
	// TODO(acetechnologist): ensure that the duration d is more than a certain
 | 
						|
	// value. e.g. 1s
 | 
						|
	req := &setReportingPeriodReq{
 | 
						|
		d: d,
 | 
						|
		c: make(chan bool),
 | 
						|
	}
 | 
						|
	w.c <- req
 | 
						|
	<-req.c // don't return until the timer is set to the new duration.
 | 
						|
}
 | 
						|
 | 
						|
// NewMeter constructs a Meter instance. You should only need to use this if
 | 
						|
// you need to separate out Measurement recordings and View aggregations within
 | 
						|
// a single process.
 | 
						|
func NewMeter() Meter {
 | 
						|
	return &worker{
 | 
						|
		measures:   make(map[string]*measureRef),
 | 
						|
		views:      make(map[string]*viewInternal),
 | 
						|
		startTimes: make(map[*viewInternal]time.Time),
 | 
						|
		timer:      time.NewTicker(defaultReportingDuration),
 | 
						|
		c:          make(chan command, 1024),
 | 
						|
		quit:       make(chan bool),
 | 
						|
		done:       make(chan bool),
 | 
						|
 | 
						|
		exporters: make(map[Exporter]struct{}),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// SetResource associates all data collected by this Meter with the specified
 | 
						|
// resource. This resource is reported when using metricexport.ReadAndExport;
 | 
						|
// it is not provided when used with ExportView/RegisterExporter, because that
 | 
						|
// interface does not provide a means for reporting the Resource.
 | 
						|
func (w *worker) SetResource(r *resource.Resource) {
 | 
						|
	w.r = r
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) Start() {
 | 
						|
	go w.start()
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) start() {
 | 
						|
	prodMgr := metricproducer.GlobalManager()
 | 
						|
	prodMgr.AddProducer(w)
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case cmd := <-w.c:
 | 
						|
			cmd.handleCommand(w)
 | 
						|
		case <-w.timer.C:
 | 
						|
			w.reportUsage()
 | 
						|
		case <-w.quit:
 | 
						|
			w.timer.Stop()
 | 
						|
			close(w.c)
 | 
						|
			w.done <- true
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) Stop() {
 | 
						|
	prodMgr := metricproducer.GlobalManager()
 | 
						|
	prodMgr.DeleteProducer(w)
 | 
						|
 | 
						|
	w.quit <- true
 | 
						|
	<-w.done
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) getMeasureRef(name string) *measureRef {
 | 
						|
	if mr, ok := w.measures[name]; ok {
 | 
						|
		return mr
 | 
						|
	}
 | 
						|
	mr := &measureRef{
 | 
						|
		measure: name,
 | 
						|
		views:   make(map[*viewInternal]struct{}),
 | 
						|
	}
 | 
						|
	w.measures[name] = mr
 | 
						|
	return mr
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
 | 
						|
	w.mu.Lock()
 | 
						|
	defer w.mu.Unlock()
 | 
						|
	vi, err := newViewInternal(v)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if x, ok := w.views[vi.view.Name]; ok {
 | 
						|
		if !x.view.same(vi.view) {
 | 
						|
			return nil, fmt.Errorf("cannot register view %q; a different view with the same name is already registered", v.Name)
 | 
						|
		}
 | 
						|
 | 
						|
		// the view is already registered so there is nothing to do and the
 | 
						|
		// command is considered successful.
 | 
						|
		return x, nil
 | 
						|
	}
 | 
						|
	w.views[vi.view.Name] = vi
 | 
						|
	w.startTimes[vi] = time.Now()
 | 
						|
	ref := w.getMeasureRef(vi.view.Measure.Name())
 | 
						|
	ref.views[vi] = struct{}{}
 | 
						|
	return vi, nil
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) unregisterView(v *viewInternal) {
 | 
						|
	w.mu.Lock()
 | 
						|
	defer w.mu.Unlock()
 | 
						|
	delete(w.views, v.view.Name)
 | 
						|
	delete(w.startTimes, v)
 | 
						|
	if measure := w.measures[v.view.Measure.Name()]; measure != nil {
 | 
						|
		delete(measure.views, v)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) reportView(v *viewInternal) {
 | 
						|
	if !v.isSubscribed() {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	rows := v.collectedRows()
 | 
						|
	viewData := &Data{
 | 
						|
		View:  v.view,
 | 
						|
		Start: w.startTimes[v],
 | 
						|
		End:   time.Now(),
 | 
						|
		Rows:  rows,
 | 
						|
	}
 | 
						|
	w.exportersMu.Lock()
 | 
						|
	defer w.exportersMu.Unlock()
 | 
						|
	for e := range w.exporters {
 | 
						|
		e.ExportView(viewData)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) reportUsage() {
 | 
						|
	w.mu.Lock()
 | 
						|
	defer w.mu.Unlock()
 | 
						|
	for _, v := range w.views {
 | 
						|
		w.reportView(v)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric {
 | 
						|
	if !v.isSubscribed() {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	var startTime time.Time
 | 
						|
	if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 ||
 | 
						|
		v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 {
 | 
						|
		startTime = time.Time{}
 | 
						|
	} else {
 | 
						|
		startTime = w.startTimes[v]
 | 
						|
	}
 | 
						|
 | 
						|
	return viewToMetric(v, w.r, now, startTime)
 | 
						|
}
 | 
						|
 | 
						|
// Read reads all view data and returns them as metrics.
 | 
						|
// It is typically invoked by metric reader to export stats in metric format.
 | 
						|
func (w *worker) Read() []*metricdata.Metric {
 | 
						|
	w.mu.Lock()
 | 
						|
	defer w.mu.Unlock()
 | 
						|
	now := time.Now()
 | 
						|
	metrics := make([]*metricdata.Metric, 0, len(w.views))
 | 
						|
	for _, v := range w.views {
 | 
						|
		metric := w.toMetric(v, now)
 | 
						|
		if metric != nil {
 | 
						|
			metrics = append(metrics, metric)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return metrics
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) RegisterExporter(e Exporter) {
 | 
						|
	w.exportersMu.Lock()
 | 
						|
	defer w.exportersMu.Unlock()
 | 
						|
 | 
						|
	w.exporters[e] = struct{}{}
 | 
						|
}
 | 
						|
 | 
						|
func (w *worker) UnregisterExporter(e Exporter) {
 | 
						|
	w.exportersMu.Lock()
 | 
						|
	defer w.exportersMu.Unlock()
 | 
						|
 | 
						|
	delete(w.exporters, e)
 | 
						|
}
 |