diff --git a/internal/tailer/logstream/pipestream_unix_test.go b/internal/tailer/logstream/pipestream_unix_test.go index 7fa924d6d..03d33bf20 100644 --- a/internal/tailer/logstream/pipestream_unix_test.go +++ b/internal/tailer/logstream/pipestream_unix_test.go @@ -20,6 +20,14 @@ import ( "golang.org/x/sys/unix" ) +// In pipestream tests we must create the write end of the pipe before opening +// the logstream. If the logstream is created first, then there is an +// opportuity for a race if the Read gets scheduled before the write end has +// opened, because then https://github.com/golang/go/issues/44176 occurs -- the +// call returns EOF signalling the end of input. This might actually be +// desirable behaviour because `pipe(7)` says that if all writers are closed, a +// read returns EOF. + func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) { testutil.TimeoutTest(1*time.Second, func(t *testing.T) { //nolint:thelper var wg sync.WaitGroup @@ -32,10 +40,6 @@ func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) waker := waker.NewTestAlways() - // In this and the following test, open RDWR so as to not block this thread - // from proceeding. If we open the logstream first, there is a race before - // the write end opens that can sometimes lead to the logstream reading an - // EOF (because the write end is not yet open) and the test fails. f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe) testutil.FatalIfErr(t, err) @@ -49,6 +53,9 @@ func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) { testutil.WriteString(t, f, "1\n") + // Park this thread and hope the logstream gets a chance to read it. + time.Sleep(10 * time.Millisecond) + // Pipes need to be closed to signal to the pipeStream to finish up. testutil.FatalIfErr(t, f.Close()) @@ -111,6 +118,9 @@ func TestPipeStreamReadURL(t *testing.T) { defer cancel() waker := waker.NewTestAlways() + f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe) + testutil.FatalIfErr(t, err) + ps, err := logstream.New(ctx, &wg, waker, "file://"+name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) @@ -120,15 +130,13 @@ func TestPipeStreamReadURL(t *testing.T) { } checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines()) - f, err := os.OpenFile(name, os.O_WRONLY, os.ModeNamedPipe) - testutil.FatalIfErr(t, err) testutil.WriteString(t, f, "1\n") + testutil.WriteString(t, f, "2\n") + // Give the stream a chance to wake and read time.Sleep(10 * time.Millisecond) - testutil.WriteString(t, f, "2\n") - // Pipes need to be closed to signal to the pipeStream to finish up. testutil.FatalIfErr(t, f.Close())