Skip to content

Commit

Permalink
Add -wait-after-start for services that need more time
Browse files Browse the repository at this point in the history
  • Loading branch information
rwstauner committed Dec 30, 2017
1 parent f105f53 commit e573353
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 81 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# v0.7 (Unreleased)
# v0.7 (2017-12-30)

- Add "-wait-after-start" option for services that aren't quite ready at
the moment their port is open.

# v0.6 (2017-12-29)

Expand Down
24 changes: 14 additions & 10 deletions README.mkdn
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ automatically if a request to them is made.

One service can be configured on the command line:

-proxy # Address pairs to listen on/proxy to ("from:port to:port from2 to2")
-proxy-sep # Alternate character to separate proxy addresses
-timeout # Duration to wait before aborting connection
-stop-after # Duration after last connection to signal command to stop
-stop-signal # Signal to send to command (default it INT)
args... # Command to run
-proxy # Address pairs to listen on/proxy to ("from:port to:port from2 to2")
-proxy-sep # Alternate character to separate proxy addresses
-stop-after # Duration after last connection to signal command to stop
-stop-signal # Signal to send to command (default it INT)
-timeout # Duration to wait before aborting connection
-wait-after-start # Duration to wait after starting command before forwarding connections
args... # Command to run

Alternatively (or additionally) multiple services
can be configured by specifying a JSON configuration file:
Expand All @@ -51,14 +52,17 @@ with something like this:
":6543": "localhost:7654"
},
"Command": ["run", "some", "server"],
"Timeout": "10s",
"StopAfter": "10m",
"StopSignal": "TERM",
"Timeout": "10s",
"WaitAfterStart": "2s"
}
]
}

The command to run is optional, without it ynetd is just a port forwarder:
Everything is optional (except for "proxy").

Without a command to run, ynetd can be used as a simple port forwarder:

ynetd -proxy "localhost:5001 remote.host:8080"

Expand All @@ -67,14 +71,14 @@ or as "interface:name:port" ("interface:eth0:5001") to listen on
(all addresses of) the named interface.

This can be useful if you don't know the ip address up front
but want to "forward from public interface to loopback"
but want to forward from public interface to loopback
to avoid having to use additional port numbers:

-proxy "interface:eth0:5000 localhost:5000"

## Why?

- To reduce memory consumption for services until they are needed.
- To reduce resource utilization for services until they are needed.
- To learn a little go.

## Why the name?
Expand Down
29 changes: 17 additions & 12 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ var proxySpec string
var timeout = DefaultTimeout
var stopAfter = DefaultStopAfter
var stopSignal = "INT"
var waitAfterStart = DefaultWaitAfterStart

