diff --git a/cmd/wg-portal-2/main.go b/cmd/wg-portal-2/main.go index 0b2c490..cbd7348 100644 --- a/cmd/wg-portal-2/main.go +++ b/cmd/wg-portal-2/main.go @@ -22,13 +22,14 @@ import ( "github.com/fedor-git/wg-portal-2/internal/app/audit" "github.com/fedor-git/wg-portal-2/internal/app/auth" "github.com/fedor-git/wg-portal-2/internal/app/configfile" + "github.com/fedor-git/wg-portal-2/internal/app/fanout" "github.com/fedor-git/wg-portal-2/internal/app/mail" "github.com/fedor-git/wg-portal-2/internal/app/route" "github.com/fedor-git/wg-portal-2/internal/app/users" "github.com/fedor-git/wg-portal-2/internal/app/webhooks" "github.com/fedor-git/wg-portal-2/internal/app/wireguard" "github.com/fedor-git/wg-portal-2/internal/config" - "github.com/fedor-git/wg-portal-2/internal/sync" + "github.com/fedor-git/wg-portal-2/internal/domain" ) // main entry point for WireGuard Portal @@ -77,6 +78,17 @@ func main() { queueSize := 100 eventBus := evbus.New(queueSize) + fanout.Start(ctx, eventBus, cfg.Core.Fanout) + // fanout.Start(ctx, eventBus, fanout.Config{ + // Enabled: cfg.Core.Fanout.Enabled, + // Peers: cfg.Core.Fanout.Peers, + // AuthHeader: cfg.Core.Fanout.AuthHeader, + // AuthValue: cfg.Core.Fanout.AuthValue, + // Timeout: cfg.Core.Fanout.Timeout, + // Debounce: cfg.Core.Fanout.Debounce, + // SelfURL: cfg.Core.Fanout.SelfURL, + // }) + auditManager := audit.NewManager(database) auditRecorder, err := audit.NewAuditRecorder(cfg, eventBus, database) @@ -104,6 +116,7 @@ func main() { cfgFileManager, err := configfile.NewConfigFileManager(cfg, eventBus, database, database, cfgFileSystem) internal.AssertNoError(err) + cfgFileManager.StartBackgroundJobs(ctx) mailManager, err := mail.NewMailManager(cfg, mailer, cfgFileManager, database, database) internal.AssertNoError(err) @@ -119,6 +132,18 @@ func main() { err = app.Initialize(cfg, wireGuardManager, userManager) internal.AssertNoError(err) + if cfg.Core.SyncOnStartup { + syncCtx := domain.SetUserInfo(ctx, domain.SystemAdminContextUserInfo()) + if err := wireGuardManager.RestoreInterfaceState(syncCtx, true /*updateDbOnError*/); err != nil { + slog.Error("initial interface restore failed", "err", err) + } + if n, err := wireGuardManager.SyncAllPeersFromDB(syncCtx); err != nil { + slog.Error("initial peer sync failed", "err", err) + } else { + slog.Info("initial peer sync done", "applied", n) + } + } + validatorManager := validator.New() // region API v0 (SPA frontend) @@ -129,13 +154,19 @@ func main() { apiV0BackendUsers := backendV0.NewUserService(cfg, userManager, wireGuardManager) apiV0BackendInterfaces := backendV0.NewInterfaceService(cfg, wireGuardManager, cfgFileManager) apiV0BackendPeers := backendV0.NewPeerService(cfg, wireGuardManager, cfgFileManager, mailManager) + // apiV0BackendPeersWithEvents := handlersV0.NewEventingPeerService(apiV0BackendPeers, eventBus) + // apiV0BackendPeersWithEvents := handlersV0.NewEventingPeerService(apiV0BackendPeers, eventBus) apiV0EndpointAuth := handlersV0.NewAuthEndpoint(cfg, apiV0Auth, apiV0Session, validatorManager, authenticator, webAuthn) apiV0EndpointAudit := handlersV0.NewAuditEndpoint(cfg, apiV0Auth, auditManager) apiV0EndpointUsers := handlersV0.NewUserEndpoint(cfg, apiV0Auth, validatorManager, apiV0BackendUsers) apiV0EndpointInterfaces := handlersV0.NewInterfaceEndpoint(cfg, apiV0Auth, validatorManager, apiV0BackendInterfaces) + apiV0EndpointPeers := handlersV0.NewPeerEndpoint(cfg, apiV0Auth, validatorManager, apiV0BackendPeers) + // apiV0EndpointPeers := handlersV0.NewPeerEndpoint(cfg, apiV0Auth, validatorManager, apiV0BackendPeersWithEvents) + apiV0EndpointPeers.SetEventBus(eventBus) + apiV0EndpointConfig := handlersV0.NewConfigEndpoint(cfg, apiV0Auth, wireGuard) apiV0EndpointTest := handlersV0.NewTestEndpoint(apiV0Auth) @@ -162,6 +193,7 @@ func main() { apiV1EndpointUsers := handlersV1.NewUserEndpoint(apiV1Auth, validatorManager, apiV1BackendUsers) apiV1EndpointPeers := handlersV1.NewPeerEndpoint(apiV1Auth, validatorManager, apiV1BackendPeers) + apiV1EndpointPeers.SetEventBus(eventBus) apiV1EndpointInterfaces := handlersV1.NewInterfaceEndpoint(apiV1Auth, validatorManager, apiV1BackendInterfaces) apiV1EndpointProvisioning := handlersV1.NewProvisioningEndpoint(apiV1Auth, validatorManager, apiV1BackendProvisioning) @@ -183,11 +215,6 @@ func main() { go metricsServer.Run(ctx) go webSrv.Run(ctx, cfg.Web.ListeningAddress) - // sync WireGuard state periodically in the background - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go sync.StartPeriodicSync(ctx, wgManager, 30*time.Second) - slog.Info("Application startup complete") // wait until context gets cancelled diff --git a/internal/adapters/database.go b/internal/adapters/database.go index 6bb7865..98a6254 100644 --- a/internal/adapters/database.go +++ b/internal/adapters/database.go @@ -1085,3 +1085,16 @@ func (r *SqlRepo) GetAllAuditEntries(ctx context.Context) ([]domain.AuditEntry, } // endregion audit + +func (r *SqlRepo) GetAllPeers(ctx context.Context) ([]domain.Peer, error) { + var peers []domain.Peer + err := r.db.WithContext(ctx). + Preload("Addresses"). + Preload("Interface"). + Find(&peers).Error + if err != nil { + return nil, err + } + + return peers, nil +} \ No newline at end of file diff --git a/internal/adapters/wgcontroller/local.go b/internal/adapters/wgcontroller/local.go index c6393e3..8891fad 100644 --- a/internal/adapters/wgcontroller/local.go +++ b/internal/adapters/wgcontroller/local.go @@ -861,4 +861,4 @@ func (c LocalController) PingAddresses( }, nil } -// endregion statistics-related +// endregion statistics-related \ No newline at end of file diff --git a/internal/app/api/v0/handlers/endpoint_peers.go b/internal/app/api/v0/handlers/endpoint_peers.go index 0d0c388..783db60 100644 --- a/internal/app/api/v0/handlers/endpoint_peers.go +++ b/internal/app/api/v0/handlers/endpoint_peers.go @@ -3,10 +3,12 @@ package handlers import ( "context" "io" + "log/slog" "net/http" "github.com/go-pkgz/routegroup" + "github.com/fedor-git/wg-portal-2/internal/app" "github.com/fedor-git/wg-portal-2/internal/app/api/core/request" "github.com/fedor-git/wg-portal-2/internal/app/api/core/respond" "github.com/fedor-git/wg-portal-2/internal/app/api/v0/model" @@ -48,6 +50,7 @@ type PeerEndpoint struct { peerService PeerService authenticator Authenticator validator Validator + bus app.EventBus } func NewPeerEndpoint( @@ -68,6 +71,15 @@ func (e PeerEndpoint) GetName() string { return "PeerEndpoint" } +func (e *PeerEndpoint) SetEventBus(bus app.EventBus) { e.bus = bus } + +// ВАЖЛИВО: fanout підписаний на 1 аргумент, тож примусово даємо рівно 1 +func (e *PeerEndpoint) publish(topic string, arg any) { + if e.bus == nil || topic == "" { return } + slog.Debug("[V0] publish", "topic", topic) + e.bus.Publish(topic, arg) +} + func (e PeerEndpoint) RegisterRoutes(g *routegroup.Bundle) { apiGroup := g.Mount("/peer") apiGroup.Use(e.authenticator.LoggedIn()) @@ -223,6 +235,9 @@ func (e PeerEndpoint) handleCreatePost() http.HandlerFunc { return } + e.publish(app.TopicFanPeerSave, newPeer) + e.publish(app.TopicFanPeersUpdated, "v0:create") + respond.JSON(w, http.StatusOK, model.NewPeer(newPeer)) } } @@ -266,6 +281,9 @@ func (e PeerEndpoint) handleCreateMultiplePost() http.HandlerFunc { return } + e.publish(app.TopicFanPeerSave, newPeers) + e.publish(app.TopicFanPeersUpdated, "v0:multi") + respond.JSON(w, http.StatusOK, model.NewPeers(newPeers)) } } @@ -314,6 +332,9 @@ func (e PeerEndpoint) handleUpdatePut() http.HandlerFunc { return } + e.publish(app.TopicFanPeerSave, updatedPeer) + e.publish(app.TopicFanPeersUpdated, "v0:update") + respond.JSON(w, http.StatusOK, model.NewPeer(updatedPeer)) } } @@ -344,6 +365,9 @@ func (e PeerEndpoint) handleDelete() http.HandlerFunc { return } + e.publish(app.TopicFanPeerDelete, domain.PeerIdentifier(id)) + e.publish(app.TopicFanPeersUpdated, "v0:delete") + respond.Status(w, http.StatusNoContent) } } diff --git a/internal/app/api/v0/handlers/peer_eventing_service.go b/internal/app/api/v0/handlers/peer_eventing_service.go new file mode 100644 index 0000000..6c63103 --- /dev/null +++ b/internal/app/api/v0/handlers/peer_eventing_service.go @@ -0,0 +1,101 @@ +package handlers + +import ( + "context" + "io" + + "github.com/fedor-git/wg-portal-2/internal/app" + "github.com/fedor-git/wg-portal-2/internal/domain" +) + +type eventingPeerService struct { + inner PeerService + bus app.EventPublisher +} + +func NewEventingPeerService(inner PeerService, bus app.EventPublisher) PeerService { + return &eventingPeerService{inner: inner, bus: bus} +} + +// ---------- read-only делегування ---------- + +func (s *eventingPeerService) GetInterfaceAndPeers(ctx context.Context, id domain.InterfaceIdentifier) (*domain.Interface, []domain.Peer, error) { + return s.inner.GetInterfaceAndPeers(ctx, id) +} + +func (s *eventingPeerService) PreparePeer(ctx context.Context, id domain.InterfaceIdentifier) (*domain.Peer, error) { + return s.inner.PreparePeer(ctx, id) +} + +func (s *eventingPeerService) GetPeer(ctx context.Context, id domain.PeerIdentifier) (*domain.Peer, error) { + return s.inner.GetPeer(ctx, id) +} + +func (s *eventingPeerService) GetPeerConfig(ctx context.Context, id domain.PeerIdentifier, style string) (io.Reader, error) { + return s.inner.GetPeerConfig(ctx, id, style) +} + +func (s *eventingPeerService) GetPeerConfigQrCode(ctx context.Context, id domain.PeerIdentifier, style string) (io.Reader, error) { + return s.inner.GetPeerConfigQrCode(ctx, id, style) +} + +func (s *eventingPeerService) SendPeerEmail(ctx context.Context, linkOnly bool, style string, peers ...domain.PeerIdentifier) error { + return s.inner.SendPeerEmail(ctx, linkOnly, style, peers...) +} + +func (s *eventingPeerService) GetPeerStats(ctx context.Context, id domain.InterfaceIdentifier) ([]domain.PeerStatus, error) { + return s.inner.GetPeerStats(ctx, id) +} + +// ---------- мутації + події ---------- + +func (s *eventingPeerService) CreatePeer(ctx context.Context, p *domain.Peer) (*domain.Peer, error) { + out, err := s.inner.CreatePeer(ctx, p) + if err != nil { return nil, err } + s.publish(app.TopicPeerCreated) + s.publish(app.TopicPeerUpdated) + return out, nil +} + +func (s *eventingPeerService) CreateMultiplePeers(ctx context.Context, ifaceID domain.InterfaceIdentifier, r *domain.PeerCreationRequest) ([]domain.Peer, error) { + out, err := s.inner.CreateMultiplePeers(ctx, ifaceID, r) + if err != nil { return nil, err } + + // внутрішні + s.publish(app.TopicPeerUpdated, out) + + // fanout + s.publish("peer.save", out) + s.publish("peers.updated", struct{}{}) + + return out, nil +} + +func (s *eventingPeerService) UpdatePeer(ctx context.Context, p *domain.Peer) (*domain.Peer, error) { + out, err := s.inner.UpdatePeer(ctx, p) + if err != nil { return nil, err } + s.publish(app.TopicPeerUpdated) + return out, nil +} + +func (s *eventingPeerService) DeletePeer(ctx context.Context, id domain.PeerIdentifier) error { + if err := s.inner.DeletePeer(ctx, id); err != nil { return err } + s.publish(app.TopicPeerDeleted) + s.publish(app.TopicPeerUpdated) + return nil +} + +func (s *eventingPeerService) publish(topic string, args ...any) { + if s.bus == nil || topic == "" { return } + // страхуємося: fanout-підписник очікує рівно 1 аргумент + if len(args) == 0 { + s.bus.Publish(topic, struct{}{}) + return + } + // якщо передали багато — обгорнемо їх у один контейнер + if len(args) > 1 { + s.bus.Publish(topic, args) + return + } + s.bus.Publish(topic, args[0]) +} diff --git a/internal/app/api/v1/backend/peer_service.go b/internal/app/api/v1/backend/peer_service.go index 89c1331..9fb9f92 100644 --- a/internal/app/api/v1/backend/peer_service.go +++ b/internal/app/api/v1/backend/peer_service.go @@ -155,3 +155,24 @@ func (s PeerService) Delete(ctx context.Context, id domain.PeerIdentifier) error return nil } + +func (s *PeerService) SyncAllPeersFromDB(ctx context.Context) (int, error) { + type syncer interface { + SyncAllPeersFromDB(context.Context) (int, error) + } + if v, ok := any(s.peers).(syncer); ok { + return v.SyncAllPeersFromDB(ctx) + } + + type syncerErrOnly interface { + SyncAllPeersFromDB(context.Context) error + } + if v, ok := any(s.peers).(syncerErrOnly); ok { + if err := v.SyncAllPeersFromDB(ctx); err != nil { + return 0, err + } + return 0, nil + } + + return 0, fmt.Errorf("sync not supported by current peers backend") +} \ No newline at end of file diff --git a/internal/app/api/v1/handlers/endpoint_peer.go b/internal/app/api/v1/handlers/endpoint_peer.go index 8dd965c..907f4a0 100644 --- a/internal/app/api/v1/handlers/endpoint_peer.go +++ b/internal/app/api/v1/handlers/endpoint_peer.go @@ -2,10 +2,12 @@ package handlers import ( "context" + "log/slog" "net/http" "github.com/go-pkgz/routegroup" + "github.com/fedor-git/wg-portal-2/internal/app" "github.com/fedor-git/wg-portal-2/internal/app/api/core/request" "github.com/fedor-git/wg-portal-2/internal/app/api/core/respond" "github.com/fedor-git/wg-portal-2/internal/app/api/v1/models" @@ -20,12 +22,14 @@ type PeerService interface { Create(context.Context, *domain.Peer) (*domain.Peer, error) Update(context.Context, domain.PeerIdentifier, *domain.Peer) (*domain.Peer, error) Delete(context.Context, domain.PeerIdentifier) error + SyncAllPeersFromDB(ctx context.Context) (int, error) } type PeerEndpoint struct { peers PeerService authenticator Authenticator validator Validator + bus app.EventBus } func NewPeerEndpoint( @@ -43,6 +47,30 @@ func (e PeerEndpoint) GetName() string { return "PeerEndpoint" } +func (e *PeerEndpoint) SetEventBus(bus app.EventBus) { + e.bus = bus +} + +func (e *PeerEndpoint) publish(topic string, args ...any) { + if e.bus == nil || topic == "" { + return + } + slog.Debug("[V1] publish", "topic", topic) + e.bus.Publish(topic, args...) +} + +// 0-arg для штатних подій (внутрішні підписники) +func (e *PeerEndpoint) publish0(topic string) { + if e.bus == nil || topic == "" { return } + e.bus.Publish(topic) // без аргументів +} + +// 1-arg для fanout (йому потрібен рівно ОДИН аргумент) +func (e *PeerEndpoint) publish1(topic string, arg any) { + if e.bus == nil || topic == "" { return } + e.bus.Publish(topic, arg) +} + func (e PeerEndpoint) RegisterRoutes(g *routegroup.Bundle) { apiGroup := g.Mount("/peer") apiGroup.Use(e.authenticator.LoggedIn()) @@ -56,6 +84,8 @@ func (e PeerEndpoint) RegisterRoutes(g *routegroup.Bundle) { apiGroup.With(e.authenticator.LoggedIn(ScopeAdmin)).HandleFunc("POST /new", e.handleCreatePost()) apiGroup.With(e.authenticator.LoggedIn(ScopeAdmin)).HandleFunc("PUT /by-id/{id}", e.handleUpdatePut()) apiGroup.With(e.authenticator.LoggedIn(ScopeAdmin)).HandleFunc("DELETE /by-id/{id}", e.handleDelete()) + apiGroup.With(e.authenticator.LoggedIn(ScopeAdmin)).HandleFunc("POST /sync", e.handleSyncPost()) + } // handleAllForInterfaceGet returns a gorm Handler function. @@ -229,6 +259,14 @@ func (e PeerEndpoint) handleCreatePost() http.HandlerFunc { return } + // внутрішні + e.publish0(app.TopicPeerCreated) + e.publish0(app.TopicPeerUpdated) + + // fanout + e.publish1("peer.save", newPeer) + e.publish1("peers.updated", "v1:create") + respond.JSON(w, http.StatusOK, models.NewPeer(newPeer)) } } @@ -276,6 +314,13 @@ func (e PeerEndpoint) handleUpdatePut() http.HandlerFunc { return } + // внутрішні + e.publish0(app.TopicPeerUpdated) + + // fanout + e.publish1("peer.save", updatedPeer) + e.publish1("peers.updated", "v1:update") + respond.JSON(w, http.StatusOK, models.NewPeer(updatedPeer)) } } @@ -311,6 +356,48 @@ func (e PeerEndpoint) handleDelete() http.HandlerFunc { return } + // внутрішні + e.publish0(app.TopicPeerDeleted) + e.publish0(app.TopicPeerUpdated) + + // fanout + e.publish1("peer.delete", domain.PeerIdentifier(id)) + e.publish1("peers.updated", "v1:delete") + respond.Status(w, http.StatusNoContent) } } + +// func (e PeerEndpoint) handleSyncPost() http.HandlerFunc { +// return func(w http.ResponseWriter, r *http.Request) { +// count, err := e.peers.SyncAllPeersFromDB(r.Context()) +// if err != nil { +// status, model := ParseServiceError(err) +// respond.JSON(w, status, model) +// return +// } +// respond.JSON(w, http.StatusOK, map[string]any{ +// "synced": count, +// }) +// } +// } + +func (e PeerEndpoint) handleSyncPost() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + // Якщо fanout поставив заголовок "не ехо", забороняємо локальні публікації + if r.Header.Get("X-WGP-NoEcho") == "1" { + ctx = app.WithNoFanout(ctx) + } + + count, err := e.peers.SyncAllPeersFromDB(ctx) + if err != nil { + status, model := ParseServiceError(err) + respond.JSON(w, status, model) + return + } + respond.JSON(w, http.StatusOK, map[string]any{ + "synced": count, + }) + } +} \ No newline at end of file diff --git a/internal/app/api/v1/handlers/peer_eventing_service.go b/internal/app/api/v1/handlers/peer_eventing_service.go new file mode 100644 index 0000000..ba2c0b6 --- /dev/null +++ b/internal/app/api/v1/handlers/peer_eventing_service.go @@ -0,0 +1,67 @@ +package handlers + +import ( + "context" + + "github.com/fedor-git/wg-portal-2/internal/app" // твої топіки і EventPublisher + "github.com/fedor-git/wg-portal-2/internal/domain" +) + +// eventingPeerService декорує будь-яку реалізацію PeerService +// та надсилає івенти після Create/Update/Delete. +type eventingPeerService struct { + inner PeerService + bus app.EventPublisher +} + +// Конструктор: повертає саме PeerService, тож NewPeerEndpoint прийме як є. +func NewEventingPeerService(inner PeerService, bus app.EventPublisher) PeerService { + return &eventingPeerService{inner: inner, bus: bus} +} + +// ---------- read-only як є ---------- + +func (s *eventingPeerService) GetForInterface(ctx context.Context, id domain.InterfaceIdentifier) ([]domain.Peer, error) { + return s.inner.GetForInterface(ctx, id) +} +func (s *eventingPeerService) GetForUser(ctx context.Context, id domain.UserIdentifier) ([]domain.Peer, error) { + return s.inner.GetForUser(ctx, id) +} +func (s *eventingPeerService) GetById(ctx context.Context, id domain.PeerIdentifier) (*domain.Peer, error) { + return s.inner.GetById(ctx, id) +} +func (s *eventingPeerService) Prepare(ctx context.Context, id domain.InterfaceIdentifier) (*domain.Peer, error) { + return s.inner.Prepare(ctx, id) +} +func (s *eventingPeerService) SyncAllPeersFromDB(ctx context.Context) (int, error) { + return s.inner.SyncAllPeersFromDB(ctx) +} + +// ---------- мутації + події ---------- + +func (s *eventingPeerService) Create(ctx context.Context, p *domain.Peer) (*domain.Peer, error) { + out, err := s.inner.Create(ctx, p) + if err != nil { return nil, err } + s.publish(app.TopicPeerCreated) + s.publish(app.TopicPeerUpdated) + return out, nil +} + +func (s *eventingPeerService) Update(ctx context.Context, id domain.PeerIdentifier, p *domain.Peer) (*domain.Peer, error) { + out, err := s.inner.Update(ctx, id, p) + if err != nil { return nil, err } + s.publish(app.TopicPeerUpdated) + return out, nil +} + +func (s *eventingPeerService) Delete(ctx context.Context, id domain.PeerIdentifier) error { + if err := s.inner.Delete(ctx, id); err != nil { return err } + s.publish(app.TopicPeerDeleted) + s.publish(app.TopicPeerUpdated) + return nil +} + +func (s *eventingPeerService) publish(topic string, args ...any) { + if s.bus == nil || topic == "" { return } + s.bus.Publish(topic, args...) +} \ No newline at end of file diff --git a/internal/app/configfile/manager.go b/internal/app/configfile/manager.go index 449180c..b61e227 100644 --- a/internal/app/configfile/manager.go +++ b/internal/app/configfile/manager.go @@ -284,3 +284,7 @@ type nopCloser struct { // Close is a no-op for the nopCloser. func (nopCloser) Close() error { return nil } + +func (m *Manager) StartBackgroundJobs(ctx context.Context) { + m.connectToMessageBus() +} \ No newline at end of file diff --git a/internal/app/eventbus.go b/internal/app/eventbus.go index d411aa6..21c1bb6 100644 --- a/internal/app/eventbus.go +++ b/internal/app/eventbus.go @@ -48,4 +48,25 @@ const TopicAuditLoginFailed = "audit:login:failed" const TopicAuditInterfaceChanged = "audit:interface:changed" const TopicAuditPeerChanged = "audit:peer:changed" + +const TopicFanPeersUpdated = "peers.updated" +const TopicFanPeerSave = "peer.save" +const TopicFanPeerDelete = "peer.delete" +const TopicFanInterfaceSave = "interface.save" +const TopicFanInterfaceUpdated = "interface.updated" + // endregion audit-events + +type EventPublisher interface { + Publish(topic string, args ...any) +} + +type EventSubscriber interface { + Subscribe(topic string, fn interface{}) error +} + +// Зручно мати комбінований інтерфейс: +type EventBus interface { + EventPublisher + EventSubscriber +} \ No newline at end of file diff --git a/internal/app/fanout/fanout.go b/internal/app/fanout/fanout.go new file mode 100644 index 0000000..f0db6e4 --- /dev/null +++ b/internal/app/fanout/fanout.go @@ -0,0 +1,311 @@ +package fanout + +import ( + "context" + "log/slog" + "net" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "github.com/google/uuid" + + cfgpkg "github.com/fedor-git/wg-portal-2/internal/config" +) + +// Мінімальний інтерфейс під ваш eventbus (eventbus.go) +type EventBus interface { + Subscribe(topic string, fn interface{}) error +} + +// Службові заголовки +const ( + hdrOrigin = "X-WGP-Origin" + hdrNoEcho = "X-WGP-NoEcho" + hdrCorrID = "X-WGP-Correlation-ID" +) + +// внутрішні налаштування (копія потрібних полів із cfg.Core.Fanout) +type settings struct { + Enabled bool + Peers []string + AuthHeader string + AuthValue string + Timeout time.Duration + Debounce time.Duration + SelfURL string + Origin string + KickOnStart bool + Topics []string +} + +// Публічний запуск fanout (бере все з cfg.Core.Fanout) +func Start(ctx context.Context, bus EventBus, fc cfgpkg.FanoutConfig) { + s := settings{ + Enabled: fc.Enabled, + Peers: append([]string(nil), fc.Peers...), + AuthHeader: fc.AuthHeader, + AuthValue: fc.AuthValue, + Timeout: fc.Timeout, + Debounce: fc.Debounce, + SelfURL: fc.SelfURL, + Origin: fc.Origin, + KickOnStart: fc.KickOnStart, + Topics: append([]string(nil), fc.Topics...), + } + + if !s.Enabled { + slog.Info("[FANOUT] disabled, skip init") + return + } + if s.Timeout <= 0 { + s.Timeout = 5 * time.Second + } + if s.Debounce <= 0 { + s.Debounce = 250 * time.Millisecond + } + + // best-effort origin + if s.Origin == "" { + if host := getHost(s.SelfURL); host != "" { + if hn, _ := net.LookupAddr(host); len(hn) > 0 { + s.Origin = strings.TrimSuffix(hn[0], ".") + } + if s.Origin == "" { + s.Origin = host + } + } + if s.Origin == "" { + s.Origin = "unknown-node" + } + } + + // дефолтні теми + if len(s.Topics) == 0 { + s.Topics = []string{ + "peer:created", "peer:updated", "peer:deleted", + "interface:created", "interface:updated", "interface:deleted", + } + } + + f := &fanout{ + cfg: s, + client: &http.Client{ + Timeout: s.Timeout, + }, + debounce: newDebouncer(s.Debounce), + } + + // Підписки на конкретні топіки (НЕ wildcard) + if bus != nil { + for _, topic := range s.Topics { + t := topic + if err := bus.Subscribe(t, func() { + slog.Debug("[FANOUT] bump", "reason", "bus:"+t) + f.bump("bus:" + t) + }); err != nil { + slog.Warn("[FANOUT] subscribe failed", "topic", t, "err", err) + } else { + slog.Debug("[FANOUT] subscribed", "topic", t) + } + } + } else { + slog.Debug("[FANOUT] no bus provided, event-driven bumps disabled") + } + + // Головний цикл + go f.loop(ctx) + + // Перший «пинок» — уже після підписок, щоб одразу роздати sync, якщо треба + if s.KickOnStart { + f.bump("startup") + } + + slog.Info("[FANOUT] initialized", + "peers", len(s.Peers), + "topics", s.Topics, + "self", s.SelfURL, + "origin", s.Origin, + "debounce", s.Debounce.String(), + "timeout", s.Timeout.String(), + ) +} + +// ------------------ реалізація ------------------ + +type fanout struct { + cfg settings + client *http.Client + debounce *debouncer +} + +func (f *fanout) loop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-f.debounce.C: + f.fire(ctx) + } + } +} + +func (f *fanout) bump(reason string) { + slog.Debug("[FANOUT] bump", "reason", reason) + f.debounce.Bump() +} + +func (f *fanout) fire(ctx context.Context) { + var wg sync.WaitGroup + + for _, base := range f.cfg.Peers { + base = strings.TrimSpace(base) + if base == "" { + continue + } + if isSelf(base, f.cfg.SelfURL) { + continue + } + endpoint := strings.TrimRight(base, "/") + "/api/v1/peer/sync" + + wg.Add(1) + go func(u string) { + defer wg.Done() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, nil) + if err != nil { + slog.Warn("[FANOUT] build request failed", "url", u, "err", err) + return + } + + // Auth (за потреби) + if f.cfg.AuthHeader != "" && f.cfg.AuthValue != "" { + req.Header.Set(f.cfg.AuthHeader, f.cfg.AuthValue) + } + + // Службові заголовки + req.Header.Set(hdrOrigin, f.cfg.Origin) + req.Header.Set(hdrNoEcho, "1") + req.Header.Set(hdrCorrID, uuid.NewString()) + + resp, err := f.client.Do(req) + if err != nil { + slog.Warn("[FANOUT] sync failed", "url", u, "err", err) + return + } + _ = resp.Body.Close() + + if resp.StatusCode >= 300 { + slog.Warn("[FANOUT] sync non-2xx", "url", u, "status", resp.Status) + return + } + slog.Debug("[FANOUT] sync ok", "url", u) + }(endpoint) + } + + wg.Wait() +} + +// ------------------ утиліти ------------------ + +type debouncer struct { + d time.Duration + tmu sync.Mutex + t *time.Timer + C chan struct{} +} + +func newDebouncer(d time.Duration) *debouncer { + return &debouncer{ + d: d, + C: make(chan struct{}, 1), + } +} + +func (d *debouncer) Bump() { + d.tmu.Lock() + defer d.tmu.Unlock() + + if d.t == nil { + d.t = time.AfterFunc(d.d, func() { + select { + case d.C <- struct{}{}: + default: + } + d.reset() + }) + return + } + + // перезапуск + if !d.t.Stop() { + // якщо таймер уже майже «вистрілив», знімаємо імпульс з каналу + select { + case <-d.C: + default: + } + } + d.t.Reset(d.d) +} + +func (d *debouncer) reset() { + d.tmu.Lock() + defer d.tmu.Unlock() + d.t = nil +} + +func isSelf(targetBase, selfBase string) bool { + if selfBase == "" || targetBase == "" { + return false + } + + tu, terr := url.Parse(strings.TrimSpace(targetBase)) + su, serr := url.Parse(strings.TrimSpace(selfBase)) + if terr != nil || serr != nil { + return strings.TrimRight(targetBase, "/") == strings.TrimRight(selfBase, "/") + } + + ts := strings.ToLower(tu.Scheme) + ss := strings.ToLower(su.Scheme) + if ts == "" { + ts = "http" + } + if ss == "" { + ss = "http" + } + + th := normalizeHostPort(tu.Host, ts) + sh := normalizeHostPort(su.Host, ss) + + return ts == ss && th == sh +} + +func normalizeHostPort(host, scheme string) string { + h := strings.ToLower(strings.TrimSpace(host)) + if h == "" { + return "" + } + if !strings.Contains(h, ":") { + switch scheme { + case "https": + h += ":443" + default: + h += ":80" + } + } + return h +} + +func getHost(base string) string { + u, err := url.Parse(strings.TrimSpace(base)) + if err != nil { + return "" + } + h := u.Host + if i := strings.IndexByte(h, ':'); i >= 0 { + h = h[:i] + } + return h +} diff --git a/internal/app/nofanout.go b/internal/app/nofanout.go new file mode 100644 index 0000000..fa10fe3 --- /dev/null +++ b/internal/app/nofanout.go @@ -0,0 +1,13 @@ +package app + +import "context" + +type noFanoutKey struct{} + +func WithNoFanout(ctx context.Context) context.Context { + return context.WithValue(ctx, noFanoutKey{}, true) +} +func NoFanout(ctx context.Context) bool { + v, _ := ctx.Value(noFanoutKey{}).(bool) + return v +} \ No newline at end of file diff --git a/internal/app/wireguard/capabilities.go b/internal/app/wireguard/capabilities.go new file mode 100644 index 0000000..4d6fb15 --- /dev/null +++ b/internal/app/wireguard/capabilities.go @@ -0,0 +1,8 @@ +package wireguard + +import "context" + +// Опціональна можливість для бекендів, які підтримують повне очищення списку peer'ів. +type SupportsClearPeers interface { + ClearPeers(ctx context.Context, iface string) error +} \ No newline at end of file diff --git a/internal/app/wireguard/controller_manager_clear.go b/internal/app/wireguard/controller_manager_clear.go new file mode 100644 index 0000000..966d39c --- /dev/null +++ b/internal/app/wireguard/controller_manager_clear.go @@ -0,0 +1,20 @@ +package wireguard + +import ( + "context" + + "golang.zx2c4.com/wireguard/wgctrl" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" +) + +// ClearPeers повністю очищає peers на інтерфейсі (ReplacePeers=true з порожнім списком). +func (m *ControllerManager) ClearPeers(_ context.Context, iface string) error { + c, err := wgctrl.New() + if err != nil { return err } + defer c.Close() + + return c.ConfigureDevice(iface, wgtypes.Config{ + ReplacePeers: true, + Peers: []wgtypes.PeerConfig{}, + }) +} \ No newline at end of file diff --git a/internal/app/wireguard/wireguard.go b/internal/app/wireguard/wireguard.go index 738d944..9609071 100644 --- a/internal/app/wireguard/wireguard.go +++ b/internal/app/wireguard/wireguard.go @@ -321,3 +321,7 @@ func (m Manager) checkExpiredPeers(ctx context.Context, peers []domain.Peer) { } } } + +func (m Manager) ClearPeers(ctx context.Context, iface domain.InterfaceIdentifier) error { + return m.clearPeers(ctx, iface) +} \ No newline at end of file diff --git a/internal/app/wireguard/wireguard_sync.go b/internal/app/wireguard/wireguard_sync.go new file mode 100644 index 0000000..0499bf3 --- /dev/null +++ b/internal/app/wireguard/wireguard_sync.go @@ -0,0 +1,165 @@ +package wireguard + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "strings" + + "github.com/fedor-git/wg-portal-2/internal/app" + "github.com/fedor-git/wg-portal-2/internal/domain" +) + +type peerLister interface { + GetAllPeers(ctx context.Context) ([]domain.Peer, error) +} + +func (m Manager) SyncAllPeersFromDB(ctx context.Context) (int, error) { + if err := domain.ValidateAdminAccessRights(ctx); err != nil { + return 0, err + } + if m.db == nil { return 0, fmt.Errorf("db repo is nil") } + if m.wg == nil { return 0, fmt.Errorf("wg controller is nil") } + + ifaces, err := m.db.GetAllInterfaces(ctx) + if err != nil { + return 0, fmt.Errorf("list interfaces: %w", err) + } + + applied := 0 + for _, in := range ifaces { + // 1) за потреби відновили/привели інтерфейс у консистентний стан + if err := m.RestoreInterfaceState(ctx, true, in.Identifier); err != nil { + slog.ErrorContext(ctx, "restore interface state failed", "iface", in.Identifier, "err", err) + continue + } + + // 2) дістали бажаний список пірів з БД (фільтруємо disabled) + peers, err := m.db.GetInterfacePeers(ctx, in.Identifier) + if err != nil { + slog.ErrorContext(ctx, "peer sync: failed to load peers", "iface", in.Identifier, "err", err) + continue + } + // >>> ДОДАЙ ЦЕ: якщо peers немає – прибираємо всіх на інтерфейсі + if len(peers) == 0 { + if err := m.clearPeers(ctx, in.Identifier); err != nil { + slog.ErrorContext(ctx, "clear peers failed", "iface", in.Identifier, "err", err) + } + // не публікуємо івенти, якщо це fanout-sync + if !app.NoFanout(ctx) { + m.bus.Publish(app.TopicPeerUpdated) + } + continue + } + desired := make([]domain.Peer, 0, len(peers)) + for i := range peers { + if !peers[i].IsDisabled() { + desired = append(desired, peers[i]) + } + } + + // 3) ЗАСТОСОВУЄМО ПОВНУ ЗАМІНУ (ключове!) + if err := m.replacePeers(ctx, in.Identifier, desired); err != nil { + // якщо інтерфейсу не існує/файл відсутній – пробуємо ще раз після restore + if isNoSuchFile(err) { + slog.WarnContext(ctx, "replacePeers failed (no iface/file), restoring and retrying", + "iface", in.Identifier, "err", err) + if rErr := m.RestoreInterfaceState(ctx, true, in.Identifier); rErr != nil { + slog.ErrorContext(ctx, "retry restore interface failed", "iface", in.Identifier, "err", rErr) + continue + } + if r2 := m.replacePeers(ctx, in.Identifier, desired); r2 != nil { + slog.ErrorContext(ctx, "replacePeers retry failed", "iface", in.Identifier, "err", r2) + continue + } + } else { + slog.ErrorContext(ctx, "replacePeers failed", "iface", in.Identifier, "err", err) + continue + } + } + + applied += len(desired) + } + + return applied, nil +} + +// replacePeers робить повну заміну складу peer-ів на інтерфейсі. +// Усередині має викликати бекенд з ReplacePeers=true. +// Реалізацію підженете під ваш controller (wgctrl, локальний тощо). +func (m Manager) replacePeers(ctx context.Context, iface domain.InterfaceIdentifier, peers []domain.Peer) error { + // ВАРІАНТ A: якщо контролер уміє "Replace" напряму: + // return m.wg.ReplacePeers(ctx, string(iface), peers) + + // ВАРІАНТ B: якщо є низькорівневий доступ до wgctrl: + // - зібрати []wgtypes.PeerConfig з domain.Peer + // - викликати ConfigureDevice(..., wgtypes.Config{ReplacePeers: true, Peers: pcs}) + // + // ВАРІАНТ C (fallback, якщо немає Replace API): + // - спочатку "очистити" пірів (ReplacePeers: true, Peers: nil) + // - потім додати кожного з desired через існуючий m.savePeers(ctx, &p) + + // Нижче – універсальний fallback «очистити і додати»: + if err := m.clearPeers(ctx, iface); err != nil { + return err + } + for i := range peers { + if err := m.savePeers(ctx, &peers[i]); err != nil { + return fmt.Errorf("add peer %s on %s: %w", peers[i].Identifier, iface, err) + } + // ВАЖЛИВО: під час sync не публікуємо події, аби не ловити шторм fanout + // (перенесіть publish із savePeers в той шар, де є user-driven зміни). + } + return nil +} + +func (m Manager) clearPeers(ctx context.Context, iface domain.InterfaceIdentifier) error { + return m.wg.ClearPeers(ctx, string(iface)) +} + +// func (m Manager) applyPeers(ctx context.Context, peers []domain.Peer) error { +// var firstErr error +// for i := range peers { +// p := &peers[i] +// if p.IsDisabled() { +// continue +// } +// if err := m.savePeers(ctx, p); err != nil { +// if firstErr == nil { +// firstErr = fmt.Errorf("apply peer %s (iface %s): %w", +// p.Identifier, p.InterfaceIdentifier, err) +// } +// continue +// } +// m.bus.Publish(app.TopicPeerUpdated, *p) +// } +// return firstErr +// } + +func (m Manager) applyPeers(ctx context.Context, peers []domain.Peer) error { + var firstErr error + for i := range peers { + p := &peers[i] + if p.IsDisabled() { continue } + if err := m.savePeers(ctx, p); err != nil { + if firstErr == nil { + firstErr = fmt.Errorf("apply peer %s (iface %s): %w", p.Identifier, p.InterfaceIdentifier, err) + } + continue + } + // <-- тут головне + if !app.NoFanout(ctx) { + m.bus.Publish(app.TopicPeerUpdated) + } + } + return firstErr +} + +func isNoSuchFile(err error) bool { + if err == nil { + return false + } + return errors.Is(err, os.ErrNotExist) || strings.Contains(err.Error(), "file does not exist") +} \ No newline at end of file diff --git a/internal/app/wireguard/wireguard_sync_.old b/internal/app/wireguard/wireguard_sync_.old new file mode 100644 index 0000000..ca38075 --- /dev/null +++ b/internal/app/wireguard/wireguard_sync_.old @@ -0,0 +1,108 @@ +package wireguard + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "strings" + + "github.com/fedor-git/wg-portal-2/internal/app" + "github.com/fedor-git/wg-portal-2/internal/domain" +) + +type peerLister interface { + GetAllPeers(ctx context.Context) ([]domain.Peer, error) +} + +func (m Manager) SyncAllPeersFromDB(ctx context.Context) (int, error) { + if err := domain.ValidateAdminAccessRights(ctx); err != nil { + return 0, err + } + if m.db == nil { + return 0, fmt.Errorf("db repo is nil") + } + if m.wg == nil { + return 0, fmt.Errorf("wg controller is nil") + } + + ifaces, err := m.db.GetAllInterfaces(ctx) + if err != nil { + return 0, fmt.Errorf("list interfaces: %w", err) + } + + applied := 0 + for _, in := range ifaces { + if err := m.RestoreInterfaceState(ctx, true, in.Identifier); err != nil { + slog.ErrorContext(ctx, "restore interface state failed", "iface", in.Identifier, "err", err) + continue + } + + peers, err := m.db.GetInterfacePeers(ctx, in.Identifier) + if err != nil { + slog.ErrorContext(ctx, "peer sync: failed to load peers", "iface", in.Identifier, "err", err) + continue + } + + for i := range peers { + p := &peers[i] + + err = m.applyPeers(ctx, []domain.Peer{*p}) + if err == nil { + applied++ + continue + } + + if isNoSuchFile(err) { + slog.WarnContext(ctx, "peer apply failed (no iface/file), restoring interface and retrying", + "peer", p.Identifier, "iface", p.InterfaceIdentifier, "err", err) + + if rErr := m.RestoreInterfaceState(ctx, true, in.Identifier); rErr != nil { + slog.ErrorContext(ctx, "retry restore interface failed", "iface", in.Identifier, "err", rErr) + continue + } + + if r2 := m.applyPeers(ctx, []domain.Peer{*p}); r2 != nil { + slog.ErrorContext(ctx, "peer apply retry failed", + "peer", p.Identifier, "iface", p.InterfaceIdentifier, "err", r2) + continue + } + + applied++ + continue + } + + slog.ErrorContext(ctx, "peer sync failed", + "peer", p.Identifier, "iface", p.InterfaceIdentifier, "err", err) + } + } + + return applied, nil +} + +func (m Manager) applyPeers(ctx context.Context, peers []domain.Peer) error { + var firstErr error + for i := range peers { + p := &peers[i] + if p.IsDisabled() { + continue + } + if err := m.savePeers(ctx, p); err != nil { + if firstErr == nil { + firstErr = fmt.Errorf("apply peer %s (iface %s): %w", + p.Identifier, p.InterfaceIdentifier, err) + } + continue + } + m.bus.Publish(app.TopicPeerUpdated, *p) + } + return firstErr +} + +func isNoSuchFile(err error) bool { + if err == nil { + return false + } + return errors.Is(err, os.ErrNotExist) || strings.Contains(err.Error(), "file does not exist") +} \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go index 0574133..9582481 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,6 +10,19 @@ import ( "gopkg.in/yaml.v3" ) +type FanoutConfig struct { + Enabled bool `yaml:"enabled"` + Peers []string `yaml:"peers"` + AuthHeader string `yaml:"auth_header"` + AuthValue string `yaml:"auth_value"` + Timeout time.Duration `yaml:"timeout"` + Debounce time.Duration `yaml:"debounce"` + SelfURL string `yaml:"self_url"` + Origin string `yaml:"origin" mapstructure:"origin"` + KickOnStart bool `yaml:"kick_on_start" mapstructure:"kick_on_start"` + Topics []string `yaml:"topics" mapstructure:"topics"` +} + // Config is the main configuration struct. type Config struct { Core struct { @@ -26,6 +39,9 @@ type Config struct { SelfProvisioningAllowed bool `yaml:"self_provisioning_allowed"` ImportExisting bool `yaml:"import_existing"` RestoreState bool `yaml:"restore_state"` + SyncOnStartup bool `mapstructure:"sync_on_startup" yaml:"sync_on_startup" env:"WG_SYNC_ON_STARTUP"` + + Fanout FanoutConfig `yaml:"fanout"` } `yaml:"core"` Advanced struct { diff --git a/internal/sync/periodic.go b/internal/sync/periodic.go deleted file mode 100644 index fb1e2ec..0000000 --- a/internal/sync/periodic.go +++ /dev/null @@ -1,35 +0,0 @@ -package sync - -import ( - "context" - "log" - "time" -) - -func startPeriodicSync(ctx context.Context, wgManager WireguardSynchronizer, interval time.Duration) { - log.Printf("✅ Starting periodic WireGuard sync every %s", interval) - ticker := time.NewTicker(interval) - defer ticker.Stop() - - log.Println("Running initial sync on startup...") - if err := wgManager.SyncDevice(); err != nil { - log.Printf("ERROR during initial sync: %v", err) - } - - for { - select { - case <-ticker.C: - log.Println("⚙️ Ticker fired: running periodic sync...") - if err := wgManager.SyncDevice(); err != nil { - log.Printf("ERROR during periodic sync: %v", err) - } - case <-ctx.Done(): - log.Println("Stopping periodic sync.") - return - } - } -} - -type WireguardSynchronizer interface { - SyncDevice() error -} \ No newline at end of file