From b90d8ed881be8ed64d0a9fcb61f1c8d10700e75c Mon Sep 17 00:00:00 2001 From: Taj Pereira Date: Wed, 11 Oct 2023 07:49:22 +1030 Subject: [PATCH] Multi Receiver Testbed Support (#5962) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [processor/resourcedetection] Fix system detector not setting attributes (#24670) (#24684) Cherry-pick the commit from the patch release v0.82.1 Co-authored-by: Dmitrii Anoshin * feat: working * . * . * . * . * . --------- Co-authored-by: Juraci Paixão Kröhling Co-authored-by: Dmitrii Anoshin --- testbed/testbed/multi_receiver_test_case.go | 325 ++++++++++++++++++++ testbed/testbed/multi_receiver_validator.go | 43 +++ testbed/testbed/options.go | 3 + testbed/testbed/results.go | 72 +++++ 4 files changed, 443 insertions(+) create mode 100644 testbed/testbed/multi_receiver_test_case.go create mode 100644 testbed/testbed/multi_receiver_validator.go diff --git a/testbed/testbed/multi_receiver_test_case.go b/testbed/testbed/multi_receiver_test_case.go new file mode 100644 index 000000000000..b75c8d03dc40 --- /dev/null +++ b/testbed/testbed/multi_receiver_test_case.go @@ -0,0 +1,325 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testbed // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" + +import ( + "fmt" + "log" + "net" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// MultiReceiverTestCase defines a running test case. +type MultiReceiverTestCase struct { + T *testing.T + + // Directory where test case results and logs will be written. + resultDir string + + // Resource spec for agent. + resourceSpec ResourceSpec + + // Agent process. + agentProc OtelcolRunner + + Sender DataSender + Receivers []DataReceiver + + LoadGenerator *LoadGenerator + MockBackends map[DataReceiver]*MockBackend + validator *MultiReceiverTestCaseValidator + + startTime time.Time + + // errorSignal indicates an error in the test case execution, e.g. process execution + // failure or exceeding resource consumption, etc. The actual error message is already + // logged, this is only an indicator on which you can wait to be informed. + errorSignal chan struct{} + // Duration is the requested duration of the tests. Configured via TESTBED_DURATION + // env variable and defaults to 15 seconds if env variable is unspecified. + Duration time.Duration + doneSignal chan struct{} + errorCause string + ResultsSummary TestResultsSummary +} + +// NewMultiReceiverTestCase creates a new MultiReceiverTestCase. It expects agent-config.yaml in the specified directory. +func NewMultiReceiverTestCase( + t *testing.T, + dataProvider DataProvider, + sender DataSender, + receivers []DataReceiver, + agentProc OtelcolRunner, + validator *MultiReceiverTestCaseValidator, + resultsSummary TestResultsSummary, + opts ...MultiReceiverTestCaseOption, +) *MultiReceiverTestCase { + tc := MultiReceiverTestCase{ + T: t, + errorSignal: make(chan struct{}), + doneSignal: make(chan struct{}), + startTime: time.Now(), + Sender: sender, + Receivers: receivers, + agentProc: agentProc, + validator: validator, + ResultsSummary: resultsSummary, + MockBackends: make(map[DataReceiver]*MockBackend), + } + + // Get requested test case duration from env variable. + duration := os.Getenv(testcaseDurationVar) + if duration == "" { + duration = "15s" + } + var err error + tc.Duration, err = time.ParseDuration(duration) + if err != nil { + log.Fatalf("Invalid "+testcaseDurationVar+": %v. Expecting a valid duration string.", duration) + } + + // Apply all provided options. + for _, opt := range opts { + opt(&tc) + } + + // Prepare directory for results. + tc.resultDir, err = filepath.Abs(path.Join("results", t.Name())) + require.NoErrorf(t, err, "Cannot resolve %s", t.Name()) + require.NoErrorf(t, os.MkdirAll(tc.resultDir, os.ModePerm), "Cannot create directory %s", tc.resultDir) + + // Set default resource check period. + tc.resourceSpec.ResourceCheckPeriod = 3 * time.Second + if tc.Duration < tc.resourceSpec.ResourceCheckPeriod { + // Resource check period should not be longer than entire test duration. + tc.resourceSpec.ResourceCheckPeriod = tc.Duration + } + + tc.LoadGenerator, err = NewLoadGenerator(dataProvider, sender) + require.NoError(t, err, "Cannot create generator") + + for _, receiver := range receivers { + tc.MockBackends[receiver] = NewMockBackend(tc.composeTestResultFileName("backend.log"), receiver) + } + + go tc.logStats() + + return &tc +} + +func (tc *MultiReceiverTestCase) composeTestResultFileName(fileName string) string { + fileName, err := filepath.Abs(path.Join(tc.resultDir, fileName)) + require.NoError(tc.T, err, "Cannot resolve %s", fileName) + return fileName +} + +// StartAgent starts the agent and redirects its standard output and standard error +// to "agent.log" file located in the test directory. +func (tc *MultiReceiverTestCase) StartAgent(args ...string) { + logFileName := tc.composeTestResultFileName("agent.log") + + startParams := StartParams{ + Name: "Agent", + LogFilePath: logFileName, + CmdArgs: args, + resourceSpec: &tc.resourceSpec, + } + if err := tc.agentProc.Start(startParams); err != nil { + tc.indicateError(err) + return + } + + // Start watching resource consumption. + go func() { + if err := tc.agentProc.WatchResourceConsumption(); err != nil { + tc.indicateError(err) + } + }() + + endpoint := tc.LoadGenerator.sender.GetEndpoint() + if endpoint != nil { + // Wait for agent to start. We consider the agent started when we can + // connect to the port to which we intend to send load. We only do this + // if the endpoint is not-empty, i.e. the sender does use network (some senders + // like text log writers don't). + tc.WaitFor(func() bool { + conn, err := net.Dial(tc.LoadGenerator.sender.GetEndpoint().Network(), tc.LoadGenerator.sender.GetEndpoint().String()) + if err == nil && conn != nil { + conn.Close() + return true + } + return false + }, fmt.Sprintf("connection to %s:%s", tc.LoadGenerator.sender.GetEndpoint().Network(), tc.LoadGenerator.sender.GetEndpoint().String())) + } +} + +// StopAgent stops agent process. +func (tc *MultiReceiverTestCase) StopAgent() { + if _, err := tc.agentProc.Stop(); err != nil { + tc.indicateError(err) + } +} + +// StartLoad starts the load generator and redirects its standard output and standard error +// to "load-generator.log" file located in the test directory. +func (tc *MultiReceiverTestCase) StartLoad(options LoadOptions) { + tc.LoadGenerator.Start(options) +} + +// StopLoad stops load generator. +func (tc *MultiReceiverTestCase) StopLoad() { + tc.LoadGenerator.Stop() +} + +// StartBackend starts the specified backend type. +func (tc *MultiReceiverTestCase) StartBackends() { + + for _, backend := range tc.MockBackends { + require.NoError(tc.T, backend.Start(), "Cannot start backend for: "+backend.receiver.ProtocolName()) + } +} + +// StopBackend stops the backend. +func (tc *MultiReceiverTestCase) StopBackend() { + for _, backend := range tc.MockBackends { + backend.Stop() + } +} + +// EnableRecording enables recording of all data received by MockBackend. +func (tc *MultiReceiverTestCase) EnableRecording() { + for _, backend := range tc.MockBackends { + backend.EnableRecording() + } +} + +// AgentMemoryInfo returns raw memory info struct about the agent +// as returned by github.com/shirou/gopsutil/process +func (tc *MultiReceiverTestCase) AgentMemoryInfo() (uint32, uint32, error) { + stat, err := tc.agentProc.GetProcessMon().MemoryInfo() + if err != nil { + return 0, 0, err + } + return uint32(stat.RSS / mibibyte), uint32(stat.VMS / mibibyte), nil +} + +// Stop stops the load generator, the agent and the backend. +func (tc *MultiReceiverTestCase) Stop() { + // Stop monitoring the agent + close(tc.doneSignal) + + // Stop all components + tc.StopLoad() + tc.StopAgent() + tc.StopBackend() + + // Report test results + tc.validator.RecordResults(tc) +} + +// ValidateData validates data received by mock backend against what was generated and sent to the collector +// instance(s) under test by the LoadGenerator. +func (tc *MultiReceiverTestCase) ValidateData() { + select { + case <-tc.errorSignal: + // Error is already signaled and recorded. Validating data is pointless. + return + default: + } + + tc.validator.Validate(tc) +} + +// Sleep for specified duration or until error is signaled. +func (tc *MultiReceiverTestCase) Sleep(d time.Duration) { + select { + case <-time.After(d): + case <-tc.errorSignal: + } +} + +// WaitForN the specific condition for up to a specified duration. Records a test error +// if time is out and condition does not become true. If error is signaled +// while waiting the function will return false, but will not record additional +// test error (we assume that signaled error is already recorded in indicateError()). +func (tc *MultiReceiverTestCase) WaitForN(cond func() bool, duration time.Duration, errMsg interface{}) bool { + startTime := time.Now() + + // Start with 5 ms waiting interval between condition re-evaluation. + waitInterval := time.Millisecond * 5 + + for { + if cond() { + return true + } + + select { + case <-time.After(waitInterval): + case <-tc.errorSignal: + return false + } + + // Increase waiting interval exponentially up to 500 ms. + if waitInterval < time.Millisecond*500 { + waitInterval *= 2 + } + + if time.Since(startTime) > duration { + // Waited too long + tc.T.Error("Time out waiting for", errMsg) + return false + } + } +} + +// WaitFor is like WaitForN but with a fixed duration of 10 seconds +func (tc *MultiReceiverTestCase) WaitFor(cond func() bool, errMsg interface{}) bool { + return tc.WaitForN(cond, time.Second*60, errMsg) +} + +func (tc *MultiReceiverTestCase) indicateError(err error) { + // Print to log for visibility + log.Print(err.Error()) + + // Indicate error for the test + tc.T.Error(err.Error()) + + tc.errorCause = err.Error() + + // Signal the error via channel + close(tc.errorSignal) +} + +func (tc *MultiReceiverTestCase) logStats() { + t := time.NewTicker(tc.resourceSpec.ResourceCheckPeriod) + defer t.Stop() + + for { + select { + case <-t.C: + tc.logStatsOnce() + case <-tc.doneSignal: + return + } + } +} + +func (tc *MultiReceiverTestCase) logStatsOnce() { + + log.Printf("%s | %s | ", + tc.agentProc.GetResourceConsumption(), + tc.LoadGenerator.GetStats()) + + for _, backend := range tc.MockBackends { + log.Printf("%s | ", + backend.GetStats()) + } +} + diff --git a/testbed/testbed/multi_receiver_validator.go b/testbed/testbed/multi_receiver_validator.go new file mode 100644 index 000000000000..c061a1a7146f --- /dev/null +++ b/testbed/testbed/multi_receiver_validator.go @@ -0,0 +1,43 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testbed // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" + +// MultiReceiverValidator defines the interface for validating and reporting test results. +type MultiReceiverTestCaseValidator struct { + // Validate executes validation routines and test assertions. + Validate func(tc *MultiReceiverTestCase) + + dataProvider DataProvider + assertionFailures []*TraceAssertionFailure +} + + +func NewMultiReceiverTestCaseValidator( + senderName string, + provider DataProvider, + validator func(tc *MultiReceiverTestCase), +) *MultiReceiverTestCaseValidator { + return &MultiReceiverTestCaseValidator{ + dataProvider: provider, + Validate: validator, + } +} + +func (v *MultiReceiverTestCaseValidator) RecordResults(tc *MultiReceiverTestCase) { + var result string + if tc.T.Failed() { + result = "FAIL" + } else { + result = "PASS" + } + + // Remove "Test" prefix from test name. + testName := tc.T.Name()[4:] + tc.ResultsSummary.Add(tc.T.Name(), &MultiReceiverTestResult{ + testName: testName, + result: result, + traceAssertionFailureCount: uint64(len(v.assertionFailures)), + traceAssertionFailures: v.assertionFailures, + }) +} diff --git a/testbed/testbed/options.go b/testbed/testbed/options.go index 3b0e51d6f975..b936eab8f349 100644 --- a/testbed/testbed/options.go +++ b/testbed/testbed/options.go @@ -42,6 +42,9 @@ func (rs *ResourceSpec) isSpecified() bool { // TestCaseOption defines a TestCase option. type TestCaseOption func(t *TestCase) +// MultiReceiverTestCaseOption defines a MultiReceiverTestCase option. +type MultiReceiverTestCaseOption func(t *MultiReceiverTestCase) + // WithSkipResults disables writing out results file for a TestCase. func WithSkipResults() TestCaseOption { return func(tc *TestCase) { diff --git a/testbed/testbed/results.go b/testbed/testbed/results.go index 505df5ec3e14..833d72842fa3 100644 --- a/testbed/testbed/results.go +++ b/testbed/testbed/results.go @@ -266,3 +266,75 @@ func consolidateAssertionFailures(failures []*TraceAssertionFailure) map[string] } return afMap } + +type MultiReceiverTestResult struct { + testName string + result string + duration time.Duration + traceAssertionFailureCount uint64 + traceAssertionFailures []*TraceAssertionFailure +} + +// MultiReceiverTestResults implements the TestResultsSummary interface +type MultiReceiverTestResults struct { + resultsDir string + resultsFile *os.File + perTestResults []*MultiReceiverTestResult + totalAssertionFailures uint64 + totalDuration time.Duration +} + +func (r *MultiReceiverTestResults) Init(resultsDir string) { + r.resultsDir = resultsDir + r.perTestResults = []*MultiReceiverTestResult{} + + // Create resultsSummary file + if err := os.MkdirAll(resultsDir, os.FileMode(0755)); err != nil { + log.Fatal(err) + } + var err error + r.resultsFile, err = os.Create(path.Join(r.resultsDir, "MULTI_RECEIVER_RESULTS.md")) + if err != nil { + log.Fatal(err) + } + + // Write the header + _, _ = io.WriteString(r.resultsFile, + "# Test Results\n"+ + fmt.Sprintf("Started: %s\n\n", time.Now().Format(time.RFC1123Z))+ + "Test |Result|Duration|Failure Count|\n"+ + "----------------------------------------------------------------------|------|-------:|------------:|\n") +} + +func (r *MultiReceiverTestResults) Add(_ string, result interface{}) { + testResult, ok := result.(*MultiReceiverTestResult) + if !ok { + return + } + consolidated := consolidateAssertionFailures(testResult.traceAssertionFailures) + failuresStr := "" + for _, af := range consolidated { + failuresStr = fmt.Sprintf("%s%s,%#v!=%#v,count=%d; ", failuresStr, af.fieldPath, af.expectedValue, + af.actualValue, af.sumCount) + } + _, _ = io.WriteString(r.resultsFile, + fmt.Sprintf("%-70s|%-6s|%7.0fs|%13d|%s\n", + testResult.testName, + testResult.result, + testResult.duration.Seconds(), + testResult.traceAssertionFailureCount, + failuresStr, + ), + ) + r.perTestResults = append(r.perTestResults, testResult) + r.totalAssertionFailures += testResult.traceAssertionFailureCount + r.totalDuration += testResult.duration +} + +func (r *MultiReceiverTestResults) Save() { + _, _ = io.WriteString(r.resultsFile, + fmt.Sprintf("\nTotal assertion failures: %d\n", r.totalAssertionFailures)) + _, _ = io.WriteString(r.resultsFile, + fmt.Sprintf("\nTotal duration: %.0fs\n", r.totalDuration.Seconds())) + r.resultsFile.Close() +}