Skip to content

Commit

Permalink
fix: LogStreams create and close their own lines channels.
Browse files Browse the repository at this point in the history
This makes it easier to observe logstream state externally because we can read
the lines channel until it closes.

The tests are changed slightly to asynchronously drain the output channel at
the start of the stream.

Fix some naming issues and log formatting.
  • Loading branch information
jaqx0r committed Jun 24, 2024
1 parent 8d2101c commit 1d5c5e2
Show file tree
Hide file tree
Showing 14 changed files with 278 additions and 224 deletions.
86 changes: 46 additions & 40 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type dgramStream struct {
cancel context.CancelFunc

lines chan<- *logline.LogLine
lines chan *logline.LogLine

scheme string // Datagram scheme, either "unixgram" or "udp".
address string // Given name for the underlying socket path on the filesystem or hostport.
Expand All @@ -28,78 +28,79 @@ type dgramStream struct {
lastReadTime time.Time // Last time a log line was read from this named pipe
}

func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) {
func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) {
if address == "" {
return nil, ErrEmptySocketAddress
}
ctx, cancel := context.WithCancel(ctx)
ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines}
ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)}
if err := ss.stream(ctx, wg, waker, oneShot); err != nil {
return nil, err
}
return ss, nil
}

func (ss *dgramStream) LastReadTime() time.Time {
ss.mu.RLock()
defer ss.mu.RUnlock()
return ss.lastReadTime
func (ds *dgramStream) LastReadTime() time.Time {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.lastReadTime
}

// The read buffer size for datagrams.
const datagramReadBufferSize = 131072

func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error {
c, err := net.ListenPacket(ss.scheme, ss.address)
func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error {
c, err := net.ListenPacket(ds.scheme, ds.address)
if err != nil {
logErrors.Add(ss.address, 1)
logErrors.Add(ds.address, 1)
return err
}
glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ss.scheme, ss.address, c)
glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ds.scheme, ds.address, c)
b := make([]byte, datagramReadBufferSize)
partial := bytes.NewBufferString("")
var total int
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
glog.V(2).Infof("stream(%s:%s): read total %d bytes", ss.scheme, ss.address, total)
glog.V(2).Infof("stream(%s:%s): closing connection", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): read total %d bytes", ds.scheme, ds.address, total)
glog.V(2).Infof("stream(%s:%s): closing connection", ds.scheme, ds.address)
err := c.Close()
if err != nil {
logErrors.Add(ss.address, 1)
logErrors.Add(ds.address, 1)
glog.Info(err)
}
logCloses.Add(ss.address, 1)
ss.mu.Lock()
ss.completed = true
ss.mu.Unlock()
ss.Stop()
logCloses.Add(ds.address, 1)
ds.mu.Lock()
ds.completed = true
close(ds.lines)
ds.mu.Unlock()
ds.Stop()
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
SetReadDeadlineOnDone(ctx, c)

for {
n, _, err := c.ReadFrom(b)
glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ss.scheme, ss.address, n, err)
glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ds.scheme, ds.address, n, err)

// This is a test-only trick that says if we've already put this
// logstream in graceful shutdown, then a zero-byte read is
// equivalent to an "EOF" in connection and file oriented streams.
if n == 0 {
if oneShot {
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ds.scheme, ds.address)
if partial.Len() > 0 {
sendLine(ctx, ss.address, partial, ss.lines)
sendLine(ctx, ds.address, partial, ds.lines)
}
return
}
select {
case <-ctx.Done():
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ds.scheme, ds.address)
if partial.Len() > 0 {
sendLine(ctx, ss.address, partial, ss.lines)
sendLine(ctx, ds.address, partial, ds.lines)
}
return
default:
Expand All @@ -109,46 +110,51 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
if n > 0 {
total += n
//nolint:contextcheck
decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial)
ss.mu.Lock()
ss.lastReadTime = time.Now()
ss.mu.Unlock()
decodeAndSend(ctx, ds.lines, ds.address, n, b[:n], partial)
ds.mu.Lock()
ds.lastReadTime = time.Now()
ds.mu.Unlock()
}

if err != nil && IsEndOrCancel(err) {
if partial.Len() > 0 {
sendLine(ctx, ss.address, partial, ss.lines)
sendLine(ctx, ds.address, partial, ds.lines)
}
glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ss.scheme, ss.address, err)
glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ds.scheme, ds.address, err)
return
}

