package main import ( "context" "log" "net" "sync" "sync/atomic" "time" ) var pktPool = sync.Pool{ New: func() interface{} { return make([]byte, 2048) }, } func getPktBuf(size int) []byte { b := pktPool.Get().([]byte) if cap(b) < size { b = make([]byte, size) } return b[:size] } func putPktBuf(b []byte) { if cap(b) < 2048 { return } pktPool.Put(b[:cap(b)]) } const ( returnChBuf = 384 // chunkSize — количество последовательных пакетов, отправляемых в один worker // перед переключением на следующий. // // Зачем: при round-robin (chunk=1) каждый пакет летит через разный TURN relay // с разным latency, что приводит к reorder на сервере. TCP внутри WireGuard // интерпретирует reorder как потери → cwnd collapse → скорость single-flow // падает до ~8 KB/s. // // С chunk=8: пакеты в пределах одного TCP congestion window (~10 пакетов при // initial cwnd) уходят через один TURN relay → прилетают по порядку. // Reorder возможен только между chunk-границами, что покрывается WG replay // window (2048 пакетов). // // Агрегатная пропускная способность не меняется — все workers загружены // равномерно по-прежнему (каждый получает 1/N от общего трафика за время). chunkSize = 8 ) type WorkerSlot struct { ID int SendCh chan []byte } type Dispatcher struct { localConn net.PacketConn clientAddr atomic.Pointer[net.Addr] mu sync.Mutex workers []*WorkerSlot rrIndex int rrCount int // сколько пакетов отправлено в текущий worker (0..chunkSize-1) ReturnCh chan []byte ctx context.Context cancel context.CancelFunc wg sync.WaitGroup stats *Stats } func NewDispatcher(ctx context.Context, localConn net.PacketConn, stats *Stats) *Dispatcher { dctx, dcancel := context.WithCancel(ctx) d := &Dispatcher{ localConn: localConn, ReturnCh: make(chan []byte, returnChBuf), ctx: dctx, cancel: dcancel, stats: stats, } d.wg.Add(2) go d.readLoop() go d.writeLoop() return d } func (d *Dispatcher) Shutdown() { d.cancel() d.wg.Wait() } func (d *Dispatcher) Register(w *WorkerSlot) { d.mu.Lock() d.workers = append(d.workers, w) count := len(d.workers) d.mu.Unlock() log.Printf("[ДИСП] Воркер #%d зарегистрирован (всего: %d)", w.ID, count) } func (d *Dispatcher) Unregister(slot *WorkerSlot) { d.mu.Lock() for i, w := range d.workers { if w == slot { d.workers = append(d.workers[:i], d.workers[i+1:]...) break } } remaining := len(d.workers) // Подстраховка: если текущий rrIndex вылез за границу после удаления if d.rrIndex >= remaining && remaining > 0 { d.rrIndex = d.rrIndex % remaining } d.rrCount = 0 d.mu.Unlock() log.Printf("[ДИСП] Воркер #%d отключён (осталось: %d)", slot.ID, remaining) } // readLoop читает WireGuard-пакеты и распределяет по workers chunk'ами. // // Логика: отправляем chunkSize подряд пакетов в один worker, потом переходим // к следующему. Если текущий worker перегружен (канал полный) — немедленно // ищем свободный worker и начинаем новый chunk на нём. Это гарантирует: // - В рамках chunk пакеты идут через один TURN relay → in-order delivery // - Между chunks — разные relay → максимальная агрегатная скорость // - Нет блокировки, нет буферизации, нет дополнительного latency func (d *Dispatcher) readLoop() { defer d.wg.Done() buf := make([]byte, readBufSize) for { if err := d.ctx.Err(); err != nil { return } n, addr, err := d.localConn.ReadFrom(buf) if err != nil { if d.ctx.Err() != nil { return } time.Sleep(10 * time.Millisecond) continue } d.clientAddr.Store(&addr) atomic.AddInt64(&d.stats.TotalBytesUp, int64(n)) pkt := getPktBuf(n) copy(pkt, buf[:n]) d.mu.Lock() nw := len(d.workers) if nw == 0 { d.mu.Unlock() putPktBuf(pkt) continue } sent := false idx := d.rrIndex % nw // Пробуем текущий worker (chunk affinity) w := d.workers[idx] select { case w.SendCh <- pkt: sent = true d.rrCount++ if d.rrCount >= chunkSize { d.rrIndex = (idx + 1) % nw d.rrCount = 0 } default: // Текущий worker перегружен — ищем свободный, начинаем новый chunk for i := 1; i < nw; i++ { altIdx := (idx + i) % nw select { case d.workers[altIdx].SendCh <- pkt: sent = true d.rrIndex = altIdx d.rrCount = 1 // первый пакет нового chunk'а уже отправлен default: } if sent { break } } } if !sent { // Все workers перегружены — сдвигаем указатель, пакет дропается d.rrIndex = (idx + 1) % nw d.rrCount = 0 putPktBuf(pkt) } d.mu.Unlock() } } func (d *Dispatcher) writeLoop() { defer d.wg.Done() for { select { case <-d.ctx.Done(): return case pkt := <-d.ReturnCh: addrPtr := d.clientAddr.Load() if addrPtr == nil { putPktBuf(pkt) continue } addr := *addrPtr if _, err := d.localConn.WriteTo(pkt, addr); err != nil { if d.ctx.Err() != nil { putPktBuf(pkt) return } } atomic.AddInt64(&d.stats.TotalBytesDown, int64(len(pkt))) putPktBuf(pkt) } } }