From c724637915c57b8d4c4b476658725f01f9e76f1d Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Wed, 13 May 2026 11:29:16 +0800 Subject: [PATCH 1/5] =?UTF-8?q?feat(examples):=20=E6=B7=BB=E5=8A=A0C10K?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E5=8E=8B=E6=B5=8B=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/c10k_test/Makefile | 8 + examples/c10k_test/bin/conf/zinx.json | 12 ++ examples/c10k_test/client/client.go | 250 ++++++++++++++++++++++++++ examples/c10k_test/server/main.go | 122 +++++++++++++ 4 files changed, 392 insertions(+) create mode 100644 examples/c10k_test/Makefile create mode 100644 examples/c10k_test/bin/conf/zinx.json create mode 100644 examples/c10k_test/client/client.go create mode 100644 examples/c10k_test/server/main.go diff --git a/examples/c10k_test/Makefile b/examples/c10k_test/Makefile new file mode 100644 index 00000000..171f8731 --- /dev/null +++ b/examples/c10k_test/Makefile @@ -0,0 +1,8 @@ +.PHONY: build clean + +build: + go build -o bin/server ./server/ + go build -o bin/client ./client/ + +clean: + rm -rf bin/ diff --git a/examples/c10k_test/bin/conf/zinx.json b/examples/c10k_test/bin/conf/zinx.json new file mode 100644 index 00000000..aa40b03b --- /dev/null +++ b/examples/c10k_test/bin/conf/zinx.json @@ -0,0 +1,12 @@ +{ + "Name": "Zinx-Standard-App", + "Mode": "tcp", + "Host": "0.0.0.0", + "TcpPort": 8888, + "MaxConn": 10000, + "WorkerPoolSize": 10, + "MaxWorkerTaskLen": 1024, + "MaxPacketSize": 4096, + "LogFile": "zinx.log", + "LogLevel": "error" +} \ No newline at end of file diff --git a/examples/c10k_test/client/client.go b/examples/c10k_test/client/client.go new file mode 100644 index 00000000..e348bb20 --- /dev/null +++ b/examples/c10k_test/client/client.go @@ -0,0 +1,250 @@ +package main + +import ( + "bytes" + "encoding/binary" + "fmt" + "math" + "math/rand" + "net" + "sort" + "strings" + "sync" + "time" +) + +type Packet struct { + A int32 + B int32 + C int32 +} + +type Stats struct { + success int64 + failed int64 + respTimes []float64 + respMu sync.Mutex + histBuckets []int64 + histMu sync.RWMutex +} + +const ( + histBucketCount = 20 +) + +func NewStats() *Stats { + return &Stats{ + respTimes: make([]float64, 0, 100000), + histBuckets: make([]int64, histBucketCount), + } +} + +func (s *Stats) addSuccess(n int64) { + s.respMu.Lock() + s.success += n + s.respMu.Unlock() +} + +func (s *Stats) addFailed(n int64) { + s.respMu.Lock() + s.failed += n + s.respMu.Unlock() +} + +func (s *Stats) addRespTime(ms float64) { + s.respMu.Lock() + s.respTimes = append(s.respTimes, ms) + s.respMu.Unlock() + + bucket := int(ms / 50) + if bucket >= histBucketCount { + bucket = histBucketCount - 1 + } + s.histMu.Lock() + s.histBuckets[bucket]++ + s.histMu.Unlock() +} + +func (s *Stats) getPercentile(p float64) float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + index := int(math.Ceil(float64(len(s.respTimes))*p)) - 1 + if index < 0 { + index = 0 + } + if index >= len(s.respTimes) { + index = len(s.respTimes) - 1 + } + + sorted := make([]float64, len(s.respTimes)) + copy(sorted, s.respTimes) + sort.Float64s(sorted) + return sorted[index] +} + +func (s *Stats) getAvg() float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + var sum float64 + for _, t := range s.respTimes { + sum += t + } + return sum / float64(len(s.respTimes)) +} + +func (s *Stats) printHistogram() { + s.histMu.RLock() + defer s.histMu.RUnlock() + + fmt.Println("\n响应时间分布 (ms):") + fmt.Println(strings.Repeat("-", 50)) + + var total int64 + for _, v := range s.histBuckets { + total += v + } + + maxBucket := int64(0) + for _, v := range s.histBuckets { + if v > maxBucket { + maxBucket = v + } + } + + for i := 0; i < histBucketCount; i++ { + low := i * 50 + high := (i + 1) * 50 + count := s.histBuckets[i] + percentage := float64(count) / float64(total) * 100 + + barLen := 0 + if maxBucket > 0 { + barLen = int(float64(count) / float64(maxBucket) * 40) + } + bar := strings.Repeat("█", barLen) + + fmt.Printf("%4d-%4dms: %8d (%5.2f%%) %s\n", low, high, count, percentage, bar) + } + fmt.Println(strings.Repeat("-", 50)) +} + +func sendMsg(conn net.Conn, msgID uint32, data []byte) error { + header := make([]byte, 8) + binary.BigEndian.PutUint32(header[0:4], msgID) + binary.BigEndian.PutUint32(header[4:8], uint32(len(data))) + _, err := conn.Write(append(header, data...)) + return err +} + +func recvMsg(conn net.Conn) (uint32, []byte, error) { + header := make([]byte, 8) + if _, err := conn.Read(header); err != nil { + return 0, nil, err + } + msgID := binary.BigEndian.Uint32(header[0:4]) + dataLen := binary.BigEndian.Uint32(header[4:8]) + body := make([]byte, dataLen) + if _, err := conn.Read(body); err != nil { + return 0, nil, err + } + return msgID, body, nil +} + +func runClient(clientID int, host string, port int, repeat int, sem chan struct{}, stats *Stats, wg *sync.WaitGroup) { + defer wg.Done() + + sem <- struct{}{} + defer func() { <-sem }() + + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat)) + return + } + defer conn.Close() + + for i := 0; i < repeat; i++ { + pkt := Packet{ + A: int32(rand.Intn(99999) + 1), + B: int32(rand.Intn(99999) + 1), + C: 0, + } + + var buf bytes.Buffer + if err := binary.Write(&buf, binary.BigEndian, pkt); err != nil { + stats.addFailed(1) + continue + } + data := buf.Bytes() + + sendTime := time.Now() + + if err := sendMsg(conn, 1001, data); err != nil { + stats.addFailed(1) + continue + } + + if _, _, err := recvMsg(conn); err != nil { + stats.addFailed(1) + } else { + respTime := time.Since(sendTime).Seconds() * 1000 + stats.addSuccess(1) + stats.addRespTime(respTime) + } + + //time.Sleep(time.Millisecond) + } +} + +func main() { + serverHost := "localhost" + serverPort := 8888 + totalConns := 10000 + repeatPerConn := 1000 + + fmt.Printf("🚀 启动压测: %d 并发连接, 每个连接请求 %d 次\n", totalConns, repeatPerConn) + startTime := time.Now() + + stats := NewStats() + sem := make(chan struct{}, 10000) + var wg sync.WaitGroup + + wg.Add(totalConns) + for i := 0; i < totalConns; i++ { + go runClient(i, serverHost, serverPort, repeatPerConn, sem, stats, &wg) + } + + wg.Wait() + duration := time.Since(startTime) + + qps := float64(stats.success) / duration.Seconds() + + fmt.Println("\n" + strings.Repeat("=", 50)) + fmt.Printf("🏁 压测报告\n") + fmt.Printf("总耗时: %.2f 秒\n", duration.Seconds()) + fmt.Printf("成功次数: %d\n", stats.success) + fmt.Printf("失败次数: %d\n", stats.failed) + fmt.Printf("有效 QPS: %.2f\n", qps) + fmt.Println(strings.Repeat("-", 50)) + fmt.Printf("响应时间 (ms):\n") + fmt.Printf(" 平均: %.2f\n", stats.getAvg()) + fmt.Printf(" 最小: %.2f\n", stats.getPercentile(0)) + fmt.Printf(" P50: %.2f\n", stats.getPercentile(0.50)) + fmt.Printf(" P90: %.2f\n", stats.getPercentile(0.90)) + fmt.Printf(" P95: %.2f\n", stats.getPercentile(0.95)) + fmt.Printf(" P99: %.2f\n", stats.getPercentile(0.99)) + fmt.Printf(" 最大: %.2f\n", stats.getPercentile(1.0)) + fmt.Println(strings.Repeat("=", 50)) + + stats.printHistogram() +} diff --git a/examples/c10k_test/server/main.go b/examples/c10k_test/server/main.go new file mode 100644 index 00000000..b2584a4c --- /dev/null +++ b/examples/c10k_test/server/main.go @@ -0,0 +1,122 @@ +package main + +import ( + "encoding/binary" + "fmt" + "runtime" + "sync" + "time" + + "github.com/aceld/zinx/zconf" + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/znet" +) + +// Packet 数据结构 +type Packet struct { + A int32 + B int32 + C int32 +} + +// Encode 将 Packet 编码为字节数组 +func (p *Packet) Encode() []byte { + buf := make([]byte, 12) // 3 * 4 bytes + binary.BigEndian.PutUint32(buf[0:4], uint32(p.A)) + binary.BigEndian.PutUint32(buf[4:8], uint32(p.B)) + binary.BigEndian.PutUint32(buf[8:12], uint32(p.C)) + return buf +} + +// Decode 将字节数组解码为 Packet +func Decode(buf []byte) *Packet { + if len(buf) < 12 { + return nil + } + return &Packet{ + A: int32(binary.BigEndian.Uint32(buf[0:4])), + B: int32(binary.BigEndian.Uint32(buf[4:8])), + C: int32(binary.BigEndian.Uint32(buf[8:12])), + } +} + +var ( + connCount int32 + mutex sync.Mutex +) + +func OnConnStart(conn ziface.IConnection) { + mutex.Lock() + connCount++ + mutex.Unlock() + //logger.Info("Client connected", "conn_id", conn.GetConnID(), "addr", conn.RemoteAddrString(), "total", connCount) +} + +func OnConnStop(conn ziface.IConnection) { + mutex.Lock() + connCount-- + mutex.Unlock() + //logger.Info("Client disconnected", "conn_id", conn.GetConnID(), "addr", conn.RemoteAddrString(), "total", connCount) +} + +// CalculateRouter 计算路由 +type CalculateRouter struct{} + +func (c *CalculateRouter) PreHandle(request ziface.IRequest) { +} + +func (c *CalculateRouter) Handle(request ziface.IRequest) { + // 获取消息数据 + data := request.GetData() + conn := request.GetConnection() + + // 解码 Packet + packet := Decode(data) + if packet == nil { + zlog.Ins().ErrorF("Invalid packet data from %s", conn.RemoteAddr().String()) + return + } + + // 计算 C = A + B + packet.C = packet.A + packet.B + + zlog.Ins().DebugF("Received from %s: A=%d, B=%d, Calculated C=%d", + conn.RemoteAddr().String(), packet.A, packet.B, packet.C) + + // 使用 SendMsg 回复(异步方式) + err := conn.SendMsg(1, packet.Encode()) + if err != nil { + zlog.Ins().ErrorF("SendMsg error: %v", err) + } +} + +func (c *CalculateRouter) PostHandle(request ziface.IRequest) { +} + +func main() { + + // 创建服务器 + s := znet.NewServer() + + s.SetOnConnStart(OnConnStart) + s.SetOnConnStop(OnConnStop) + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for range ticker.C { + mutex.Lock() + count := connCount + mutex.Unlock() + fmt.Println("Connection stats", "clients", count, "goroutines", runtime.NumGoroutine()) + } + }() + + // 注册路由 + s.AddRouter(1001, &CalculateRouter{}) + + fmt.Printf("C10K Test Server starting on :%d\n", zconf.GlobalObject.TCPPort) + + // 启动服务 + s.Serve() +} From 043db5591968b6a218a7b7b3a55eaac4e9fba223 Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Wed, 13 May 2026 12:41:38 +0800 Subject: [PATCH 2/5] =?UTF-8?q?perf(c10k=5Ftest):=20=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E8=B0=83=E8=AF=95=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/c10k_test/server/main.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/examples/c10k_test/server/main.go b/examples/c10k_test/server/main.go index b2584a4c..8d8745ae 100644 --- a/examples/c10k_test/server/main.go +++ b/examples/c10k_test/server/main.go @@ -80,10 +80,6 @@ func (c *CalculateRouter) Handle(request ziface.IRequest) { // 计算 C = A + B packet.C = packet.A + packet.B - - zlog.Ins().DebugF("Received from %s: A=%d, B=%d, Calculated C=%d", - conn.RemoteAddr().String(), packet.A, packet.B, packet.C) - // 使用 SendMsg 回复(异步方式) err := conn.SendMsg(1, packet.Encode()) if err != nil { From 6f01931be3cfa28f6c5c6b2fe2d930025d4851d8 Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Wed, 13 May 2026 12:43:24 +0800 Subject: [PATCH 3/5] =?UTF-8?q?perf(c10k):=20=E4=BC=98=E5=8C=96=E5=8C=85?= =?UTF-8?q?=E5=BA=8F=E5=88=97=E5=8C=96=E5=B9=B6=E5=90=AF=E7=94=A8=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E9=97=B4=E9=9A=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/c10k_test/client/client.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/examples/c10k_test/client/client.go b/examples/c10k_test/client/client.go index e348bb20..26e5f496 100644 --- a/examples/c10k_test/client/client.go +++ b/examples/c10k_test/client/client.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "encoding/binary" "fmt" "math" @@ -174,22 +173,18 @@ func runClient(clientID int, host string, port int, repeat int, sem chan struct{ defer conn.Close() for i := 0; i < repeat; i++ { - pkt := Packet{ - A: int32(rand.Intn(99999) + 1), - B: int32(rand.Intn(99999) + 1), - C: 0, - } + a := int32(rand.Intn(99999) + 1) + b := int32(rand.Intn(99999) + 1) + c := int32(0) - var buf bytes.Buffer - if err := binary.Write(&buf, binary.BigEndian, pkt); err != nil { - stats.addFailed(1) - continue - } - data := buf.Bytes() + var data [12]byte + binary.BigEndian.PutUint32(data[0:4], uint32(a)) + binary.BigEndian.PutUint32(data[4:8], uint32(b)) + binary.BigEndian.PutUint32(data[8:12], uint32(c)) sendTime := time.Now() - if err := sendMsg(conn, 1001, data); err != nil { + if err := sendMsg(conn, 1001, data[:]); err != nil { stats.addFailed(1) continue } @@ -202,7 +197,7 @@ func runClient(clientID int, host string, port int, repeat int, sem chan struct{ stats.addRespTime(respTime) } - //time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond) } } From d9ec180b987f04019aaf0cc1577eb11c3a3c39f0 Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Wed, 13 May 2026 22:48:22 +0800 Subject: [PATCH 4/5] =?UTF-8?q?perf(znet):=20=E4=BC=98=E5=8C=96=20StartWri?= =?UTF-8?q?ter=20=E6=89=B9=E9=87=8F=E5=8F=91=E9=80=81=E4=B8=8E=E9=98=9F?= =?UTF-8?q?=E5=88=97=E9=9D=9E=E9=98=BB=E5=A1=9E=E5=86=99=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/c10k_test/bin/conf/zinx.json | 3 +- examples/c10k_test/client/client.go | 11 ++++-- examples/c10k_test/server/main.go | 10 ++++-- znet/connection.go | 49 +++++++++++++++++---------- 4 files changed, 48 insertions(+), 25 deletions(-) diff --git a/examples/c10k_test/bin/conf/zinx.json b/examples/c10k_test/bin/conf/zinx.json index aa40b03b..8e0e09b4 100644 --- a/examples/c10k_test/bin/conf/zinx.json +++ b/examples/c10k_test/bin/conf/zinx.json @@ -8,5 +8,6 @@ "MaxWorkerTaskLen": 1024, "MaxPacketSize": 4096, "LogFile": "zinx.log", + "LogIsolationLevel": 3, "LogLevel": "error" -} \ No newline at end of file +} diff --git a/examples/c10k_test/client/client.go b/examples/c10k_test/client/client.go index 26e5f496..3274bc87 100644 --- a/examples/c10k_test/client/client.go +++ b/examples/c10k_test/client/client.go @@ -184,13 +184,18 @@ func runClient(clientID int, host string, port int, repeat int, sem chan struct{ sendTime := time.Now() + conn.SetWriteDeadline(time.Now().Add(15 * time.Second)) if err := sendMsg(conn, 1001, data[:]); err != nil { - stats.addFailed(1) - continue + fmt.Printf("[client %d] send failed at i=%d: %v\n", clientID, i, err) + stats.addFailed(int64(repeat - i)) + break } + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) if _, _, err := recvMsg(conn); err != nil { - stats.addFailed(1) + fmt.Printf("[client %d] recv failed at i=%d: %v\n", clientID, i, err) + stats.addFailed(int64(repeat - i)) + break } else { respTime := time.Since(sendTime).Seconds() * 1000 stats.addSuccess(1) diff --git a/examples/c10k_test/server/main.go b/examples/c10k_test/server/main.go index 8d8745ae..0ce2a824 100644 --- a/examples/c10k_test/server/main.go +++ b/examples/c10k_test/server/main.go @@ -80,10 +80,14 @@ func (c *CalculateRouter) Handle(request ziface.IRequest) { // 计算 C = A + B packet.C = packet.A + packet.B - // 使用 SendMsg 回复(异步方式) - err := conn.SendMsg(1, packet.Encode()) + // 使用 SendBuffMsg 回复(异步方式) + err := conn.SendBuffMsg(1, packet.Encode()) if err != nil { - zlog.Ins().ErrorF("SendMsg error: %v", err) + zlog.Ins().ErrorF("SendBuffMsg error (first): %v, retrying...", err) + // err = conn.SendBuffMsg(1, packet.Encode()) + // if err != nil { + // zlog.Ins().ErrorF("SendBuffMsg error (retry): %v", err) + // } } } diff --git a/znet/connection.go b/znet/connection.go index 190f6661..b4a1e9d0 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -196,28 +196,42 @@ func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection { // (写消息Goroutine, 用户将数据发送给客户端) func (c *Connection) StartWriter() { zlog.Ins().InfoF("Writer Goroutine is running") - ticker := time.NewTicker(10 * time.Millisecond) defer func() { zlog.Ins().InfoF("%s [conn Writer exit!]", c.RemoteAddr().String()) - ticker.Stop() c.Flush() }() for { select { - case <-ticker.C: - err := c.Flush() - if err != nil { - zlog.Ins().ErrorF("Flush Buff Data error: %v Conn Writer exit", err) + case data, ok := <-c.msgBuffChan: + if !ok { + zlog.Ins().ErrorF("msgBuffChan is Closed") return } - case data, ok := <-c.msgBuffChan: - if ok { - if err := c.SendBuf(data); err != nil { - zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) - return + + if err := c.SendBuf(data); err != nil { + zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) + return + } + // 一次性循环读出 msgBuffChan 中所有剩余数据 + drainLoop: + for { + select { + case extra, ok2 := <-c.msgBuffChan: + if !ok2 { + zlog.Ins().ErrorF("msgBuffChan is Closed") + return + } + if err := c.SendBuf(extra); err != nil { + zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) + return + } + default: + break drainLoop } - } else { - zlog.Ins().ErrorF("msgBuffChan is Closed") + } + // 批量写入完成后一次性 flush + if err := c.Flush(); err != nil { + zlog.Ins().ErrorF("Flush Buff Data error: %v Conn Writer exit", err) return } case <-c.ctx.Done(): @@ -252,7 +266,7 @@ func (c *Connection) StartReader() { // (从conn的IO中读取数据到内存缓冲buffer中) n, err := c.conn.Read(buffer) if err != nil { - zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err) + zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err) // 不需要发 Log,正常关闭 return } zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n])) @@ -417,8 +431,6 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro o(&opt) } - idleTimeout := time.NewTimer(opt.Timeout) - defer idleTimeout.Stop() if c.isClosed() == true { return errors.New("Connection closed when send buff msg") @@ -435,10 +447,11 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro // Close all channels associated with the connection close(c.msgBuffChan) return errors.New("connection closed when send buff msg") - case <-idleTimeout.C: - return errors.New("send buff msg timeout") case c.msgBuffChan <- data: return nil + default: + zlog.Ins().ErrorF("send buff msg channel is full") + return errors.New("send buff msg channel is full") } } From 53b4bbfdfeecad10f1313ed1e49b9419c7611267 Mon Sep 17 00:00:00 2001 From: redfox <8564093@qq.com> Date: Thu, 14 May 2026 07:22:45 +0800 Subject: [PATCH 5/5] =?UTF-8?q?fix(znet):=20=E7=A7=BB=E9=99=A4SendToQueue?= =?UTF-8?q?=E4=B8=AD=E6=9C=AA=E4=BD=BF=E7=94=A8=E7=9A=84=E9=80=89=E9=A1=B9?= =?UTF-8?q?=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- znet/connection.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/znet/connection.go b/znet/connection.go index b4a1e9d0..08d8b37b 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -423,15 +423,6 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro go c.StartWriter() } - opt := ziface.MsgSendOptionObj{ - Timeout: 5 * time.Millisecond, - } - - for _, o := range opts { - o(&opt) - } - - if c.isClosed() == true { return errors.New("Connection closed when send buff msg") }