Files
Zern-BlackOut/go_client/session.go
T
2026-05-26 22:48:52 +03:00

428 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"crypto/tls"
"fmt"
"log"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/cbeuw/connutil"
"github.com/pion/dtls/v3"
"github.com/pion/dtls/v3/pkg/crypto/selfsign"
"github.com/pion/logging"
"github.com/pion/turn/v5"
)
const (
workerSendBuf = 128
sessionReadTimeout = 30 * time.Minute // Increased from 60s to 30min
readBufSize = 1600
socketBufSize = 625 * 1024
keepaliveByte = 0xFF // DTLS-level keepalive marker
keepaliveInterval = 15 * time.Second
)
// Handshake semaphore: limit to 3 concurrent DTLS handshakes
var handshakeSem = make(chan struct{}, 3)
// NullLoggerFactory подавляет логи pion
type NullLoggerFactory struct{}
func (n *NullLoggerFactory) NewLogger(_ string) logging.LeveledLogger { return &NullLogger{} }
type NullLogger struct{}
func (n *NullLogger) Trace(_ string) {}
func (n *NullLogger) Tracef(_ string, _ ...interface{}) {}
func (n *NullLogger) Debug(_ string) {}
func (n *NullLogger) Debugf(_ string, _ ...interface{}) {}
func (n *NullLogger) Info(_ string) {}
func (n *NullLogger) Infof(_ string, _ ...interface{}) {}
func (n *NullLogger) Warn(_ string) {}
func (n *NullLogger) Warnf(_ string, _ ...interface{}) {}
func (n *NullLogger) Error(_ string) {}
func (n *NullLogger) Errorf(_ string, _ ...interface{}) {}
// connectedUDPConn — обёртка для connected UDP socket → PacketConn
type connectedUDPConn struct{ *net.UDPConn }
func (c *connectedUDPConn) WriteTo(p []byte, _ net.Addr) (int, error) { return c.Write(p) }
func RunSession(
ctx context.Context,
tp *TurnParams,
peer *net.UDPAddr,
d *Dispatcher,
localPort string,
getConfig bool,
configCh chan<- string,
sessionID int,
creds *Credentials,
deviceID, password string,
stats *Stats,
) (bool, error) {
configDelivered := false
if len(creds.TurnURLs) == 0 {
return false, fmt.Errorf("нет TURN URL в учетных данных")
}
selectedURL := creds.TurnURLs[sessionID%len(creds.TurnURLs)]
urlhost, urlport, err := net.SplitHostPort(selectedURL)
if err != nil {
return false, fmt.Errorf("разбор TURN URL %q: %w", selectedURL, err)
}
if tp.Host != "" {
urlhost = tp.Host
}
if tp.Port != "" {
urlport = tp.Port
}
turnAddr := net.JoinHostPort(urlhost, urlport)
// Транспорт: всегда UDP
resolved, err := net.ResolveUDPAddr("udp", turnAddr)
if err != nil {
return false, fmt.Errorf("резолв TURN: %w", err)
}
c, err := net.DialUDP("udp", nil, resolved)
if err != nil {
return false, fmt.Errorf("подключение TURN UDP: %w", err)
}
defer c.Close()
_ = c.SetReadBuffer(socketBufSize)
_ = c.SetWriteBuffer(socketBufSize)
var turnConn net.PacketConn = &connectedUDPConn{c}
log.Printf("[СЕССИЯ #%d] TURN UDP (%s)", sessionID, turnAddr)
// RequestedAddressFamily
var addrFamily turn.RequestedAddressFamily
if peer.IP.To4() != nil {
addrFamily = turn.RequestedAddressFamilyIPv4
} else {
addrFamily = turn.RequestedAddressFamilyIPv6
}
// TURN Client (pion/turn/v5)
tc, err := turn.NewClient(&turn.ClientConfig{
STUNServerAddr: turnAddr,
TURNServerAddr: turnAddr,
Conn: turnConn,
Username: creds.User,
Password: creds.Pass,
RequestedAddressFamily: addrFamily,
LoggerFactory: &NullLoggerFactory{},
})
if err != nil {
return false, fmt.Errorf("TURN клиент: %w", err)
}
defer tc.Close()
if err = tc.Listen(); err != nil {
return false, fmt.Errorf("TURN Listen: %w", err)
}
relay, err := tc.Allocate()
if err != nil {
if isAuthError(err) {
handleAuthError(creds.CacheStreamID)
}
errStr := err.Error()
if strings.Contains(errStr, "Quota") || strings.Contains(errStr, "486") {
return false, fmt.Errorf("TURN квота: %w", err)
}
return false, fmt.Errorf("TURN Allocate: %w", err)
}
defer relay.Close()
// Reset error count on successful allocation
getStreamCache(creds.CacheStreamID).errorCount.Store(0)
log.Printf("[СЕССИЯ #%d] Relay: %s", sessionID, relay.LocalAddr())
// Pipe для DTLS ↔ TURN relay
pipeA, pipeB := connutil.AsyncPacketPipe()
sessCtx, sessCancel := context.WithCancel(ctx)
defer sessCancel()
// Keepalive goroutine (TURN binding request)
var sessionWg sync.WaitGroup
sessionWg.Add(1)
go func() {
defer sessionWg.Done()
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-sessCtx.Done():
return
case <-t.C:
tc.SendBindingRequest()
}
}
}()
// Relay ↔ Pipe proxy (with RTP obfuscation)
var relayWg sync.WaitGroup
relayWg.Add(2)
useWrap := len(tp.WrapKey) == wrapKeyLen
// Initialize obfs config per session
var obfsCfg *ObfsConfig
var obfsWriteState *ObfsState
if useWrap {
obfsCfg = NewObfsConfig()
obfsWriteState = NewObfsState()
}
stopRelay := context.AfterFunc(sessCtx, func() {
_ = relay.SetDeadline(time.Now())
_ = pipeA.SetDeadline(time.Now())
})
defer stopRelay()
// relay → pipeA (UNWRAP: strip RTP header + decrypt)
go func() {
defer relayWg.Done()
defer sessCancel()
// Max incoming: RTP header (12) + AEAD tag (16) + padding.
readBufLen := readBufSize + 80
buf := make([]byte, readBufLen)
plain := make([]byte, readBufSize)
for {
n, _, readErr := relay.ReadFrom(buf)
if readErr != nil {
return
}
payload := buf[:n]
if useWrap {
if !obfsIsRTPPacket(payload) {
log.Printf("[СЕССИЯ #%d] OBFS unwrap: unexpected packet (n=%d)", sessionID, n)
continue
}
m, wrapErr := obfsUnwrapPacket(tp.WrapKey, payload, plain)
if wrapErr != nil {
log.Printf("[СЕССИЯ #%d] OBFS unwrap: %v (n=%d)", sessionID, wrapErr, n)
continue
}
payload = plain[:m]
}
if _, writeErr := pipeA.WriteTo(payload, peer); writeErr != nil {
return
}
}
}()
// pipeA → relay (WRAP: add RTP header + encrypt)
go func() {
defer relayWg.Done()
defer sessCancel()
b := make([]byte, readBufSize)
for {
n, _, readErr := pipeA.ReadFrom(b)
if readErr != nil {
return
}
out := b[:n]
if useWrap {
if obfsCfg != nil && obfsWriteState != nil {
wrapped, wrapErr := obfsWrapPacket(tp.WrapKey, out, obfsCfg, obfsWriteState)
if wrapErr != nil {
log.Printf("[СЕССИЯ #%d] OBFS wrap: %v", sessionID, wrapErr)
return
}
out = wrapped
}
}
if _, writeErr := relay.WriteTo(out, peer); writeErr != nil {
return
}
}
}()
// DTLS с поддержкой Connection ID (без SNI)
cert, err := selfsign.GenerateSelfSigned()
if err != nil {
return false, fmt.Errorf("генерация сертификата: %w", err)
}
// Acquire handshake semaphore
select {
case handshakeSem <- struct{}{}:
case <-sessCtx.Done():
return false, sessCtx.Err()
}
dtlsCfg := &dtls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
ExtendedMasterSecret: dtls.RequireExtendedMasterSecret,
CipherSuites: []dtls.CipherSuiteID{dtls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256},
ConnectionIDGenerator: dtls.OnlySendCIDGenerator(),
// No ServerName (SNI) — less detectable by DPI
}
dtlsConn, err := dtls.Client(pipeB, peer, dtlsCfg)
if err != nil {
<-handshakeSem
return false, fmt.Errorf("DTLS клиент: %w", err)
}
defer dtlsConn.Close()
hctx, hcancel := context.WithTimeout(sessCtx, 20*time.Second)
log.Printf("[ВОРКЕР #%d] [DTLS] Рукопожатие (Handshake)...", sessionID)
err = dtlsConn.HandshakeContext(hctx)
hcancel()
<-handshakeSem // RELEASE SEMAPHORE IMMEDIATELY AFTER HANDSHAKE
if err != nil {
if useWrap {
errStr := strings.ToLower(err.Error())
if strings.Contains(errStr, "deadline") || strings.Contains(errStr, "timeout") {
return false, fmt.Errorf("WRAP_AUTH_TIMEOUT: DTLS timeout, пароль/WRAP не подтверждён")
}
}
return false, fmt.Errorf("DTLS хендшейк: %w", err)
}
log.Printf("[ВОРКЕР #%d] [DTLS] Соединение установлено ✓", sessionID)
atomic.AddInt32(&stats.ActiveConnections, 1)
defer atomic.AddInt32(&stats.ActiveConnections, -1)
// Запрос конфига
if getConfig && configCh != nil {
conf, confErr := RequestConfig(dtlsConn, localPort, deviceID, password)
if confErr != nil {
errStr := confErr.Error()
if strings.Contains(errStr, "FATAL_AUTH") {
return false, confErr
}
log.Printf("[ВОРКЕР #%d] Ошибка конфига: %v", sessionID, confErr)
} else if conf != "" {
select {
case configCh <- conf:
configDelivered = true
log.Printf("[ВОРКЕР #%d] Конфиг получен", sessionID)
default:
configDelivered = true
log.Printf("[ВОРКЕР #%d] Конфиг уже был доставлен другим воркером", sessionID)
}
} else {
log.Printf("[ВОРКЕР #%d] Сервер ещё не выдал WireGuard-конфиг, повторим позже", sessionID)
}
}
log.Printf("[ВОРКЕР #%d] [READY] Туннель готов к работе ✓", sessionID)
// Регистрация в диспетчере
slot := &WorkerSlot{
ID: sessionID,
SendCh: make(chan []byte, workerSendBuf),
}
d.Register(slot)
defer d.Unregister(slot)
// Proxy DTLS ↔ Dispatcher
var proxyWg sync.WaitGroup
proxyWg.Add(3) // +1 for keepalive goroutine
stopDTLS := context.AfterFunc(sessCtx, func() {
_ = dtlsConn.SetDeadline(time.Now())
})
defer stopDTLS()
// DTLS Keepalive: prevents TURN allocation timeout and DTLS idle disconnect
go func() {
defer proxyWg.Done()
t := time.NewTicker(keepaliveInterval)
defer t.Stop()
ping := []byte{keepaliveByte}
for {
select {
case <-sessCtx.Done():
return
case <-t.C:
_ = dtlsConn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if _, err := dtlsConn.Write(ping); err != nil {
return
}
}
}
}()
// Writer: dispatcher → DTLS
go func() {
defer proxyWg.Done()
defer sessCancel()
for {
select {
case <-sessCtx.Done():
return
case pkt, ok := <-slot.SendCh:
if !ok {
return
}
_ = dtlsConn.SetWriteDeadline(time.Now().Add(sessionReadTimeout))
_, writeErr := dtlsConn.Write(pkt)
putPktBuf(pkt)
if writeErr != nil {
log.Printf("[ВОРКЕР #%d] Ошибка Writer: %v", sessionID, writeErr)
return
}
}
}
}()
// Reader: DTLS → dispatcher
go func() {
defer proxyWg.Done()
defer sessCancel()
b := make([]byte, 2000)
for {
_ = dtlsConn.SetReadDeadline(time.Now().Add(sessionReadTimeout))
n, readErr := dtlsConn.Read(b)
if readErr != nil {
if sessCtx.Err() != nil {
return
}
if ne, ok := readErr.(net.Error); ok && ne.Timeout() {
continue
}
log.Printf("[ВОРКЕР #%d] Ошибка Reader: %v", sessionID, readErr)
return
}
// Skip keepalive pong from server
if n == 1 && b[0] == keepaliveByte {
continue
}
pkt := getPktBuf(n)
copy(pkt, b[:n])
select {
case d.ReturnCh <- pkt:
case <-sessCtx.Done():
putPktBuf(pkt)
return
}
}
}()
proxyWg.Wait()
sessCancel()
relayWg.Wait()
sessionWg.Wait()
_ = pipeA.Close()
_ = pipeB.Close()
log.Printf("[СЕССИЯ #%d] Завершена", sessionID)
return configDelivered, nil
}