版本说明

基于 istio 1.21.0 版本源代码,ambient 进行了大更新,从原来的主机网络命名空间中重定向更新为 in-Pod 网络命名空间重定向,解决了原有模式 Istio CNI 扩展的网络配置可能会和主流 CNI 实现的主机级网络配置互相干扰,导致流量中断和网络策略无法生效等问题,更新后理论将支持所有 CNI,经过实际测试,在 Flannel、Cilium 等网络 CNI 插件都能正常工作,详细参考 inpod 流量重定向 ambient 模式

这里主要测试了在 cilium CNI 插件下,ambient 模式能够正常工作,并根据测试过程中的日志,从 cni 和 ztunnel 两个模块源码中分析 ambient 模式的实现细节。

部署

这里使用 k3s 集群来测试,因为 k3s 集群默认的 cni 插件是 flannel,路径为/var/lib/rancher/k3s/agent/etc/cni/net.d,与 istio 中的默认的插件路径不一致,这里改为用 cilium cni 插件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
rm -rf /etc/cni/net.d/*
curl -sfL https://rancher-mirror.rancher.cn/k3s/k3s-install.sh | INSTALL_K3S_MIRROR=cn \
  INSTALL_K3S_EXEC='--flannel-backend=none' sh -s - \
  --disable-network-policy \
  --disable "servicelb" \
  --disable "traefik" \
  --disable "metrics-server"

cp /etc/rancher/k3s/k3s.yaml ~/.kube/config

wget https://github.com/cilium/cilium-cli/releases/download/v0.15.23/cilium-linux-amd64.tar.gz
tar -xzvf cilium-linux-amd64.tar.gz
mv cilium /usr/local/bin/
# 设置 cni-exclusive 为 false,防止 cilium 一直重写/etc/cni/net.d/05-cilium.conflist 文件
# 参考:https://github.com/istio/istio/issues/46764 这里改为安装时设置
# set 值通过 helm 查看:helm show values cilium/cilium
cilium install --version 1.15.1 \
--set "routingMode=native,debug.enabled=true,cni.exclusive=false,ipv4NativeRoutingCIDR=10.0.0.0/8"

等 pods 起来后,使用下面的命令来部署 istio 和测试应用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
wget https://github.com/istio/istio/releases/download/1.21.0/istio-1.21.0-linux-amd64.tar.gz
# 解压后在~/.bash_profile 中添加 istioctl 的路径:PATH=$PATH:/root/found/istio-1.21.0/bin
# 安装设置 ambient 模式,并开启 ingressGateway,设置各个组件日志级别为 debug
istioctl install --set profile=ambient --set "components.ingressGateways[0].enabled=true" \
--set "components.ingressGateways[0].name=istio-ingressgateway" \
--set "values.global.logging.level=default:debug" \
--set "components.cni.k8s.env[0].name=LOG_LEVEL" \
--set "components.cni.k8s.env[0].value=debug" \
--set "components.ztunnel.k8s.env[0].name=RUST_LOG" \
--set "components.ztunnel.k8s.env[0].value=debug" \
--skip-confirmation

# Kubernetes Gateway API 部署 waypoint 需要
kubectl get crd gateways.gateway.networking.k8s.io &> /dev/null || \
  { kubectl kustomize "github.com/kubernetes-sigs/gateway-api/config/crd/experimental?ref=v1.0.0" | kubectl apply -f -; }

# 如果要观察 kaili 页面访问情况,需部署 kaili 和 prometheus.yaml
wget https://raw.githubusercontent.com/istio/istio/release-1.21/samples/addons/kiali.yaml
# kiali 下载后修改 service kiali 类型为 NodePort,http nodePort 设置为 30001
#  type: NodePort
#  ports:
#  - name: http
#    appProtocol: http
#    protocol: TCP
#    nodePort: 30001
#    port: 20001
kubectl apply -f kiali.yaml
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.21/samples/addons/prometheus.yaml

等网格所有服务 pods 启动后,部署测试应用,并测试在设置命名空间为 ambient 模式前后是否能正常请求:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 并部署测试应用
kubectl create namespace sample
kubectl apply  -f samples/helloworld/helloworld.yaml \
    -l service=helloworld -n sample
kubectl apply -f samples/helloworld/helloworld.yaml \
    -l version=v1 -n sample
kubectl apply -f samples/helloworld/helloworld.yaml \
    -l version=v2 -n sample
kubectl apply -f samples/sleep/sleep.yaml -n sample

# 没加入 ambient 时可以正常返回
kubectl exec -n sample -c sleep "$(kubectl get pod -n sample -l \
    app=sleep -o jsonpath='{.items[0].metadata.name}')" \
    -- curl helloworld.sample:5000/hello

# 加入 ambient
kubectl label namespace sample istio.io/dataplane-mode=ambient

# 再次测试可以正常返回
kubectl exec -n sample -c sleep "$(kubectl get pod -n sample -l \
    app=sleep -o jsonpath='{.items[0].metadata.name}')" \
    -- curl helloworld.sample:5000/hello

cni 源码

为 pods 生成 iptables 规则

节点上的 istio-cni-node pods 会为在 ambient mesh 模式下的 pods 生成 iptables 规则,规则如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# 进入 helloworld pods 并安装 iptables
[root@localhost ~]# kubectl -nsample exec -it helloworld-v1-559bccf6d7-jwx42 -- /bin/bash
root@helloworld-v1-559bccf6d7-jwx42:/opt/microservices# apt-get update
root@helloworld-v1-559bccf6d7-jwx42:/opt/microservices# apt-get install iptables
# nat 表
root@helloworld-v1-559bccf6d7-jwx42:/opt/microservices# iptables-legacy -t nat -nL --line-number
Chain PREROUTING (policy ACCEPT)
num  target     prot opt source               destination

Chain INPUT (policy ACCEPT)
num  target     prot opt source               destination

Chain OUTPUT (policy ACCEPT)
num  target     prot opt source               destination
1    ISTIO_OUTPUT  0    --  0.0.0.0/0            0.0.0.0/0

Chain POSTROUTING (policy ACCEPT)
num  target     prot opt source               destination

Chain ISTIO_OUTPUT (1 references)
num  target     prot opt source               destination
# 接受目的地址是 169.254.7.127 的 tcp 流量
1    ACCEPT     6    --  0.0.0.0/0            169.254.7.127        tcp
# 如果协议是 TCP,且数据包的标记匹配 0x111,则接受
2    ACCEPT     6    --  0.0.0.0/0            0.0.0.0/0            mark match 0x111/0xfff
# 如果目的地址不是 127.0.0.1,则接受
3    ACCEPT     0    --  0.0.0.0/0           !127.0.0.1
# 如果目的地址不是 127.0.0.1,协议是 TCP,且数据包的标记不为 0x539,则定向到本地的 15001 端口
4    REDIRECT   6    --  0.0.0.0/0           !127.0.0.1            mark match ! 0x539/0xfff redir ports 15001

# mangle 表,用来修改数据包的服务类型和设置标记等
root@helloworld-v1-559bccf6d7-jwx42:/opt/microservices# iptables-legacy -t mangle -nL --line-number
Chain PREROUTING (policy ACCEPT)
num  target     prot opt source               destination
1    ISTIO_PRERT  0    --  0.0.0.0/0            0.0.0.0/0

Chain INPUT (policy ACCEPT)
num  target     prot opt source               destination

Chain FORWARD (policy ACCEPT)
num  target     prot opt source               destination

Chain OUTPUT (policy ACCEPT)
num  target     prot opt source               destination
1    ISTIO_OUTPUT  0    --  0.0.0.0/0            0.0.0.0/0

Chain POSTROUTING (policy ACCEPT)
num  target     prot opt source               destination

Chain ISTIO_OUTPUT (1 references)
num  target     prot opt source               destination
# 对连接标记字段等于 0x111 的数据包,其连接标记将被恢复
1    CONNMARK   0    --  0.0.0.0/0            0.0.0.0/0            connmark match  0x111/0xfff CONNMARK restore

Chain ISTIO_PRERT (1 references)
num  target     prot opt source               destination
# 对标记为 0x539 的数据包设置连接标记 0x111
1    CONNMARK   0    --  0.0.0.0/0            0.0.0.0/0            mark match 0x539/0xfff CONNMARK xset 0x111/0xfff
# 接受来自 169.254.7.127 的 TCP 流量
2    ACCEPT     6    --  169.254.7.127        0.0.0.0/0            tcp
# 接受所有目标不为 127.0.0.1 的本地 TCP 流量
3    ACCEPT     6    --  0.0.0.0/0           !127.0.0.1
# 所有标记不为 0x539,且目标端口为 15008 的 TCP 流量进行透明代理,代理端口为 15008,并设置代理标记为 0x111
# 处理加密流量,经过 HBONE(HTTP-Based Overlay Network Environment) 套接字 15008 进入
4    TPROXY     6    --  0.0.0.0/0            0.0.0.0/0            tcp dpt:15008 mark match ! 0x539/0xfff TPROXY redirect 0.0.0.0:15008 mark 0x111/0xfff
# 接受所有已建立或相关的 TCP 连接
5    ACCEPT     6    --  0.0.0.0/0            0.0.0.0/0            ctstate RELATED,ESTABLISHED
# 处理所有明文套接字,重定向到 15006
# 对所有标记不为 0x539 ,且目标不为 127.0.0.1 的 TCP 流量进行透明代理,代理端口为 15006,并设置代理标记为 0x111
6    TPROXY     6    --  0.0.0.0/0           !127.0.0.1            mark match ! 0x539/0xfff TPROXY redirect 0.0.0.0:15006 mark 0x111/0xfff

其中“169.254.7.127”为固定的 IP 地址,用来从 pod 内部可靠地识别 kubelet 健康探针。这些 iptable 规则,可以参考 istio 官网的两张图来理解: tls_flow raw_buffer_flow

设置 ambient 相关源码

在开启了“istio.io/dataplane-mode=ambient”标签的命名空间下,增加 pods 时会调用到以下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// cni/pkg/nodeagent/net.go
// 1、获取网络空间
// 2、将 pod 的 IP 添加到 hostnetns 的 ipset 中,用于节点探针检查
// 3、在 pod 的网络空间中创建 iptables 规则
// 4、通过 GRPC 通知 ztunnel 创建一个代理
func (s *NetServer) AddPodToMesh(ctx context.Context, pod *corev1.Pod, podIPs []netip.Addr, netNs string) error {
	log.Infof("in pod mode - adding pod %s/%s to ztunnel ", pod.Namespace, pod.Name)
	s.currentPodSnapshot.Ensure(string(pod.UID))
	openNetns, err := s.getOrOpenNetns(&pod.ObjectMeta, netNs)
	if err != nil {
		return err
	}

	err = addPodToHostNSIpset(pod, podIPs, &s.hostsideProbeIPSet)
	if err != nil {
		log.Errorf("failed to add pod to ipset: %s/%s %v", pod.Namespace, pod.Name, err)
		return err
	}

	log.Debug("calling CreateInpodRules")
	// 切换 ns 并执行
	if err := s.netnsRunner(openNetns, func() error {
		// 这里会创建上面的 iptables 规则
		return s.iptablesConfigurator.CreateInpodRules(&HostProbeSNATIP)
	}); err != nil {
		log.Errorf("failed to update POD inpod: %s/%s %v", pod.Namespace, pod.Name, err)
		return err
	}

	log.Debug("notifying subscribed node proxies")
	if err := s.sendPodToZtunnelAndWaitForAck(ctx, &pod.ObjectMeta, openNetns); err != nil {
		return NewErrPartialAdd(err)
	}
	return nil
}

这里通过函数 netnsRunner 来运行 CreateInpodRules 函数,会先把当前进程的网络命名空间 ns 切换到目标 pods 的 ns,然后执行 iptables 命令。执行完成后再调用 ztunnel 的 grpc 接口,通知 ztunnel 创建一个代理。

通过搜索看到,AddPodToMesh 这个函数在两个地方会调用,一个是 InformerHandlers 中,其通过 k8s informer 机制监听 pods 和命名空间的事件,在 pods 新增和更新事件中调用 AddPodToMesh 设置 iptables 规则:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// cni/pkg/nodeagent/informers.go
func setupHandlers(ctx context.Context, kubeClient kube.Client, dataplane MeshDataplane, systemNamespace string) *InformerHandlers {
	s := &InformerHandlers{ctx: ctx, dataplane: dataplane, systemNamespace: systemNamespace}
	s.queue = controllers.NewQueue("ambient",
		controllers.WithGenericReconciler(s.reconcile),
		controllers.WithMaxAttempts(5),
	)
	// 关注当前 node 上的 pods 事件,并添加到 queue
	s.pods = kclient.NewFiltered[*corev1.Pod](kubeClient, kclient.Filter{FieldSelector: "spec.nodeName=" + NodeName})
	s.pods.AddEventHandler(controllers.FromEventHandler(func(o controllers.Event) {
		s.queue.Add(o)
	}))
	// 关注所有的 ns 事件,并添加到 queue
	s.namespaces = kclient.New[*corev1.Namespace](kubeClient)
	s.namespaces.AddEventHandler(controllers.FromEventHandler(func(o controllers.Event) {
		s.queue.Add(o)
	}))

	return s
}

func (s *InformerHandlers) reconcilePod(input any) error {
	event := input.(controllers.Event)
	pod := event.Latest().(*corev1.Pod)

	defer EventTotals.With(eventTypeTag.Value(event.Event.String())).Increment()

	switch event.Event {
	case controllers.EventAdd:
	case controllers.EventUpdate:
		newPod := event.New.(*corev1.Pod)
		oldPod := event.Old.(*corev1.Pod)
		ns := s.namespaces.Get(newPod.Namespace, "")
		if ns == nil {
			return fmt.Errorf("failed to find namespace %v", ns)
		}
		wasAnnotated := oldPod.Annotations != nil && oldPod.Annotations[constants.AmbientRedirection] == constants.AmbientRedirectionEnabled
		isAnnotated := newPod.Annotations != nil && newPod.Annotations[constants.AmbientRedirection] == constants.AmbientRedirectionEnabled
		shouldBeEnabled := util.PodRedirectionEnabled(ns, newPod)

		changeNeeded := wasAnnotated != shouldBeEnabled
		log.Debugf("Pod %s events: %+v", pod.Name, pod)
		if !changeNeeded {
			log.Debugf("Pod %s update event skipped, no change needed", pod.Name)
			return nil
		}

		if !shouldBeEnabled {
			log.Debugf("Pod %s no longer matches, removing from mesh", newPod.Name)
			err := s.dataplane.RemovePodFromMesh(s.ctx, pod)
			log.Debugf("RemovePodFromMesh(%s) returned %v", newPod.Name, err)
		} else {
			wasReady := kube.CheckPodReadyOrComplete(oldPod)
			isReady := kube.CheckPodReadyOrComplete(newPod)
			if wasReady != nil && isReady != nil && isAnnotated {
				log.Infof("Pod %s update event skipped, added/labeled by CNI plugin", pod.Name)
				return nil
			}

			log.Debugf("Pod %s now matches, adding to mesh", newPod.Name)
			podIPs := util.GetPodIPsIfPresent(pod)
			if len(podIPs) == 0 {
				log.Warnf("pod %s does not appear to have any assigned IPs, not capturing", pod.Name)
				return nil
			}

			err := s.dataplane.AddPodToMesh(s.ctx, pod, podIPs, "")
			log.Debugf("AddPodToMesh(%s) returned %v", newPod.Name, err)
		}
	case controllers.EventDelete:
		err := s.dataplane.DelPodFromMesh(s.ctx, pod)
		log.Debugf("DelPodFromMesh(%s) returned %v", pod.Name, err)
	}
	return nil
}

另一个是 CniPluginServer,会在 cni 插件的 socket 地址“/var/run/istio-cni/pluginevent.sock” 监听事件,当有事件发生时,会调用 AddPodToMesh 设置 iptables 规则:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// cni/pkg/nodeagent/cni-watcher.go
func startCniPluginServer(ctx context.Context, pluginSocket string,
	handlers K8sHandlers,
	dataplane MeshDataplane,
) *CniPluginServer {
	ctx, cancel := context.WithCancel(ctx)
	mux := http.NewServeMux()
	s := &CniPluginServer{
		handlers:  handlers,
		dataplane: dataplane,
		cniListenServer: &http.Server{
			Handler: mux,
		},
		cniListenServerCancel: cancel,
		sockAddress:           pluginSocket,
		ctx:                   ctx,
	}
	// 创建了一个 http server,并在/cmdadd 路径下注册了一个处理函数
	mux.HandleFunc(pconstants.CNIAddEventPath, s.handleAddEvent)
	return s
}

func (s *CniPluginServer) handleAddEvent(w http.ResponseWriter, req *http.Request) {
	if req.Body == nil {
		log.Error("empty request body")
		http.Error(w, "empty request body", http.StatusBadRequest)
		return
	}
	defer req.Body.Close()
	data, err := io.ReadAll(req.Body)
	if err != nil {
		log.Errorf("Failed to read event report from cni plugin: %v", err)
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	msg, err := processAddEvent(data)
	if err != nil {
		log.Errorf("Failed to process CNI event payload: %v", err)
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	if err := s.ReconcileCNIAddEvent(req.Context(), msg); err != nil {
		log.Errorf("Failed to handle add event: %v", err)
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
}

// 获取 pods 信息并添加到网格中
func (s *CniPluginServer) ReconcileCNIAddEvent(ctx context.Context, addCmd CNIPluginAddEvent) error {
	log := log.WithLabels("cni-event", addCmd)

	log.Debugf("netns: %s", addCmd.Netns)

	maxStaleRetries := 10
	msInterval := 10
	retries := 0
	var ambientPod *corev1.Pod
	var err error

	log.Debugf("Checking pod: %s in ns: %s is enabled for ambient", addCmd.PodName, addCmd.PodNamespace)
	for ambientPod, err = s.handlers.GetPodIfAmbient(addCmd.PodName, addCmd.PodNamespace); (ambientPod == nil) && (retries < maxStaleRetries); retries++ {
		if err != nil {
			return err
		}
		log.Warnf("got an event for pod %s in namespace %s not found in current pod cache, retry %d of %d",
			addCmd.PodName, addCmd.PodNamespace, retries, maxStaleRetries)
		time.Sleep(time.Duration(msInterval) * time.Millisecond)
	}

	if ambientPod == nil {
		return fmt.Errorf("got event for pod %s in namespace %s but could not find in pod cache after retries", addCmd.PodName, addCmd.PodNamespace)
	}
	log.Debugf("Pod: %s in ns: %s is enabled for ambient, adding to mesh.", addCmd.PodName, addCmd.PodNamespace)

	var podIps []netip.Addr
	for _, configuredPodIPs := range addCmd.IPs {
		ip, _ := netip.AddrFromSlice(configuredPodIPs.Address.IP)
		podIps = append(podIps, ip)
	}
	err = s.dataplane.AddPodToMesh(ctx, ambientPod, podIps, addCmd.Netns)
	if err != nil {
		return err
	}

	return nil
}

ztunnel 源码

在 ztunnel 中,收到 cni 发送的创建代理信息后,其处理日志为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
2024-02-20T03:07:24.246171Z DEBUG ztunnel::inpod::protocol: FD 20 type is netns
2024-02-20T03:07:24.246197Z DEBUG ztunnel::inpod::protocol: FD 20 looks like a netns
2024-02-20T03:07:24.246200Z DEBUG ztunnel::inpod::protocol: Validating netns FD: Ok(())
2024-02-20T03:07:24.246220Z DEBUG ztunnel::inpod::workloadmanager: received message: AddWorkload(WorkloadData { netns: OwnedFd { fd: 20 }, info: WorkloadInfo { workload_uid: WorkloadUid("5e28c62f-51c5-4179-a651-76dca59d65ab") } })
2024-02-20T03:07:24.246255Z  INFO ztunnel::inpod::statemanager: pod WorkloadUid("5e28c62f-51c5-4179-a651-76dca59d65ab") received netns, starting proxy
2024-02-20T03:07:24.246328Z DEBUG ztunnel::inpod::statemanager: starting proxy workload=WorkloadInfo { workload_uid: WorkloadUid("5e28c62f-51c5-4179-a651-76dca59d65ab") } inode=4026532923
2024-02-20T03:07:24.246425Z  INFO ztunnel::proxy::inbound: listener established address=[::]:15008 component="inbound" transparent=true
2024-02-20T03:07:24.246472Z  INFO ztunnel::proxy::inbound_passthrough: listener established address=[::]:15006 component="inbound plaintext" transparent=true
2024-02-20T03:07:24.246494Z  INFO ztunnel::proxy::outbound: listener established address=[::]:15001 component="outbound" transparent=true
2024-02-20T03:07:24.246522Z  INFO ztunnel::proxy::socks5: listener established address=127.0.0.1:15080 component="socks5"

ztunnel 会创建 4 个监听端口,分别是 15008、15006、15001、15080。其中 15008 和 15006 用于入口流量,15001 用于出口流量,15080 用于 socks5 代理。入向 15008 的相关源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// ztunnel/src/inpod/workloadmanager.rs
    pub async fn process_msg(&mut self, msg: WorkloadMessage) -> Result<(), Error> {
        match msg {
            WorkloadMessage::AddWorkload(poddata) => {
                info!(
                    "pod {:?} received netns, starting proxy",
                    poddata.info.workload_uid
                );
                if !self.snapshot_received {
                    self.snapshot_names
                        .insert(poddata.info.workload_uid.clone());
                }
                let netns = InpodNetns::new(self.inpod_config.cur_netns(), poddata.netns)
                    .map_err(|e| Error::ProxyError(crate::proxy::Error::Io(e)))?;

                self.add_workload(poddata.info, netns)
                    .await
                    .map_err(Error::ProxyError)
            }
            // ...
        }
    }
    // 添加 workload,即配置几个端口代理
    async fn add_workload(
        &mut self,
        workload_info: WorkloadInfo,
        netns: InpodNetns,
    ) -> Result<(), crate::proxy::Error> {
        match self.add_workload_inner(&workload_info, netns.clone()).await {
			// ...
        }
    }
    async fn add_workload_inner(
        &mut self,
        workload_info: &WorkloadInfo,
        netns: InpodNetns,
    ) -> Result<(), crate::proxy::Error> {
        // 获取几个端口代理
        let proxies = self
            .proxy_gen
            .new_proxies_from_factory(
                Some(drain_rx),
                Arc::from(self.inpod_config.socket_factory(netns)),
            )
            .await?;

        // ... 对每个代理依次执行 run
        if let Some(proxy) = proxies.proxy {
            tokio::spawn(
                async move {
                    proxy.run().await;
                    debug!("proxy for workload {:?} exited", uid);
                    metrics.proxies_stopped.get_or_create(&()).inc();
                    admin_handler.proxy_down(&uid);
                }
                .instrument(
                    tracing::info_span!("proxy", uid=%workload_info.workload_uid.clone().into_string()),
                ),
            );
        }
        // ...
    }

new_proxies_from_factory 函数逻辑如下,主要是调用 Proxy::from_inputs 函数来生成的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// src/proxyfactory.rs
    pub async fn new_proxies_from_factory(
        &self,
        proxy_drain: Option<Watch>,
        socket_factory: Arc<dyn crate::proxy::SocketFactory + Send + Sync>,
    ) -> Result<ProxyResult, Error> {
        let mut result: ProxyResult = Default::default();
        let drain = proxy_drain.unwrap_or_else(|| self.drain.clone());

        // Optionally create the HBONE proxy.
        if self.config.proxy {
            let pi = crate::proxy::ProxyInputs::new(
                self.config.clone(),
                self.cert_manager.clone(),
                self.state.clone(),
                self.proxy_metrics.clone().unwrap(),
                socket_factory.clone(),
            );
            result.proxy = Some(Proxy::from_inputs(pi, drain.clone()).await?);
        }

        // Optionally create the DNS proxy.
        if self.config.dns_proxy {
            result.dns_proxy = Some(
                dns::Server::new(
                    self.config.cluster_domain.clone(),
                    self.config.dns_proxy_addr,
                    self.config.network.clone(),
                    self.state.clone(),
                    dns::forwarder_for_mode(self.config.proxy_mode)?,
                    self.dns_metrics.clone().unwrap(),
                    drain,
                    socket_factory.as_ref(),
                )
                .await?,
            );
        }
        Ok(result)
    }

这里看其中一个端口 15008 代理的逻辑,其初始化代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
impl Inbound {
    pub(super) async fn new(mut pi: ProxyInputs, drain: Watch) -> Result<Inbound, Error> {
        let listener: TcpListener = pi
            .socket_factory
            // 调用的 InPodSocketFactory 中的 tcp_bind
            .tcp_bind(pi.cfg.inbound_addr)
            .map_err(|e| Error::Bind(pi.cfg.inbound_addr, e))?;
        let transparent = super::maybe_set_transparent(&pi, &listener)?;
        pi.cfg.enable_original_source = Some(transparent);
        Ok(Inbound {
            cfg: pi.cfg,
            state: pi.state,
            listener,
            cert_manager: pi.cert_manager,
            metrics: pi.metrics,
            drain,
            socket_factory: pi.socket_factory.clone(),
        })
    }

    pub(super) async fn run(self)//...
    pub(super) async fn handle_inbound//...

看下这里使用的 tcp_bind 函数实现逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
impl crate::proxy::SocketFactory for InPodSocketFactory {
    fn new_tcp_v4(&self) -> std::io::Result<tokio::net::TcpSocket> {
        self.configure(tokio::net::TcpSocket::new_v4)
    }

    fn new_tcp_v6(&self) -> std::io::Result<tokio::net::TcpSocket> {
        self.configure(tokio::net::TcpSocket::new_v6)
    }

    fn tcp_bind(&self, addr: std::net::SocketAddr) -> std::io::Result<tokio::net::TcpListener> {
        // 会调用 configure 方法,将当前进程的网络命名空间 ns 切换到目标 pods 的 ns,执行 bind 函数
        let std_sock = self.configure(|| std::net::TcpListener::bind(addr))?;
        std_sock.set_nonblocking(true)?;
        tokio::net::TcpListener::from_std(std_sock)
    }

    fn udp_bind(&self, addr: std::net::SocketAddr) -> std::io::Result<tokio::net::UdpSocket> {
        let std_sock = self.configure(|| std::net::UdpSocket::bind(addr))?;
        std_sock.set_nonblocking(true)?;
        tokio::net::UdpSocket::from_std(std_sock)
    }
}

// configure 定义
impl InPodSocketFactory {
    fn from_cfg(inpod_config: &InPodConfig, netns: InpodNetns) -> Self {
        Self::new(netns, inpod_config.mark())
    }
    fn new(netns: InpodNetns, mark: Option<std::num::NonZeroU32>) -> Self {
        Self { netns, mark }
    }

    fn configure<S: std::os::unix::io::AsFd, F: FnOnce() -> std::io::Result<S>>(
        &self,
        f: F,
    ) -> std::io::Result<S> {
        let socket = self.netns.run(f)??;

        if let Some(mark) = self.mark {
            crate::socket::set_mark(&socket, mark.into())?;
        }
        Ok(socket)
    }
}

最终执行到 netns.run(f) 这里,其代码如下,也在把当前进程的网络命名空间 ns 切换到目标 pods 的 ns,然后执行 f 函数,f 函数中会创建监听端口:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// src/inpod/netns.rs
impl InpodNetns {
    // ...
    pub fn new(cur_netns: Arc<OwnedFd>, workload_netns: OwnedFd) -> std::io::Result<Self> {
        let res = nix::sys::stat::fstat(workload_netns.as_raw_fd())
            .map_err(|e| std::io::Error::from_raw_os_error(e as i32))?;
        let netns_inode = res.st_ino;
        Ok(InpodNetns {
            inner: Arc::new(NetnsInner {
                cur_netns,
                netns: workload_netns,
                netns_inode,
            }),
        })
    }

    pub fn run<F, T>(&self, f: F) -> std::io::Result<T>
    where
        F: FnOnce() -> T,
    {
        setns(self.inner.netns.as_raw_fd(), CloneFlags::CLONE_NEWNET)
            .map_err(|e| std::io::Error::from_raw_os_error(e as i32))?;
        let ret = f();
        setns(self.inner.cur_netns.as_raw_fd(), CloneFlags::CLONE_NEWNET)
            .expect("this must never fail");
        Ok(ret)
    }
}

参考