diff --git a/examples/c10k_test/Makefile b/examples/c10k_test/Makefile new file mode 100644 index 00000000..171f8731 --- /dev/null +++ b/examples/c10k_test/Makefile @@ -0,0 +1,8 @@ +.PHONY: build clean + +build: + go build -o bin/server ./server/ + go build -o bin/client ./client/ + +clean: + rm -rf bin/ diff --git a/examples/c10k_test/bin/conf/zinx.json b/examples/c10k_test/bin/conf/zinx.json new file mode 100644 index 00000000..8e0e09b4 --- /dev/null +++ b/examples/c10k_test/bin/conf/zinx.json @@ -0,0 +1,13 @@ +{ + "Name": "Zinx-Standard-App", + "Mode": "tcp", + "Host": "0.0.0.0", + "TcpPort": 8888, + "MaxConn": 10000, + "WorkerPoolSize": 10, + "MaxWorkerTaskLen": 1024, + "MaxPacketSize": 4096, + "LogFile": "zinx.log", + "LogIsolationLevel": 3, + "LogLevel": "error" +} diff --git a/examples/c10k_test/client/client.go b/examples/c10k_test/client/client.go new file mode 100644 index 00000000..3274bc87 --- /dev/null +++ b/examples/c10k_test/client/client.go @@ -0,0 +1,250 @@ +package main + +import ( + "encoding/binary" + "fmt" + "math" + "math/rand" + "net" + "sort" + "strings" + "sync" + "time" +) + +type Packet struct { + A int32 + B int32 + C int32 +} + +type Stats struct { + success int64 + failed int64 + respTimes []float64 + respMu sync.Mutex + histBuckets []int64 + histMu sync.RWMutex +} + +const ( + histBucketCount = 20 +) + +func NewStats() *Stats { + return &Stats{ + respTimes: make([]float64, 0, 100000), + histBuckets: make([]int64, histBucketCount), + } +} + +func (s *Stats) addSuccess(n int64) { + s.respMu.Lock() + s.success += n + s.respMu.Unlock() +} + +func (s *Stats) addFailed(n int64) { + s.respMu.Lock() + s.failed += n + s.respMu.Unlock() +} + +func (s *Stats) addRespTime(ms float64) { + s.respMu.Lock() + s.respTimes = append(s.respTimes, ms) + s.respMu.Unlock() + + bucket := int(ms / 50) + if bucket >= histBucketCount { + bucket = histBucketCount - 1 + } + s.histMu.Lock() + s.histBuckets[bucket]++ + s.histMu.Unlock() +} + +func (s *Stats) getPercentile(p float64) float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + index := int(math.Ceil(float64(len(s.respTimes))*p)) - 1 + if index < 0 { + index = 0 + } + if index >= len(s.respTimes) { + index = len(s.respTimes) - 1 + } + + sorted := make([]float64, len(s.respTimes)) + copy(sorted, s.respTimes) + sort.Float64s(sorted) + return sorted[index] +} + +func (s *Stats) getAvg() float64 { + s.respMu.Lock() + defer s.respMu.Unlock() + + if len(s.respTimes) == 0 { + return 0 + } + + var sum float64 + for _, t := range s.respTimes { + sum += t + } + return sum / float64(len(s.respTimes)) +} + +func (s *Stats) printHistogram() { + s.histMu.RLock() + defer s.histMu.RUnlock() + + fmt.Println("\n响应时间分布 (ms):") + fmt.Println(strings.Repeat("-", 50)) + + var total int64 + for _, v := range s.histBuckets { + total += v + } + + maxBucket := int64(0) + for _, v := range s.histBuckets { + if v > maxBucket { + maxBucket = v + } + } + + for i := 0; i < histBucketCount; i++ { + low := i * 50 + high := (i + 1) * 50 + count := s.histBuckets[i] + percentage := float64(count) / float64(total) * 100 + + barLen := 0 + if maxBucket > 0 { + barLen = int(float64(count) / float64(maxBucket) * 40) + } + bar := strings.Repeat("█", barLen) + + fmt.Printf("%4d-%4dms: %8d (%5.2f%%) %s\n", low, high, count, percentage, bar) + } + fmt.Println(strings.Repeat("-", 50)) +} + +func sendMsg(conn net.Conn, msgID uint32, data []byte) error { + header := make([]byte, 8) + binary.BigEndian.PutUint32(header[0:4], msgID) + binary.BigEndian.PutUint32(header[4:8], uint32(len(data))) + _, err := conn.Write(append(header, data...)) + return err +} + +func recvMsg(conn net.Conn) (uint32, []byte, error) { + header := make([]byte, 8) + if _, err := conn.Read(header); err != nil { + return 0, nil, err + } + msgID := binary.BigEndian.Uint32(header[0:4]) + dataLen := binary.BigEndian.Uint32(header[4:8]) + body := make([]byte, dataLen) + if _, err := conn.Read(body); err != nil { + return 0, nil, err + } + return msgID, body, nil +} + +func runClient(clientID int, host string, port int, repeat int, sem chan struct{}, stats *Stats, wg *sync.WaitGroup) { + defer wg.Done() + + sem <- struct{}{} + defer func() { <-sem }() + + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + stats.addFailed(int64(repeat)) + return + } + defer conn.Close() + + for i := 0; i < repeat; i++ { + a := int32(rand.Intn(99999) + 1) + b := int32(rand.Intn(99999) + 1) + c := int32(0) + + var data [12]byte + binary.BigEndian.PutUint32(data[0:4], uint32(a)) + binary.BigEndian.PutUint32(data[4:8], uint32(b)) + binary.BigEndian.PutUint32(data[8:12], uint32(c)) + + sendTime := time.Now() + + conn.SetWriteDeadline(time.Now().Add(15 * time.Second)) + if err := sendMsg(conn, 1001, data[:]); err != nil { + fmt.Printf("[client %d] send failed at i=%d: %v\n", clientID, i, err) + stats.addFailed(int64(repeat - i)) + break + } + + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + if _, _, err := recvMsg(conn); err != nil { + fmt.Printf("[client %d] recv failed at i=%d: %v\n", clientID, i, err) + stats.addFailed(int64(repeat - i)) + break + } else { + respTime := time.Since(sendTime).Seconds() * 1000 + stats.addSuccess(1) + stats.addRespTime(respTime) + } + + time.Sleep(time.Millisecond) + } +} + +func main() { + serverHost := "localhost" + serverPort := 8888 + totalConns := 10000 + repeatPerConn := 1000 + + fmt.Printf("🚀 启动压测: %d 并发连接, 每个连接请求 %d 次\n", totalConns, repeatPerConn) + startTime := time.Now() + + stats := NewStats() + sem := make(chan struct{}, 10000) + var wg sync.WaitGroup + + wg.Add(totalConns) + for i := 0; i < totalConns; i++ { + go runClient(i, serverHost, serverPort, repeatPerConn, sem, stats, &wg) + } + + wg.Wait() + duration := time.Since(startTime) + + qps := float64(stats.success) / duration.Seconds() + + fmt.Println("\n" + strings.Repeat("=", 50)) + fmt.Printf("🏁 压测报告\n") + fmt.Printf("总耗时: %.2f 秒\n", duration.Seconds()) + fmt.Printf("成功次数: %d\n", stats.success) + fmt.Printf("失败次数: %d\n", stats.failed) + fmt.Printf("有效 QPS: %.2f\n", qps) + fmt.Println(strings.Repeat("-", 50)) + fmt.Printf("响应时间 (ms):\n") + fmt.Printf(" 平均: %.2f\n", stats.getAvg()) + fmt.Printf(" 最小: %.2f\n", stats.getPercentile(0)) + fmt.Printf(" P50: %.2f\n", stats.getPercentile(0.50)) + fmt.Printf(" P90: %.2f\n", stats.getPercentile(0.90)) + fmt.Printf(" P95: %.2f\n", stats.getPercentile(0.95)) + fmt.Printf(" P99: %.2f\n", stats.getPercentile(0.99)) + fmt.Printf(" 最大: %.2f\n", stats.getPercentile(1.0)) + fmt.Println(strings.Repeat("=", 50)) + + stats.printHistogram() +} diff --git a/examples/c10k_test/server/main.go b/examples/c10k_test/server/main.go new file mode 100644 index 00000000..0ce2a824 --- /dev/null +++ b/examples/c10k_test/server/main.go @@ -0,0 +1,122 @@ +package main + +import ( + "encoding/binary" + "fmt" + "runtime" + "sync" + "time" + + "github.com/aceld/zinx/zconf" + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/znet" +) + +// Packet 数据结构 +type Packet struct { + A int32 + B int32 + C int32 +} + +// Encode 将 Packet 编码为字节数组 +func (p *Packet) Encode() []byte { + buf := make([]byte, 12) // 3 * 4 bytes + binary.BigEndian.PutUint32(buf[0:4], uint32(p.A)) + binary.BigEndian.PutUint32(buf[4:8], uint32(p.B)) + binary.BigEndian.PutUint32(buf[8:12], uint32(p.C)) + return buf +} + +// Decode 将字节数组解码为 Packet +func Decode(buf []byte) *Packet { + if len(buf) < 12 { + return nil + } + return &Packet{ + A: int32(binary.BigEndian.Uint32(buf[0:4])), + B: int32(binary.BigEndian.Uint32(buf[4:8])), + C: int32(binary.BigEndian.Uint32(buf[8:12])), + } +} + +var ( + connCount int32 + mutex sync.Mutex +) + +func OnConnStart(conn ziface.IConnection) { + mutex.Lock() + connCount++ + mutex.Unlock() + //logger.Info("Client connected", "conn_id", conn.GetConnID(), "addr", conn.RemoteAddrString(), "total", connCount) +} + +func OnConnStop(conn ziface.IConnection) { + mutex.Lock() + connCount-- + mutex.Unlock() + //logger.Info("Client disconnected", "conn_id", conn.GetConnID(), "addr", conn.RemoteAddrString(), "total", connCount) +} + +// CalculateRouter 计算路由 +type CalculateRouter struct{} + +func (c *CalculateRouter) PreHandle(request ziface.IRequest) { +} + +func (c *CalculateRouter) Handle(request ziface.IRequest) { + // 获取消息数据 + data := request.GetData() + conn := request.GetConnection() + + // 解码 Packet + packet := Decode(data) + if packet == nil { + zlog.Ins().ErrorF("Invalid packet data from %s", conn.RemoteAddr().String()) + return + } + + // 计算 C = A + B + packet.C = packet.A + packet.B + // 使用 SendBuffMsg 回复(异步方式) + err := conn.SendBuffMsg(1, packet.Encode()) + if err != nil { + zlog.Ins().ErrorF("SendBuffMsg error (first): %v, retrying...", err) + // err = conn.SendBuffMsg(1, packet.Encode()) + // if err != nil { + // zlog.Ins().ErrorF("SendBuffMsg error (retry): %v", err) + // } + } +} + +func (c *CalculateRouter) PostHandle(request ziface.IRequest) { +} + +func main() { + + // 创建服务器 + s := znet.NewServer() + + s.SetOnConnStart(OnConnStart) + s.SetOnConnStop(OnConnStop) + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for range ticker.C { + mutex.Lock() + count := connCount + mutex.Unlock() + fmt.Println("Connection stats", "clients", count, "goroutines", runtime.NumGoroutine()) + } + }() + + // 注册路由 + s.AddRouter(1001, &CalculateRouter{}) + + fmt.Printf("C10K Test Server starting on :%d\n", zconf.GlobalObject.TCPPort) + + // 启动服务 + s.Serve() +} diff --git a/znet/connection.go b/znet/connection.go index 190f6661..08d8b37b 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -196,28 +196,42 @@ func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection { // (写消息Goroutine, 用户将数据发送给客户端) func (c *Connection) StartWriter() { zlog.Ins().InfoF("Writer Goroutine is running") - ticker := time.NewTicker(10 * time.Millisecond) defer func() { zlog.Ins().InfoF("%s [conn Writer exit!]", c.RemoteAddr().String()) - ticker.Stop() c.Flush() }() for { select { - case <-ticker.C: - err := c.Flush() - if err != nil { - zlog.Ins().ErrorF("Flush Buff Data error: %v Conn Writer exit", err) + case data, ok := <-c.msgBuffChan: + if !ok { + zlog.Ins().ErrorF("msgBuffChan is Closed") return } - case data, ok := <-c.msgBuffChan: - if ok { - if err := c.SendBuf(data); err != nil { - zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) - return + + if err := c.SendBuf(data); err != nil { + zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) + return + } + // 一次性循环读出 msgBuffChan 中所有剩余数据 + drainLoop: + for { + select { + case extra, ok2 := <-c.msgBuffChan: + if !ok2 { + zlog.Ins().ErrorF("msgBuffChan is Closed") + return + } + if err := c.SendBuf(extra); err != nil { + zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err) + return + } + default: + break drainLoop } - } else { - zlog.Ins().ErrorF("msgBuffChan is Closed") + } + // 批量写入完成后一次性 flush + if err := c.Flush(); err != nil { + zlog.Ins().ErrorF("Flush Buff Data error: %v Conn Writer exit", err) return } case <-c.ctx.Done(): @@ -252,7 +266,7 @@ func (c *Connection) StartReader() { // (从conn的IO中读取数据到内存缓冲buffer中) n, err := c.conn.Read(buffer) if err != nil { - zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err) + zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err) // 不需要发 Log,正常关闭 return } zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n])) @@ -409,17 +423,6 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro go c.StartWriter() } - opt := ziface.MsgSendOptionObj{ - Timeout: 5 * time.Millisecond, - } - - for _, o := range opts { - o(&opt) - } - - idleTimeout := time.NewTimer(opt.Timeout) - defer idleTimeout.Stop() - if c.isClosed() == true { return errors.New("Connection closed when send buff msg") } @@ -435,10 +438,11 @@ func (c *Connection) SendToQueue(data []byte, opts ...ziface.MsgSendOption) erro // Close all channels associated with the connection close(c.msgBuffChan) return errors.New("connection closed when send buff msg") - case <-idleTimeout.C: - return errors.New("send buff msg timeout") case c.msgBuffChan <- data: return nil + default: + zlog.Ins().ErrorF("send buff msg channel is full") + return errors.New("send buff msg channel is full") } }