Skip to content

Commit

Permalink
Merge pull request #15 from m1k1o/hls-exit-on-error
Browse files Browse the repository at this point in the history
HLS: add error to OnStop.
  • Loading branch information
m1k1o authored Sep 28, 2021
2 parents 133e266 + 903b44c commit 50049a2
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 26 deletions.
76 changes: 51 additions & 25 deletions hls/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// how often should be cleanup called
const cleanupPeriod = 4 * time.Second

// timeot for first playlist, when it waits for new data
// timeout for first playlist, when it waits for new data
const playlistTimeout = 60 * time.Second

// minimum segments available to consider stream as active
Expand All @@ -40,7 +40,7 @@ type ManagerCtx struct {
events struct {
onStart func()
onCmdLog func(message string)
onStop func()
onStop func(err error)
}

cmd *exec.Cmd
Expand Down Expand Up @@ -92,7 +92,7 @@ func (m *ManagerCtx) Start() error {
read, write := io.Pipe()
m.cmd.Stdout = write

//create a new process group
// create a new process group
m.cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

m.active = false
Expand All @@ -104,6 +104,7 @@ func (m *ManagerCtx) Start() error {
m.playlistLoad = make(chan string)
m.shutdown = make(chan interface{})

// read playlist on stdout
go func() {
buf := make([]byte, 1024)

Expand All @@ -125,14 +126,15 @@ func (m *ManagerCtx) Start() error {
}
}

// if stdout pipe has been closed
if err != nil {
// When command fails to start, we reach this branch after a while
m.logger.Err(err).Msg("cmd read failed")
return
}
}
}()

// periodic cleanup
go func() {
ticker := time.NewTicker(cleanupPeriod)
defer ticker.Stop()
Expand All @@ -152,21 +154,54 @@ func (m *ManagerCtx) Start() error {
m.events.onStart()
}

return m.cmd.Start()
// start program
err = m.cmd.Start()

// wait for program to exit
go func() {
err = m.cmd.Wait()
if err != nil {
if exiterr, ok := err.(*exec.ExitError); ok {
// The program has exited with an exit code != 0

// This works on both Unix and Windows. Although package
// syscall is generally platform dependent, WaitStatus is
// defined for both Unix and Windows and in both cases has
// an ExitStatus() method with the same signature.
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
m.logger.Warn().Int("exit-status", status.ExitStatus()).Msg("the program has exited with an exit code != 0")
}
} else {
m.logger.Err(err).Msg("the program has exited with an error")
}
} else {
m.logger.Info().Msg("the program has successfully exited")
}

close(m.shutdown)

if m.events.onStop != nil {
m.events.onStop(err)
}

err := os.RemoveAll(m.tempdir)
m.logger.Err(err).Msg("removing tempdir")

m.mu.Lock()
m.cmd = nil
m.mu.Unlock()
}()

return err
}

func (m *ManagerCtx) Stop() {
m.mu.Lock()
defer m.mu.Unlock()

if m.cmd == nil {
return
}

m.logger.Debug().Msg("performing stop")
close(m.shutdown)
if m.cmd != nil && m.cmd.Process != nil {
m.logger.Debug().Msg("performing stop")

if m.cmd.Process != nil {
pgid, err := syscall.Getpgid(m.cmd.Process.Pid)
if err == nil {
err := syscall.Kill(-pgid, syscall.SIGKILL)
Expand All @@ -176,15 +211,6 @@ func (m *ManagerCtx) Stop() {
err := m.cmd.Process.Kill()
m.logger.Err(err).Msg("killing proccess")
}
_ = m.cmd.Wait()
m.cmd = nil
}

err := os.RemoveAll(m.tempdir)
m.logger.Err(err).Msg("removing tempdir")

if m.events.onStop != nil {
m.events.onStop()
}
}

Expand Down Expand Up @@ -225,14 +251,14 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) {
if !m.active {
select {
case playlist = <-m.playlistLoad:
// when command exits before providing any playlist
case <-m.shutdown:
m.logger.Warn().Msg("playlist load failed because of shutdown")
// When command failed to start and timeout has been increased we reach this branch after a while
http.Error(w, "404 playlist not found", http.StatusNotFound)
http.Error(w, "500 playlist not available", http.StatusInternalServerError)
return
case <-time.After(playlistTimeout):
m.logger.Warn().Msg("playlist load channel timeouted")
http.Error(w, "500 not available", http.StatusInternalServerError)
http.Error(w, "504 playlist timeout", http.StatusGatewayTimeout)
return
}
}
Expand Down Expand Up @@ -269,6 +295,6 @@ func (m *ManagerCtx) OnCmdLog(event func(message string)) {
m.events.onCmdLog = event
}

func (m *ManagerCtx) OnStop(event func()) {
func (m *ManagerCtx) OnStop(event func(err error)) {
m.events.onStop = event
}
2 changes: 1 addition & 1 deletion hls/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ type Manager interface {

OnStart(event func())
OnCmdLog(event func(message string))
OnStop(event func())
OnStop(event func(err error))
}

0 comments on commit 50049a2

Please sign in to comment.