Production-tested Go library for real-time cryptocurrency market data via WebSocket. Supports Binance Futures, Bybit Linear, and Bitget Futures with a unified interface.
go get github.com/KhavrTrading/flowexRequires Go 1.22+
flowex/
binance/ — Binance Futures WebSocket manager
bybit/ — Bybit Linear WebSocket manager
bitget/ — Bitget Futures/Spot WebSocket manager
ws/ — Core engine: client, worker, manager, snapshots
models/ — Candle, Trade, Ticker types
depth/ — Order book metrics (75 fields) + time-bucketed store
candles/ — REST fetchers + timeframe aggregation
indicators/ — EMA, RSI, MACD, ATR, Bollinger, StochRSI, S/R
examples/ — Working examples
package main
import (
"fmt"
"os"
"os/signal"
"time"
"github.com/KhavrTrading/flowex/binance"
)
func main() {
mgr := binance.NewManager()
// Subscribe to candles + depth + trades for BTCUSDT
mgr.SubscribeAll("BTCUSDT", nil, nil, nil)
// Poll snapshots every 5 seconds
go func() {
for range time.Tick(5 * time.Second) {
snap := mgr.GetSnapshot("BTCUSDT")
if snap == nil {
continue
}
fmt.Printf("Candles: %d | Trades: %d | Depth points: %d\n",
len(snap.Candles), len(snap.Trades), snap.DepthStore.Size())
if len(snap.Candles) > 0 {
c := snap.Candles[len(snap.Candles)-1]
fmt.Printf(" Last: O=%.2f H=%.2f L=%.2f C=%.2f V=%.4f\n",
c.Open, c.High, c.Low, c.Close, c.Volume)
}
}
}()
// Wait for Ctrl+C
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
mgr.Shutdown()
}See examples/basic/main.go for a complete working example with worker hooks, snapshot polling, and metrics monitoring.
Each exchange has its own manager. Create one, subscribe to symbols, read snapshots.
import "github.com/KhavrTrading/flowex/binance"
mgr := binance.NewManager() // default config
mgr.SubscribeAll("BTCUSDT", nil, nil, nil) // candles + depth + trades
mgr.SubscribeAll("ETHUSDT", nil, nil, nil) // subscribe to multiple symbolsimport "github.com/KhavrTrading/flowex/bybit"
mgr := bybit.NewManager()
mgr.SubscribeAll("BTCUSDT", nil, nil, nil)import "github.com/KhavrTrading/flowex/bitget"
mgr := bitget.NewManager() // defaults to USDT-FUTURES
mgr.SubscribeAll("BTCUSDT", nil, nil, nil)
// Or for spot:
cfg := bitget.DefaultManagerConfig()
cfg.InstType = bitget.InstSpot
spotMgr := bitget.NewManagerWithConfig(cfg)binanceMgr := binance.NewManager()
bybitMgr := bybit.NewManager()
bitgetMgr := bitget.NewManager()
for _, symbol := range []string{"BTCUSDT", "ETHUSDT", "SOLUSDT"} {
binanceMgr.SubscribeAll(symbol, nil, nil, nil)
bybitMgr.SubscribeAll(symbol, nil, nil, nil)
bitgetMgr.SubscribeAll(symbol, nil, nil, nil)
}You don't have to subscribe to everything. Pick what you need:
mgr := binance.NewManager()
// Only candles
mgr.SubscribeCandle("BTCUSDT", nil)
// Only depth
mgr.SubscribeDepth("ETHUSDT", nil)
// Only trades
mgr.SubscribeTrade("SOLUSDT", nil)
// Unsubscribe one stream
mgr.Unsubscribe("BTCUSDT", ws.StreamCandle)
// Unsubscribe everything for a symbol
mgr.UnsubscribeAll("ETHUSDT")Each exchange offers different order book depth options.
import "github.com/KhavrTrading/flowex/binance"
mgr := binance.NewManager()
// Default: 20 levels, exchange default speed
mgr.SubscribeDepth("BTCUSDT", nil)
// 5 levels, 100ms updates (fastest)
mgr.SubscribeDepthWithConfig("BTCUSDT", binance.Depth5, binance.Speed100ms, nil)
// 10 levels, 500ms updates (lowest bandwidth)
mgr.SubscribeDepthWithConfig("ETHUSDT", binance.Depth10, binance.Speed500ms, nil)
// 20 levels, 100ms updates
mgr.SubscribeDepthWithConfig("SOLUSDT", binance.Depth20, binance.Speed100ms, nil)Available options:
| Levels | Constants |
|---|---|
| 5 | binance.Depth5 |
| 10 | binance.Depth10 |
| 20 | binance.Depth20 (default) |
| Speed | Constants | Notes |
|---|---|---|
| 100ms | binance.Speed100ms |
Fastest, highest bandwidth |
| 250ms | binance.Speed250ms |
|
| 500ms | binance.Speed500ms |
Lowest bandwidth |
| default | binance.SpeedDefault |
Exchange decides |
import "github.com/KhavrTrading/flowex/bybit"
mgr := bybit.NewManager()
// Default: 50 levels
mgr.SubscribeDepth("BTCUSDT", nil)
// Top-of-book only (1 level) - minimal bandwidth
mgr.SubscribeDepthWithLevel("BTCUSDT", bybit.Depth1, nil)
// 200 levels
mgr.SubscribeDepthWithLevel("BTCUSDT", bybit.Depth200, nil)
// 500 levels - full book
mgr.SubscribeDepthWithLevel("BTCUSDT", bybit.Depth500, nil)Available: bybit.Depth1, bybit.Depth50 (default), bybit.Depth200, bybit.Depth500
import "github.com/KhavrTrading/flowex/bitget"
mgr := bitget.NewManager()
// Default: full book ("books")
mgr.SubscribeDepth("BTCUSDT", nil)
// Top 5 levels only
mgr.SubscribeDepthWithChannel("BTCUSDT", bitget.DepthBooks5, nil)
// Top 15 levels
mgr.SubscribeDepthWithChannel("BTCUSDT", bitget.DepthBooks15, nil)Available: bitget.DepthFull (default), bitget.DepthBooks5, bitget.DepthBooks15
Binance offers two trade stream types:
mgr := binance.NewManager()
// Default: aggregate trades (recommended - lower bandwidth)
mgr.SubscribeTrade("BTCUSDT", nil)
// Individual trades (every single fill, higher volume)
mgr.SubscribeTradeWithMode("BTCUSDT", binance.TradeIndividual, nil)
// Or set it globally in config:
cfg := binance.DefaultManagerConfig()
cfg.TradeMode = binance.TradeIndividual
mgr = binance.NewManagerWithConfig(cfg)| Mode | Stream | Notes |
|---|---|---|
binance.TradeAggregated |
@aggTrade |
Trades at same price/time combined (default, lower bandwidth) |
binance.TradeIndividual |
@trade |
Every individual fill (higher volume, more granular) |
These exchanges have a single public trade stream each. No mode selection needed:
bybitMgr.SubscribeTrade("BTCUSDT", nil) // publicTrade stream
bitgetMgr.SubscribeTrade("BTCUSDT", nil) // trade streamAll exchanges default to 1-minute candles. You can change the interval:
mgr := binance.NewManager()
mgr.SubscribeCandle("BTCUSDT", nil) // default 1m
mgr.SubscribeCandleWithInterval("BTCUSDT", "5m", nil) // 5-minute
mgr.SubscribeCandleWithInterval("ETHUSDT", "1h", nil) // 1-hour
mgr.SubscribeCandleWithInterval("SOLUSDT", "4h", nil) // 4-hourBinance intervals: "1m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "6h", "8h", "12h", "1d", "1w"
mgr := bybit.NewManager()
mgr.SubscribeCandleWithInterval("BTCUSDT", "5", nil) // 5-minute
mgr.SubscribeCandleWithInterval("BTCUSDT", "60", nil) // 1-hour
mgr.SubscribeCandleWithInterval("BTCUSDT", "D", nil) // dailyBybit intervals: "1", "3", "5", "15", "30", "60", "120", "240", "360", "720", "D", "W", "M"
mgr := bitget.NewManager()
mgr.SubscribeCandleWithInterval("BTCUSDT", "5m", nil) // 5-minute
mgr.SubscribeCandleWithInterval("BTCUSDT", "1H", nil) // 1-hour
mgr.SubscribeCandleWithInterval("BTCUSDT", "1D", nil) // dailyBitget intervals: "1m", "5m", "15m", "30m", "1H", "4H", "6H", "12H", "1D", "1W"
Instead of passing options on every call, set them once in the manager config:
// Binance: 5-level depth at 100ms, individual trades, 5m candles
cfg := binance.DefaultManagerConfig()
cfg.DepthLevel = binance.Depth5
cfg.DepthSpeed = binance.Speed100ms
cfg.TradeMode = binance.TradeIndividual
cfg.Interval = "5m"
mgr := binance.NewManagerWithConfig(cfg)
mgr.SubscribeAll("BTCUSDT", nil, nil, nil) // uses all the config above// Bybit: 200-level depth, 15m candles
cfg := bybit.DefaultManagerConfig()
cfg.DepthLevel = bybit.Depth200
cfg.Interval = "15"
mgr := bybit.NewManagerWithConfig(cfg)// Bitget: spot market, books5 depth
cfg := bitget.DefaultManagerConfig()
cfg.InstType = bitget.InstSpot
cfg.DepthChannel = bitget.DepthBooks5
cfg.Interval = "5m"
mgr := bitget.NewManagerWithConfig(cfg)Every symbol produces immutable snapshots every second (configurable). Read them lock-free from any goroutine.
// Snapshot is an immutable, point-in-time view of a symbol's state.
type Snapshot struct {
Timestamp time.Time // when the snapshot was taken
Candles []models.CandleHLCV // historical + live candle bars
DepthStore *depth.Store // order book metrics with time-bucketed storage
Trades []models.NormalizedTrade // recent trades, normalized across exchanges
}snap := mgr.GetSnapshot("BTCUSDT")
if snap == nil {
// No data yet
return
}
// Candles (OHLCV)
for _, c := range snap.Candles {
fmt.Printf("ts=%d O=%.2f H=%.2f L=%.2f C=%.2f V=%.4f\n",
c.Ts, c.Open, c.High, c.Low, c.Close, c.Volume)
}
// Trades (normalized across exchanges)
for _, t := range snap.Trades {
fmt.Printf("[%s] %s %.4f @ %.2f\n", t.Exchange, t.Side, t.SizeUSD, t.Price)
}
// Depth metrics
latest := snap.DepthStore.GetLatest()
if latest != nil {
fmt.Printf("Spread: %.2f bps | Imbalance: %.3f | Mid: %.2f\n",
latest.SpreadBps, latest.ImbalanceRatio10, latest.MidPrice)
}
// Historical depth (last 30 seconds)
recent := snap.DepthStore.GetLastNSeconds(30)type CandleHLCV struct {
Ts int64 // Unix millisecond timestamp
Open float64
High float64
Low float64
Close float64
Volume float64
}Helper methods: GetTimestamp(), HL2(), HLC3().
Lighter candle without Open/Volume — used by ATR, Bollinger, and Support/Resistance indicators.
type CandleHLC struct {
High float64
Low float64
Close float64
}Unified trade format across all exchanges.
type NormalizedTrade struct {
Timestamp int64 // Unix milliseconds
Price float64
Size float64 // base currency
SizeUSD float64
Side string // "buy" or "sell"
TradeID string
Symbol string // e.g. "BTCUSDT"
Exchange string // "binance", "bybit", "bitget"
}Workers fire callbacks on every state change. Plug your own logic:
worker := mgr.GetOrCreateWorker("BTCUSDT")
// Called after every candle update (same-minute update or new bar)
worker.SetOnCandleUpdate(func(candles []models.CandleHLCV) {
if len(candles) >= 14 {
closes := make([]float64, len(candles))
for i, c := range candles {
closes[i] = c.Close
}
rsi := indicators.CalculateRSI(closes, 14)
fmt.Printf("RSI(14): %.2f\n", rsi)
}
})
// Called after every depth update with the computed metrics
worker.SetOnDepthUpdate(func(m depth.DepthMetrics) {
fmt.Printf("Bid liq: $%.0f | Ask liq: $%.0f | Spread: %.2f bps\n",
m.BidLiquidity10, m.AskLiquidity10, m.SpreadBps)
})
// Called after every trade
worker.SetOnTradeUpdate(func(t models.NormalizedTrade) {
if t.SizeUSD > 50000 {
fmt.Printf("LARGE %s: $%.0f @ %.2f\n", t.Side, t.SizeUSD, t.Price)
}
})cfg := ws.DefaultWorkerConfig()
cfg.CandleChSize = 128 // Candle channel buffer (default 64)
cfg.DepthChSize = 4096 // Depth channel buffer (default 2048)
cfg.TradeChSize = 4096 // Trade channel buffer (default 2048)
cfg.MaxCandles = 1500 // Candle history length (default 750)
cfg.MaxNormTrades = 5000 // Normalized trade buffer (default 2000)
cfg.MaxDepthMetrics = 20000 // Depth metric storage (default 10000)
cfg.MaxDepthSeconds = 1800 // Keep 30 min of depth data (default 1000s)
cfg.RecentMetricsSize = 200 // Fast-access depth buffer (default 100)
cfg.SnapshotInterval = 500 * time.Millisecond // Snapshot frequency (default 1s)
// Pass to any exchange manager
mgr := binance.NewManagerWithConfig(binance.ManagerConfig{WorkerConfig: cfg})Fetch candles from exchange REST APIs:
import "github.com/KhavrTrading/flowex/candles"
// Binance: up to 1500 per request
data, err := candles.FetchBinanceCandles("BTCUSDT", "1m", 750)
data, err := candles.FetchBinanceCandles("ETHUSDT", "5m", 500)
// Bybit: up to 200 per request
data, err := candles.FetchBybitCandles("BTCUSDT", "1", 200)
// Bitget: up to 200 per request
data, err := candles.FetchBitgetCandles("BTCUSDT", "1m", 200)
// Also available as CandleHLC (without open/volume):
hlc, err := candles.FetchBinanceCandleHLC("BTCUSDT", "1m", 750)import "github.com/KhavrTrading/flowex/candles"
oneMin, _ := candles.FetchBinanceCandles("BTCUSDT", "1m", 750)
fiveMin := candles.Aggregate1mTo5m(oneMin) // 1m -> 5m
fifteenMin := candles.Aggregate1mTo15m(oneMin) // 1m -> 15m
// Custom duration (e.g., 3 minutes)
threeMin := candles.Aggregate(oneMin, 3*60*1000)Built-in standard indicators that work on []float64 or []models.CandleHLC:
import "github.com/KhavrTrading/flowex/indicators"
closes := []float64{100, 101, 99, 102, 103, ...}
// EMA
ema20 := indicators.CalculateEMA(closes, 20)
emaSeries := indicators.CalculateEMAList(closes, 20) // full series
// RSI
rsi := indicators.CalculateRSI(closes, 14)
// MACD (12/26/9)
macd, signal, histogram := indicators.CalculateMACD(closes)
// Stochastic RSI
stochRSI := indicators.CalculateStochRSI(closes, 14, 14)
// ATR (needs CandleHLC with High/Low/Close)
atr := indicators.CalculateATR(hlcCandles, 14)
atr, threshold, rising, err := indicators.EvaluateATR(hlcCandles, 14, 0.02)
// Bollinger Mean Deviation
score, oscSD := indicators.BMD(hlcCandles, "1m")
score, oscSD = indicators.BollingerMeanDeviation(hlcCandles, 20, 25)
// Support/Resistance (pivot-based)
supportPct, resistancePct, srScore := indicators.SupportResistance(hlcCandles, 5, 20) +-----------+
WebSocket -----> | Client | (per-symbol connection, auto-reconnect, heartbeat)
+-----+-----+
|
callbacks (non-blocking)
|
+-----v-----+
| Worker | (per-symbol actor goroutine, owns ALL state)
| |
| candles | <- channel (buf 64)
| depth | <- channel (buf 2048)
| trades | <- channel (buf 2048)
| |
| hooks | -> user callbacks (OnCandle, OnDepth, OnTrade)
| |
+-----+-----+
|
atomic.Store (every 1s)
|
+-----v-----+
| Snapshot | (immutable, lock-free reads from any goroutine)
+-----------+
- One goroutine per symbol -- no locks needed for state mutation
- Non-blocking enqueue -- if channel is full, oldest message is dropped (never blocks WS read)
- Atomic snapshots -- readers never contend with the writer
- Auto-reconnect -- connection drops trigger reconnect + resubscribe to all active streams
| Package | Description |
|---|---|
ws/ |
Core: BaseClient, SymbolWorker (actor), BaseManager (pool), PubSub[T], interfaces |
binance/ |
Binance Futures adapter (depth5/10/20, aggTrade/trade, all candle intervals) |
bybit/ |
Bybit V5 Linear adapter (depth 1/50/200/500, all candle intervals) |
bitget/ |
Bitget V2 adapter (books/books5/books15, spot/futures, all candle intervals) |
models/ |
CandleHLC, CandleHLCV, NormalizedTrade, TickerData |
depth/ |
Order book metrics (75 fields) + time-bucketed store with enrichment |
indicators/ |
EMA, RSI, ATR, MACD, StochRSI, Bollinger, Support/Resistance |
indicators/technical/ |
Batch-optimized calculator, ADX, MMI, signal types, movement tracking |
candles/ |
REST fetchers (Binance/Bybit/Bitget) + timeframe aggregator |
See DOCUMENTATION.md for the full API reference — all 75 depth metric fields, store query methods, worker monitoring, historical data seeding, and more.
Only two:
github.com/gorilla/websocketgithub.com/sirupsen/logrus
MIT