From 14fdeddbe7f6ff285f37223cadd3d135da08cc28 Mon Sep 17 00:00:00 2001 From: fedorHub Date: Thu, 11 Sep 2025 16:20:46 +0200 Subject: [PATCH] rebuild v1 api --- cmd/wg-portal-2/main.go | 7 +- internal/app/api/v1/handlers/endpoint_peer.go | 81 +++++-------------- .../api/v1/handlers/peer_eventing_service.go | 47 ++++++----- internal/app/wireguard/wireguard_sync.go | 8 +- 4 files changed, 50 insertions(+), 93 deletions(-) diff --git a/cmd/wg-portal-2/main.go b/cmd/wg-portal-2/main.go index cbd7348..12a6f7e 100644 --- a/cmd/wg-portal-2/main.go +++ b/cmd/wg-portal-2/main.go @@ -186,14 +186,17 @@ func main() { apiV1Auth := handlersV1.NewAuthenticationHandler(userManager) apiV1BackendUsers := backendV1.NewUserService(cfg, userManager) - apiV1BackendPeers := backendV1.NewPeerService(cfg, wireGuardManager, userManager) + // apiV1BackendPeers := backendV1.NewPeerService(cfg, wireGuardManager, userManager) + coreV1Peers := backendV1.NewPeerService(cfg, wireGuardManager, userManager) apiV1BackendInterfaces := backendV1.NewInterfaceService(cfg, wireGuardManager) apiV1BackendProvisioning := backendV1.NewProvisioningService(cfg, userManager, wireGuardManager, cfgFileManager) apiV1BackendMetrics := backendV1.NewMetricsService(cfg, database, userManager, wireGuardManager) apiV1EndpointUsers := handlersV1.NewUserEndpoint(apiV1Auth, validatorManager, apiV1BackendUsers) + // apiV1EndpointPeers := handlersV1.NewPeerEndpoint(apiV1Auth, validatorManager, apiV1BackendPeers) + apiV1BackendPeers := handlersV1.NewEventingPeerService(coreV1Peers, eventBus) apiV1EndpointPeers := handlersV1.NewPeerEndpoint(apiV1Auth, validatorManager, apiV1BackendPeers) - apiV1EndpointPeers.SetEventBus(eventBus) + //apiV1EndpointPeers.SetEventBus(eventBus) apiV1EndpointInterfaces := handlersV1.NewInterfaceEndpoint(apiV1Auth, validatorManager, apiV1BackendInterfaces) apiV1EndpointProvisioning := handlersV1.NewProvisioningEndpoint(apiV1Auth, validatorManager, apiV1BackendProvisioning) diff --git a/internal/app/api/v1/handlers/endpoint_peer.go b/internal/app/api/v1/handlers/endpoint_peer.go index 907f4a0..665f39a 100644 --- a/internal/app/api/v1/handlers/endpoint_peer.go +++ b/internal/app/api/v1/handlers/endpoint_peer.go @@ -59,18 +59,6 @@ func (e *PeerEndpoint) publish(topic string, args ...any) { e.bus.Publish(topic, args...) } -// 0-arg для штатних подій (внутрішні підписники) -func (e *PeerEndpoint) publish0(topic string) { - if e.bus == nil || topic == "" { return } - e.bus.Publish(topic) // без аргументів -} - -// 1-arg для fanout (йому потрібен рівно ОДИН аргумент) -func (e *PeerEndpoint) publish1(topic string, arg any) { - if e.bus == nil || topic == "" { return } - e.bus.Publish(topic, arg) -} - func (e PeerEndpoint) RegisterRoutes(g *routegroup.Bundle) { apiGroup := g.Mount("/peer") apiGroup.Use(e.authenticator.LoggedIn()) @@ -259,13 +247,10 @@ func (e PeerEndpoint) handleCreatePost() http.HandlerFunc { return } - // внутрішні - e.publish0(app.TopicPeerCreated) - e.publish0(app.TopicPeerUpdated) - - // fanout - e.publish1("peer.save", newPeer) - e.publish1("peers.updated", "v1:create") + e.publish(app.TopicPeerCreated) + e.publish(app.TopicPeerUpdated) + e.publish("peer.save", newPeer) + e.publish("peers.updated", "v1:create") respond.JSON(w, http.StatusOK, models.NewPeer(newPeer)) } @@ -314,12 +299,9 @@ func (e PeerEndpoint) handleUpdatePut() http.HandlerFunc { return } - // внутрішні - e.publish0(app.TopicPeerUpdated) - - // fanout - e.publish1("peer.save", updatedPeer) - e.publish1("peers.updated", "v1:update") + e.publish(app.TopicPeerUpdated) + e.publish("peer.save", updatedPeer) + e.publish("peers.updated", "v1:update") respond.JSON(w, http.StatusOK, models.NewPeer(updatedPeer)) } @@ -357,47 +339,24 @@ func (e PeerEndpoint) handleDelete() http.HandlerFunc { } // внутрішні - e.publish0(app.TopicPeerDeleted) - e.publish0(app.TopicPeerUpdated) - - // fanout - e.publish1("peer.delete", domain.PeerIdentifier(id)) - e.publish1("peers.updated", "v1:delete") + e.publish(app.TopicPeerDeleted) + e.publish(app.TopicPeerUpdated) + e.publish("peer.delete", domain.PeerIdentifier(id)) + e.publish("peers.updated", "v1:delete") respond.Status(w, http.StatusNoContent) } } -// func (e PeerEndpoint) handleSyncPost() http.HandlerFunc { -// return func(w http.ResponseWriter, r *http.Request) { -// count, err := e.peers.SyncAllPeersFromDB(r.Context()) -// if err != nil { -// status, model := ParseServiceError(err) -// respond.JSON(w, status, model) -// return -// } -// respond.JSON(w, http.StatusOK, map[string]any{ -// "synced": count, -// }) -// } -// } func (e PeerEndpoint) handleSyncPost() http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - // Якщо fanout поставив заголовок "не ехо", забороняємо локальні публікації - if r.Header.Get("X-WGP-NoEcho") == "1" { - ctx = app.WithNoFanout(ctx) - } - - count, err := e.peers.SyncAllPeersFromDB(ctx) - if err != nil { - status, model := ParseServiceError(err) - respond.JSON(w, status, model) - return - } - respond.JSON(w, http.StatusOK, map[string]any{ - "synced": count, - }) - } + return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + if r.Header.Get("X-WGP-NoEcho") == "1" { + ctx = app.WithNoFanout(ctx) + } + count, err := e.peers.SyncAllPeersFromDB(ctx) + if err != nil { /* ... */ } + respond.JSON(w, http.StatusOK, map[string]any{"synced": count}) + } } \ No newline at end of file diff --git a/internal/app/api/v1/handlers/peer_eventing_service.go b/internal/app/api/v1/handlers/peer_eventing_service.go index ba2c0b6..ad6f17c 100644 --- a/internal/app/api/v1/handlers/peer_eventing_service.go +++ b/internal/app/api/v1/handlers/peer_eventing_service.go @@ -3,65 +3,64 @@ package handlers import ( "context" - "github.com/fedor-git/wg-portal-2/internal/app" // твої топіки і EventPublisher + "github.com/fedor-git/wg-portal-2/internal/app" "github.com/fedor-git/wg-portal-2/internal/domain" ) -// eventingPeerService декорує будь-яку реалізацію PeerService -// та надсилає івенти після Create/Update/Delete. type eventingPeerService struct { - inner PeerService - bus app.EventPublisher + inner PeerService + bus app.EventPublisher } -// Конструктор: повертає саме PeerService, тож NewPeerEndpoint прийме як є. func NewEventingPeerService(inner PeerService, bus app.EventPublisher) PeerService { - return &eventingPeerService{inner: inner, bus: bus} + return &eventingPeerService{inner: inner, bus: bus} } -// ---------- read-only як є ---------- - +// ---------- read-only ---------- func (s *eventingPeerService) GetForInterface(ctx context.Context, id domain.InterfaceIdentifier) ([]domain.Peer, error) { - return s.inner.GetForInterface(ctx, id) + return s.inner.GetForInterface(ctx, id) } func (s *eventingPeerService) GetForUser(ctx context.Context, id domain.UserIdentifier) ([]domain.Peer, error) { - return s.inner.GetForUser(ctx, id) + return s.inner.GetForUser(ctx, id) } func (s *eventingPeerService) GetById(ctx context.Context, id domain.PeerIdentifier) (*domain.Peer, error) { - return s.inner.GetById(ctx, id) + return s.inner.GetById(ctx, id) } func (s *eventingPeerService) Prepare(ctx context.Context, id domain.InterfaceIdentifier) (*domain.Peer, error) { - return s.inner.Prepare(ctx, id) + return s.inner.Prepare(ctx, id) } func (s *eventingPeerService) SyncAllPeersFromDB(ctx context.Context) (int, error) { - return s.inner.SyncAllPeersFromDB(ctx) + // вхідний sync нічого не фан-аутить сам по собі — і це ок + return s.inner.SyncAllPeersFromDB(ctx) } -// ---------- мутації + події ---------- - +// ---------- мутації + fanout ---------- func (s *eventingPeerService) Create(ctx context.Context, p *domain.Peer) (*domain.Peer, error) { out, err := s.inner.Create(ctx, p) if err != nil { return nil, err } - s.publish(app.TopicPeerCreated) - s.publish(app.TopicPeerUpdated) + s.bumpFanout(ctx, "peer.save", out) // 1 аргумент + s.bumpFanout(ctx, "peers.updated", "v1:create") return out, nil } func (s *eventingPeerService) Update(ctx context.Context, id domain.PeerIdentifier, p *domain.Peer) (*domain.Peer, error) { out, err := s.inner.Update(ctx, id, p) if err != nil { return nil, err } - s.publish(app.TopicPeerUpdated) + s.bumpFanout(ctx, "peer.save", out) // 1 аргумент + s.bumpFanout(ctx, "peers.updated", "v1:update") return out, nil } func (s *eventingPeerService) Delete(ctx context.Context, id domain.PeerIdentifier) error { if err := s.inner.Delete(ctx, id); err != nil { return err } - s.publish(app.TopicPeerDeleted) - s.publish(app.TopicPeerUpdated) + s.bumpFanout(ctx, "peer.delete", id) // 1 аргумент + s.bumpFanout(ctx, "peers.updated", "v1:delete") return nil } -func (s *eventingPeerService) publish(topic string, args ...any) { +// ---- helpers ---- +func (s *eventingPeerService) bumpFanout(ctx context.Context, topic string, arg any) { if s.bus == nil || topic == "" { return } - s.bus.Publish(topic, args...) -} \ No newline at end of file + if app.NoFanout(ctx) { return } // важливо: не ехо + s.bus.Publish(topic, arg) // рівно 1 аргумент +} diff --git a/internal/app/wireguard/wireguard_sync.go b/internal/app/wireguard/wireguard_sync.go index 0499bf3..bc0e37a 100644 --- a/internal/app/wireguard/wireguard_sync.go +++ b/internal/app/wireguard/wireguard_sync.go @@ -42,15 +42,11 @@ func (m Manager) SyncAllPeersFromDB(ctx context.Context) (int, error) { slog.ErrorContext(ctx, "peer sync: failed to load peers", "iface", in.Identifier, "err", err) continue } - // >>> ДОДАЙ ЦЕ: якщо peers немає – прибираємо всіх на інтерфейсі if len(peers) == 0 { - if err := m.clearPeers(ctx, in.Identifier); err != nil { + // або ReplacePeers=true з пустим списком, або спеціальний ClearPeers + if err := m.wg.ClearPeers(ctx, string(in.Identifier)); err != nil { slog.ErrorContext(ctx, "clear peers failed", "iface", in.Identifier, "err", err) } - // не публікуємо івенти, якщо це fanout-sync - if !app.NoFanout(ctx) { - m.bus.Publish(app.TopicPeerUpdated) - } continue } desired := make([]domain.Peer, 0, len(peers))