// Yield and wait
glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): waiting", ds.scheme, ds.address)
select {
case <-ctx.Done():
// We may have started waiting here when the stop signal
// arrives, but since that wait the file may have been
// written to. The file is not technically yet at EOF so
// we need to go back and try one more read. We'll exit
// the stream in the zero byte handler above.
glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ds.scheme, ds.address)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): Wake received", ds.scheme, ds.address)
}
}
}()
return nil
}

func (ss *dgramStream) IsComplete() bool {
ss.mu.RLock()
defer ss.mu.RUnlock()
return ss.completed
func (ds *dgramStream) IsComplete() bool {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.completed
}

func (ss *dgramStream) Stop() {
glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ss.scheme, ss.address)
ss.cancel()
func (ds *dgramStream) Stop() {
glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ds.scheme, ds.address)
ds.cancel()
}

// Lines implements the LogStream interface, returning the output lines channel.
func (ds *dgramStream) Lines() <-chan *logline.LogLine {
return ds.lines
}
36 changes: 18 additions & 18 deletions internal/tailer/logstream/dgramstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,19 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
default:
t.Fatalf("bad scheme %s", scheme)
}
lines := make(chan *logline.LogLine, 1)

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotEnabled)
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled)
testutil.FatalIfErr(t, err)

expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines())

s, err := net.Dial(scheme, addr)
testutil.FatalIfErr(t, err)

Expand All @@ -59,18 +64,13 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
testutil.FatalIfErr(t, err)

wg.Wait()
close(lines)

received := testutil.LinesReceived(lines)
expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
}
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
checkLineDiff()

cancel()
wg.Wait()

if !ss.IsComplete() {
if !ds.IsComplete() {
t.Errorf("expecting dgramstream to be complete because socket closed")
}
}))
Expand All @@ -93,14 +93,19 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
default:
t.Fatalf("bad scheme %s", scheme)
}
lines := make(chan *logline.LogLine, 1)

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotDisabled)
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled)
testutil.FatalIfErr(t, err)

expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines())

s, err := net.Dial(scheme, addr)
testutil.FatalIfErr(t, err)

Expand All @@ -112,15 +117,10 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
cancel() // This cancellation should cause the stream to shut down.

wg.Wait()
close(lines)

received := testutil.LinesReceived(lines)
expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
}
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
checkLineDiff()

if !ss.IsComplete() {
if !ds.IsComplete() {
t.Errorf("expecting dgramstream to be complete because cancel")
}
}))
Expand Down
14 changes: 11 additions & 3 deletions internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var fileTruncates = expvar.NewMap("file_truncates_total")
type fileStream struct {
cancel context.CancelFunc

lines chan<- *logline.LogLine
lines chan *logline.LogLine

pathname string // Given name for the underlying file on the filesystem

Expand All @@ -46,9 +46,9 @@ type fileStream struct {
}

// newFileStream creates a new log stream from a regular file.
func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) {
func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, oneShot OneShotMode) (LogStream, error) {
ctx, cancel := context.WithCancel(ctx)
fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: lines}
fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)}
// Stream from the start of the file when in one shot mode.
streamFromStart := oneShot == OneShotEnabled
if err := fs.stream(ctx, wg, waker, fi, oneShot, streamFromStart); err != nil {
Expand Down Expand Up @@ -163,6 +163,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
fs.mu.Lock()
fs.completed = true
close(fs.lines)
fs.mu.Unlock()
return
}
Expand Down Expand Up @@ -227,6 +228,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
fs.mu.Lock()
fs.completed = true
close(fs.lines)
fs.mu.Unlock()
return
}
Expand All @@ -238,6 +240,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
fs.mu.Lock()
fs.completed = true
close(fs.lines)
fs.mu.Unlock()
return
default:
Expand Down Expand Up @@ -280,3 +283,8 @@ func (fs *fileStream) IsComplete() bool {
func (fs *fileStream) Stop() {
fs.cancel()
}

// Lines implements the LogStream interface, returning the output lines channel.
func (fs *fileStream) Lines() <-chan *logline.LogLine {
return fs.lines
}
Loading

0 comments on commit 1d5c5e2

Please sign in to comment.