Initial commit: JustAMessenger v0.1.0

Серверная часть (Go):
- WebSocket сервер с бинарным протоколом
- XChaCha20-Poly1305 шифрование
- zstd сжатие с дедупликацией (64KB чанки)
- SQLite хранилище (WAL режим)
- Управление гильдиями, каналами, ролями
- Федерация между серверами (ed25519)
- REST API + WebSocket endpoints

Клиентская часть (Flutter):
- Material Design 3 тёмная тема (Discord-like)
- WebSocket соединение с сервером
- Экраны: сплэш, логин, домашний, гильдии, чат
- Модели: пользователи, гильдии, каналы, сообщения, роли
- Сервисы: соединение, API, криптография, тема
- Виджеты: иконки гильдий, сообщения, ввод чата
- Web сборка (PWA)

Документация:
- AGENTS.md — контекст для ИИ ассистентов
- docs/protocol.md — спецификация протокола
This commit is contained in:
SashegDev
2026-06-06 22:39:14 +00:00
commit 096c4d0a2d
40 changed files with 5054 additions and 0 deletions
+454
View File
@@ -0,0 +1,454 @@
package api
import (
"encoding/json"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strconv"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/justamessenger/server/internal/channel"
"github.com/justamessenger/server/internal/config"
"github.com/justamessenger/server/internal/database"
"github.com/justamessenger/server/internal/federation"
"github.com/justamessenger/server/internal/models"
"github.com/justamessenger/server/internal/role"
"github.com/justamessenger/server/internal/server"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool { return true },
}
type HTTPServer struct {
srv *server.Server
cfg *config.Config
db *database.DB
chanM *channel.Manager
roleM *role.Manager
fedM *federation.Manager
mux *http.ServeMux
httpS *http.Server
}
func New(srv *server.Server, cfg *config.Config) *HTTPServer {
h := &HTTPServer{
srv: srv,
cfg: cfg,
db: srv.GetDB(),
chanM: srv.GetChannelManager(),
roleM: srv.GetRoleManager(),
fedM: srv.GetFederationManager(),
mux: http.NewServeMux(),
}
h.registerRoutes()
return h
}
func (h *HTTPServer) registerRoutes() {
h.mux.HandleFunc("/ws", h.handleWebSocket)
h.mux.HandleFunc("/api/health", h.handleHealth)
h.mux.HandleFunc("/api/guilds", h.handleGuilds)
h.mux.HandleFunc("/api/guilds/", h.handleGuild)
h.mux.HandleFunc("/api/channels", h.handleChannels)
h.mux.HandleFunc("/api/channels/", h.handleChannel)
h.mux.HandleFunc("/api/roles/", h.handleRoles)
h.mux.HandleFunc("/api/messages/", h.handleMessages)
h.mux.HandleFunc("/api/users/", h.handleUsers)
h.mux.HandleFunc("/api/upload", h.handleUpload)
h.mux.HandleFunc("/api/files/", h.handleFile)
h.mux.HandleFunc("/federation/receive", h.handleFederationReceive)
fs := http.FileServer(http.Dir(filepath.Join(h.cfg.DataDir, "files")))
h.mux.Handle("/files/", http.StripPrefix("/files/", fs))
}
func (h *HTTPServer) Start(addr string) error {
h.httpS = &http.Server{
Addr: addr,
Handler: h.mux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
}
log.Printf("API server listening on %s", addr)
return h.httpS.ListenAndServe()
}
func (h *HTTPServer) handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade failed: %v", err)
return
}
h.srv.HandleConnection(conn)
}
func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "ok",
"name": h.cfg.ServerName,
"version": "0.1.0",
"uptime": time.Now().Unix(),
})
}
func (h *HTTPServer) handleGuilds(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
userID := r.URL.Query().Get("user_id")
if userID == "" {
http.Error(w, "user_id required", http.StatusBadRequest)
return
}
rows, err := h.db.Query(
`SELECT g.id, g.name, g.owner_id, g.icon, g.description, g.created_at
FROM guilds g
INNER JOIN guild_members gm ON g.id = gm.guild_id
WHERE gm.user_id = ?`, userID,
)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer rows.Close()
var guilds []*models.Guild
for rows.Next() {
g := &models.Guild{}
rows.Scan(&g.ID, &g.Name, &g.OwnerID, &g.Icon, &g.Description, &g.CreatedAt)
guilds = append(guilds, g)
}
json.NewEncoder(w).Encode(guilds)
case http.MethodPost:
var input struct {
Name string `json:"name"`
OwnerID string `json:"owner_id"`
Description string `json:"description"`
}
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
http.Error(w, "invalid body", http.StatusBadRequest)
return
}
id := uuid.New().String()
_, err := h.db.Exec(
`INSERT INTO guilds (id, name, owner_id, description) VALUES (?, ?, ?, ?)`,
id, input.Name, input.OwnerID, input.Description,
)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
h.db.Exec(
`INSERT INTO guild_members (guild_id, user_id) VALUES (?, ?)`,
id, input.OwnerID,
)
h.roleM.CreateDefaultRoles(id)
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(map[string]string{"id": id})
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
func (h *HTTPServer) handleGuild(w http.ResponseWriter, r *http.Request) {
id := extractID(r.URL.Path, "/api/guilds/")
if id == "" {
http.Error(w, "guild id required", http.StatusBadRequest)
return
}
switch r.Method {
case http.MethodGet:
var g models.Guild
err := h.db.QueryRow(
`SELECT id, name, owner_id, icon, description, created_at FROM guilds WHERE id = ?`, id,
).Scan(&g.ID, &g.Name, &g.OwnerID, &g.Icon, &g.Description, &g.CreatedAt)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
channels, _ := h.chanM.ListByGuild(id)
roles, _ := h.roleM.ListByGuild(id)
categories, _ := h.chanM.ListCategories(id)
json.NewEncoder(w).Encode(map[string]interface{}{
"guild": g,
"channels": channels,
"roles": roles,
"categories": categories,
})
case http.MethodDelete:
h.db.Exec(`DELETE FROM guilds WHERE id = ?`, id)
w.WriteHeader(http.StatusNoContent)
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
func (h *HTTPServer) handleChannels(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var input struct {
GuildID string `json:"guild_id"`
CategoryID string `json:"category_id"`
Name string `json:"name"`
Type string `json:"type"`
Topic string `json:"topic"`
Position int `json:"position"`
}
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
http.Error(w, "invalid body", http.StatusBadRequest)
return
}
ch, err := h.chanM.Create(input.GuildID, input.CategoryID, input.Name, input.Type, input.Topic, input.Position)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(ch)
}
func (h *HTTPServer) handleChannel(w http.ResponseWriter, r *http.Request) {
id := extractID(r.URL.Path, "/api/channels/")
if id == "" {
http.Error(w, "channel id required", http.StatusBadRequest)
return
}
switch r.Method {
case http.MethodGet:
ch, err := h.chanM.Get(id)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(ch)
case http.MethodPut:
var input struct {
Name string `json:"name"`
Topic string `json:"topic"`
Position int `json:"position"`
}
json.NewDecoder(r.Body).Decode(&input)
if err := h.chanM.Update(id, input.Name, input.Topic, input.Position); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
case http.MethodDelete:
if err := h.chanM.Delete(id); err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.WriteHeader(http.StatusNoContent)
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
func (h *HTTPServer) handleRoles(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodPost:
var input struct {
GuildID string `json:"guild_id"`
Name string `json:"name"`
Color int `json:"color"`
Position int `json:"position"`
Permissions []string `json:"permissions"`
}
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
http.Error(w, "invalid", http.StatusBadRequest)
return
}
rl, err := h.roleM.Create(input.GuildID, input.Name, input.Color, input.Position, input.Permissions)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(rl)
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
}
func (h *HTTPServer) handleMessages(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
channelID := r.URL.Query().Get("channel_id")
if channelID == "" {
http.Error(w, "channel_id required", http.StatusBadRequest)
return
}
limitStr := r.URL.Query().Get("limit")
limit := 50
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 200 {
limit = l
}
before := r.URL.Query().Get("before")
var query string
var args []interface{}
if before != "" {
query = `SELECT id, channel_id, author_id, content, encrypted, nonce, message_type, reply_to, pinned, edited_at, created_at
FROM messages WHERE channel_id = ? AND id < ? ORDER BY created_at DESC LIMIT ?`
args = []interface{}{channelID, before, limit}
} else {
query = `SELECT id, channel_id, author_id, content, encrypted, nonce, message_type, reply_to, pinned, edited_at, created_at
FROM messages WHERE channel_id = ? ORDER BY created_at DESC LIMIT ?`
args = []interface{}{channelID, limit}
}
rows, err := h.db.Query(query, args...)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer rows.Close()
var messages []*models.Message
for rows.Next() {
m := &models.Message{}
rows.Scan(&m.ID, &m.ChannelID, &m.AuthorID, &m.Content, &m.Encrypted, &m.Nonce,
&m.MessageType, &m.ReplyTo, &m.Pinned, &m.EditedAt, &m.CreatedAt)
messages = append(messages, m)
}
json.NewEncoder(w).Encode(messages)
}
func (h *HTTPServer) handleUsers(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
id := extractID(r.URL.Path, "/api/users/")
if id == "" {
http.Error(w, "user id required", http.StatusBadRequest)
return
}
var u models.User
err := h.db.QueryRow(
`SELECT id, username, avatar, bio, public_key, created_at FROM users WHERE id = ?`, id,
).Scan(&u.ID, &u.Username, &u.Avatar, &u.Bio, &u.PublicKey, &u.CreatedAt)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(u)
}
func (h *HTTPServer) handleUpload(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
r.ParseMultipartForm(2 << 30)
file, header, err := r.FormFile("file")
if err != nil {
http.Error(w, "no file", http.StatusBadRequest)
return
}
defer file.Close()
uploadDir := filepath.Join(h.cfg.DataDir, "files")
os.MkdirAll(uploadDir, 0755)
id := uuid.New().String()
ext := filepath.Ext(header.Filename)
filename := id + ext
dst, err := os.Create(filepath.Join(uploadDir, filename))
if err != nil {
http.Error(w, "failed to save", http.StatusInternalServerError)
return
}
defer dst.Close()
size, _ := io.Copy(dst, file)
json.NewEncoder(w).Encode(map[string]interface{}{
"id": id,
"filename": header.Filename,
"size": size,
"url": "/files/" + filename,
})
}
func (h *HTTPServer) handleFile(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
id := extractID(r.URL.Path, "/api/files/")
if id == "" {
http.Error(w, "file id required", http.StatusBadRequest)
return
}
http.ServeFile(w, r, filepath.Join(h.cfg.DataDir, "files", id))
}
func (h *HTTPServer) handleFederationReceive(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
h.fedM.HandleReceive(w, r)
}
func extractID(path, prefix string) string {
if len(path) <= len(prefix) {
return ""
}
id := path[len(prefix):]
if idx := stringsIndex(id, "/"); idx >= 0 {
id = id[:idx]
}
return id
}
func stringsIndex(s, substr string) int {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return i
}
}
return -1
}
+135
View File
@@ -0,0 +1,135 @@
package channel
import (
"errors"
"github.com/google/uuid"
"github.com/justamessenger/server/internal/database"
"github.com/justamessenger/server/internal/models"
)
type Manager struct {
db *database.DB
}
func NewManager(db *database.DB) *Manager {
return &Manager{db: db}
}
func (m *Manager) Create(guildID, categoryID, name, channelType, topic string, position int) (*models.Channel, error) {
id := uuid.New().String()
_, err := m.db.Exec(
`INSERT INTO channels (id, guild_id, category_id, name, type, topic, position)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
id, guildID, categoryID, name, channelType, topic, position,
)
if err != nil {
return nil, err
}
return m.Get(id)
}
func (m *Manager) Get(id string) (*models.Channel, error) {
row := m.db.QueryRow(
`SELECT id, guild_id, category_id, name, type, topic, position, created_at
FROM channels WHERE id = ?`, id,
)
c := &models.Channel{}
err := row.Scan(&c.ID, &c.GuildID, &c.Category, &c.Name, &c.Type, &c.Topic, &c.Position, &c.CreatedAt)
if err != nil {
return nil, err
}
return c, nil
}
func (m *Manager) ListByGuild(guildID string) ([]*models.Channel, error) {
rows, err := m.db.Query(
`SELECT id, guild_id, category_id, name, type, topic, position, created_at
FROM channels WHERE guild_id = ? ORDER BY position`, guildID,
)
if err != nil {
return nil, err
}
defer rows.Close()
var channels []*models.Channel
for rows.Next() {
c := &models.Channel{}
if err := rows.Scan(&c.ID, &c.GuildID, &c.Category, &c.Name, &c.Type, &c.Topic, &c.Position, &c.CreatedAt); err != nil {
return nil, err
}
channels = append(channels, c)
}
return channels, nil
}
func (m *Manager) Update(id, name, topic string, position int) error {
result, err := m.db.Exec(
`UPDATE channels SET name = ?, topic = ?, position = ? WHERE id = ?`,
name, topic, position, id,
)
if err != nil {
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return errors.New("channel not found")
}
return nil
}
func (m *Manager) Delete(id string) error {
result, err := m.db.Exec(`DELETE FROM channels WHERE id = ?`, id)
if err != nil {
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return errors.New("channel not found")
}
return nil
}
func (m *Manager) CreateCategory(guildID, name string, position int) (string, error) {
id := uuid.New().String()
_, err := m.db.Exec(
`INSERT INTO categories (id, guild_id, name, position) VALUES (?, ?, ?, ?)`,
id, guildID, name, position,
)
if err != nil {
return "", err
}
return id, nil
}
func (m *Manager) ListCategories(guildID string) ([]*struct {
ID string
Name string
Position int
}, error) {
rows, err := m.db.Query(
`SELECT id, name, position FROM categories WHERE guild_id = ? ORDER BY position`, guildID,
)
if err != nil {
return nil, err
}
defer rows.Close()
var cats []*struct {
ID string
Name string
Position int
}
for rows.Next() {
var c struct {
ID string
Name string
Position int
}
if err := rows.Scan(&c.ID, &c.Name, &c.Position); err != nil {
return nil, err
}
cats = append(cats, &c)
}
return cats, nil
}
+195
View File
@@ -0,0 +1,195 @@
package compression
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"io"
"sync"
"github.com/klauspost/compress/zstd"
)
var (
encOnce sync.Once
decOnce sync.Once
encoder *zstd.Encoder
decoder *zstd.Decoder
)
func getEncoder() *zstd.Encoder {
encOnce.Do(func() {
opts := []zstd.EOption{
zstd.WithEncoderLevel(zstd.SpeedBestCompression),
zstd.WithWindowSize(1 << 24),
}
enc, _ := zstd.NewWriter(nil, opts...)
encoder = enc
})
return encoder
}
func getDecoder() *zstd.Decoder {
decOnce.Do(func() {
dec, _ := zstd.NewReader(nil)
decoder = dec
})
return decoder
}
type Chunk struct {
Hash [32]byte
Data []byte
Offset int64
Size int
}
type DedupStore struct {
mu sync.RWMutex
chunks map[[32]byte][]byte
}
func NewDedupStore() *DedupStore {
return &DedupStore{chunks: make(map[[32]byte][]byte)}
}
func chunkData(data []byte, chunkSize int) []Chunk {
var chunks []Chunk
for offset := 0; offset < len(data); offset += chunkSize {
end := offset + chunkSize
if end > len(data) {
end = len(data)
}
chunk := data[offset:end]
hash := sha256.Sum256(chunk)
chunks = append(chunks, Chunk{
Hash: hash,
Data: chunk,
Offset: int64(offset),
Size: len(chunk),
})
}
return chunks
}
func (ds *DedupStore) Deduplicate(data []byte, chunkSize int) ([]Chunk, int, error) {
chunks := chunkData(data, chunkSize)
uniqueSize := 0
ds.mu.Lock()
defer ds.mu.Unlock()
for i, c := range chunks {
if existing, ok := ds.chunks[c.Hash]; ok {
chunks[i].Data = existing
chunks[i].Size = len(existing)
} else {
ds.chunks[c.Hash] = c.Data
uniqueSize += len(c.Data)
}
}
return chunks, uniqueSize, nil
}
func Compress(data []byte) ([]byte, error) {
if len(data) == 0 {
return nil, nil
}
var buf bytes.Buffer
enc := getEncoder()
enc.Reset(&buf)
if _, err := enc.Write(data); err != nil {
return nil, err
}
if err := enc.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func Decompress(data []byte) ([]byte, error) {
if len(data) == 0 {
return nil, nil
}
dec := getDecoder()
return dec.DecodeAll(data, nil)
}
type Compressor struct {
ChunkSize int
DedupStore *DedupStore
}
func New(chunkSize int) *Compressor {
if chunkSize <= 0 {
chunkSize = 65536
}
return &Compressor{
ChunkSize: chunkSize,
DedupStore: NewDedupStore(),
}
}
type CompressResult struct {
Data []byte
OriginalSize int64
CompressedSize int64
Chunks int
UniqueChunks int
}
func (c *Compressor) CompressFile(data []byte) (*CompressResult, error) {
originalSize := int64(len(data))
chunks, uniqueSize, err := c.DedupStore.Deduplicate(data, c.ChunkSize)
if err != nil {
return nil, err
}
var buf bytes.Buffer
buf.Write(binary.AppendVarint(nil, int64(len(chunks))))
for _, chunk := range chunks {
buf.Write(chunk.Hash[:])
buf.Write(binary.AppendVarint(nil, int64(chunk.Offset)))
buf.Write(binary.AppendVarint(nil, int64(chunk.Size)))
}
compressed, err := Compress(data)
if err != nil {
return nil, err
}
return &CompressResult{
Data: compressed,
OriginalSize: originalSize,
CompressedSize: int64(len(compressed)),
Chunks: len(chunks),
UniqueChunks: uniqueSize / c.ChunkSize,
}, nil
}
func (c *Compressor) DecompressFile(compressed []byte) ([]byte, error) {
return Decompress(compressed)
}
func CompressStream(r io.Reader, w io.Writer) error {
enc := getEncoder()
defer enc.Close()
enc.Reset(w)
if _, err := io.Copy(enc, r); err != nil {
return err
}
return enc.Close()
}
func DecompressStream(r io.Reader, w io.Writer) error {
dec := getDecoder()
defer dec.Close()
dec.Reset(r)
if _, err := io.Copy(w, dec); err != nil {
return err
}
return nil
}
+46
View File
@@ -0,0 +1,46 @@
package config
import (
"encoding/json"
"os"
"path/filepath"
)
type Config struct {
ServerName string `json:"server_name"`
ListenAddr string `json:"listen_addr"`
Domain string `json:"domain"`
DataDir string `json:"data_dir"`
MaxFileSize int64 `json:"max_file_size"`
Federation bool `json:"federation"`
FederatedWith []string `json:"federated_with"`
LogLevel string `json:"log_level"`
}
func Default() *Config {
return &Config{
ServerName: "JustAMessenger",
ListenAddr: ":8443",
Domain: "localhost",
DataDir: "./data",
MaxFileSize: 2 << 30,
Federation: false,
LogLevel: "info",
}
}
func Load(path string) (*Config, error) {
cfg := Default()
abs, err := filepath.Abs(path)
if err != nil {
return cfg, nil
}
data, err := os.ReadFile(abs)
if err != nil {
return cfg, nil
}
if err := json.Unmarshal(data, cfg); err != nil {
return nil, err
}
return cfg, nil
}
+92
View File
@@ -0,0 +1,92 @@
package crypto
import (
"crypto/rand"
"encoding/hex"
"errors"
"io"
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/crypto/curve25519"
"golang.org/x/crypto/argon2"
)
const (
KeySize = 32
NonceSize = chacha20poly1305.NonceSizeX
)
func GenerateKey() ([]byte, error) {
key := make([]byte, KeySize)
if _, err := rand.Read(key); err != nil {
return nil, err
}
return key, nil
}
func GenerateNonce() ([]byte, error) {
nonce := make([]byte, NonceSize)
if _, err := rand.Read(nonce); err != nil {
return nil, err
}
return nonce, nil
}
func DeriveKey(password string, salt []byte) []byte {
return argon2.IDKey([]byte(password), salt, 3, 64*1024, 4, KeySize)
}
func Encrypt(plaintext []byte, key []byte) ([]byte, []byte, error) {
aead, err := chacha20poly1305.NewX(key)
if err != nil {
return nil, nil, err
}
nonce := make([]byte, aead.NonceSize())
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, nil, err
}
ciphertext := aead.Seal(nil, nonce, plaintext, nil)
return ciphertext, nonce, nil
}
func Decrypt(ciphertext []byte, key []byte, nonce []byte) ([]byte, error) {
aead, err := chacha20poly1305.NewX(key)
if err != nil {
return nil, err
}
if len(nonce) != aead.NonceSize() {
return nil, errors.New("invalid nonce size")
}
plaintext, err := aead.Open(nil, nonce, ciphertext, nil)
if err != nil {
return nil, err
}
return plaintext, nil
}
func GenerateKeyPair() ([]byte, []byte, error) {
priv := make([]byte, 32)
if _, err := rand.Read(priv); err != nil {
return nil, nil, err
}
priv[0] &= 248
priv[31] &= 127
priv[31] |= 64
pub, err := curve25519.X25519(priv, curve25519.Basepoint)
if err != nil {
return nil, nil, err
}
return priv, pub, nil
}
func ComputeSharedSecret(privateKey, publicKey []byte) ([]byte, error) {
return curve25519.X25519(privateKey, publicKey)
}
func KeyToString(key []byte) string {
return hex.EncodeToString(key)
}
func StringToKey(s string) ([]byte, error) {
return hex.DecodeString(s)
}
+190
View File
@@ -0,0 +1,190 @@
package database
import (
"database/sql"
"os"
"path/filepath"
"time"
_ "github.com/mattn/go-sqlite3"
)
type DB struct {
*sql.DB
}
func Open(dataDir string) (*DB, error) {
if err := os.MkdirAll(dataDir, 0755); err != nil {
return nil, err
}
dbPath := filepath.Join(dataDir, "jam.db")
db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_busy_timeout=5000&_synchronous=NORMAL")
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
db.SetConnMaxLifetime(time.Hour)
d := &DB{db}
if err := d.migrate(); err != nil {
return nil, err
}
return d, nil
}
func (db *DB) migrate() error {
schema := `
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
avatar TEXT DEFAULT '',
bio TEXT DEFAULT '',
public_key BLOB,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS guilds (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
owner_id TEXT NOT NULL,
icon TEXT DEFAULT '',
description TEXT DEFAULT '',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (owner_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS guild_members (
guild_id TEXT NOT NULL,
user_id TEXT NOT NULL,
nickname TEXT DEFAULT '',
avatar TEXT DEFAULT '',
joined_at DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (guild_id, user_id),
FOREIGN KEY (guild_id) REFERENCES guilds(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS categories (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
name TEXT NOT NULL,
position INTEGER DEFAULT 0,
FOREIGN KEY (guild_id) REFERENCES guilds(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS channels (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
category_id TEXT DEFAULT '',
name TEXT NOT NULL,
type TEXT NOT NULL DEFAULT 'text',
topic TEXT DEFAULT '',
position INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (guild_id) REFERENCES guilds(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS roles (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
name TEXT NOT NULL,
color INTEGER DEFAULT 0,
position INTEGER DEFAULT 0,
permissions TEXT DEFAULT '[]',
is_default INTEGER DEFAULT 0,
FOREIGN KEY (guild_id) REFERENCES guilds(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS role_members (
role_id TEXT NOT NULL,
user_id TEXT NOT NULL,
PRIMARY KEY (role_id, user_id),
FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
channel_id TEXT NOT NULL,
author_id TEXT NOT NULL,
content BLOB,
encrypted INTEGER DEFAULT 0,
nonce BLOB,
message_type TEXT DEFAULT 'text',
reply_to TEXT,
pinned INTEGER DEFAULT 0,
self_destruct DATETIME,
edited_at DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (channel_id) REFERENCES channels(id) ON DELETE CASCADE,
FOREIGN KEY (author_id) REFERENCES users(id)
);
CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages(channel_id, created_at);
CREATE TABLE IF NOT EXISTS attachments (
id TEXT PRIMARY KEY,
message_id TEXT NOT NULL,
filename TEXT NOT NULL,
file_type TEXT DEFAULT '',
size INTEGER DEFAULT 0,
compressed INTEGER DEFAULT 0,
original_size INTEGER DEFAULT 0,
url TEXT NOT NULL,
hash TEXT DEFAULT '',
FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS reactions (
message_id TEXT NOT NULL,
user_id TEXT NOT NULL,
emoji TEXT NOT NULL,
PRIMARY KEY (message_id, user_id, emoji),
FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE TABLE IF NOT EXISTS stickers (
id TEXT PRIMARY KEY,
guild_id TEXT NOT NULL,
name TEXT NOT NULL,
data BLOB,
format TEXT DEFAULT 'png',
FOREIGN KEY (guild_id) REFERENCES guilds(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS voice_states (
user_id TEXT NOT NULL,
channel_id TEXT NOT NULL,
guild_id TEXT NOT NULL,
muted INTEGER DEFAULT 0,
deafened INTEGER DEFAULT 0,
PRIMARY KEY (user_id, channel_id)
);
CREATE TABLE IF NOT EXISTS streams (
id TEXT PRIMARY KEY,
channel_id TEXT NOT NULL,
user_id TEXT NOT NULL,
title TEXT DEFAULT '',
started_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS federation_peers (
domain TEXT PRIMARY KEY,
name TEXT NOT NULL,
public_key BLOB,
last_seen DATETIME,
is_active INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS server_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
`
_, err := db.Exec(schema)
return err
}
+209
View File
@@ -0,0 +1,209 @@
package federation
import (
"bytes"
"crypto/ed25519"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
jcrypto "github.com/justamessenger/server/internal/crypto"
"github.com/justamessenger/server/internal/database"
"github.com/justamessenger/server/internal/models"
"github.com/justamessenger/server/internal/protocol"
)
type Manager struct {
db *database.DB
domain string
privateKey ed25519.PrivateKey
publicKey ed25519.PublicKey
peers map[string]*models.FederationPeer
mu sync.RWMutex
httpClient *http.Client
}
func NewManager(db *database.DB, domain string) (*Manager, error) {
pub, priv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
return nil, fmt.Errorf("failed to generate federation keypair: %w", err)
}
m := &Manager{
db: db,
domain: domain,
privateKey: priv,
publicKey: pub,
peers: make(map[string]*models.FederationPeer),
httpClient: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 20,
IdleConnTimeout: 90 * time.Second,
},
},
}
if err := m.loadPeers(); err != nil {
log.Printf("Warning: failed to load federation peers: %v", err)
}
return m, nil
}
func (m *Manager) PublicKey() []byte {
return m.publicKey
}
func (m *Manager) loadPeers() error {
rows, err := m.db.Query(
`SELECT domain, name, public_key, last_seen, is_active
FROM federation_peers WHERE is_active = 1`,
)
if err != nil {
return err
}
defer rows.Close()
m.mu.Lock()
defer m.mu.Unlock()
for rows.Next() {
p := &models.FederationPeer{}
if err := rows.Scan(&p.Domain, &p.Name, &p.PublicKey, &p.LastSeen, &p.IsActive); err != nil {
return err
}
m.peers[p.Domain] = p
}
return nil
}
func (m *Manager) AddPeer(domain, name string, publicKey []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
peer := &models.FederationPeer{
Domain: domain,
Name: name,
PublicKey: publicKey,
LastSeen: time.Now(),
IsActive: true,
}
m.peers[domain] = peer
_, err := m.db.Exec(
`INSERT OR REPLACE INTO federation_peers (domain, name, public_key, last_seen, is_active)
VALUES (?, ?, ?, ?, 1)`,
domain, name, publicKey, time.Now(),
)
return err
}
func (m *Manager) Sign(data []byte) []byte {
return ed25519.Sign(m.privateKey, data)
}
func (m *Manager) Verify(publicKey, data, signature []byte) bool {
return ed25519.Verify(publicKey, data, signature)
}
func (m *Manager) SendToPeer(peerDomain string, pkt *protocol.FederationData) error {
m.mu.RLock()
peer, ok := m.peers[peerDomain]
m.mu.RUnlock()
if !ok {
return fmt.Errorf("unknown peer: %s", peerDomain)
}
payload, err := json.Marshal(pkt)
if err != nil {
return err
}
url := fmt.Sprintf("https://%s/federation/receive", peer.Domain)
req, err := http.NewRequest("POST", url, bytes.NewReader(payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-JAM-Domain", m.domain)
req.Header.Set("X-JAM-Signature", string(m.Sign(payload)))
resp, err := m.httpClient.Do(req)
if err != nil {
return fmt.Errorf("federation request to %s failed: %w", peer.Domain, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("peer %s returned %d: %s", peer.Domain, resp.StatusCode, string(body))
}
m.mu.Lock()
peer.LastSeen = time.Now()
m.mu.Unlock()
return nil
}
func (m *Manager) BroadcastToPeers(pkt *protocol.FederationData) {
m.mu.RLock()
domains := make([]string, 0, len(m.peers))
for domain := range m.peers {
domains = append(domains, domain)
}
m.mu.RUnlock()
for _, domain := range domains {
if domain == m.domain {
continue
}
if err := m.SendToPeer(domain, pkt); err != nil {
log.Printf("Federation broadcast to %s failed: %v", domain, err)
}
}
}
func (m *Manager) HandleReceive(w http.ResponseWriter, r *http.Request) {
signature := r.Header.Get("X-JAM-Signature")
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "failed to read body", http.StatusBadRequest)
return
}
var pkt protocol.FederationData
if err := json.Unmarshal(body, &pkt); err != nil {
http.Error(w, "invalid packet", http.StatusBadRequest)
return
}
m.mu.RLock()
peer, ok := m.peers[pkt.FromDomain]
m.mu.RUnlock()
if ok && !m.Verify(peer.PublicKey, body, []byte(signature)) {
http.Error(w, "invalid signature", http.StatusUnauthorized)
return
}
log.Printf("Federation packet from %s: type=%s", pkt.FromDomain, pkt.PacketType)
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
}
func (m *Manager) EncryptFederationPayload(payload []byte) ([]byte, []byte, error) {
key, err := jcrypto.GenerateKey()
if err != nil {
return nil, nil, err
}
return jcrypto.Encrypt(payload, key)
}
+108
View File
@@ -0,0 +1,108 @@
package models
import "time"
type User struct {
ID string `json:"id"`
Username string `json:"username"`
Avatar string `json:"avatar,omitempty"`
Bio string `json:"bio,omitempty"`
PublicKey []byte `json:"public_key"`
CreatedAt time.Time `json:"created_at"`
}
type Guild struct {
ID string `json:"id"`
Name string `json:"name"`
OwnerID string `json:"owner_id"`
Icon string `json:"icon,omitempty"`
Description string `json:"description,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
type Channel struct {
ID string `json:"id"`
GuildID string `json:"guild_id"`
Name string `json:"name"`
Type string `json:"type"`
Topic string `json:"topic,omitempty"`
Position int `json:"position"`
Category string `json:"category,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
type Role struct {
ID string `json:"id"`
GuildID string `json:"guild_id"`
Name string `json:"name"`
Color int `json:"color"`
Position int `json:"position"`
Permissions []string `json:"permissions"`
IsDefault bool `json:"is_default"`
}
type Message struct {
ID string `json:"id"`
ChannelID string `json:"channel_id"`
AuthorID string `json:"author_id"`
Content []byte `json:"content,omitempty"`
Encrypted bool `json:"encrypted"`
Nonce []byte `json:"nonce,omitempty"`
MessageType string `json:"message_type"`
ReplyTo string `json:"reply_to,omitempty"`
Attachments []string `json:"attachments,omitempty"`
EditedAt *time.Time `json:"edited_at,omitempty"`
Pinned bool `json:"pinned"`
SelfDestruct *time.Time `json:"self_destruct,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
type VoiceState struct {
UserID string `json:"user_id"`
ChannelID string `json:"channel_id"`
GuildID string `json:"guild_id"`
Muted bool `json:"muted"`
Deafened bool `json:"deafened"`
}
type Stream struct {
ID string `json:"id"`
ChannelID string `json:"channel_id"`
UserID string `json:"user_id"`
Title string `json:"title"`
StartedAt time.Time `json:"started_at"`
}
type Reaction struct {
MessageID string `json:"message_id"`
UserID string `json:"user_id"`
Emoji string `json:"emoji"`
}
type Attachment struct {
ID string `json:"id"`
MessageID string `json:"message_id"`
Filename string `json:"filename"`
FileType string `json:"file_type"`
Size int64 `json:"size"`
Compressed bool `json:"compressed"`
OriginalSize int64 `json:"original_size,omitempty"`
URL string `json:"url"`
Hash string `json:"hash"`
}
type Sticker struct {
ID string `json:"id"`
GuildID string `json:"guild_id"`
Name string `json:"name"`
Data []byte `json:"data"`
Format string `json:"format"`
}
type FederationPeer struct {
Domain string `json:"domain"`
Name string `json:"name"`
PublicKey []byte `json:"public_key"`
LastSeen time.Time `json:"last_seen"`
IsActive bool `json:"is_active"`
}
+128
View File
@@ -0,0 +1,128 @@
package protocol
import "encoding/json"
type OpCode int
const (
OpHeartbeat OpCode = 0
OpHello OpCode = 1
OpAuthenticate OpCode = 2
OpAuthenticated OpCode = 3
OpError OpCode = 4
OpMessageCreate OpCode = 10
OpMessageUpdate OpCode = 11
OpMessageDelete OpCode = 12
OpMessageReaction OpCode = 13
OpChannelCreate OpCode = 20
OpChannelUpdate OpCode = 21
OpChannelDelete OpCode = 22
OpGuildCreate OpCode = 30
OpGuildUpdate OpCode = 31
OpGuildDelete OpCode = 32
OpGuildMemberAdd OpCode = 33
OpGuildMemberRemove OpCode = 34
OpVoiceStateUpdate OpCode = 40
OpStreamStart OpCode = 50
OpStreamEnd OpCode = 51
OpTypingStart OpCode = 60
OpTypingStop OpCode = 61
OpUserPresence OpCode = 70
OpUserUpdate OpCode = 71
OpFederationPacket OpCode = 80
OpCallOffer OpCode = 90
OpCallAnswer OpCode = 91
OpCallICE OpCode = 92
OpCallEnd OpCode = 93
)
type Packet struct {
Op OpCode `json:"op"`
Data json.RawMessage `json:"d,omitempty"`
Seq int64 `json:"s,omitempty"`
}
type HelloData struct {
HeartbeatInterval int `json:"heartbeat_interval"`
ServerName string `json:"server_name"`
ServerVersion string `json:"server_version"`
}
type AuthData struct {
Token string `json:"token"`
Username string `json:"username,omitempty"`
}
type MessageData struct {
ID string `json:"id"`
ChannelID string `json:"channel_id"`
Content []byte `json:"content,omitempty"`
Encrypted bool `json:"encrypted"`
Nonce []byte `json:"nonce,omitempty"`
MessageType string `json:"message_type,omitempty"`
ReplyTo string `json:"reply_to,omitempty"`
}
type ReactionData struct {
MessageID string `json:"message_id"`
Emoji string `json:"emoji"`
Add bool `json:"add"`
}
type VoiceStateData struct {
GuildID string `json:"guild_id"`
ChannelID string `json:"channel_id"`
Muted bool `json:"muted"`
Deafened bool `json:"deafened"`
}
type StreamData struct {
ChannelID string `json:"channel_id"`
Title string `json:"title"`
Action string `json:"action"`
}
type FederationData struct {
FromDomain string `json:"from_domain"`
TargetID string `json:"target_id"`
PacketType string `json:"packet_type"`
Payload json.RawMessage `json:"payload"`
Signature []byte `json:"signature"`
}
type CallData struct {
ChannelID string `json:"channel_id"`
Type string `json:"type"`
PeerID string `json:"peer_id,omitempty"`
SDP json.RawMessage `json:"sdp,omitempty"`
ICE json.RawMessage `json:"ice,omitempty"`
}
type ErrorData struct {
Code int `json:"code"`
Message string `json:"message"`
}
func NewPacket(op OpCode, data interface{}) (*Packet, error) {
raw, err := json.Marshal(data)
if err != nil {
return nil, err
}
return &Packet{Op: op, Data: raw}, nil
}
func MustPacket(op OpCode, data interface{}) *Packet {
p, err := NewPacket(op, data)
if err != nil {
panic(err)
}
return p
}
+180
View File
@@ -0,0 +1,180 @@
package role
import (
"encoding/json"
"errors"
"github.com/google/uuid"
"github.com/justamessenger/server/internal/database"
"github.com/justamessenger/server/internal/models"
)
type Manager struct {
db *database.DB
}
func NewManager(db *database.DB) *Manager {
return &Manager{db: db}
}
var DefaultPermissions = []string{
"view_channels",
"send_messages",
"add_reactions",
"read_message_history",
"connect_voice",
"speak",
}
var AdminPermissions = []string{
"administrator",
"manage_guild",
"manage_channels",
"manage_roles",
"manage_messages",
"kick_members",
"ban_members",
"view_channels",
"send_messages",
"add_reactions",
"read_message_history",
"connect_voice",
"speak",
"mute_members",
"deafen_members",
"move_members",
"stream",
}
func (m *Manager) CreateDefaultRoles(guildID string) error {
_, err := m.db.Exec(
`INSERT OR IGNORE INTO roles (id, guild_id, name, color, position, permissions, is_default)
VALUES (?, ?, 'everyone', 0, 0, ?, 1)`,
uuid.New().String(), guildID, mustJSON(DefaultPermissions),
)
return err
}
func mustJSON(v interface{}) string {
b, _ := json.Marshal(v)
return string(b)
}
func (m *Manager) Create(guildID, name string, color, position int, permissions []string) (*models.Role, error) {
id := uuid.New().String()
permJSON, _ := json.Marshal(permissions)
_, err := m.db.Exec(
`INSERT INTO roles (id, guild_id, name, color, position, permissions)
VALUES (?, ?, ?, ?, ?, ?)`,
id, guildID, name, color, position, string(permJSON),
)
if err != nil {
return nil, err
}
return m.Get(id)
}
func (m *Manager) Get(id string) (*models.Role, error) {
row := m.db.QueryRow(
`SELECT id, guild_id, name, color, position, permissions, is_default
FROM roles WHERE id = ?`, id,
)
r := &models.Role{}
var permJSON string
err := row.Scan(&r.ID, &r.GuildID, &r.Name, &r.Color, &r.Position, &permJSON, &r.IsDefault)
if err != nil {
return nil, err
}
json.Unmarshal([]byte(permJSON), &r.Permissions)
return r, nil
}
func (m *Manager) ListByGuild(guildID string) ([]*models.Role, error) {
rows, err := m.db.Query(
`SELECT id, guild_id, name, color, position, permissions, is_default
FROM roles WHERE guild_id = ? ORDER BY position DESC`, guildID,
)
if err != nil {
return nil, err
}
defer rows.Close()
var roles []*models.Role
for rows.Next() {
r := &models.Role{}
var permJSON string
if err := rows.Scan(&r.ID, &r.GuildID, &r.Name, &r.Color, &r.Position, &permJSON, &r.IsDefault); err != nil {
return nil, err
}
json.Unmarshal([]byte(permJSON), &r.Permissions)
roles = append(roles, r)
}
return roles, nil
}
func (m *Manager) AssignRole(roleID, userID string) error {
_, err := m.db.Exec(
`INSERT OR IGNORE INTO role_members (role_id, user_id) VALUES (?, ?)`,
roleID, userID,
)
return err
}
func (m *Manager) RemoveRole(roleID, userID string) error {
result, err := m.db.Exec(
`DELETE FROM role_members WHERE role_id = ? AND user_id = ?`,
roleID, userID,
)
if err != nil {
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return errors.New("role assignment not found")
}
return nil
}
func (m *Manager) GetUserRoles(guildID, userID string) ([]*models.Role, error) {
rows, err := m.db.Query(
`SELECT r.id, r.guild_id, r.name, r.color, r.position, r.permissions, r.is_default
FROM roles r
INNER JOIN role_members rm ON r.id = rm.role_id
WHERE r.guild_id = ? AND rm.user_id = ?
UNION
SELECT id, guild_id, name, color, position, permissions, is_default
FROM roles WHERE guild_id = ? AND is_default = 1
ORDER BY position DESC`, guildID, userID, guildID,
)
if err != nil {
return nil, err
}
defer rows.Close()
var roles []*models.Role
for rows.Next() {
r := &models.Role{}
var permJSON string
if err := rows.Scan(&r.ID, &r.GuildID, &r.Name, &r.Color, &r.Position, &permJSON, &r.IsDefault); err != nil {
return nil, err
}
json.Unmarshal([]byte(permJSON), &r.Permissions)
roles = append(roles, r)
}
return roles, nil
}
func (m *Manager) HasPermission(guildID, userID, permission string) (bool, error) {
roles, err := m.GetUserRoles(guildID, userID)
if err != nil {
return false, err
}
for _, r := range roles {
for _, p := range r.Permissions {
if p == "administrator" || p == permission {
return true, nil
}
}
}
return false, nil
}
+473
View File
@@ -0,0 +1,473 @@
package server
import (
"encoding/json"
"fmt"
"log"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/justamessenger/server/internal/channel"
"github.com/justamessenger/server/internal/compression"
"github.com/justamessenger/server/internal/config"
jcrypto "github.com/justamessenger/server/internal/crypto"
"github.com/justamessenger/server/internal/database"
"github.com/justamessenger/server/internal/federation"
"github.com/justamessenger/server/internal/models"
"github.com/justamessenger/server/internal/protocol"
"github.com/justamessenger/server/internal/role"
)
type Client struct {
ID string
UserID string
Username string
Conn *websocket.Conn
Send chan []byte
Guilds map[string]bool
mu sync.RWMutex
}
type Server struct {
config *config.Config
db *database.DB
clients map[string]*Client
mu sync.RWMutex
register chan *Client
unregister chan *Client
channelMgr *channel.Manager
roleMgr *role.Manager
fedMgr *federation.Manager
compressor *compression.Compressor
tokens map[string]string
tokenMu sync.RWMutex
stopCh chan struct{}
}
func New(cfg *config.Config) (*Server, error) {
db, err := database.Open(cfg.DataDir)
if err != nil {
return nil, fmt.Errorf("database: %w", err)
}
fedMgr, err := federation.NewManager(db, cfg.Domain)
if err != nil {
return nil, fmt.Errorf("federation: %w", err)
}
s := &Server{
config: cfg,
db: db,
clients: make(map[string]*Client),
register: make(chan *Client, 256),
unregister: make(chan *Client, 256),
channelMgr: channel.NewManager(db),
roleMgr: role.NewManager(db),
fedMgr: fedMgr,
compressor: compression.New(65536),
tokens: make(map[string]string),
stopCh: make(chan struct{}),
}
return s, nil
}
func (s *Server) Start() {
log.Printf("JAM Server %s starting on %s", s.config.ServerName, s.config.ListenAddr)
if s.config.Federation {
log.Printf("Federation enabled, domain: %s", s.config.Domain)
}
go s.runLoop()
}
func (s *Server) Stop() {
close(s.stopCh)
}
func (s *Server) runLoop() {
for {
select {
case client := <-s.register:
s.mu.Lock()
s.clients[client.ID] = client
s.mu.Unlock()
log.Printf("Client connected: %s (%s)", client.ID, client.Username)
case client := <-s.unregister:
s.mu.Lock()
if _, ok := s.clients[client.ID]; ok {
delete(s.clients, client.ID)
close(client.Send)
}
s.mu.Unlock()
log.Printf("Client disconnected: %s", client.ID)
case <-s.stopCh:
return
}
}
}
func (s *Server) HandleConnection(conn *websocket.Conn) {
client := &Client{
ID: uuid.New().String(),
Conn: conn,
Send: make(chan []byte, 256),
Guilds: make(map[string]bool),
}
s.register <- client
hello := protocol.MustPacket(protocol.OpHello, &protocol.HelloData{
HeartbeatInterval: 30,
ServerName: s.config.ServerName,
ServerVersion: "0.1.0",
})
client.Conn.WriteJSON(hello)
go client.writePump()
go client.readPump(s)
}
func (c *Client) writePump() {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case message, ok := <-c.Send:
if !ok {
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
return
}
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
func (c *Client) readPump(s *Server) {
defer func() {
s.unregister <- c
c.Conn.Close()
}()
c.Conn.SetReadLimit(1 << 24)
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, message, err := c.Conn.ReadMessage()
if err != nil {
break
}
var pkt protocol.Packet
if err := json.Unmarshal(message, &pkt); err != nil {
log.Printf("Invalid packet from %s: %v", c.ID, err)
continue
}
s.handlePacket(c, &pkt)
}
}
func (s *Server) handlePacket(c *Client, pkt *protocol.Packet) {
switch pkt.Op {
case protocol.OpHeartbeat:
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
pong := protocol.MustPacket(protocol.OpHeartbeat, nil)
c.Send <- mustJSON(pong)
case protocol.OpAuthenticate:
var auth protocol.AuthData
if err := json.Unmarshal(pkt.Data, &auth); err != nil {
s.sendError(c, 400, "invalid auth data")
return
}
s.handleAuth(c, &auth)
case protocol.OpMessageCreate:
var msg protocol.MessageData
if err := json.Unmarshal(pkt.Data, &msg); err != nil {
s.sendError(c, 400, "invalid message data")
return
}
s.handleMessage(c, &msg)
case protocol.OpMessageReaction:
var react protocol.ReactionData
if err := json.Unmarshal(pkt.Data, &react); err != nil {
s.sendError(c, 400, "invalid reaction data")
return
}
s.handleReaction(c, &react)
case protocol.OpVoiceStateUpdate:
var vs protocol.VoiceStateData
if err := json.Unmarshal(pkt.Data, &vs); err != nil {
s.sendError(c, 400, "invalid voice state")
return
}
s.handleVoiceState(c, &vs)
case protocol.OpStreamStart, protocol.OpStreamEnd:
var sd protocol.StreamData
if err := json.Unmarshal(pkt.Data, &sd); err != nil {
s.sendError(c, 400, "invalid stream data")
return
}
s.handleStream(c, &sd, pkt.Op)
case protocol.OpTypingStart:
s.broadcastToChannel(c, pkt)
case protocol.OpCallOffer, protocol.OpCallAnswer, protocol.OpCallICE, protocol.OpCallEnd:
s.broadcastToChannel(c, pkt)
default:
log.Printf("Unknown opcode: %d from %s", pkt.Op, c.ID)
}
}
func (s *Server) handleAuth(c *Client, auth *protocol.AuthData) {
if auth.Token != "" {
s.tokenMu.RLock()
uid, ok := s.tokens[auth.Token]
s.tokenMu.RUnlock()
if ok {
var user models.User
err := s.db.QueryRow(
`SELECT id, username, avatar, bio FROM users WHERE id = ?`, uid,
).Scan(&user.ID, &user.Username, &user.Avatar, &user.Bio)
if err == nil {
c.UserID = user.ID
c.Username = user.Username
authData := protocol.MustPacket(protocol.OpAuthenticated, map[string]interface{}{
"user_id": user.ID,
"username": user.Username,
})
c.Send <- mustJSON(authData)
return
}
}
}
if auth.Username == "" {
s.sendError(c, 401, "authentication required")
return
}
key, _ := jcrypto.GenerateKey()
id := uuid.New().String()
_, err := s.db.Exec(
`INSERT INTO users (id, username, public_key) VALUES (?, ?, ?)`,
id, auth.Username, key,
)
if err != nil {
s.sendError(c, 409, "username taken")
return
}
token := uuid.New().String()
s.tokenMu.Lock()
s.tokens[token] = id
s.tokenMu.Unlock()
c.UserID = id
c.Username = auth.Username
authData := protocol.MustPacket(protocol.OpAuthenticated, map[string]interface{}{
"user_id": id,
"username": auth.Username,
"token": token,
})
c.Send <- mustJSON(authData)
}
func (s *Server) handleMessage(c *Client, msg *protocol.MessageData) {
if c.UserID == "" {
s.sendError(c, 401, "not authenticated")
return
}
content := msg.Content
var compressed bool
if len(content) > 1024 {
compressedData, err := s.compressor.CompressFile(content)
if err == nil && compressedData.CompressedSize < int64(len(content)) {
content = compressedData.Data
compressed = true
}
}
id := uuid.New().String()
_, err := s.db.Exec(
`INSERT INTO messages (id, channel_id, author_id, content, encrypted, nonce, message_type, reply_to)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
id, msg.ChannelID, c.UserID, content, msg.Encrypted, msg.Nonce, msg.MessageType, msg.ReplyTo,
)
if err != nil {
s.sendError(c, 500, "failed to save message")
return
}
broadcastMsg := protocol.MustPacket(protocol.OpMessageCreate, map[string]interface{}{
"id": id,
"channel_id": msg.ChannelID,
"author_id": c.UserID,
"username": c.Username,
"content": content,
"encrypted": msg.Encrypted,
"nonce": msg.Nonce,
"message_type": msg.MessageType,
"reply_to": msg.ReplyTo,
"compressed": compressed,
"created_at": time.Now(),
})
s.broadcastToChannel(c, broadcastMsg)
}
func (s *Server) handleReaction(c *Client, react *protocol.ReactionData) {
if c.UserID == "" {
s.sendError(c, 401, "not authenticated")
return
}
if react.Add {
s.db.Exec(
`INSERT OR IGNORE INTO reactions (message_id, user_id, emoji) VALUES (?, ?, ?)`,
react.MessageID, c.UserID, react.Emoji,
)
} else {
s.db.Exec(
`DELETE FROM reactions WHERE message_id = ? AND user_id = ? AND emoji = ?`,
react.MessageID, c.UserID, react.Emoji,
)
}
s.broadcastToChannel(c, pktFromReaction(c, react))
}
func pktFromReaction(c *Client, react *protocol.ReactionData) *protocol.Packet {
return protocol.MustPacket(protocol.OpMessageReaction, map[string]interface{}{
"message_id": react.MessageID,
"user_id": c.UserID,
"emoji": react.Emoji,
"add": react.Add,
})
}
func (s *Server) handleVoiceState(c *Client, vs *protocol.VoiceStateData) {
if c.UserID == "" {
s.sendError(c, 401, "not authenticated")
return
}
if vs.ChannelID == "" {
s.db.Exec(`DELETE FROM voice_states WHERE user_id = ?`, c.UserID)
} else {
s.db.Exec(
`INSERT OR REPLACE INTO voice_states (user_id, channel_id, guild_id, muted, deafened)
VALUES (?, ?, ?, ?, ?)`,
c.UserID, vs.ChannelID, vs.GuildID, vs.Muted, vs.Deafened,
)
}
broadcast := protocol.MustPacket(protocol.OpVoiceStateUpdate, map[string]interface{}{
"user_id": c.UserID,
"channel_id": vs.ChannelID,
"guild_id": vs.GuildID,
"muted": vs.Muted,
"deafened": vs.Deafened,
})
s.broadcastToChannel(c, broadcast)
}
func (s *Server) handleStream(c *Client, sd *protocol.StreamData, op protocol.OpCode) {
if c.UserID == "" {
s.sendError(c, 401, "not authenticated")
return
}
broadcast := protocol.MustPacket(op, map[string]interface{}{
"user_id": c.UserID,
"channel_id": sd.ChannelID,
"title": sd.Title,
})
s.broadcastToChannel(c, broadcast)
}
func (s *Server) broadcastToChannel(sender *Client, pkt *protocol.Packet) {
data, err := json.Marshal(pkt)
if err != nil {
return
}
s.mu.RLock()
defer s.mu.RUnlock()
for _, client := range s.clients {
if client.UserID == "" {
continue
}
select {
case client.Send <- data:
default:
log.Printf("Dropping message for slow client %s", client.ID)
}
}
}
func (s *Server) sendError(c *Client, code int, message string) {
errPkt := protocol.MustPacket(protocol.OpError, &protocol.ErrorData{
Code: code,
Message: message,
})
c.Send <- mustJSON(errPkt)
}
func (s *Server) GetDB() *database.DB {
return s.db
}
func (s *Server) GetChannelManager() *channel.Manager {
return s.channelMgr
}
func (s *Server) GetRoleManager() *role.Manager {
return s.roleMgr
}
func (s *Server) GetFederationManager() *federation.Manager {
return s.fedMgr
}
func (s *Server) GetCompressor() *compression.Compressor {
return s.compressor
}
func mustJSON(v interface{}) []byte {
data, _ := json.Marshal(v)
return data
}