rebuild v1 api

This commit is contained in:
fedorHub 2025-09-11 16:20:46 +02:00
parent 49048921e7
commit 14fdeddbe7
No known key found for this signature in database
GPG Key ID: 7FDE5B4177850E7D
4 changed files with 50 additions and 93 deletions

View File

@ -186,14 +186,17 @@ func main() {
apiV1Auth := handlersV1.NewAuthenticationHandler(userManager)
apiV1BackendUsers := backendV1.NewUserService(cfg, userManager)
apiV1BackendPeers := backendV1.NewPeerService(cfg, wireGuardManager, userManager)
// apiV1BackendPeers := backendV1.NewPeerService(cfg, wireGuardManager, userManager)
coreV1Peers := backendV1.NewPeerService(cfg, wireGuardManager, userManager)
apiV1BackendInterfaces := backendV1.NewInterfaceService(cfg, wireGuardManager)
apiV1BackendProvisioning := backendV1.NewProvisioningService(cfg, userManager, wireGuardManager, cfgFileManager)
apiV1BackendMetrics := backendV1.NewMetricsService(cfg, database, userManager, wireGuardManager)
apiV1EndpointUsers := handlersV1.NewUserEndpoint(apiV1Auth, validatorManager, apiV1BackendUsers)
// apiV1EndpointPeers := handlersV1.NewPeerEndpoint(apiV1Auth, validatorManager, apiV1BackendPeers)
apiV1BackendPeers := handlersV1.NewEventingPeerService(coreV1Peers, eventBus)
apiV1EndpointPeers := handlersV1.NewPeerEndpoint(apiV1Auth, validatorManager, apiV1BackendPeers)
apiV1EndpointPeers.SetEventBus(eventBus)
//apiV1EndpointPeers.SetEventBus(eventBus)
apiV1EndpointInterfaces := handlersV1.NewInterfaceEndpoint(apiV1Auth, validatorManager, apiV1BackendInterfaces)
apiV1EndpointProvisioning := handlersV1.NewProvisioningEndpoint(apiV1Auth, validatorManager,
apiV1BackendProvisioning)

View File

@ -59,18 +59,6 @@ func (e *PeerEndpoint) publish(topic string, args ...any) {
e.bus.Publish(topic, args...)
}
// 0-arg для штатних подій (внутрішні підписники)
func (e *PeerEndpoint) publish0(topic string) {
if e.bus == nil || topic == "" { return }
e.bus.Publish(topic) // без аргументів
}
// 1-arg для fanout (йому потрібен рівно ОДИН аргумент)
func (e *PeerEndpoint) publish1(topic string, arg any) {
if e.bus == nil || topic == "" { return }
e.bus.Publish(topic, arg)
}
func (e PeerEndpoint) RegisterRoutes(g *routegroup.Bundle) {
apiGroup := g.Mount("/peer")
apiGroup.Use(e.authenticator.LoggedIn())
@ -259,13 +247,10 @@ func (e PeerEndpoint) handleCreatePost() http.HandlerFunc {
return
}
// внутрішні
e.publish0(app.TopicPeerCreated)
e.publish0(app.TopicPeerUpdated)
// fanout
e.publish1("peer.save", newPeer)
e.publish1("peers.updated", "v1:create")
e.publish(app.TopicPeerCreated)
e.publish(app.TopicPeerUpdated)
e.publish("peer.save", newPeer)
e.publish("peers.updated", "v1:create")
respond.JSON(w, http.StatusOK, models.NewPeer(newPeer))
}
@ -314,12 +299,9 @@ func (e PeerEndpoint) handleUpdatePut() http.HandlerFunc {
return
}
// внутрішні
e.publish0(app.TopicPeerUpdated)
// fanout
e.publish1("peer.save", updatedPeer)
e.publish1("peers.updated", "v1:update")
e.publish(app.TopicPeerUpdated)
e.publish("peer.save", updatedPeer)
e.publish("peers.updated", "v1:update")
respond.JSON(w, http.StatusOK, models.NewPeer(updatedPeer))
}
@ -357,47 +339,24 @@ func (e PeerEndpoint) handleDelete() http.HandlerFunc {
}
// внутрішні
e.publish0(app.TopicPeerDeleted)
e.publish0(app.TopicPeerUpdated)
// fanout
e.publish1("peer.delete", domain.PeerIdentifier(id))
e.publish1("peers.updated", "v1:delete")
e.publish(app.TopicPeerDeleted)
e.publish(app.TopicPeerUpdated)
e.publish("peer.delete", domain.PeerIdentifier(id))
e.publish("peers.updated", "v1:delete")
respond.Status(w, http.StatusNoContent)
}
}
// func (e PeerEndpoint) handleSyncPost() http.HandlerFunc {
// return func(w http.ResponseWriter, r *http.Request) {
// count, err := e.peers.SyncAllPeersFromDB(r.Context())
// if err != nil {
// status, model := ParseServiceError(err)
// respond.JSON(w, status, model)
// return
// }
// respond.JSON(w, http.StatusOK, map[string]any{
// "synced": count,
// })
// }
// }
func (e PeerEndpoint) handleSyncPost() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Якщо fanout поставив заголовок "не ехо", забороняємо локальні публікації
if r.Header.Get("X-WGP-NoEcho") == "1" {
ctx = app.WithNoFanout(ctx)
}
count, err := e.peers.SyncAllPeersFromDB(ctx)
if err != nil {
status, model := ParseServiceError(err)
respond.JSON(w, status, model)
return
}
respond.JSON(w, http.StatusOK, map[string]any{
"synced": count,
})
}
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if r.Header.Get("X-WGP-NoEcho") == "1" {
ctx = app.WithNoFanout(ctx)
}
count, err := e.peers.SyncAllPeersFromDB(ctx)
if err != nil { /* ... */ }
respond.JSON(w, http.StatusOK, map[string]any{"synced": count})
}
}

