kubernetes-operator/test/e2e/port_forward_test.go

121 lines
3.0 KiB
Go

package e2e
import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"strings"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
type portForwardToPodRequest struct {
// config is the kubernetes config
config *rest.Config
// pod is the selected pod for this port forwarding
pod v1.Pod
// localPort is the local port that will be selected to expose the podPort
localPort int
// podPort is the target port for the pod
podPort int
// Streams configures where to write or read input from
streams genericclioptions.IOStreams
// stopCh is the channel used to manage the port forward lifecycle
stopCh <-chan struct{}
// readyCh communicates when the tunnel is ready to receive traffic
readyCh chan struct{}
}
func getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", ":0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
_ = l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}
func setupPortForwardToPod(namespace, podName string, podPort int) (port int, cleanUpFunc func(), waitFunc func(), portForwardFunc func(), err error) {
port, err = getFreePort()
Expect(err).NotTo(HaveOccurred())
stream := genericclioptions.IOStreams{
In: os.Stdin,
Out: os.Stdout,
ErrOut: os.Stderr,
}
// stopCh control the port forwarding lifecycle. When it gets closed the
// port forward will terminate
stopCh := make(chan struct{}, 1)
// readyCh communicate when the port forward is ready to get traffic
readyCh := make(chan struct{})
req := portForwardToPodRequest{
config: Cfg,
pod: v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
},
},
localPort: port,
podPort: podPort,
streams: stream,
stopCh: stopCh,
readyCh: readyCh,
}
waitFunc = func() {
_, _ = fmt.Fprintf(GinkgoWriter, "Waiting for the port-forward.\n")
<-readyCh
_, _ = fmt.Fprintf(GinkgoWriter, "The port-forward is established.\n")
}
portForwardFunc = func() {
err := portForwardToPod(req)
if err != nil {
panic(err)
}
}
cleanUpFunc = func() {
_, _ = fmt.Fprintf(GinkgoWriter, "Closing port-forward\n")
close(stopCh)
}
return
}
func portForwardToPod(req portForwardToPodRequest) error {
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward",
req.pod.Namespace, req.pod.Name)
hostIP := strings.TrimLeft(req.config.Host, "htps:/")
transport, upgrader, err := spdy.RoundTripperFor(req.config)
if err != nil {
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, &url.URL{Scheme: "https", Path: path, Host: hostIP})
fw, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", req.localPort, req.podPort)}, req.stopCh, req.readyCh, req.streams.Out, req.streams.ErrOut)
if err != nil {
return err
}
return fw.ForwardPorts()
}