func init() {
const (
configUsage = "Path to configuration file"
listenUsage = "Address to listen on (deprecated)"
proxySepUsage = "Separator character for -proxy"
proxyUsage = "Addresses to proxy, separated by spaces (\"fromhost:port tohost:port from to\")"
timeoutUsage = "Duration of time to allow command to start up"
stopAfterUsage = "Duration of time after the last client connection to stop the command"
stopSignalUsage = "Signal to send to stop"
configUsage = "Path to configuration file"
listenUsage = "Address to listen on (deprecated)"
proxySepUsage = "Separator character for -proxy"
proxyUsage = "Addresses to proxy, separated by spaces (\"fromhost:port tohost:port from to\")"
timeoutUsage = "Duration of time to allow connections to attempt to forward"
stopAfterUsage = "Duration of time after the last client connection to stop the command"
stopSignalUsage = "Signal to send to stop"
waitAfterStartUsage = "Duration of time to wait while command starts before forwarding"
)

flag.StringVar(&configfile, "config", "", configUsage)
Expand All @@ -43,6 +45,8 @@ func init() {

flag.DurationVar(&timeout, "timeout", timeout, timeoutUsage)
flag.DurationVar(&timeout, "t", timeout, timeoutUsage+" (shorthand)")

flag.DurationVar(&waitAfterStart, "wait-after-start", waitAfterStart, waitAfterStartUsage)
}

// Load config from cli arguments.
Expand Down Expand Up @@ -85,11 +89,12 @@ func Load(args []string) (cfg Config, err error) {

if len(proxy) > 0 {
cfg.Services = append(cfg.Services, Service{
Proxy: proxy,
Command: args,
Timeout: timeout.String(),
StopAfter: stopAfter.String(),
StopSignal: stopSignal,
Proxy: proxy,
Command: args,
Timeout: timeout.String(),
StopAfter: stopAfter.String(),
StopSignal: stopSignal,
WaitAfterStart: waitAfterStart.String(),
})
}

Expand Down
10 changes: 9 additions & 1 deletion config/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestLoadArgs(t *testing.T) {
listenAddress = ""
proxySpec = ":5000 localhost:5001 some:6001 some:7001"
timeout = 2 * time.Second
waitAfterStart = 500 * time.Millisecond
cfg, err := Load([]string{"foo", "bar"})

if err != nil {
Expand All @@ -47,6 +48,9 @@ func TestLoadArgs(t *testing.T) {
if svc.Timeout != "2s" {
t.Errorf("Timeout incorrect: %s", svc.Timeout)
}
if svc.WaitAfterStart != "500ms" {
t.Errorf("WaitAfterStart incorrect: %s", svc.WaitAfterStart)
}
}

func TestLoadProxySep(t *testing.T) {
Expand Down Expand Up @@ -147,7 +151,8 @@ func TestLoadConfigFile(t *testing.T) {
{
"Proxy": {":5000": "localhost:5001"},
"Command": ["3", "4"],
"Timeout": "15ms"
"Timeout": "15ms",
"WaitAfterStart": "25ms"
}
]
}`))
Expand Down Expand Up @@ -175,6 +180,9 @@ func TestLoadConfigFile(t *testing.T) {
if svc.Timeout != "15ms" {
t.Errorf("Timeout incorrect: %s", svc.Timeout)
}
if svc.WaitAfterStart != "25ms" {
t.Errorf("WaitAfterStart incorrect: %s", svc.WaitAfterStart)
}
}

func TestLoadConfigFileError(t *testing.T) {
Expand Down
25 changes: 21 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,28 @@ type Config struct {
Services []Service
}

// DefaultTimeout is the default timeout duration for new connections
// to proxy to the service.
// DefaultTimeout is the default duration to allow new connections
// to attempt to forward to the service.
var DefaultTimeout = 5 * time.Minute

// DefaultStopAfter is the default duration of inactivity after which
// a command will be stopped.
// The default is zero (the command will not be stopped).
// a command will be stopped (zero means the command will not be stopped).
var DefaultStopAfter = time.Duration(0)

// DefaultWaitAfterStart is the default duration to wait after starting a
// command before forwarding connections (zero means as soon as the port is open).
var DefaultWaitAfterStart = time.Duration(0)

// ParseDuration is a wrapper around time.ParseDuration.
// It returns the provided default if the string is blank
// and panics if there is an error in parsing.
func ParseDuration(str string, defaultVal time.Duration) time.Duration {
if str == "" {
return defaultVal
}
duration, err := time.ParseDuration(str)
if err != nil {
panic(err)
}
return duration
}
20 changes: 11 additions & 9 deletions config/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ func TestParseConfigFile(t *testing.T) {
"Command": ["sleep", "1"],
"StopAfter": "10m",
"StopSignal": "INT",
"Timeout": "150ms"
"Timeout": "150ms",
"WaitAfterStart": "250ms"
},
{
"Proxy": {":3001": "localhost:4001"},
"Command": ["sleep", "2"],
"StopAfter": "11m",
"StopSignal": "TERM",
"Timeout": "151ms"
"Timeout": "151ms",
"WaitAfterStart": "251ms"
}
]
}`))
Expand All @@ -45,24 +47,24 @@ func TestParseConfigFile(t *testing.T) {
}

for i, svc := range cfg.Services {
if len(svc.Proxy) != 1 {
t.Errorf("unexpected proxy: %v", svc.Proxy)
}
if svc.Proxy[fmt.Sprintf(":%d", i+3000)] != fmt.Sprintf("localhost:%d", i+4000) {
t.Errorf("unexpected proxy: %v", svc.Proxy)
if len(svc.Proxy) != 1 || svc.Proxy[fmt.Sprintf(":%d", i+3000)] != fmt.Sprintf("localhost:%d", i+4000) {
t.Errorf("unexpected Proxy: %v", svc.Proxy)
}
if fmt.Sprintf("%s", svc.Command) != fmt.Sprintf("[sleep %d]", i+1) {
t.Errorf("unexpected command: %s", svc.Command)
t.Errorf("unexpected Command: %s", svc.Command)
}
if svc.Timeout != fmt.Sprintf("%dms", i+150) {
t.Errorf("unexpected timeout: %s", svc.Timeout)
t.Errorf("unexpected Timeout: %s", svc.Timeout)
}
if svc.StopAfter != fmt.Sprintf("%dm", i+10) {
t.Errorf("unexpected StopAfter: %s", svc.StopAfter)
}
if svc.StopSignal != []string{"INT", "TERM"}[i] {
t.Errorf("unexpected StopSignal: %s", svc.StopSignal)
}
if svc.WaitAfterStart != fmt.Sprintf("%dms", i+250) {
t.Errorf("unexpected WaitAfterStart: %s", svc.WaitAfterStart)
}
}
}

Expand Down
11 changes: 6 additions & 5 deletions config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package config

// Service holds string representations of Service attributes.
type Service struct {
Proxy map[string]string
Command []string
Timeout string
StopAfter string
StopSignal string
Proxy map[string]string
Command []string
StopAfter string
StopSignal string
Timeout string
WaitAfterStart string
}
24 changes: 18 additions & 6 deletions procman/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"os/exec"
"syscall"
"time"

"github.com/hashicorp/go-reap"
"github.com/rwstauner/ynetd/config"
Expand All @@ -12,15 +13,15 @@ import (
// ProcessManager manages launching and reaping of processes.
type ProcessManager struct {
procs map[int]*Process // pid -> proc
launcher chan *Process
launcher chan *launchRequest
stopper chan *Process
signals chan os.Signal
}

// New returns a new ProcessManager.
func New() *ProcessManager {
return &ProcessManager{
launcher: make(chan *Process),
launcher: make(chan *launchRequest),
stopper: make(chan *Process),
procs: make(map[int]*Process),
signals: make(chan os.Signal),
Expand All @@ -34,9 +35,10 @@ func (m *ProcessManager) Process(cfg config.Service) *Process {
return nil
}
return &Process{
argv: cfg.Command,
manager: m,
stopSignal: getSignal(cfg.StopSignal, syscall.SIGINT),
argv: cfg.Command,
manager: m,
stopSignal: getSignal(cfg.StopSignal, syscall.SIGINT),
waitAfterStart: config.ParseDuration(cfg.WaitAfterStart, config.DefaultWaitAfterStart),
}
}

Expand Down Expand Up @@ -95,11 +97,21 @@ func (m *ProcessManager) Manage() {
}
return

case proc := <-m.launcher:
case req := <-m.launcher:
proc := req.process
if proc.cmd == nil {
proc.cmd = m.launch(proc)
proc.started = time.Now()
if proc.waitAfterStart > 0 {
proc.waitUntil = proc.started.Add(proc.waitAfterStart)
}
m.procs[proc.cmd.Process.Pid] = proc
}
if proc.waitAfterStart > 0 && proc.waitUntil.After(time.Now()) {
time.AfterFunc(proc.waitUntil.Sub(time.Now()), func() { req.ready <- true })
} else {
req.ready <- true
}

case proc := <-m.stopper:
if proc.cmd != nil {
Expand Down
24 changes: 17 additions & 7 deletions procman/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,31 @@ import (
"fmt"
"os"
"os/exec"
"sync"
"time"
)

// Process represents a command managed by a ProcessManager.
type Process struct {
argv []string
cmd *exec.Cmd
manager *ProcessManager
mutex *sync.Mutex
stopSignal os.Signal
argv []string
cmd *exec.Cmd
manager *ProcessManager
started time.Time
stopSignal os.Signal
waitAfterStart time.Duration
waitUntil time.Time
}

type launchRequest struct {
process *Process
ready chan bool
}

// LaunchOnce launches the process if it isn't already running.
func (p *Process) LaunchOnce() {
p.manager.launcher <- p
ready := make(chan bool)
req := &launchRequest{process: p, ready: ready}
p.manager.launcher <- req
<-ready
}

// Stop sends the configured signal to the process.
Expand Down
17 changes: 2 additions & 15 deletions service/config.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,10 @@
package service

import (
"time"

"github.com/rwstauner/ynetd/config"
"github.com/rwstauner/ynetd/procman"
)

func parseDuration(str string, defaultVal time.Duration) time.Duration {
if str == "" {
return defaultVal
}
duration, err := time.ParseDuration(str)
if err != nil {
panic(err)
}
return duration
}

// New returns the address to a new Service based on the provided Config.
func New(c config.Service, pm *procman.ProcessManager) (svc *Service, err error) {
defer func() {
Expand All @@ -28,8 +15,8 @@ func New(c config.Service, pm *procman.ProcessManager) (svc *Service, err error)
svc = &Service{
Proxy: c.Proxy,
Command: pm.Process(c),
Timeout: parseDuration(c.Timeout, config.DefaultTimeout),
StopAfter: parseDuration(c.StopAfter, config.DefaultStopAfter),
Timeout: config.ParseDuration(c.Timeout, config.DefaultTimeout),
StopAfter: config.ParseDuration(c.StopAfter, config.DefaultStopAfter),
}
return
}
Loading

0 comments on commit e573353

Please sign in to comment.