-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
144 lines (127 loc) · 3.28 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package hconx
import (
"fmt"
"net"
"sync"
)
const (
defaultTcpPort = ":8086"
)
//Server
type Server struct {
listener *net.TCPListener
handler OnConnHandle
connectMap map[int64]*Connection
locker *sync.RWMutex
stopChan chan struct{}
}
// handleConn loop handle new message from conn
func (s *Server) handleConn(conn *Connection) {
s.addConnection(conn)
defer s.removeConnection(conn)
for {
errRead := conn.readMessage()
if errRead == nil {
if s.handler != nil {
errHandler := s.handler(conn)
if errHandler != nil {
Error(fmt.Sprintf("Server.handler error ClientID:%v err:%v", conn.ConnIndex, errHandler))
}
}
} else {
//discard current request
Error(fmt.Sprintf("Server.handleConn error ClientID:%v err:%v", conn.ConnIndex, errRead))
break
}
}
}
// SetOnConnHandle set handler on new conn receive
func (s *Server) SetOnConnHandle(handler OnConnHandle) {
s.handler = handler
}
// getConnection get connection with ConnIndex
func (s *Server) getConnection(index int64) (*Connection, bool) {
conn, isExists := s.connectMap[index]
return conn, isExists
}
// addConnection add new connection
func (s *Server) addConnection(conn *Connection) {
s.locker.Lock()
defer s.locker.Unlock()
s.connectMap[conn.ConnIndex] = conn
Trace(fmt.Sprintf("Server addConnection %v", conn.RemoteAddr()))
}
// removeConnection remove connection
func (s *Server) removeConnection(conn *Connection) {
s.locker.Lock()
defer s.locker.Unlock()
conn.Close()
delete(s.connectMap, conn.ConnIndex)
Trace(fmt.Sprintf("Server removeConnection %v", conn.RemoteAddr()))
}
// GetConnCount get connection count on current Server
func (s *Server) GetConnectionCount() int {
s.locker.RLock()
defer s.locker.RUnlock()
return len(s.connectMap)
}
// GetConnMap get connection map on current Server
func (s *Server) GetConnectionMap() map[int64]*Connection {
return s.connectMap
}
// AddConnection add new connection
func (s *Server) AddConnection(conn *Connection) {
s.addConnection(conn)
}
// RemoveConnection remove connection with ConnIndex
func (s *Server) RemoveConnection(connIndex int64) {
s.locker.RLock()
conn, isExists := s.getConnection(connIndex)
s.locker.RUnlock()
if !isExists {
return
}
s.removeConnection(conn)
}
// Start start loop handler conn
func (s *Server) Start() {
for {
select {
case <-s.stopChan:
Info(fmt.Sprint("Server get stop signal, so stop server loop handle conn"))
break
default:
conn, err := s.listener.Accept()
if err != nil {
Error(fmt.Sprint("Server accept listener error ", err))
} else {
Trace(fmt.Sprintf("Server received new connection from %v", conn.RemoteAddr()))
go s.handleConn(NewConnction(conn))
}
}
}
}
// Stop send stop signal
func (s *Server) Stop() {
s.stopChan <- struct{}{}
}
// GetNewServer get new server with tcp port
func NewServer(tcpPort string, handler OnConnHandle) (*Server, error) {
if tcpPort == "" {
tcpPort = defaultTcpPort
}
var s *Server
s = &Server{
handler: handler,
locker: new(sync.RWMutex),
connectMap: make(map[int64]*Connection),
stopChan: make(chan struct{}),
}
tcpAddr, err := net.ResolveTCPAddr("tcp", tcpPort)
if err != nil {
return s, err
}
s.listener, err = net.ListenTCP("tcp", tcpAddr)
Trace(fmt.Sprintf("NewServer %v", tcpAddr))
return s, err
}