Skip to content

Commit

Permalink
fix: Named pipe streams can exit after a zero byte read.
Browse files Browse the repository at this point in the history
According to `pipe(7)` and `read(2)` the read will return 0 when all writer FDs are closed,
with no `errno`, signalling EOF.

In order to use the `io.ReadFrom` interface we won't be able to rely on the
return of `io.EOF` anymore.
  • Loading branch information
jaqx0r committed Jul 10, 2024
1 parent a39087b commit b53a574
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
13 changes: 10 additions & 3 deletions internal/tailer/logstream/pipestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
glog.Info(err)
}
logCloses.Add(ps.pathname, 1)
if partial.Len() > 0 {
ps.sendLine(ctx, partial)
}
close(ps.lines)
ps.cancel()
}()
Expand All @@ -107,13 +110,17 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
if err == nil && ctx.Err() == nil {
continue
}
} else if n == 0 {
// `pipe(7)` tells us "If all file descriptors referring to the
// write end of a pipe have been closed, then an attempt to
// read(2) from the pipe will see end-of-file (read(2) will
// return 0)."
glog.V(2).Infof("stream(%s): exiting, 0 bytes read", ps.sourcename)
return
}

// Test to see if we should exit.
if IsExitableError(err) {
if partial.Len() > 0 {
ps.sendLine(ctx, partial)
}
glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err)
return
}
Expand Down
13 changes: 11 additions & 2 deletions internal/tailer/logstream/pipestream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,19 @@ func TestPipeStreamReadURL(t *testing.T) {

expected := []*logline.LogLine{
{Context: context.TODO(), Filename: name, Line: "1"},
{Context: context.TODO(), Filename: name, Line: "2"},
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines())

f, err := os.OpenFile(name, os.O_WRONLY, os.ModeNamedPipe)
testutil.FatalIfErr(t, err)
testutil.WriteString(t, f, "1\n")

// Give the stream a chance to wake and read
time.Sleep(10 * time.Millisecond)

testutil.WriteString(t, f, "2\n")

// Pipes need to be closed to signal to the pipeStream to finish up.
testutil.FatalIfErr(t, f.Close())

Expand All @@ -147,7 +153,7 @@ func TestPipeStreamReadStdin(t *testing.T) {
f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe)
testutil.FatalIfErr(t, err)
testutil.OverrideStdin(t, f)
testutil.WriteString(t, f, "content\n")
testutil.WriteString(t, f, "1\n")

ctx, cancel := context.WithCancel(context.Background())
// The stream is not shut down by cancel in this test.
Expand All @@ -158,10 +164,13 @@ func TestPipeStreamReadStdin(t *testing.T) {
testutil.FatalIfErr(t, err)

expected := []*logline.LogLine{
{Context: context.TODO(), Filename: "-", Line: "content"},
{Context: context.TODO(), Filename: "-", Line: "1"},
{Context: context.TODO(), Filename: "-", Line: "2"},
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines())

testutil.WriteString(t, f, "2\n")

// Give the stream a chance to wake and read
time.Sleep(10 * time.Millisecond)

Expand Down

0 comments on commit b53a574

Please sign in to comment.