diff --git a/.gitignore b/.gitignore index 0f80e41c..8ce409d2 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,7 @@ log /examples/zinx_websocket/minicode/miniprogram_npm rebase +examples/c10k_test/bin/server +examples/c10k_test/bin/client + +examples/c10k_test/bin/client_v2 diff --git a/examples/c10k_test/Makefile b/examples/c10k_test/Makefile new file mode 100644 index 00000000..afc6462b --- /dev/null +++ b/examples/c10k_test/Makefile @@ -0,0 +1,9 @@ +.PHONY: build clean + +build: + go build -o bin/server ./server/ + go build -o bin/client ./client/ + go build -o bin/client_v2 ./client_v2/ + +clean: + rm -rf bin/ diff --git a/examples/c10k_test/bin/conf/zinx.json b/examples/c10k_test/bin/conf/zinx.json new file mode 100644 index 00000000..8e0e09b4 --- /dev/null +++ b/examples/c10k_test/bin/conf/zinx.json @@ -0,0 +1,13 @@ +{ + "Name": "Zinx-Standard-App", + "Mode": "tcp", + "Host": "0.0.0.0", + "TcpPort": 8888, + "MaxConn": 10000, + "WorkerPoolSize": 10, + "MaxWorkerTaskLen": 1024, + "MaxPacketSize": 4096, + "LogFile": "zinx.log", + "LogIsolationLevel": 3, + "LogLevel": "error" +} diff --git a/examples/c10k_test/client/client.go b/examples/c10k_test/client/client.go new file mode 100644 index 00000000..3274bc87 --- /dev/null +++ b/examples/c10k_test/client/client.go @@ -0,0 +1,250 @@ +package main + +import ( + "encoding/binary" + "fmt" + "math" + "math/rand" + "net" + "sort" + "strings" + "sync" + "time" +) + +type Packet struct { + A int32 + B int32 + C int32 +} + +type Stats struct { + success int64 + failed int64 + respTimes []float64 + respMu sync.Mutex + histBuckets []int64 + histMu sync.RWMutex +} + +const ( + histBucketCount = 20 +) + +func NewStats() *Stats { + return &Stats{ + respTimes: make([]float64, 0, 100000), + histBuckets: make([]int64, histBucketCount), + } +} + +func (s *Stats) addSuccess(n int64) { + s.respMu.Lock() + s.success += n + s.respMu.Unlock() +} + +func (s *Stats) addFailed(n int64) { + s.respMu.Lock() + s.failed += n + s.respMu.Unlock() +} + +func (s *Stats) addRespTime(ms float64) { + s.respMu.Lock() + s.respTimes = append(s.respTimes, ms) + s.respMu.Unlock() + + bucket := int(ms / 50) + if bucket >= histBucketCount { + bucket = histBucketCount - 1 + } + s.histMu.Lock() + s.histBuckets[bucket]++ + s.histMu.Unlock() +} + +func (s *Stats) getPercentile(p float64) float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + index := int(math.Ceil(float64(len(s.respTimes))*p)) - 1 + if index < 0 { + index = 0 + } + if index >= len(s.respTimes) { + index = len(s.respTimes) - 1 + } + + sorted := make([]float64, len(s.respTimes)) + copy(sorted, s.respTimes) + sort.Float64s(sorted) + return sorted[index] +} + +func (s *Stats) getAvg() float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + var sum float64 + for _, t := range s.respTimes { + sum += t + } + return sum / float64(len(s.respTimes)) +} + +func (s *Stats) printHistogram() { + s.histMu.RLock() + defer s.histMu.RUnlock() + + fmt.Println("\n响应时间分布 (ms):") + fmt.Println(strings.Repeat("-", 50)) + + var total int64 + for _, v := range s.histBuckets { + total += v + } + + maxBucket := int64(0) + for _, v := range s.histBuckets { + if v > maxBucket { + maxBucket = v + } + } + + for i := 0; i < histBucketCount; i++ { + low := i * 50 + high := (i + 1) * 50 + count := s.histBuckets[i] + percentage := float64(count) / float64(total) * 100 + + barLen := 0 + if maxBucket > 0 { + barLen = int(float64(count) / float64(maxBucket) * 40) + } + bar := strings.Repeat("█", barLen) + + fmt.Printf("%4d-%4dms: %8d (%5.2f%%) %s\n", low, high, count, percentage, bar) + } + fmt.Println(strings.Repeat("-", 50)) +} + +func sendMsg(conn net.Conn, msgID uint32, data []byte) error { + header := make([]byte, 8) + binary.BigEndian.PutUint32(header[0:4], msgID) + binary.BigEndian.PutUint32(header[4:8], uint32(len(data))) + _, err := conn.Write(append(header, data...)) + return err +} + +func recvMsg(conn net.Conn) (uint32, []byte, error) { + header := make([]byte, 8) + if _, err := conn.Read(header); err != nil { + return 0, nil, err + } + msgID := binary.BigEndian.Uint32(header[0:4]) + dataLen := binary.BigEndian.Uint32(header[4:8]) + body := make([]byte, dataLen) + if _, err := conn.Read(body); err != nil { + return 0, nil, err + } + return msgID, body, nil +} + +func runClient(clientID int, host string, port int, repeat int, sem chan struct{}, stats *Stats, wg *sync.WaitGroup) { + defer wg.Done() + + sem <- struct{}{} + defer func() { <-sem }() + + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat)) + return + } + defer conn.Close() + + for i := 0; i < repeat; i++ { + a := int32(rand.Intn(99999) + 1) + b := int32(rand.Intn(99999) + 1) + c := int32(0) + + var data [12]byte + binary.BigEndian.PutUint32(data[0:4], uint32(a)) + binary.BigEndian.PutUint32(data[4:8], uint32(b)) + binary.BigEndian.PutUint32(data[8:12], uint32(c)) + + sendTime := time.Now() + + conn.SetWriteDeadline(time.Now().Add(15 * time.Second)) + if err := sendMsg(conn, 1001, data[:]); err != nil { + fmt.Printf("[client %d] send failed at i=%d: %v\n", clientID, i, err) + stats.addFailed(int64(repeat - i)) + break + } + + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + if _, _, err := recvMsg(conn); err != nil { + fmt.Printf("[client %d] recv failed at i=%d: %v\n", clientID, i, err) + stats.addFailed(int64(repeat - i)) + break + } else { + respTime := time.Since(sendTime).Seconds() * 1000 + stats.addSuccess(1) + stats.addRespTime(respTime) + } + + time.Sleep(time.Millisecond) + } +} + +func main() { + serverHost := "localhost" + serverPort := 8888 + totalConns := 10000 + repeatPerConn := 1000 + + fmt.Printf("🚀 启动压测: %d 并发连接, 每个连接请求 %d 次\n", totalConns, repeatPerConn) + startTime := time.Now() + + stats := NewStats() + sem := make(chan struct{}, 10000) + var wg sync.WaitGroup + + wg.Add(totalConns) + for i := 0; i < totalConns; i++ { + go runClient(i, serverHost, serverPort, repeatPerConn, sem, stats, &wg) + } + + wg.Wait() + duration := time.Since(startTime) + + qps := float64(stats.success) / duration.Seconds() + + fmt.Println("\n" + strings.Repeat("=", 50)) + fmt.Printf("🏁 压测报告\n") + fmt.Printf("总耗时: %.2f 秒\n", duration.Seconds()) + fmt.Printf("成功次数: %d\n", stats.success) + fmt.Printf("失败次数: %d\n", stats.failed) + fmt.Printf("有效 QPS: %.2f\n", qps) + fmt.Println(strings.Repeat("-", 50)) + fmt.Printf("响应时间 (ms):\n") + fmt.Printf(" 平均: %.2f\n", stats.getAvg()) + fmt.Printf(" 最小: %.2f\n", stats.getPercentile(0)) + fmt.Printf(" P50: %.2f\n", stats.getPercentile(0.50)) + fmt.Printf(" P90: %.2f\n", stats.getPercentile(0.90)) + fmt.Printf(" P95: %.2f\n", stats.getPercentile(0.95)) + fmt.Printf(" P99: %.2f\n", stats.getPercentile(0.99)) + fmt.Printf(" 最大: %.2f\n", stats.getPercentile(1.0)) + fmt.Println(strings.Repeat("=", 50)) + + stats.printHistogram() +} diff --git a/examples/c10k_test/client_v2/client.go b/examples/c10k_test/client_v2/client.go new file mode 100644 index 00000000..2eca6296 --- /dev/null +++ b/examples/c10k_test/client_v2/client.go @@ -0,0 +1,372 @@ +package main + +import ( + "encoding/binary" + "fmt" + "math" + "math/rand" + "net" + "sort" + "strings" + "sync" + "time" +) + +const ( + CmdCalculate = 1001 + CmdSmall = 2001 + CmdMedium = 3001 + CmdLarge = 4001 +) + +type Stats struct { + success int64 + failed int64 + respTimes []float64 + respMu sync.Mutex + histBuckets []int64 + histMu sync.RWMutex +} + +const ( + histBucketCount = 20 +) + +func NewStats() *Stats { + return &Stats{ + respTimes: make([]float64, 0, 100000), + histBuckets: make([]int64, histBucketCount), + } +} + +func (s *Stats) addSuccess(n int64) { + s.respMu.Lock() + s.success += n + s.respMu.Unlock() +} + +func (s *Stats) addFailed(n int64) { + s.respMu.Lock() + s.failed += n + s.respMu.Unlock() +} + +func (s *Stats) addRespTime(ms float64) { + s.respMu.Lock() + s.respTimes = append(s.respTimes, ms) + s.respMu.Unlock() + + bucket := int(ms / 50) + if bucket >= histBucketCount { + bucket = histBucketCount - 1 + } + s.histMu.Lock() + s.histBuckets[bucket]++ + s.histMu.Unlock() +} + +func (s *Stats) getPercentile(p float64) float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + index := int(math.Ceil(float64(len(s.respTimes))*p)) - 1 + if index < 0 { + index = 0 + } + if index >= len(s.respTimes) { + index = len(s.respTimes) - 1 + } + + sorted := make([]float64, len(s.respTimes)) + copy(sorted, s.respTimes) + sort.Float64s(sorted) + return sorted[index] +} + +func (s *Stats) getAvg() float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + var sum float64 + for _, t := range s.respTimes { + sum += t + } + return sum / float64(len(s.respTimes)) +} + +func (s *Stats) printHistogram() { + s.histMu.RLock() + defer s.histMu.RUnlock() + + fmt.Println("\n响应时间分布 (ms):") + fmt.Println(strings.Repeat("-", 50)) + + var total int64 + for _, v := range s.histBuckets { + total += v + } + + maxBucket := int64(0) + for _, v := range s.histBuckets { + if v > maxBucket { + maxBucket = v + } + } + + for i := 0; i < histBucketCount; i++ { + low := i * 50 + high := (i + 1) * 50 + count := s.histBuckets[i] + percentage := float64(count) / float64(total) * 100 + + barLen := 0 + if maxBucket > 0 { + barLen = int(float64(count) / float64(maxBucket) * 40) + } + bar := strings.Repeat("█", barLen) + + fmt.Printf("%4d-%4dms: %8d (%5.2f%%) %s\n", low, high, count, percentage, bar) + } + fmt.Println(strings.Repeat("-", 50)) +} + +func sendMsg(conn net.Conn, msgID uint32, data []byte) error { + header := make([]byte, 8) + binary.BigEndian.PutUint32(header[0:4], msgID) + binary.BigEndian.PutUint32(header[4:8], uint32(len(data))) + _, err := conn.Write(append(header, data...)) + return err +} + +func recvMsg(conn net.Conn) (uint32, []byte, error) { + header := make([]byte, 8) + if _, err := conn.Read(header); err != nil { + return 0, nil, err + } + msgID := binary.BigEndian.Uint32(header[0:4]) + dataLen := binary.BigEndian.Uint32(header[4:8]) + body := make([]byte, dataLen) + if _, err := conn.Read(body); err != nil { + return 0, nil, err + } + return msgID, body, nil +} + +func genCalculatePacket() []byte { + a := int32(rand.Intn(99999) + 1) + b := int32(rand.Intn(99999) + 1) + c := int32(0) + var data [12]byte + binary.BigEndian.PutUint32(data[0:4], uint32(a)) + binary.BigEndian.PutUint32(data[4:8], uint32(b)) + binary.BigEndian.PutUint32(data[8:12], uint32(c)) + return data[:] +} + +func genSmallPacket() []byte { + data := make([]byte, 16) + binary.BigEndian.PutUint32(data[0:4], uint32(rand.Intn(10000))) + for i := 4; i < 16; i++ { + data[i] = byte(rand.Intn(256)) + } + return data +} + +func genMediumPacket() []byte { + data := make([]byte, 64) + binary.BigEndian.PutUint32(data[0:4], uint32(rand.Intn(10000))) + binary.BigEndian.PutUint32(data[4:8], uint32(rand.Intn(100000))) + for i := 8; i < 64; i++ { + data[i] = byte(rand.Intn(256)) + } + return data +} + +func genLargePacket() []byte { + data := make([]byte, 256) + binary.BigEndian.PutUint32(data[0:4], uint32(rand.Intn(10000))) + binary.BigEndian.PutUint32(data[4:8], uint32(rand.Intn(100000))) + binary.BigEndian.PutUint32(data[8:12], uint32(rand.Intn(1000000))) + for i := 12; i < 256; i++ { + data[i] = byte(rand.Intn(256)) + } + return data +} + +func getRandomCmd() uint32 { + switch rand.Intn(4) { + case 0: + return CmdCalculate + case 1: + return CmdSmall + case 2: + return CmdMedium + case 3: + return CmdLarge + default: + return CmdCalculate + } +} + +func getPacketByCmd(cmd uint32) []byte { + switch cmd { + case CmdCalculate: + return genCalculatePacket() + case CmdSmall: + return genSmallPacket() + case CmdMedium: + return genMediumPacket() + case CmdLarge: + return genLargePacket() + default: + return genCalculatePacket() + } +} + +func runClient(clientID int, host string, port int, repeat int, sem chan struct{}, stats *Stats, wg *sync.WaitGroup) { + defer wg.Done() + + sem <- struct{}{} + defer func() { <-sem }() + + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat)) + return + } + + reconnectCount := 0 + + for i := 0; i < repeat; { + // 随机断开重连 (5% 概率) + if i > 0 && rand.Intn(20) == 0 { + conn.Close() + reconnectCount++ + // 模拟重连延迟 (10-100ms) + time.Sleep(time.Duration(rand.Intn(90)+10) * time.Millisecond) + + conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat - i)) + return + } + } + + // 随机决定是单发还是多发 + sendCount := 1 + if rand.Intn(10) < 3 { // 30% 概率发两个包 + sendCount = 2 + } + if i+sendCount > repeat { + sendCount = repeat - i + } + + sendTime := time.Now() + + // 发送多个包 + successSend := 0 + for j := 0; j < sendCount; j++ { + cmd := getRandomCmd() + data := getPacketByCmd(cmd) + conn.SetWriteDeadline(time.Now().Add(15 * time.Second)) + if err := sendMsg(conn, cmd, data); err != nil { + conn.Close() + reconnectCount++ + // 尝试重连 + time.Sleep(time.Duration(rand.Intn(100)+50) * time.Millisecond) + conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat - i - j)) + return + } + continue + } + successSend++ + } + + // 接收对应数量的回包 + successRecv := 0 + for j := 0; j < sendCount; j++ { + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + if _, _, err := recvMsg(conn); err != nil { + conn.Close() + reconnectCount++ + time.Sleep(time.Duration(rand.Intn(100)+50) * time.Millisecond) + conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat - i - j)) + return + } + j-- + continue + } + successRecv++ + } + + respTime := time.Since(sendTime).Seconds() * 1000 + stats.addSuccess(int64(successRecv)) + if successRecv > 0 { + stats.addRespTime(respTime / float64(successRecv)) + } + + i += successSend + + time.Sleep(time.Millisecond) + } + + conn.Close() +} + +func main() { + serverHost := "localhost" + serverPort := 8888 + totalConns := 10000 + repeatPerConn := 1000 + + fmt.Printf("🚀 启动压测 v2: %d 并发连接, 每个连接请求 %d 次\n", totalConns, repeatPerConn) + fmt.Printf(" - 命令号: 1001(12字节), 2001(16字节), 3001(64字节), 4001(256字节)\n") + fmt.Printf(" - 模式: 随机单发或多发(30%%概率发2包)\n") + fmt.Printf(" - 网络模拟: 随机断开重连(5%%概率), 重连延迟10-150ms\n") + startTime := time.Now() + + stats := NewStats() + sem := make(chan struct{}, 10000) + var wg sync.WaitGroup + + wg.Add(totalConns) + for i := 0; i < totalConns; i++ { + go runClient(i, serverHost, serverPort, repeatPerConn, sem, stats, &wg) + } + + wg.Wait() + duration := time.Since(startTime) + + qps := float64(stats.success) / duration.Seconds() + + fmt.Println("\n" + strings.Repeat("=", 50)) + fmt.Printf("🏁 压测报告 v2\n") + fmt.Printf("总耗时: %.2f 秒\n", duration.Seconds()) + fmt.Printf("成功次数: %d\n", stats.success) + fmt.Printf("失败次数: %d\n", stats.failed) + fmt.Printf("有效 QPS: %.2f\n", qps) + fmt.Println(strings.Repeat("-", 50)) + fmt.Printf("响应时间 (ms):\n") + fmt.Printf(" 平均: %.2f\n", stats.getAvg()) + fmt.Printf(" 最小: %.2f\n", stats.getPercentile(0)) + fmt.Printf(" P50: %.2f\n", stats.getPercentile(0.50)) + fmt.Printf(" P90: %.2f\n", stats.getPercentile(0.90)) + fmt.Printf(" P95: %.2f\n", stats.getPercentile(0.95)) + fmt.Printf(" P99: %.2f\n", stats.getPercentile(0.99)) + fmt.Printf(" 最大: %.2f\n", stats.getPercentile(1.0)) + fmt.Println(strings.Repeat("=", 50)) + + stats.printHistogram() +} diff --git a/examples/c10k_test/server/main.go b/examples/c10k_test/server/main.go new file mode 100644 index 00000000..0d62302b --- /dev/null +++ b/examples/c10k_test/server/main.go @@ -0,0 +1,247 @@ +package main + +import ( + "encoding/binary" + "fmt" + "runtime" + "sync" + "time" + + "github.com/aceld/zinx/zconf" + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/znet" +) + +// Packet 数据结构 +type Packet struct { + A int32 + B int32 + C int32 +} + +// Encode 将 Packet 编码为字节数组 +func (p *Packet) Encode() []byte { + buf := make([]byte, 12) // 3 * 4 bytes + binary.BigEndian.PutUint32(buf[0:4], uint32(p.A)) + binary.BigEndian.PutUint32(buf[4:8], uint32(p.B)) + binary.BigEndian.PutUint32(buf[8:12], uint32(p.C)) + return buf +} + +// Decode 将字节数组解码为 Packet +func Decode(buf []byte) *Packet { + if len(buf) < 12 { + return nil + } + return &Packet{ + A: int32(binary.BigEndian.Uint32(buf[0:4])), + B: int32(binary.BigEndian.Uint32(buf[4:8])), + C: int32(binary.BigEndian.Uint32(buf[8:12])), + } +} + +// SmallPacket 小数据包 (16字节) +type SmallPacket struct { + ID int32 + Data [12]byte +} + +func (p *SmallPacket) Encode() []byte { + buf := make([]byte, 16) + binary.BigEndian.PutUint32(buf[0:4], uint32(p.ID)) + copy(buf[4:16], p.Data[:]) + return buf +} + +// MediumPacket 中等数据包 (64字节) +type MediumPacket struct { + ID int32 + Seq int32 + Data [56]byte +} + +func (p *MediumPacket) Encode() []byte { + buf := make([]byte, 64) + binary.BigEndian.PutUint32(buf[0:4], uint32(p.ID)) + binary.BigEndian.PutUint32(buf[4:8], uint32(p.Seq)) + copy(buf[8:64], p.Data[:]) + return buf +} + +// LargePacket 大数据包 (256字节) +type LargePacket struct { + ID int32 + Seq int32 + Counter int32 + Data [244]byte +} + +func (p *LargePacket) Encode() []byte { + buf := make([]byte, 256) + binary.BigEndian.PutUint32(buf[0:4], uint32(p.ID)) + binary.BigEndian.PutUint32(buf[4:8], uint32(p.Seq)) + binary.BigEndian.PutUint32(buf[8:12], uint32(p.Counter)) + copy(buf[12:256], p.Data[:]) + return buf +} + +var ( + connCount int32 + mutex sync.Mutex +) + +func OnConnStart(conn ziface.IConnection) { + mutex.Lock() + connCount++ + mutex.Unlock() + //logger.Info("Client connected", "conn_id", conn.GetConnID(), "addr", conn.RemoteAddrString(), "total", connCount) +} + +func OnConnStop(conn ziface.IConnection) { + mutex.Lock() + connCount-- + mutex.Unlock() + //logger.Info("Client disconnected", "conn_id", conn.GetConnID(), "addr", conn.RemoteAddrString(), "total", connCount) +} + +// CalculateRouter 计算路由 (命令号 1001) +type CalculateRouter struct{} + +func (c *CalculateRouter) PreHandle(request ziface.IRequest) { +} + +func (c *CalculateRouter) Handle(request ziface.IRequest) { + data := request.GetData() + conn := request.GetConnection() + packet := Decode(data) + if packet == nil { + zlog.Ins().ErrorF("Invalid packet data from %s", conn.RemoteAddr().String()) + return + } + packet.C = packet.A + packet.B + err := conn.SendBuffMsg(1001, packet.Encode()) + if err != nil { + zlog.Ins().ErrorF("SendBuffMsg error: %v", err) + } +} + +func (c *CalculateRouter) PostHandle(request ziface.IRequest) { +} + +// SmallRouter 小数据路由 (命令号 2001) +type SmallRouter struct{} + +func (s *SmallRouter) PreHandle(request ziface.IRequest) { +} + +func (s *SmallRouter) Handle(request ziface.IRequest) { + data := request.GetData() + conn := request.GetConnection() + if len(data) < 16 { + zlog.Ins().ErrorF("Invalid small packet from %s", conn.RemoteAddr().String()) + return + } + pkt := &SmallPacket{ + ID: int32(binary.BigEndian.Uint32(data[0:4])), + } + copy(pkt.Data[:], data[4:16]) + pkt.ID++ + err := conn.SendBuffMsg(2001, pkt.Encode()) + if err != nil { + zlog.Ins().ErrorF("SendBuffMsg error: %v", err) + } +} + +func (s *SmallRouter) PostHandle(request ziface.IRequest) { +} + +// MediumRouter 中等数据路由 (命令号 3001) +type MediumRouter struct{} + +func (m *MediumRouter) PreHandle(request ziface.IRequest) { +} + +func (m *MediumRouter) Handle(request ziface.IRequest) { + data := request.GetData() + conn := request.GetConnection() + if len(data) < 64 { + zlog.Ins().ErrorF("Invalid medium packet from %s", conn.RemoteAddr().String()) + return + } + pkt := &MediumPacket{ + ID: int32(binary.BigEndian.Uint32(data[0:4])), + Seq: int32(binary.BigEndian.Uint32(data[4:8])), + } + copy(pkt.Data[:], data[8:64]) + pkt.Seq++ + err := conn.SendBuffMsg(3001, pkt.Encode()) + if err != nil { + zlog.Ins().ErrorF("SendBuffMsg error: %v", err) + } +} + +func (m *MediumRouter) PostHandle(request ziface.IRequest) { +} + +// LargeRouter 大数据路由 (命令号 4001) +type LargeRouter struct{} + +func (l *LargeRouter) PreHandle(request ziface.IRequest) { +} + +func (l *LargeRouter) Handle(request ziface.IRequest) { + data := request.GetData() + conn := request.GetConnection() + if len(data) < 256 { + zlog.Ins().ErrorF("Invalid large packet from %s", conn.RemoteAddr().String()) + return + } + pkt := &LargePacket{ + ID: int32(binary.BigEndian.Uint32(data[0:4])), + Seq: int32(binary.BigEndian.Uint32(data[4:8])), + Counter: int32(binary.BigEndian.Uint32(data[8:12])), + } + copy(pkt.Data[:], data[12:256]) + pkt.Counter++ + err := conn.SendBuffMsg(4001, pkt.Encode()) + if err != nil { + zlog.Ins().ErrorF("SendBuffMsg error: %v", err) + } +} + +func (l *LargeRouter) PostHandle(request ziface.IRequest) { +} + +func main() { + + // 创建服务器 + s := znet.NewServer() + + s.SetOnConnStart(OnConnStart) + s.SetOnConnStop(OnConnStop) + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for range ticker.C { + mutex.Lock() + count := connCount + mutex.Unlock() + + var m runtime.MemStats + runtime.ReadMemStats(&m) + fmt.Println("Connection stats", "time", time.Now().Format("2006-01-02 15:04:05"), "clients", count, "goroutines", runtime.NumGoroutine(), "MEM:", m.Alloc/1024/1024, "MB") + } + }() + + // 注册路由 + s.AddRouter(1001, &CalculateRouter{}) + s.AddRouter(2001, &SmallRouter{}) + s.AddRouter(3001, &MediumRouter{}) + s.AddRouter(4001, &LargeRouter{}) + + fmt.Printf("C10K Test Server starting on :%d\n", zconf.GlobalObject.TCPPort) + + // 启动服务 + s.Serve() +} diff --git a/znet/connection.go b/znet/connection.go index 190f6661..4d675736 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -196,28 +196,42 @@ func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection { // (写消息Goroutine, 用户将数据发送给客户端) func (c *Connection) StartWriter() { zlog.Ins().InfoF("Writer Goroutine is running") - ticker := time.NewTicker(10 * time.Millisecond) defer func() { zlog.Ins().InfoF("%s [conn Writer exit!]", c.RemoteAddr().String()) - ticker.Stop() c.Flush() }() for { select { - case <-ticker.C: - err := c.Flush() - if err != nil { - zlog.Ins().ErrorF("Flush Buff Data error: %v Conn Writer exit", err) + case data, ok := <-c.msgBuffChan: + if !ok { + zlog.Ins().ErrorF("msgBuffChan is Closed") return } - case data, ok := <-c.msgBuffChan: - if ok { - if err := c.SendBuf(data); err != nil { - zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) - return + + if err := c.SendBuf(data); err != nil { + zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) + return + } + // 一次性循环读出 msgBuffChan 中所有剩余数据 + drainLoop: + for { + select { + case extra, ok2 := <-c.msgBuffChan: + if !ok2 { + zlog.Ins().ErrorF("msgBuffChan is Closed") + return + } + if err := c.SendBuf(extra); err != nil { + zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) + return + } + default: + break drainLoop } - } else { - zlog.Ins().ErrorF("msgBuffChan is Closed") + } + // 批量写入完成后一次性 flush + if err := c.Flush(); err != nil { + zlog.Ins().ErrorF("Flush Buff Data error: %v Conn Writer exit", err) return } case <-c.ctx.Done(): @@ -231,7 +245,10 @@ func (c *Connection) StartWriter() { func (c *Connection) StartReader() { zlog.Ins().InfoF("[Reader Goroutine is running]") defer zlog.Ins().InfoF("%s [conn Reader exit!]", c.RemoteAddr().String()) - defer c.Stop() + defer func() { + c.Stop() + c.doClose() + }() defer func() { if err := recover(); err != nil { zlog.Ins().ErrorF("connID=%d, panic err=%v", c.GetConnID(), err) @@ -252,7 +269,7 @@ func (c *Connection) StartReader() { // (从conn的IO中读取数据到内存缓冲buffer中) n, err := c.conn.Read(buffer) if err != nil { - zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err) + zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err) // 不需要发 Log,正常关闭 return } zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n])) @@ -318,14 +335,8 @@ func (c *Connection) Start() { // (开启用户从客户端读取数据流程的Goroutine) go c.StartReader() - select { - case <-c.ctx.Done(): - c.finalizer() + // 直接退出,让 StartReader协程 来处理关闭逻辑 - // 归还workerid - freeWorker(c) - return - } } // Stop stops the connection and ends the current connection state. @@ -409,17 +420,6 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro go c.StartWriter() } - opt := ziface.MsgSendOptionObj{ - Timeout: 5 * time.Millisecond, - } - - for _, o := range opts { - o(&opt) - } - - idleTimeout := time.NewTimer(opt.Timeout) - defer idleTimeout.Stop() - if c.isClosed() == true { return errors.New("Connection closed when send buff msg") } @@ -435,10 +435,11 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro // Close all channels associated with the connection close(c.msgBuffChan) return errors.New("connection closed when send buff msg") - case <-idleTimeout.C: - return errors.New("send buff msg timeout") case c.msgBuffChan <- data: return nil + default: + zlog.Ins().ErrorF("send buff msg channel is full") + return errors.New("send buff msg channel is full") } } @@ -617,3 +618,9 @@ func (s *Connection) InvokeCloseCallbacks() { defer s.closeCallbackMutex.RUnlock() s.closeCallback.Invoke() } + +func (s *Connection) doClose() { + s.finalizer() + // 归还workerid + freeWorker(s) +}