Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Correctly handle lines that cross buffer boundaries. #902

Merged
merged 9 commits into from
Jul 17, 2024
4 changes: 0 additions & 4 deletions internal/tailer/logstream/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@
package logstream

import (
"time"

"github.com/google/mtail/internal/logline"
)

type streamBase struct {
sourcename string // human readable name of the logstream source

lines chan *logline.LogLine // outbound channel for lines

staleTimer *time.Timer // Expire the stream if no read in 24h.
}

// Lines returns the output log line channel for this stream. The stream is
Expand Down
68 changes: 0 additions & 68 deletions internal/tailer/logstream/decode.go

This file was deleted.

35 changes: 14 additions & 21 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
package logstream

import (
"bytes"
"context"
"fmt"
"net"
"sync"
"time"

"github.com/golang/glog"
"github.com/google/mtail/internal/logline"
Expand Down Expand Up @@ -55,8 +53,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
return err
}
glog.V(2).Infof("stream(%s): opened new datagram socket %v", ds.sourcename, c)
b := make([]byte, datagramReadBufferSize)
partial := bytes.NewBufferString("")
lr := NewLineReader(ds.sourcename, ds.lines, &dgramConn{c}, datagramReadBufferSize, ds.cancel)
var total int
wg.Add(1)
go func() {
Expand All @@ -70,6 +67,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
glog.Info(err)
}
logCloses.Add(ds.address, 1)
lr.Finish(ctx)
close(ds.lines)
ds.cancel()
}()
Expand All @@ -78,40 +76,27 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
SetReadDeadlineOnDone(ctx, c)

for {
n, _, err := c.ReadFrom(b)
n, err := lr.ReadAndSend(ctx)
glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ds.sourcename, n, err)

if ds.staleTimer != nil {
ds.staleTimer.Stop()
}

// This is a test-only trick that says if we've already put this
// logstream in graceful shutdown, then a zero-byte read is
// equivalent to an "EOF" in connection and file oriented streams.
if n == 0 {
if oneShot {
glog.V(2).Infof("stream(%s): exiting because zero byte read and one shot", ds.sourcename)
if partial.Len() > 0 {
ds.sendLine(ctx, partial)
}
return
}
select {
case <-ctx.Done():
glog.V(2).Infof("stream(%s): exiting because zero byte read after cancellation", ds.sourcename)
if partial.Len() > 0 {
ds.sendLine(ctx, partial)
}
return
default:
}
}

if n > 0 {
total += n
//nolint:contextcheck
ds.decodeAndSend(ctx, n, b[:n], partial)
ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel)

// No error implies more to read, so restart the loop.
if err == nil && ctx.Err() == nil {
Expand All @@ -120,9 +105,6 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
}

if IsExitableError(err) {
if partial.Len() > 0 {
ds.sendLine(ctx, partial)
}
glog.V(2).Infof("stream(%s): exiting, stream has error %s", ds.sourcename, err)
return
}
Expand All @@ -146,3 +128,14 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
}()
return nil
}

// dgramConn wraps a PacketConn to add a Read method.
type dgramConn struct {
net.PacketConn
}

// Read satisfies io.Reader
func (d *dgramConn) Read(p []byte) (count int, err error) {
count, _, err = d.ReadFrom(p)
return
}
26 changes: 7 additions & 19 deletions internal/tailer/logstream/fifostream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
package logstream

import (
"bytes"
"context"
"errors"
"io"
"os"
"sync"
"syscall"
"time"

"github.com/golang/glog"
"github.com/google/mtail/internal/logline"
Expand Down Expand Up @@ -77,8 +75,7 @@ func (ps *fifoStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
if err != nil {
return err
}
b := make([]byte, defaultFifoReadBufferSize)
partial := bytes.NewBufferString("")
lr := NewLineReader(ps.sourcename, ps.lines, fd, defaultFifoReadBufferSize, ps.cancel)
var total int
wg.Add(1)
go func() {
Expand All @@ -92,30 +89,17 @@ func (ps *fifoStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
glog.Info(err)
}
logCloses.Add(ps.pathname, 1)
if partial.Len() > 0 {
ps.sendLine(ctx, partial)
}
lr.Finish(ctx)
close(ps.lines)
ps.cancel()
}()
SetReadDeadlineOnDone(ctx, fd)

for {
// Because we've opened in nonblocking mode, this Read can return
// straight away. If there are no writers, it'll return EOF (per
// `pipe(7)` and `read(2)`.) This is expected when `mtail` is
// starting at system init as the writer may not be ready yet.
n, err := fd.Read(b)
glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ps.sourcename, n, err)

if ps.staleTimer != nil {
ps.staleTimer.Stop()
}
n, err := lr.ReadAndSend(ctx)

if n > 0 {
total += n
ps.decodeAndSend(ctx, n, b[:n], partial)
ps.staleTimer = time.AfterFunc(time.Hour*24, ps.cancel)

// No error implies there is more to read so restart the loop.
if err == nil && ctx.Err() == nil {
Expand All @@ -134,6 +118,10 @@ func (ps *fifoStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake

// Test to see if we should exit.
if IsExitableError(err) {
// Because we've opened in nonblocking mode, this Read can return
// straight away. If there are no writers, it'll return EOF (per
// `pipe(7)` and `read(2)`.) This is expected when `mtail` is
// starting at system init as the writer may not be ready yet.
if !(errors.Is(err, io.EOF) && total == 0) {
glog.V(2).Infof("stream(%s): exiting, stream has error %s", ps.sourcename, err)
return
Expand Down
8 changes: 4 additions & 4 deletions internal/tailer/logstream/fifostream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"golang.org/x/sys/unix"
)

func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) {
func TestFifoStreamReadCompletedBecauseClosed(t *testing.T) {
testutil.TimeoutTest(1*time.Second, func(t *testing.T) { //nolint:thelper
var wg sync.WaitGroup

Expand Down Expand Up @@ -63,7 +63,7 @@ func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) {
})(t)
}

func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {
func TestFifoStreamReadCompletedBecauseCancel(t *testing.T) {
testutil.TimeoutTest(1*time.Second, func(t *testing.T) { // nolint:thelper
var wg sync.WaitGroup

Expand Down Expand Up @@ -98,7 +98,7 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {
})(t)
}

func TestPipeStreamReadURL(t *testing.T) {
func TestFifoStreamReadURL(t *testing.T) {
var wg sync.WaitGroup

tmpDir := testutil.TestTempDir(t)
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestPipeStreamReadURL(t *testing.T) {
cancel() // no-op for pipes
}

func TestPipeStreamReadStdin(t *testing.T) {
func TestFifoStreamReadStdin(t *testing.T) {
var wg sync.WaitGroup

tmpDir := testutil.TestTempDir(t)
Expand Down
Loading
Loading