Files
2026-05-26 22:48:52 +03:00

225 lines
5.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"log"
"net"
"sync"
"sync/atomic"
"time"
)
var pktPool = sync.Pool{
New: func() interface{} {
return make([]byte, 2048)
},
}
func getPktBuf(size int) []byte {
b := pktPool.Get().([]byte)
if cap(b) < size {
b = make([]byte, size)
}
return b[:size]
}
func putPktBuf(b []byte) {
if cap(b) < 2048 {
return
}
pktPool.Put(b[:cap(b)])
}
const (
returnChBuf = 384
// chunkSize — количество последовательных пакетов, отправляемых в один worker
// перед переключением на следующий.
//
// Зачем: при round-robin (chunk=1) каждый пакет летит через разный TURN relay
// с разным latency, что приводит к reorder на сервере. TCP внутри WireGuard
// интерпретирует reorder как потери → cwnd collapse → скорость single-flow
// падает до ~8 KB/s.
//
// С chunk=8: пакеты в пределах одного TCP congestion window (~10 пакетов при
// initial cwnd) уходят через один TURN relay → прилетают по порядку.
// Reorder возможен только между chunk-границами, что покрывается WG replay
// window (2048 пакетов).
//
// Агрегатная пропускная способность не меняется — все workers загружены
// равномерно по-прежнему (каждый получает 1/N от общего трафика за время).
chunkSize = 8
)
type WorkerSlot struct {
ID int
SendCh chan []byte
}
type Dispatcher struct {
localConn net.PacketConn
clientAddr atomic.Pointer[net.Addr]
mu sync.Mutex
workers []*WorkerSlot
rrIndex int
rrCount int // сколько пакетов отправлено в текущий worker (0..chunkSize-1)
ReturnCh chan []byte
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
stats *Stats
}
func NewDispatcher(ctx context.Context, localConn net.PacketConn, stats *Stats) *Dispatcher {
dctx, dcancel := context.WithCancel(ctx)
d := &Dispatcher{
localConn: localConn,
ReturnCh: make(chan []byte, returnChBuf),
ctx: dctx,
cancel: dcancel,
stats: stats,
}
d.wg.Add(2)
go d.readLoop()
go d.writeLoop()
return d
}
func (d *Dispatcher) Shutdown() {
d.cancel()
d.wg.Wait()
}
func (d *Dispatcher) Register(w *WorkerSlot) {
d.mu.Lock()
d.workers = append(d.workers, w)
count := len(d.workers)
d.mu.Unlock()
log.Printf("[ДИСП] Воркер #%d зарегистрирован (всего: %d)", w.ID, count)
}
func (d *Dispatcher) Unregister(slot *WorkerSlot) {
d.mu.Lock()
for i, w := range d.workers {
if w == slot {
d.workers = append(d.workers[:i], d.workers[i+1:]...)
break
}
}
remaining := len(d.workers)
// Подстраховка: если текущий rrIndex вылез за границу после удаления
if d.rrIndex >= remaining && remaining > 0 {
d.rrIndex = d.rrIndex % remaining
}
d.rrCount = 0
d.mu.Unlock()
log.Printf("[ДИСП] Воркер #%d отключён (осталось: %d)", slot.ID, remaining)
}
// readLoop читает WireGuard-пакеты и распределяет по workers chunk'ами.
//
// Логика: отправляем chunkSize подряд пакетов в один worker, потом переходим
// к следующему. Если текущий worker перегружен (канал полный) — немедленно
// ищем свободный worker и начинаем новый chunk на нём. Это гарантирует:
// - В рамках chunk пакеты идут через один TURN relay → in-order delivery
// - Между chunks — разные relay → максимальная агрегатная скорость
// - Нет блокировки, нет буферизации, нет дополнительного latency
func (d *Dispatcher) readLoop() {
defer d.wg.Done()
buf := make([]byte, readBufSize)
for {
if err := d.ctx.Err(); err != nil {
return
}
n, addr, err := d.localConn.ReadFrom(buf)
if err != nil {
if d.ctx.Err() != nil {
return
}
time.Sleep(10 * time.Millisecond)
continue
}
d.clientAddr.Store(&addr)
atomic.AddInt64(&d.stats.TotalBytesUp, int64(n))
pkt := getPktBuf(n)
copy(pkt, buf[:n])
d.mu.Lock()
nw := len(d.workers)
if nw == 0 {
d.mu.Unlock()
putPktBuf(pkt)
continue
}
sent := false
idx := d.rrIndex % nw
// Пробуем текущий worker (chunk affinity)
w := d.workers[idx]
select {
case w.SendCh <- pkt:
sent = true
d.rrCount++
if d.rrCount >= chunkSize {
d.rrIndex = (idx + 1) % nw
d.rrCount = 0
}
default:
// Текущий worker перегружен — ищем свободный, начинаем новый chunk
for i := 1; i < nw; i++ {
altIdx := (idx + i) % nw
select {
case d.workers[altIdx].SendCh <- pkt:
sent = true
d.rrIndex = altIdx
d.rrCount = 1 // первый пакет нового chunk'а уже отправлен
default:
}
if sent {
break
}
}
}
if !sent {
// Все workers перегружены — сдвигаем указатель, пакет дропается
d.rrIndex = (idx + 1) % nw
d.rrCount = 0
putPktBuf(pkt)
}
d.mu.Unlock()
}
}
func (d *Dispatcher) writeLoop() {
defer d.wg.Done()
for {
select {
case <-d.ctx.Done():
return
case pkt := <-d.ReturnCh:
addrPtr := d.clientAddr.Load()
if addrPtr == nil {
putPktBuf(pkt)
continue
}
addr := *addrPtr
if _, err := d.localConn.WriteTo(pkt, addr); err != nil {
if d.ctx.Err() != nil {
putPktBuf(pkt)
return
}
}
atomic.AddInt64(&d.stats.TotalBytesDown, int64(len(pkt)))
putPktBuf(pkt)
}
}
}