Skip to content

Commit

Permalink
Merge pull request #15 from pawelgaczynski/dev
Browse files Browse the repository at this point in the history
unused io_uring methods removed, tests coverage improved
  • Loading branch information
pawelgaczynski authored May 18, 2023
2 parents 2247bea + ec6c58f commit 49bacf0
Show file tree
Hide file tree
Showing 37 changed files with 1,607 additions and 727 deletions.
4 changes: 0 additions & 4 deletions acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"unsafe"

"github.com/pawelgaczynski/gain/iouring"
"github.com/pawelgaczynski/gain/pkg/errors"
"github.com/pawelgaczynski/gain/pkg/socket"
)

Expand Down Expand Up @@ -53,9 +52,6 @@ func (a *acceptor) addAcceptConnRequest() error {
}

conn := a.connectionManager.getFd(a.fd)
if conn == nil {
return errors.ErrConnectionIsMissing
}
conn.state = connAccept

return nil
Expand Down
18 changes: 10 additions & 8 deletions common_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ func (t *connServerTester) waitForWrites() {
t.writeWG.Wait()
}

func (t *connServerTester) onReadCallback(conn gain.Conn, n int) {
func (t *connServerTester) onReadCallback(conn gain.Conn, n int, _ string) {
buf, _ := conn.Next(n)
_, _ = conn.Write(buf)
}

func (t *connServerTester) onWriteCallback(_ gain.Conn, _ int) {
func (t *connServerTester) onWriteCallback(_ gain.Conn, _ int, _ string) {
if t.writeWG != nil {
t.mutex.Lock()

Expand All @@ -64,7 +64,7 @@ func (t *connServerTester) onWriteCallback(_ gain.Conn, _ int) {
}
}

func newConnServerTester(writeCount int, removeWGAfterMinWrites bool) *connServerTester {
func newConnServerTester(network string, writeCount int, removeWGAfterMinWrites bool) *connServerTester {
connServerTester := &connServerTester{}

if writeCount > 0 {
Expand All @@ -76,16 +76,18 @@ func newConnServerTester(writeCount int, removeWGAfterMinWrites bool) *connServe
connServerTester.removeWGAfterMinWrites = removeWGAfterMinWrites
}

testConnHandler := newTestServerHandler(connServerTester.onReadCallback)
testConnHandler := newTestServerHandler(connServerTester.onReadCallback, network)

testConnHandler.onWriteCallback = connServerTester.onWriteCallback
connServerTester.testServerHandler = testConnHandler

return connServerTester
}

func newEventHandlerTester(callbacks callbacksHolder) *testServerHandler {
testHandler := &testServerHandler{}
func newEventHandlerTester(callbacks callbacksHolder, network string) *testServerHandler {
testHandler := &testServerHandler{
network: network,
}

var (
startedWg sync.WaitGroup
Expand Down Expand Up @@ -289,7 +291,7 @@ func testConnAddress(

wg.Add(numberOfClients)

onReadCallback := func(conn gain.Conn, n int) {
onReadCallback := func(conn gain.Conn, n int, _ string) {
buf, _ := conn.Next(n)
_, _ = conn.Write(buf)

Expand All @@ -298,7 +300,7 @@ func testConnAddress(
wg.Done()
}

testHandler := newTestServerHandler(onReadCallback)
testHandler := newTestServerHandler(onReadCallback, network)

server := gain.NewServer(testHandler, config)
testPort := getTestPort()
Expand Down
51 changes: 30 additions & 21 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ type testServerConfig struct {
waitForDialAllClients bool
afterDial afterDialCallback
writesCount int
configOptions []gain.ConfigOption

readHandler onReadCallback
}

var defaultTestOnReadCallback = func(c gain.Conn, n int) {
var defaultTestOnReadCallback = func(c gain.Conn, n int, network string) {
buffer := make([]byte, 128)

_, err := c.Read(buffer)
Expand Down Expand Up @@ -93,14 +94,16 @@ type testServerHandler struct {
onCloseWg *sync.WaitGroup

finished atomic.Bool

network string
}

func (h *testServerHandler) OnStart(server gain.Server) {
if !h.finished.Load() {
h.startedWg.Done()

if h.onStartCallback != nil {
h.onStartCallback(server)
h.onStartCallback(server, h.network)
}

h.onStartCount.Add(1)
Expand All @@ -110,7 +113,7 @@ func (h *testServerHandler) OnStart(server gain.Server) {
func (h *testServerHandler) OnAccept(c gain.Conn) {
if !h.finished.Load() {
if h.onAcceptCallback != nil {
h.onAcceptCallback(c)
h.onAcceptCallback(c, h.network)
}

h.onAcceptCount.Add(1)
Expand All @@ -124,7 +127,7 @@ func (h *testServerHandler) OnAccept(c gain.Conn) {
func (h *testServerHandler) OnClose(c gain.Conn, err error) {
if !h.finished.Load() {
if h.onCloseCallback != nil {
h.onCloseCallback(c, err)
h.onCloseCallback(c, err, h.network)
}

h.onCloseCount.Add(1)
Expand All @@ -138,7 +141,7 @@ func (h *testServerHandler) OnClose(c gain.Conn, err error) {
func (h *testServerHandler) OnRead(conn gain.Conn, n int) {
if !h.finished.Load() {
if h.onReadCallback != nil {
h.onReadCallback(conn, n)
h.onReadCallback(conn, n, h.network)
}

h.onReadCount.Add(1)
Expand All @@ -152,7 +155,7 @@ func (h *testServerHandler) OnRead(conn gain.Conn, n int) {
func (h *testServerHandler) OnWrite(c gain.Conn, n int) {
if !h.finished.Load() {
if h.onWriteCallback != nil {
h.onWriteCallback(c, n)
h.onWriteCallback(c, n, h.network)
}

h.onWriteCount.Add(1)
Expand Down Expand Up @@ -208,8 +211,10 @@ func dialClientRW(t *testing.T, protocol string, port int,
clientConnChan <- conn
}

func newTestServerHandler(onReadCallback onReadCallback) *testServerHandler {
testHandler := &testServerHandler{}
func newTestServerHandler(onReadCallback onReadCallback, network string) *testServerHandler {
testHandler := &testServerHandler{
network: network,
}

var startedWg sync.WaitGroup

Expand Down Expand Up @@ -241,9 +246,13 @@ func testServer(t *testing.T, testConfig testServerConfig, architecture gain.Ser
gain.WithArchitecture(architecture),
}

if testConfig.configOptions != nil {
opts = append(opts, testConfig.configOptions...)
}

config := gain.NewConfig(opts...)

testHandler := newTestServerHandler(testConfig.readHandler)
testHandler := newTestServerHandler(testConfig.readHandler, testConfig.protocol)

server := gain.NewServer(testHandler, config)

Expand All @@ -266,7 +275,7 @@ func testServer(t *testing.T, testConfig testServerConfig, architecture gain.Ser
if testConfig.waitForDialAllClients {
clientConnectWG := new(sync.WaitGroup)
clientConnectWG.Add(testConfig.numberOfClients)
testHandler.onAcceptCallback = func(c gain.Conn) {
testHandler.onAcceptCallback = func(c gain.Conn, _ string) {
clientConnectWG.Done()
}

Expand Down Expand Up @@ -297,11 +306,11 @@ func testServer(t *testing.T, testConfig testServerConfig, architecture gain.Ser
}
clientRWWG.Add(testConfig.numberOfClients * testConfig.writesCount)
if testConfig.protocol == gainNet.TCP {
testHandler.onAcceptCallback = func(c gain.Conn) {
testHandler.onAcceptCallback = func(c gain.Conn, _ string) {
clientConnectWG.Done()
}
}
testHandler.onWriteCallback = func(c gain.Conn, n int) {
testHandler.onWriteCallback = func(c gain.Conn, n int, network string) {
clientRWWG.Done()
}
afterDial := deafultAfterDial
Expand Down Expand Up @@ -335,7 +344,7 @@ type RingBufferTestDataHandler struct {
testFinished atomic.Bool
}

func (r *RingBufferTestDataHandler) OnRead(conn gain.Conn, _ int) {
func (r *RingBufferTestDataHandler) OnRead(conn gain.Conn, _ int, _ string) {
buffer := make([]byte, 128)
bytesRead, readErr := conn.Read(buffer)

Expand Down Expand Up @@ -390,7 +399,7 @@ func testRingBuffer(t *testing.T, protocol string, architecture gain.ServerArchi

func testCloseServer(t *testing.T, network string, architecture gain.ServerArchitecture, doubleShutdown bool) {
t.Helper()
testHandler := newConnServerTester(10, false)
testHandler := newConnServerTester(network, 10, false)
server, port := newTestConnServer(t, network, false, architecture, testHandler.testServerHandler)
clientsGroup := newTestConnClientGroup(t, network, port, 10)
clientsGroup.Dial()
Expand Down Expand Up @@ -419,7 +428,7 @@ func testCloseServer(t *testing.T, network string, architecture gain.ServerArchi

func testCloseServerWithConnectedClients(t *testing.T, architecture gain.ServerArchitecture) {
t.Helper()
testHandler := newConnServerTester(10, false)
testHandler := newConnServerTester(gainNet.TCP, 10, false)
server, port := newTestConnServer(t, gainNet.TCP, false, architecture, testHandler.testServerHandler)

clientsGroup := newTestConnClientGroup(t, gainNet.TCP, port, 10)
Expand All @@ -438,7 +447,7 @@ func testCloseServerWithConnectedClients(t *testing.T, architecture gain.ServerA

func testCloseConn(t *testing.T, async bool, architecture gain.ServerArchitecture, justClose bool) {
t.Helper()
testHandler := newTestServerHandler(func(conn gain.Conn, n int) {
testHandler := newTestServerHandler(func(conn gain.Conn, n int, network string) {
if !justClose {
buf, err := conn.Next(n)
if err != nil {
Expand All @@ -455,7 +464,7 @@ func testCloseConn(t *testing.T, async bool, architecture gain.ServerArchitectur
if err != nil {
log.Panic(err)
}
})
}, gainNet.TCP)

server, port := newTestConnServer(t, gainNet.TCP, async, architecture, testHandler)

Expand Down Expand Up @@ -511,7 +520,7 @@ func testLargeRead(t *testing.T, network string, architecture gain.ServerArchite
var doneWg sync.WaitGroup

doneWg.Add(1)
onReadCallback := func(c gain.Conn, _ int) {
onReadCallback := func(c gain.Conn, _ int, _ string) {
readBuffer := make([]byte, doublePageSize)

n, cErr := c.Read(readBuffer)
Expand All @@ -530,7 +539,7 @@ func testLargeRead(t *testing.T, network string, architecture gain.ServerArchite
Equal(t, doublePageSize, n)
}

testConnHandler := newTestServerHandler(onReadCallback)
testConnHandler := newTestServerHandler(onReadCallback, network)
server, port := newTestConnServer(t, network, false, architecture, testConnHandler)

clientsGroup := newTestConnClientGroup(t, network, port, 1)
Expand Down Expand Up @@ -562,7 +571,7 @@ func testMultipleReads(t *testing.T, network string, asyncHandler bool, architec
)

doneWg.Add(int(expectedReads))
onReadCallback := func(c gain.Conn, _ int) {
onReadCallback := func(c gain.Conn, _ int, _ string) {
readBuffer := make([]byte, pageSize)

n, cErr := c.Read(readBuffer)
Expand All @@ -575,7 +584,7 @@ func testMultipleReads(t *testing.T, network string, asyncHandler bool, architec
Equal(t, pageSize, n)
}

testConnHandler := newTestServerHandler(onReadCallback)
testConnHandler := newTestServerHandler(onReadCallback, network)
server, port := newTestConnServer(t, network, asyncHandler, architecture, testConnHandler)

clientsGroup := newTestConnClientGroup(t, network, port, 1)
Expand Down
57 changes: 57 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2023 Paweł Gaczyński
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gain

import (
"testing"
"time"

"github.com/rs/zerolog"
. "github.com/stretchr/testify/require"
)

func TestConfig(t *testing.T) {
opts := []ConfigOption{
WithArchitecture(SocketSharding),
WithAsyncHandler(true),
WithGoroutinePool(true),
WithCPUAffinity(true),
WithProcessPriority(true),
WithWorkers(128),
WithCBPF(true),
WithLoadBalancing(LeastConnections),
WithSocketRecvBufferSize(4096),
WithSocketSendBufferSize(4096),
WithTCPKeepAlive(time.Second),
WithLoggerLevel(zerolog.DebugLevel),
WithPrettyLogger(true),
}

config := NewConfig(opts...)

Equal(t, SocketSharding, config.Architecture)
Equal(t, true, config.AsyncHandler)
Equal(t, true, config.GoroutinePool)
Equal(t, true, config.CPUAffinity)
Equal(t, true, config.ProcessPriority)
Equal(t, 128, config.Workers)
Equal(t, true, config.CBPFilter)
Equal(t, LeastConnections, config.LoadBalancing)
Equal(t, 4096, config.SocketRecvBufferSize)
Equal(t, 4096, config.SocketSendBufferSize)
Equal(t, time.Second, config.TCPKeepAlive)
Equal(t, zerolog.DebugLevel, config.LoggerLevel)
Equal(t, true, config.PrettyLogger)
}
Loading

0 comments on commit 49bacf0

Please sign in to comment.