View File

@ -3,65 +3,64 @@ package handlers
import (
"context"
"github.com/fedor-git/wg-portal-2/internal/app" // твої топіки і EventPublisher
"github.com/fedor-git/wg-portal-2/internal/app"
"github.com/fedor-git/wg-portal-2/internal/domain"
)
// eventingPeerService декорує будь-яку реалізацію PeerService
// та надсилає івенти після Create/Update/Delete.
type eventingPeerService struct {
inner PeerService
bus app.EventPublisher
inner PeerService
bus app.EventPublisher
}
// Конструктор: повертає саме PeerService, тож NewPeerEndpoint прийме як є.
func NewEventingPeerService(inner PeerService, bus app.EventPublisher) PeerService {
return &eventingPeerService{inner: inner, bus: bus}
return &eventingPeerService{inner: inner, bus: bus}
}
// ---------- read-only як є ----------
// ---------- read-only ----------
func (s *eventingPeerService) GetForInterface(ctx context.Context, id domain.InterfaceIdentifier) ([]domain.Peer, error) {
return s.inner.GetForInterface(ctx, id)
return s.inner.GetForInterface(ctx, id)
}
func (s *eventingPeerService) GetForUser(ctx context.Context, id domain.UserIdentifier) ([]domain.Peer, error) {
return s.inner.GetForUser(ctx, id)
return s.inner.GetForUser(ctx, id)
}
func (s *eventingPeerService) GetById(ctx context.Context, id domain.PeerIdentifier) (*domain.Peer, error) {
return s.inner.GetById(ctx, id)
return s.inner.GetById(ctx, id)
}
func (s *eventingPeerService) Prepare(ctx context.Context, id domain.InterfaceIdentifier) (*domain.Peer, error) {
return s.inner.Prepare(ctx, id)
return s.inner.Prepare(ctx, id)
}
func (s *eventingPeerService) SyncAllPeersFromDB(ctx context.Context) (int, error) {
return s.inner.SyncAllPeersFromDB(ctx)
// вхідний sync нічого не фан-аутить сам по собіі це ок
return s.inner.SyncAllPeersFromDB(ctx)
}
// ---------- мутації + події ----------
// ---------- мутації + fanout ----------
func (s *eventingPeerService) Create(ctx context.Context, p *domain.Peer) (*domain.Peer, error) {
out, err := s.inner.Create(ctx, p)
if err != nil { return nil, err }
s.publish(app.TopicPeerCreated)
s.publish(app.TopicPeerUpdated)
s.bumpFanout(ctx, "peer.save", out) // 1 аргумент
s.bumpFanout(ctx, "peers.updated", "v1:create")
return out, nil
}
func (s *eventingPeerService) Update(ctx context.Context, id domain.PeerIdentifier, p *domain.Peer) (*domain.Peer, error) {
out, err := s.inner.Update(ctx, id, p)
if err != nil { return nil, err }
s.publish(app.TopicPeerUpdated)
s.bumpFanout(ctx, "peer.save", out) // 1 аргумент
s.bumpFanout(ctx, "peers.updated", "v1:update")
return out, nil
}
func (s *eventingPeerService) Delete(ctx context.Context, id domain.PeerIdentifier) error {
if err := s.inner.Delete(ctx, id); err != nil { return err }
s.publish(app.TopicPeerDeleted)
s.publish(app.TopicPeerUpdated)
s.bumpFanout(ctx, "peer.delete", id) // 1 аргумент
s.bumpFanout(ctx, "peers.updated", "v1:delete")
return nil
}
func (s *eventingPeerService) publish(topic string, args ...any) {
// ---- helpers ----
func (s *eventingPeerService) bumpFanout(ctx context.Context, topic string, arg any) {
if s.bus == nil || topic == "" { return }
s.bus.Publish(topic, args...)
}
if app.NoFanout(ctx) { return } // важливо: не ехо
s.bus.Publish(topic, arg) // рівно 1 аргумент
}

View File

@ -42,15 +42,11 @@ func (m Manager) SyncAllPeersFromDB(ctx context.Context) (int, error) {
slog.ErrorContext(ctx, "peer sync: failed to load peers", "iface", in.Identifier, "err", err)
continue
}
// >>> ДОДАЙ ЦЕ: якщо peers немає прибираємо всіх на інтерфейсі
if len(peers) == 0 {
if err := m.clearPeers(ctx, in.Identifier); err != nil {
// або ReplacePeers=true з пустим списком, або спеціальний ClearPeers
if err := m.wg.ClearPeers(ctx, string(in.Identifier)); err != nil {
slog.ErrorContext(ctx, "clear peers failed", "iface", in.Identifier, "err", err)
}
// не публікуємо івенти, якщо це fanout-sync
if !app.NoFanout(ctx) {
m.bus.Publish(app.TopicPeerUpdated)
}
continue
}
desired := make([]domain.Peer, 0, len(peers))