pilot-agent 源码

整体流程

整体逻辑如下图,简单来说分为以下几步:

  • pilot-agent 启动后,生成私钥和 CSR(证书签名请求),用 CSR 请求 istiod 获取工作负载证书,ROOTCA 根证书是挂载在本地目录的
  • pilot-agent 创建 envoy 启动配置文件,使用 cmd 方式启动 envoy,启动配置文件中包含 istiod 的地址,pilot-agent SDS(secret discovery service)服务的 UDS(unix domain socket)地址
  • envoy 请求 SDS,获取私钥、工作负载证书、ROOTCA 根证书,并请求 istiod 获取 XDS 动态配置(经过 pilot-agent 转发和 istiod 建立的长连接)
  • 如果服务器证书有更新,pilot-agent 收到推送的更新,会经过 UDS 发给 envoy

agent_arch_overview

pilot-agent 命令

pilot-agent 是用来启动 envoy 的,可以用在两种场景:

  • 应用容器的边车代理中:启动命令为“pilot-agent proxy sidecar xx”
  • Gateway pods 中转发流量:启动命令为“pilot-agent proxy router xx”

pilot-agent 在启动时会从 istiod 中获取 envoy 部分配置来启动 envoy,等 envoy 启动后,envoy 和 istiod 连接动态获取 XDS 配置。pilot-agent 使用 cobra 来启动服务的,这里只关注 proxy 子命令逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// pilot/cmd/pilot-agent/app/cmd.go
func newProxyCommand() *cobra.Command {
  return &cobra.Command{
    Use:   "proxy",
    RunE: func(c *cobra.Command, args []string) error {
      // 根据启动参数创建和初始化 Proxy,Proxy 包含代理实例的信息(envoy sizecar,gateway)
      proxy, err := initProxy(args)
      if err != nil {
        return err
      }
      // 构建代理配置,自动注入时 istio-proxy 容器设置了 PROXY_CONFIG 环境变量,指定了 meshId
      proxyConfig, err := config.ConstructProxyConfig(proxyArgs.MeshConfigFile, proxyArgs.ServiceCluster, options.ProxyConfigEnv, proxyArgs.Concurrency, proxy)
      if err != nil {
        return fmt.Errorf("failed to get proxy config: %v", err)
      }
      if out, err := protomarshal.ToYAML(proxyConfig); err != nil {
        log.Infof("Failed to serialize to YAML: %v", err)
      } else {
        log.Infof("Effective config: %s", out)
      }
      // Options 保存用于 secret 服务发现的所有配置参数和 CA 配置
      secOpts, err := options.NewSecurityOptions(proxyConfig, proxyArgs.StsPort, proxyArgs.TokenManagerPlugin)
      if err != nil {
        return err
      }

      envoyOptions := envoy.ProxyConfig{
        LogLevel:          proxyArgs.ProxyLogLevel,
        ComponentLogLevel: proxyArgs.ProxyComponentLogLevel,
        LogAsJSON:         loggingOptions.JSONEncoding,
        NodeIPs:           proxy.IPAddresses,
        Sidecar:           proxy.Type == model.SidecarProxy,
        OutlierLogPath:    proxyArgs.OutlierLogPath,
      }
      agentOptions := options.NewAgentOptions(proxy, proxyConfig)
      // 创建 Agent
      agent := istio_agent.NewAgent(proxyConfig, agentOptions, secOpts, envoyOptions)
      ctx, cancel := context.WithCancel(context.Background())
      defer cancel()

      // 优雅退出
      go cmd.WaitSignalFunc(cancel)

      // 启动 SDS 服务发现、dns 服务、xds 代理和 Envoy
      wait, err := agent.Run(ctx)
      if err != nil {
        return err
      }
      wait()
      return nil
    },
  }
}

istio-agent 下的 Agent

Agent 承载了本地 SDS 和 XDS 服务发现的功能,其定义和运行逻辑如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// pkg/istio-agent/agent.go
type Agent struct {
  proxyConfig *mesh.ProxyConfig

  cfg       *AgentOptions
  secOpts   *security.Options
  envoyOpts envoy.ProxyConfig

  envoyAgent             *envoy.Agent
  dynamicBootstrapWaitCh chan error

  sdsServer   *sds.Server
  secretCache *cache.SecretManagerClient

  // Used when proxying envoy xds via istio-agent is enabled.
  xdsProxy    *XdsProxy
  fileWatcher filewatcher.FileWatcher

  // local DNS Server that processes DNS requests locally and forwards to upstream DNS if needed.
  localDNSServer *dnsClient.LocalDNSServer

  // Signals true completion (e.g. with delayed graceful termination of Envoy)
  wg sync.WaitGroup
}

func NewAgent(proxyConfig *mesh.ProxyConfig, agentOpts *AgentOptions, sopts *security.Options, eopts envoy.ProxyConfig) *Agent {
  return &Agent{
    proxyConfig: proxyConfig,
    cfg:         agentOpts,
    secOpts:     sopts,
    envoyOpts:   eopts,
    fileWatcher: filewatcher.NewWatcher(),
  }
}

// 非阻塞调用
func (a *Agent) Run(ctx context.Context) (func(), error) {
  var err error
  // 初始化 dns 服务(gateway 不需要 dns 服务)
  if err = a.initLocalDNSServer(); err != nil {
    return nil, fmt.Errorf("failed to start local DNS server: %v", err)
  }
  // 检查用于 SDS 发现的 unix domain socket 是否存在
  socketExists, err := checkSocket(ctx, security.WorkloadIdentitySocketPath)
  if err != nil {
    return nil, fmt.Errorf("failed to check SDS socket: %v", err)
  }

  if socketExists {
    log.Info("Workload SDS socket found. Istio SDS Server won't be started")
  } else {
    // 使用 istio 的 SDS 服务
    log.Info("Workload SDS socket not found. Starting Istio SDS Server")
    err = a.initSdsServer()
    if err != nil {
      return nil, fmt.Errorf("failed to start SDS server: %v", err)
    }
  }
  a.xdsProxy, err = initXdsProxy(a)
  if err != nil {
    return nil, fmt.Errorf("failed to start xds proxy: %v", err)
  }
  // 定义当代理连接到控制面是如何认证的
  if a.proxyConfig.ControlPlaneAuthPolicy != mesh.AuthenticationPolicy_NONE {
    // 决定在 bootstrap 文件中配置的根 CA,可能与证书服务器的不同
    rootCAForXDS, err := a.FindRootCAForXDS()
    if err != nil {
      return nil, fmt.Errorf("failed to find root XDS CA: %v", err)
    }
    go a.startFileWatcher(ctx, rootCAForXDS, func() {
      if err := a.xdsProxy.initIstiodDialOptions(a); err != nil {
        log.Warnf("Failed to init xds proxy dial options")
      }
    })
  }
  // 开启了 envoy 代理
  if !a.EnvoyDisabled() {
    // 初始化 envoy agent 和 envoy proxy,用于创建 envoy 执行的 cmd
    err = a.initializeEnvoyAgent(ctx)
    if err != nil {
      return nil, fmt.Errorf("failed to initialize envoy agent: %v", err)
    }

    a.wg.Add(1)
    go func() {
      defer a.wg.Done()
      // 启动 envoy 命令
      a.envoyAgent.Run(ctx)
    }()
  } else if a.WaitForSigterm() {
    // ...
  }
  return a.wg.Wait, nil
}

func (a *Agent) initializeEnvoyAgent(ctx context.Context) error {
  // 节点信息
  node, err := a.generateNodeMetadata()
  if err != nil {
    return fmt.Errorf("failed to generate bootstrap metadata: %v", err)
  }
  // 日志示例:Pilot SAN: [istiod.istio-system.svc]
  log.Infof("Pilot SAN: %v", node.Metadata.PilotSubjectAltName)

  // 自定义代理配置文件路径
  if len(a.proxyConfig.CustomConfigFile) > 0 {
    // there is a custom configuration. Don't write our own config - but keep watching the certs.
    a.envoyOpts.ConfigPath = a.proxyConfig.CustomConfigFile
    a.envoyOpts.ConfigCleanup = false
  } else {
    out, err := bootstrap.New(bootstrap.Config{
      Node: node,
    }).CreateFile()
    if err != nil {
      return fmt.Errorf("failed to generate bootstrap config: %v", err)
    }
    a.envoyOpts.ConfigPath = out
    a.envoyOpts.ConfigCleanup = true
  }

  // 用 proxy 配置回填 envoy 配置
  a.envoyOpts.BinaryPath = a.proxyConfig.BinaryPath
  a.envoyOpts.AdminPort = a.proxyConfig.ProxyAdminPort
  a.envoyOpts.DrainDuration = a.proxyConfig.DrainDuration
  a.envoyOpts.ParentShutdownDuration = a.proxyConfig.ParentShutdownDuration
  a.envoyOpts.Concurrency = a.proxyConfig.Concurrency.GetValue()

  a.envoyOpts.AgentIsRoot = os.Getuid() == 0 && strings.HasSuffix(a.cfg.DNSAddr, ":53")
  // 创建代理控制命令的实例
  envoyProxy := envoy.NewProxy(a.envoyOpts)

  drainDuration := a.proxyConfig.TerminationDrainDuration.AsDuration()
  localHostAddr := localHostIPv4
  if a.cfg.IsIPv6 {
    localHostAddr = localHostIPv6
  }
  // 创建新的代理 agent,其持有 envoyProxy 并管理 envoyProxy 的启动
  a.envoyAgent = envoy.NewAgent(envoyProxy, drainDuration, a.cfg.MinimumDrainDuration, localHostAddr,
    int(a.proxyConfig.ProxyAdminPort), a.cfg.EnvoyStatusPort, a.cfg.EnvoyPrometheusPort, a.cfg.ExitOnZeroActiveConnections)
  // 动态生成启动配置
  if a.cfg.EnableDynamicBootstrap {
    a.dynamicBootstrapWaitCh = make(chan error, 1)
    // 模拟一个启动的 xDS 请求
    b := backoff.NewExponentialBackOff(backoff.DefaultOption())
    for {
      // handleStream 在退出后处理请求,因此创建一个新的
      bsStream := &bootstrapDiscoveryStream{
        node:        node,
        errCh:       a.dynamicBootstrapWaitCh,
        // 配置有更新,调用 envoy.envoy 的 UpdateConfig 将配置写入到 etc/istio/proxy/envoy-rev.json
        envoyUpdate: envoyProxy.UpdateConfig,
      }
      _ = a.xdsProxy.handleStream(bsStream)
      delay := b.NextBackOff()
      select {
      case err, ok := <-a.dynamicBootstrapWaitCh:
        if !ok {
          log.Infof("successfully updated bootstrap config")
          return nil
        }
        // received invalid config, could not happen in normal case.
        log.Warn(err)
        return err
      case <-ctx.Done():
        return nil
      case <-time.After(delay):
        log.Infof("retrying bootstrap discovery request with backoff: %v", delay)
      }
    }
  }
  return nil
}

initSdsServer

initSdsServer 是初始化 SDS 服务发现,在 newSecretManager 函数中,获取了集群的根证书、本地服务器证书、私钥等。这里逻辑是,生成私钥和 CSR,向 istiod 发送 CSR,istiod 根据请求的服务身份签发证书。查看 sidecar 的日志中,对于 tlsOpts.RootCert 有打印“Using CA istiod.istio-system.svc:15012 cert with certs: var/run/secrets/istio/root-cert.pem”,结合 sidecar 的 pod 描述看,CA 根证书实际是从 istio-system 空间的 istio-ca-root-cert cm 中获取的,而 cm 这个值从手动创建的 cacerts secret 中获取的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// pkg/istio-agent/agent.go
func (a *Agent) initSdsServer() error {
  var err error
  // 检查工作负载证书文件是否挂载在指定的路径
  if security.CheckWorkloadCertificate(security.WorkloadIdentityCertChainPath, security.WorkloadIdentityKeyPath, security.WorkloadIdentityRootCertPath) {
    log.Info("workload certificate files detected, creating secret manager without caClient")
    a.secOpts.RootCertFilePath = security.WorkloadIdentityRootCertPath
    a.secOpts.CertChainFilePath = security.WorkloadIdentityCertChainPath
    a.secOpts.KeyFilePath = security.WorkloadIdentityKeyPath
    a.secOpts.FileMountedCerts = true
  }

  a.secretCache, err = a.newSecretManager()
  if err != nil {
    return fmt.Errorf("failed to start workload secret manager %v", err)
  }
  // 关闭 envoy 代理功能
  if a.cfg.DisableEnvoy {
    // 对于无代理模式,不需要 SDS 服务器,但仍然需要密钥并且需要定期刷新。
    go func() {
      st := a.secretCache
      st.RegisterSecretHandler(func(resourceName string) {
        // 缓存无效时,在 secret 需要更新时调用此 handler
        _, _ = a.getWorkloadCerts(st)
      })
      // 尝试获取证书,直到工作负载证书和根证书都获取了才返回
      _, _ = a.getWorkloadCerts(st)
    }()
  } else {
    // 私钥配置
    pkpConf := a.proxyConfig.GetPrivateKeyProvider()
    // 创建和开启 SDS 的 grpc 服务器
    a.sdsServer = sds.NewServer(a.secOpts, a.secretCache, pkpConf)
    // secretCache 注册当 secret 更新时调用的 handler
    a.secretCache.RegisterSecretHandler(a.sdsServer.OnSecretUpdate)
  }

  return nil
}

### SecretManager
// 为负载 secret 创建 SecretManager
func (a *Agent) newSecretManager() (*cache.SecretManagerClient, error) {
  // 使用文件挂载的证书
  if a.secOpts.FileMountedCerts {
    log.Info("Workload is using file mounted certificates. Skipping connecting to CA")
    return cache.NewSecretManagerClient(nil, a.secOpts)
  }
  // 打印示例:“CA Endpoint istiod.istio-system.svc:15012, provider Citadel”,即通过 istiod 的 15012 端口的 Citadel 模块获取
  log.Infof("CA Endpoint %s, provider %s", a.secOpts.CAEndpoint, a.secOpts.CAProviderName)

  var tlsOpts *citadel.TLSOptions
  var err error
  if strings.HasSuffix(a.secOpts.CAEndpoint, ":15010") {
    log.Warn("Debug mode or IP-secure network")
  } else {
    tlsOpts = &citadel.TLSOptions{}
    // 获取根证书
    tlsOpts.RootCert, err = a.FindRootCAForCA()
    if err != nil {
      return nil, fmt.Errorf("failed to find root CA cert for CA: %v", err)
    }

    if tlsOpts.RootCert == "" {
      log.Infof("Using CA %s cert with system certs", a.secOpts.CAEndpoint)
    } else if !fileExists(tlsOpts.RootCert) {
      log.Fatalf("invalid config - %s missing a root certificate %s", a.secOpts.CAEndpoint, tlsOpts.RootCert)
    } else {
      // 获取了有效的 CA 根证书,打印示例:“Using CA istiod.istio-system.svc:15012 cert with certs: var/run/secrets/istio/root-cert.pem”
      log.Infof("Using CA %s cert with certs: %s", a.secOpts.CAEndpoint, tlsOpts.RootCert)
    }
    // secOpts.ProvCert 为空,这两个也为空
    tlsOpts.Key, tlsOpts.Cert = a.getKeyCertsForCA()
  }

  // 创建 CitadelClient,即用于 Citadel 的 CA 客户端
  caClient, err := citadel.NewCitadelClient(a.secOpts, tlsOpts)
  if err != nil {
    return nil, err
  }
  // 创建 SecretManagerClient,使用提供的 client 签发证书签名请求文件
  return cache.NewSecretManagerClient(caClient, a.secOpts)
}

这里会创建 CitadelClient 用来请求 Citadel 模块,SecretManagerClient 里面会调用 CitadelClient 的 CSRSign 方法,这里关注下初始化和 CSRSign 方法逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
type CitadelClient struct {
  // 不为空表示开启 tls 连接
  tlsOpts   *TLSOptions
  client    pb.IstioCertificateServiceClient
  conn      *grpc.ClientConn
  provider  *caclient.TokenProvider
  opts      *security.Options
  usingMtls *atomic.Bool
}

func NewCitadelClient(opts *security.Options, tlsOpts *TLSOptions) (*CitadelClient, error) {
  c := &CitadelClient{
    tlsOpts:   tlsOpts,
    opts:      opts,
    provider:  caclient.NewCATokenProvider(opts),
    usingMtls: atomic.NewBool(false),
  }
  // 建立 grpc 连接
  conn, err := c.buildConnection()
  if err != nil {
    return nil, fmt.Errorf("failed to connect to endpoint %s", opts.CAEndpoint)
  }
  c.conn = conn
  // IstioCertificateServiceClient 是 IstioCertificateService 服务的客户端 API
  // 里面的 CreateCertificate 方法,使用提供的证书签名请求,来获取签发的证书
  c.client = pb.NewIstioCertificateServiceClient(conn)
  return c, nil
}

// 使用 CSR 请求 Citadel 来签发证书
func (c *CitadelClient) CSRSign(csrPEM []byte, certValidTTLInSec int64) ([]string, error) {
  crMetaStruct := &structpb.Struct{
    Fields: map[string]*structpb.Value{
      security.CertSigner: {
        Kind: &structpb.Value_StringValue{StringValue: c.opts.CertSigner},
      },
    },
  }
  req := &pb.IstioCertificateRequest{
    Csr:              string(csrPEM),
    ValidityDuration: certValidTTLInSec,
    Metadata:         crMetaStruct,
  }

  if err := c.reconnectIfNeeded(); err != nil {
    return nil, err
  }

  ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("ClusterID", c.opts.ClusterID))
  resp, err := c.client.CreateCertificate(ctx, req)
  if err != nil {
    return nil, fmt.Errorf("create certificate: %v", err)
  }

  if len(resp.CertChain) <= 1 {
    return nil, errors.New("invalid empty CertChain")
  }

  return resp.CertChain, nil
}

istiod 服务端 CreateCertificate

在上面的 client 中可以看到请求的 istiod 接口是“istio.v1.auth.IstioCertificateService/CreateCertificate”,找到对应的服务端处理逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func _IstioCertificateService_CreateCertificate_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
  in := new(IstioCertificateRequest)
  if err := dec(in); err != nil {
    return nil, err
  }
  if interceptor == nil {
    return srv.(IstioCertificateServiceServer).CreateCertificate(ctx, in)
  }
  info := &grpc.UnaryServerInfo{
    Server:     srv,
    FullMethod: "/istio.v1.auth.IstioCertificateService/CreateCertificate",
  }
  handler := func(ctx context.Context, req interface{}) (interface{}, error) {
    return srv.(IstioCertificateServiceServer).CreateCertificate(ctx, req.(*IstioCertificateRequest))
  }
  return interceptor(ctx, in, info, handler)
}

上面最终会调用到下面的 server.CreateCertificate 函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// security/pkg/server/ca/server.go
func (s *Server) Register(grpcServer *grpc.Server) {
  pb.RegisterIstioCertificateServiceServer(grpcServer, s)
}

// s.Ca 是通过 createIstioCA 函数创建的,详细参考 pilot-discovery 源码解析
func (s *Server) CreateCertificate(ctx context.Context, request *pb.IstioCertificateRequest) (
  *pb.IstioCertificateResponse, error,
) {
  s.monitoring.CSR.Increment()
  am := security.AuthenticationManager{Authenticators: s.Authenticators}
  caller := am.Authenticate(ctx)
  if caller == nil {
    s.monitoring.AuthnError.Increment()
    return nil, status.Error(codes.Unauthenticated, "request authenticate failure")
  }
  // By default, we will use the callers identity for the certificate
  sans := caller.Identities
  crMetadata := request.Metadata.GetFields()
  impersonatedIdentity := crMetadata[security.ImpersonatedIdentity].GetStringValue()
  if impersonatedIdentity != "" {
    ...
  }
  // 获取请求 metadata 中的字段“CertSigner”
  certSigner := crMetadata[security.CertSigner].GetStringValue()
  // 打印示例:generating a certificate for [spiffe://cluster.local/ns/sample/sa/sleep], requested ttl: 24h0m0s
  serverCaLog.Infof("generating a certificate for %v, requested ttl: %s",
    sans, time.Duration(request.ValidityDuration*int64(time.Second)))
  _, _, certChainBytes, rootCertBytes := s.ca.GetCAKeyCertBundle().GetAll()
  certOpts := ca.CertOpts{
    SubjectIDs: sans,
    TTL:        time.Duration(request.ValidityDuration) * time.Second,
    ForCA:      false,
    CertSigner: certSigner,
  }
  var signErr error
  var cert []byte
  var respCertChain []string
  if certSigner == "" {
    // 为空直接签发证书
    cert, signErr = s.ca.Sign([]byte(request.Csr), certOpts)
  } else {
    // 不为空,签发证书后,会返回此证书的证书链
    respCertChain, signErr = s.ca.SignWithCertChain([]byte(request.Csr), certOpts)
  }
  if certSigner == "" {
    respCertChain = []string{string(cert)}
    if len(certChainBytes) != 0 {
      respCertChain = append(respCertChain, string(certChainBytes))
    }
  }
  // 在证书链最后加上根证书,会导致 pods 证书链中有两个一样的根证书
  // 因为 certChainBytes 中包含了集群中间证书和根证书
  if len(rootCertBytes) != 0 {
    respCertChain = append(respCertChain, string(rootCertBytes))
  }
  response := &pb.IstioCertificateResponse{
    CertChain: respCertChain,
  }
  s.monitoring.Success.Increment()
  serverCaLog.Debug("CSR successfully signed.")
  return response, nil
}

这里会调用到:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// security/pkg/pki/ca/ca.go
func (ca *IstioCA) Sign(csrPEM []byte, certOpts CertOpts) ([]byte, error,) {
  return ca.sign(csrPEM, certOpts.SubjectIDs, certOpts.TTL, true, certOpts.ForCA)
}

func (ca *IstioCA) GetCAKeyCertBundle() *util.KeyCertBundle {
  return ca.keyCertBundle
}

func (ca *IstioCA) sign(csrPEM []byte, subjectIDs []string, requestedLifetime time.Duration, checkLifetime, forCA bool) ([]byte, error) {
  // 当前集群证书和私钥
  signingCert, signingKey, _, _ := ca.keyCertBundle.GetAll()
  if signingCert == nil {
    return nil, caerror.NewError(caerror.CANotReady, fmt.Errorf("Istio CA is not ready")) // nolint
  }
  // 解析 csr 请求并检查签名
  csr, err := util.ParsePemEncodedCSR(csrPEM)
  if err != nil {
    return nil, caerror.NewError(caerror.CSRError, err)
  }

  if err := csr.CheckSignature(); err != nil {
    return nil, caerror.NewError(caerror.CSRError, err)
  }
  // 生成证书
  certBytes, err := util.GenCertFromCSR(csr, signingCert, csr.PublicKey, *signingKey, subjectIDs, lifetime, forCA)
  if err != nil {
    return nil, caerror.NewError(caerror.CertGenError, err)
  }

  block := &pem.Block{
    Type:  "CERTIFICATE",
    Bytes: certBytes,
  }
  cert := pem.EncodeToMemory(block)

  return cert, nil
}

// 跟 Sign 类似,但返回叶子证书和整个证书链
func (ca *IstioCA) SignWithCertChain(csrPEM []byte, certOpts CertOpts) (
  []string, error,
) {
  cert, err := ca.signWithCertChain(csrPEM, certOpts.SubjectIDs, certOpts.TTL, true, certOpts.ForCA)
  if err != nil {
    return nil, err
  }
  return []string{string(cert)}, nil
}

func (ca *IstioCA) signWithCertChain(csrPEM []byte, subjectIDs []string, requestedLifetime time.Duration, lifetimeCheck,
  forCA bool,
) ([]byte, error) {
  cert, err := ca.sign(csrPEM, subjectIDs, requestedLifetime, lifetimeCheck, forCA)
  if err != nil {
    return nil, err
  }

  chainPem := ca.GetCAKeyCertBundle().GetCertChainPem()
  if len(chainPem) > 0 {
    cert = append(cert, chainPem...)
  }
  return cert, nil
}

其中 keyCertBundle.GetAll() 逻辑如下,其中 ca.KeyCertBundle 由 NewVerifiedKeyCertBundleFromFile 函数创建:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// security/pkg/pki/util/keycertbundle.go
func (b *KeyCertBundle) GetAll() (cert *x509.Certificate, privKey *crypto.PrivateKey, certChainBytes,
  rootCertBytes []byte,
) {
  b.mutex.RLock()
  cert = b.cert
  privKey = b.privKey
  certChainBytes = copyBytes(b.certChainBytes)
  rootCertBytes = copyBytes(b.rootCertBytes)
  b.mutex.RUnlock()
  return
}

// 传入的四个参数,分别多集群初始化时创建的 secret cacerts 中的四个文件:ca-cert.pem、ca-key.pem、cert-chain.pem、root-cert.pem
// 在 security/pkg/pki/ca/ca.go 的 NewPluggedCertIstioCAOptions 函数中调用
func NewVerifiedKeyCertBundleFromFile(certFile string, privKeyFile string, certChainFiles []string, rootCertFile string) (
  *KeyCertBundle, error,
) {
  certBytes, err := os.ReadFile(certFile)
  if err != nil {
    return nil, err
  }
  privKeyBytes, err := os.ReadFile(privKeyFile)
  if err != nil {
    return nil, err
  }
  var certChainBytes []byte
  if len(certChainFiles) > 0 {
    for _, f := range certChainFiles {
      var b []byte

      if b, err = os.ReadFile(f); err != nil {
        return nil, err
      }

      certChainBytes = append(certChainBytes, b...)
    }
  }
  rootCertBytes, err := os.ReadFile(rootCertFile)
  if err != nil {
    return nil, err
  }
  return NewVerifiedKeyCertBundleFromPem(certBytes, privKeyBytes, certChainBytes, rootCertBytes)
}

SecretManagerClient

用来生成私钥和 CSR,并请求 pilot 的证书中心 Citadel 获取证书,会获取两个特定名称的证书:

  • default:表示工作负载的 spiffe 证书,即本地 envoy 使用的证书
  • ROOTCA:集群根证书,用来校验其他服务证书
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// security/pkg/nodeagent/cache/secretcache.go
type SecretManagerClient struct {
  // 用来请求 CA 的客户端
  caClient security.Client

  configOptions *security.Options
  // 当 secret 改变时的回调函数
  secretHandler func(resourceName string)

  // 工作证书和根证书缓存
  cache secretCache

  generateMutex sync.Mutex
  // 存在证书链的位置,如果存在,则用这些文件指定的证书、私钥、根证书
  existingCertificateFile security.SdsCertificateConfig
  // 观测证书的变化,并触发通知给代理
  certWatcher *fsnotify.Watcher
  // ...
}

// 处理 secret 更新,secretHandler 实际是 sdsServer.OnSecretUpdate 函数
func (sc *SecretManagerClient) OnSecretUpdate(resourceName string) {
  sc.certMutex.RLock()
  defer sc.certMutex.RUnlock()
  if sc.secretHandler != nil {
    sc.secretHandler(resourceName)
  }
}

func NewSecretManagerClient(caClient security.Client, options *security.Options) (*SecretManagerClient, error) {
  watcher, err := fsnotify.NewWatcher()
  if err != nil {
    return nil, err
  }

  ret := &SecretManagerClient{
    queue:         queue.NewDelayed(queue.DelayQueueBuffer(0)),
    caClient:      caClient,
    configOptions: options,
    existingCertificateFile: security.SdsCertificateConfig{
      CertificatePath:   options.CertChainFilePath,
      PrivateKeyPath:    options.KeyFilePath,
      CaCertificatePath: options.RootCertFilePath,
    },
    certWatcher: watcher,
    fileCerts:   make(map[FileCert]struct{}),
    stop:        make(chan struct{}),
    caRootPath:  options.CARootPath,
  }

  go ret.queue.Run(ret.stop)
  go ret.handleFileWatch()
  return ret, nil
}

func (sc *SecretManagerClient) GenerateSecret(resourceName string) (secret *security.SecretItem, err error) {
  cacheLog.Debugf("generate secret %q", resourceName)
  defer func() {
    if secret == nil || err != nil {
      return
    }
    sc.outputMutex.Lock()
    // 存储获取的证书和私钥到文件中
    if resourceName == security.RootCertReqResourceName || resourceName == security.WorkloadKeyCertResourceName {
      if err := nodeagentutil.OutputKeyCertToDir(sc.configOptions.OutputKeyCertToDir, secret.PrivateKey,
        secret.CertificateChain, secret.RootCert); err != nil {
        cacheLog.Errorf("error when output the resource: %v", err)
      } else {
        resourceLog(resourceName).Debugf("output the resource to %v", sc.configOptions.OutputKeyCertToDir)
      }
    }
    sc.outputMutex.Unlock()
  }()

  // 从文件生成密钥
  if sdsFromFile, ns, err := sc.generateFileSecret(resourceName); sdsFromFile {
    if err != nil {
      return nil, err
    }
    return ns, nil
  }
  // 获取缓存的证书
  ns := sc.getCachedSecret(resourceName)
  if ns != nil {
    return ns, nil
  }

  t0 := time.Now()
  sc.generateMutex.Lock()
  defer sc.generateMutex.Unlock()

  // 获取锁后,再查询一次缓存的证书
  ns = sc.getCachedSecret(resourceName)
  if ns != nil {
    return ns, nil
  }

  if ts := time.Since(t0); ts > time.Second {
    cacheLog.Warnf("slow generate secret lock: %v", ts)
  }

  // 生成 CSR 和私钥,发送请求到 CA 获取工作负载的证书
  ns, err = sc.generateNewSecret(resourceName)
  if err != nil {
    return nil, fmt.Errorf("failed to generate workload certificate: %v", err)
  }

  // 存储 secret 到缓存中,触发工作负载证书的自动更新
  sc.registerSecret(*ns)

  if resourceName == security.RootCertReqResourceName {
    ns.RootCert = sc.mergeTrustAnchorBytes(ns.RootCert)
  } else {
    // 假如定期刷新获取了新的根证书,触发根证书请求刷新信任锚
    oldRoot := sc.cache.GetRoot()
    if !bytes.Equal(oldRoot, ns.RootCert) {
      cacheLog.Info("Root cert has changed, start rotating root cert")
      // We store the oldRoot only for comparison and not for serving
      sc.cache.SetRoot(ns.RootCert)
      sc.OnSecretUpdate(security.RootCertReqResourceName)
    }
  }

  return ns, nil
}

func (sc *SecretManagerClient) generateNewSecret(resourceName string) (*security.SecretItem, error) {
  trustBundlePEM := []string{}
  var rootCertPEM []byte

  if sc.caClient == nil {
    return nil, fmt.Errorf("attempted to fetch secret, but ca client is nil")
  }
  t0 := time.Now()
  logPrefix := cacheLogPrefix(resourceName)
  // spiffe 身份,spiffe 标准定义了以全自动方式获取和验证加密身份所需的接口和文档
  csrHostName := &spiffe.Identity{
    TrustDomain:    sc.configOptions.TrustDomain,
    Namespace:      sc.configOptions.WorkloadNamespace,
    ServiceAccount: sc.configOptions.ServiceAccount,
  }

  cacheLog.Debugf("constructed host name for CSR: %s", csrHostName.String())
  // CertOptions 用来生成新证书的配置
  options := pkiutil.CertOptions{
    Host:       csrHostName.String(),
    RSAKeySize: sc.configOptions.WorkloadRSAKeySize,
    // 是否是 PKCS#8 私钥
    PKCS8Key:   sc.configOptions.Pkcs8Keys,
    // 设置椭圆曲线签名算法,目前只支持 ECDSA
    ECSigAlg:   pkiutil.SupportedECSignatureAlgorithms(sc.configOptions.ECCSigAlg),
  }

  // 生成 X509 证书签名请求和私钥
  csrPEM, keyPEM, err := pkiutil.GenCSR(options)
  if err != nil {
    cacheLog.Errorf("%s failed to generate key and certificate for CSR: %v", logPrefix, err)
    return nil, err
  }

  numOutgoingRequests.With(RequestType.Value(monitoring.CSR)).Increment()
  timeBeforeCSR := time.Now()
  // 请求 pilot 的 CA 来签发证书
  certChainPEM, err := sc.caClient.CSRSign(csrPEM, int64(sc.configOptions.SecretTTL.Seconds()))
  if err == nil {
    // 获取 CA 根证书
    trustBundlePEM, err = sc.caClient.GetRootCertBundle()
  }
  // ...
  certChain := concatCerts(certChainPEM)

  var expireTime time.Time
  // 获取证书过期时间,默认时 createTime+sc.configOptions.SecretTTL
  if expireTime, err = nodeagentutil.ParseCertAndGetExpiryTimestamp(certChain); err != nil {
    cacheLog.Errorf("%s failed to extract expire time from server certificate in CSR response %+v: %v",
      logPrefix, certChainPEM, err)
    return nil, fmt.Errorf("failed to extract expire time from server certificate in CSR response: %v", err)
  }

  cacheLog.WithLabels("latency", time.Since(t0), "ttl", time.Until(expireTime)).Info("generated new workload certificate")

  if len(trustBundlePEM) > 0 {
    rootCertPEM = concatCerts(trustBundlePEM)
  } else {
    rootCertPEM = []byte(certChainPEM[len(certChainPEM)-1])
  }

  return &security.SecretItem{
    CertificateChain: certChain,
    PrivateKey:       keyPEM,
    ResourceName:     resourceName,
    CreatedTime:      time.Now(),
    ExpireTime:       expireTime,
    RootCert:         rootCertPEM,
  }, nil
}

SDS Grpc 服务

上面的 sds.NewServer 是创建通过 UDS 暴露的 grpc SDS 服务器,其定义如下。其中 OnSecretUpdate 函数,在 secret 有更新时,在 SecretManagerClient.OnSecretUpdate 中调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// security/pkg/nodeagent/sds/server.go
type Server struct {
  workloadSds *sdsservice
  grpcWorkloadListener net.Listener
  grpcWorkloadServer *grpc.Server
  stopped            *atomic.Bool
}

func NewServer(options *security.Options, workloadSecretCache security.SecretManager, pkpConf *mesh.PrivateKeyProvider) *Server {
  s := &Server{stopped: atomic.NewBool(false)}
  // 创建 SDS 服务,此服务实现了 envoy SDS API
  s.workloadSds = newSDSService(workloadSecretCache, options, pkpConf)
  s.initWorkloadSdsService()
  return s
}

func (s *Server) initWorkloadSdsService() {
  s.grpcWorkloadServer = grpc.NewServer(s.grpcServerOptions()...)
  // 添加 SDS handle 到 grpc 服务器
  s.workloadSds.register(s.grpcWorkloadServer)
  var err error
  // UDS 路径上(/var/run/secrets/workload-spiffe-uds/socket)进行监听,envoy 启动时会连接
  s.grpcWorkloadListener, err = uds.NewListener(security.WorkloadIdentitySocketPath)
  go func() {
    sdsServiceLog.Info("Starting SDS grpc server")
    waitTime := time.Second
    started := false
    for i := 0; i < maxRetryTimes; i++ {
      if s.stopped.Load() {
        return
      }
      serverOk := true
      setUpUdsOK := true
      if s.grpcWorkloadListener != nil {
        // 接收请求并处理
        if err = s.grpcWorkloadServer.Serve(s.grpcWorkloadListener); err != nil {
          sdsServiceLog.Errorf("SDS grpc server for workload proxies failed to start: %v", err)
          serverOk = false
        }
      }
      if serverOk && setUpUdsOK {
        sdsServiceLog.Infof("SDS server for workload certificates started, listening on %q", security.WorkloadIdentitySocketPath)
        started = true
        break
      }
      time.Sleep(waitTime)
      waitTime *= 2
    }
    if !started {
      sdsServiceLog.Warn("SDS grpc server could not be started")
    }
  }()
}

func (s *Server) OnSecretUpdate(resourceName string) {
  if s.workloadSds == nil {
    return
  }
  // 使用 ADS 推送配置更新
  s.workloadSds.XdsServer.Push(&model.PushRequest{
    Full: false,
    ConfigsUpdated: map[model.ConfigKey]struct{}{
      {Kind: kind.Secret, Name: resourceName}: {},
    },
    Reason: []model.TriggerReason{model.SecretTrigger},
  })
}

sdsservice

sdsservice 实现了 envoy SDS API。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// security/pkg/nodeagent/sds/sdsservice.go
type sdsservice struct {
  st security.SecretManager

  XdsServer  *xds.DiscoveryServer
  stop       chan struct{}
  rootCaPath string
  pkpConf    *mesh.PrivateKeyProvider
}

func newSDSService(st security.SecretManager, options *security.Options, pkpConf *mesh.PrivateKeyProvider) *sdsservice {
  ret := &sdsservice{
    st:      st,
    stop:    make(chan struct{}),
    pkpConf: pkpConf,
  }
  // pilot 的 grpc 实现,用于接收代理的 SDS 请求
  ret.XdsServer = NewXdsServer(ret.stop, ret)
  // 根 CA,cm 配置挂载到目录中获取的
  ret.rootCaPath = options.CARootPath

  if options.FileMountedCerts {
    return ret
  }
  // 预生成工作负载证书
  go func() {
    b := backoff.NewExponentialBackOff(backoff.DefaultOption())
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
      select {
      case <-ret.stop:
        cancel()
      case <-ctx.Done():
      }
    }()
    defer cancel()
    _ = b.RetryWithContext(ctx, func() error {
      // 获取工作负载证书,设置名称为 default,生成私钥和请求 CA 获取证书
      // 实际调用的是 SecretManagerClient.GenerateSecret 函数
      _, err := st.GenerateSecret(security.WorkloadKeyCertResourceName)
      if err != nil {
        sdsServiceLog.Warnf("failed to warm certificate: %v", err)
        return err
      }
      // 获取根证书,设置名称为 ROOTCA
      _, err = st.GenerateSecret(security.RootCertReqResourceName)
      if err != nil {
        sdsServiceLog.Warnf("failed to warm root certificate: %v", err)
        return err
      }

      return nil
    })
  }()

  return ret
}

// 添加 SDS handler 到 grpc 服务器,即 StreamSecrets、DeltaSecrets、FetchSecrets 三个 handler,这里只实现了 StreamSecrets
func (s *sdsservice) register(rpcs *grpc.Server) {
  // 引用 github.com/envoyproxy/go-control-plane 库
  sds.RegisterSecretDiscoveryServiceServer(rpcs, s)
}

func (s *sdsservice) StreamSecrets(stream sds.SecretDiscoveryService_StreamSecretsServer) error {
  return s.XdsServer.Stream(stream)
}

func (s *sdsservice) DeltaSecrets(stream sds.SecretDiscoveryService_DeltaSecretsServer) error {
  return status.Error(codes.Unimplemented, "DeltaSecrets not implemented")
}

func (s *sdsservice) FetchSecrets(ctx context.Context, discReq *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
  return nil, status.Error(codes.Unimplemented, "FetchSecrets not implemented")
}

// xds 发现服务,用来接收代理的 SDS 请求
func NewXdsServer(stop chan struct{}, gen model.XdsResourceGenerator) *xds.DiscoveryServer {
  // 创建基本的,功能性的发现服务,使用和 istiod 一样的代码,但用的时内存配置和 endpoints 存储
  s := xds.NewXDS(stop)
  // 设置不限流
  s.DiscoveryServer.RequestRateLimit = rate.NewLimiter(0, 1)
  s.DiscoveryServer.Generators = map[string]model.XdsResourceGenerator{
    v3.SecretType: gen,
  }
  // 决定是否需要推送给代理
  s.DiscoveryServer.ProxyNeedsPush = func(proxy *model.Proxy, req *model.PushRequest) bool {
    // ...
  }
  s.DiscoveryServer.Start(stop)
  return s.DiscoveryServer
}

RegisterSecretDiscoveryServiceServer

主要是注册 grcp 的方法,具体方案名称如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// github.com/envoyproxy/go-control-plane/envoy/service/secret/v3/sds.pb.go
func RegisterSecretDiscoveryServiceServer(s *grpc.Server, srv SecretDiscoveryServiceServer) {
  s.RegisterService(&_SecretDiscoveryService_serviceDesc, srv)
}

var _SecretDiscoveryService_serviceDesc = grpc.ServiceDesc{
  ServiceName: "envoy.service.secret.v3.SecretDiscoveryService",
  HandlerType: (*SecretDiscoveryServiceServer)(nil),
  Methods: []grpc.MethodDesc{
    {
      MethodName: "FetchSecrets",
      Handler:    _SecretDiscoveryService_FetchSecrets_Handler,
    },
  },
  Streams: []grpc.StreamDesc{
    {
      StreamName:    "DeltaSecrets",
      Handler:       _SecretDiscoveryService_DeltaSecrets_Handler,
      ServerStreams: true,
      ClientStreams: true,
    },
    {
      StreamName:    "StreamSecrets",
      Handler:       _SecretDiscoveryService_StreamSecrets_Handler,
      ServerStreams: true,
      ClientStreams: true,
    },
  },
  Metadata: "envoy/service/secret/v3/sds.proto",
}

XdsProxy

XdsProxy 代理所有 envoy 到 istiod 的 XDS 请求(为啥 envoy 不直接连接 istiod??看启动配置中有 istiod 配置),即连接逻辑为 envoy<->XDS Proxy<->istiod,此外还允许 agent 内部的子系统可以和 istiod/envoy 通信。目的是将所有到 istiod 和 envoy 的 xds 相关连接,用多个 grpc 流统一到单个 tcp 连接中。

在 handleStream 函数中,涉及到背压流控,即 XdsProxy 充当了管道,连接 envoy 和 istiod,两端的处理速度不一样会导致阻塞,参考 背压与流量控制 了解。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
// pkg/istio-agent/xds_proxy.go
type XdsProxy struct {
  stopChan             chan struct{}
  clusterID            string
  downstreamListener   net.Listener
  downstreamGrpcServer *grpc.Server
  istiodAddress        string
  istiodDialOptions    []grpc.DialOption
  optsMutex            sync.RWMutex
  handlers             map[string]ResponseHandler
  healthChecker        *health.WorkloadHealthChecker
  xdsHeaders           map[string]string
  xdsUdsPath           string
  proxyAddresses       []string
  ia                   *Agent

  httpTapServer      *http.Server
  tapMutex           sync.RWMutex
  tapResponseChannel chan *discovery.DiscoveryResponse

  connected                 *ProxyConnection
  initialHealthRequest      *discovery.DiscoveryRequest
  initialDeltaHealthRequest *discovery.DeltaDiscoveryRequest
  connectedMutex            sync.RWMutex

  wasmCache wasm.Cache
  ecdsLastAckVersion    atomic.String
  ecdsLastNonce         atomic.String
  downstreamGrpcOptions []grpc.ServerOption
  istiodSAN             string
}

func initXdsProxy(ia *Agent) (*XdsProxy, error) {
  var err error
  localHostAddr := localHostIPv4
  if ia.cfg.IsIPv6 {
    localHostAddr = localHostIPv6
  }
  var envoyProbe ready.Prober
  if !ia.cfg.DisableEnvoy {
    envoyProbe = &ready.Probe{
      AdminPort:     uint16(ia.proxyConfig.ProxyAdminPort),
      LocalHostAddr: localHostAddr,
    }
  }
  // 创建一个 wasm 缓存,用来在本地下载和存储 wasm 模块文件,见 [istio 扩展性](https://istio.io/latest/docs/concepts/wasm/)
  cache := wasm.NewLocalFileCache(constants.IstioDataDir, ia.cfg.WASMOptions)
  proxy := &XdsProxy{
    istiodAddress:         ia.proxyConfig.DiscoveryAddress,
    istiodSAN:             ia.cfg.IstiodSAN,
    clusterID:             ia.secOpts.ClusterID,
    handlers:              map[string]ResponseHandler{},
    stopChan:              make(chan struct{}),
    healthChecker:         health.NewWorkloadHealthChecker(ia.proxyConfig.ReadinessProbe, envoyProbe, ia.cfg.ProxyIPAddresses, ia.cfg.IsIPv6),
    xdsHeaders:            ia.cfg.XDSHeaders,
    xdsUdsPath:            ia.cfg.XdsUdsPath,
    wasmCache:             cache,
    proxyAddresses:        ia.cfg.ProxyIPAddresses,
    ia:                    ia,
    downstreamGrpcOptions: ia.cfg.DownstreamGrpcOptions,
  }

  if ia.localDNSServer != nil {
    // 设置 NameTable 的 handler,获取后需要更新 dns 服务
    proxy.handlers[v3.NameTableType] = func(resp *anypb.Any) error {
      var nt dnsProto.NameTable
      if err := resp.UnmarshalTo(&nt); err != nil {
        log.Errorf("failed to unmarshal name table: %v", err)
        return err
      }
      ia.localDNSServer.UpdateLookupTable(&nt)
      return nil
    }
  }
  // 开启动态获取 proxy 配置
  if ia.cfg.EnableDynamicProxyConfig && ia.secretCache != nil {
    // 获取代理配置的 handler,主要是更新信任包(包含信任域公钥的文件)
    proxy.handlers[v3.ProxyConfigType] = func(resp *anypb.Any) error {
      pc := &meshconfig.ProxyConfig{}
      if err := resp.UnmarshalTo(pc); err != nil {
        log.Errorf("failed to unmarshal proxy config: %v", err)
        return err
      }
      caCerts := pc.GetCaCertificatesPem()
      log.Debugf("received new certificates to add to mesh trust domain: %v", caCerts)
      trustBundle := []byte{}
      for _, cert := range caCerts {
        trustBundle = util.AppendCertByte(trustBundle, []byte(cert))
      }
      return ia.secretCache.UpdateConfigTrustBundle(trustBundle)
    }
  }
  // 日志示例:Initializing with upstream address "istiod.istio-system.svc:15012" and cluster "cluster1"
  proxyLog.Infof("Initializing with upstream address %q and cluster %q", proxy.istiodAddress, proxy.clusterID)
  // 初始化下游服务器,即创建 grpc 服务,设置聚合发现服务 handler 为 XdsProxy
  if err = proxy.initDownstreamServer(); err != nil {
    return nil, err
  }
  // 初始化上游客户端连接配置,即用证书信息出事 tls 连接
  if err = proxy.initIstiodDialOptions(ia); err != nil {
    return nil, err
  }

  go func() {
    // 开始接收下游连接
    if err := proxy.downstreamGrpcServer.Serve(proxy.downstreamListener); err != nil {
      log.Errorf("failed to accept downstream gRPC connection %v", err)
    }
  }()

  return proxy, nil
}

// XDS API 的实现,用来在 istiod 和 envoy 之间代理,envoy 每次请求 pilot-agent 时,就重建一个连接到上游的 xds,确保 istiod 和 pilot-agent 的新连接不会最终消耗待处理的消息,因为新连接可能不会连接到同一个 istiod。
func (p *XdsProxy) StreamAggregatedResources(downstream xds.DiscoveryStream) error {
  proxyLog.Debugf("accepted XDS connection from Envoy, forwarding to upstream XDS server")
  return p.handleStream(downstream)
}

func (p *XdsProxy) handleStream(downstream adsStream) error {
  con := &ProxyConnection{
    conID:           connectionNumber.Inc(),
    upstreamError:   make(chan error, 2), // can be produced by recv and send
    downstreamError: make(chan error, 2), // can be produced by recv and send
    // 请求 channel 无限大,envoy<->XDS Proxy<->istiod 系统产生了接收和发送天然循环。因为 grpc
    requestsChan: channels.NewUnbounded[*discovery.DiscoveryRequest](),
    // Allow a buffer of 1. This ensures we queue up at most 2 (one in process, 1 pending) responses before forwarding.
    responsesChan: make(chan *discovery.DiscoveryResponse, 1),
    stopChan:      make(chan struct{}),
    downstream:    downstream,
  }
  // 注册下游连接
  p.registerStream(con)
  defer p.unregisterStream(con)

  ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
  defer cancel()
  // 新创建到上游 istiod 的连接
  upstreamConn, err := p.buildUpstreamConn(ctx)
  // ...
  defer upstreamConn.Close()
  // 创建上游 ADS 的 client
  xds := discovery.NewAggregatedDiscoveryServiceClient(upstreamConn)
  ctx = metadata.AppendToOutgoingContext(context.Background(), "ClusterID", p.clusterID)
  for k, v := range p.xdsHeaders {
    ctx = metadata.AppendToOutgoingContext(ctx, k, v)
  }
  // s
  return p.handleUpstream(ctx, con, xds)
}

func (p *XdsProxy) handleUpstream(ctx context.Context, con *ProxyConnection, xds discovery.AggregatedDiscoveryServiceClient) error {
  upstream, err := xds.StreamAggregatedResources(ctx,
    grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
  // ...

  con.upstream = upstream

  // 处理上游的 xds 推送
  go func() {
    for {
      // 获取推送内容
      resp, err := con.upstream.Recv()
      // ...
      select {
      case con.responsesChan <- resp:
      case <-con.stopChan:
      }
    }
  }()
  // 处理对上游的请求,即获取 envoy 的 request,并转发给 istiod
  go p.handleUpstreamRequest(con)
  // 接收上游 istiod 的响应,转发给 envoy
  go p.handleUpstreamResponse(con)

  for {
    select {
      // ... 处理错误
    }
  }
}

func (p *XdsProxy) handleUpstreamResponse(con *ProxyConnection) {
  forwardEnvoyCh := make(chan *discovery.DiscoveryResponse, 1)
  for {
    select {
    case resp := <-con.responsesChan:
      proxyLog.Debugf("response for type url %s", resp.TypeUrl)
      metrics.XdsProxyResponses.Increment()
      // v3.NameTableType/v3.ProxyConfigType 更新是在 pilot-agent 处理,是在 initXdsProxy 函数中设置的
      if h, f := p.handlers[resp.TypeUrl]; f {
        if len(resp.Resources) == 0 {
          break
        }
        err := h(resp.Resources[0])
        var errorResp *google_rpc.Status
        // Send ACK/NACK
        con.sendRequest(&discovery.DiscoveryRequest{
          VersionInfo:   resp.VersionInfo,
          TypeUrl:       resp.TypeUrl,
          ResponseNonce: resp.Nonce,
          ErrorDetail:   errorResp,
        })
        continue
      }
      // 其他类型转发给 envoy
      switch resp.TypeUrl {
      case v3.ExtensionConfigurationType:
        if features.WasmRemoteLoadConversion {
          go p.rewriteAndForward(con, resp, func(resp *discovery.DiscoveryResponse) {
            select {
            case forwardEnvoyCh <- resp:
            case <-con.stopChan:
            }
          })
        } else {
          forwardToEnvoy(con, resp)
        }
      default:
        if strings.HasPrefix(resp.TypeUrl, v3.DebugType) {
          p.forwardToTap(resp)
        } else {
          forwardToEnvoy(con, resp)
        }
      }
    case resp := <-forwardEnvoyCh:
      forwardToEnvoy(con, resp)
    case <-con.stopChan:
      return
    }
  }
}

envoy agent

envoy 目录下的 Agent 结构,用于管理运行 envoy 命令的结构 Proxy,负责 Proxy 的启动和清理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// pkg/envoy/agent.go
type Agent struct {
  // proxy commands
  proxy Proxy

  // channel for proxy exit notifications
  statusCh chan exitStatus

  abortCh chan error

  // time to allow for the proxy to drain before terminating all remaining proxy processes
  terminationDrainDuration time.Duration
  minDrainDuration         time.Duration

  adminPort int
  localhost string

  knownIstioListeners sets.String

  exitOnZeroActiveConnections bool
}

// 创建一个 proxy agent,用于 envoy 的启动和清理功能
func NewAgent(proxy Proxy, terminationDrainDuration, minDrainDuration time.Duration, localhost string,
  adminPort, statusPort, prometheusPort int, exitOnZeroActiveConnections bool,
) *Agent {
  knownIstioListeners := sets.New[string](
    fmt.Sprintf("listener.0.0.0.0_%d.downstream_cx_active", statusPort),
    fmt.Sprintf("listener.0.0.0.0_%d.downstream_cx_active", prometheusPort),
    "listener.admin.downstream_cx_active",
    "listener.admin.main_thread.downstream_cx_active",
  )
  return &Agent{
    proxy:                       proxy,
    statusCh:                    make(chan exitStatus, 1), // context might stop drainage
    abortCh:                     make(chan error, 1),
    terminationDrainDuration:    terminationDrainDuration,
    minDrainDuration:            minDrainDuration,
    exitOnZeroActiveConnections: exitOnZeroActiveConnections,
    adminPort:                   adminPort,
    localhost:                   localhost,
    knownIstioListeners:         knownIstioListeners,
  }
}

// 启动 envoy 并一直等待 envoy 终止
func (a *Agent) Run(ctx context.Context) {
  log.Info("Starting proxy agent")
  // 用 go routine 启动命令,等待完成
  go a.runWait(a.abortCh)

  select {
  case status := <-a.statusCh:
    // ...
  case <-ctx.Done():
    a.terminate()
    // ...
  }
}

func (a *Agent) runWait(abortCh <-chan error) {
  log.Infof("starting")
  // 启动 proxy
  err := a.proxy.Run(abortCh)
  a.proxy.Cleanup()
  a.statusCh <- exitStatus{err: err}
}

envoy proxy

envoy 目录下 envoy 结构体实现了 Proxy 接口,用于组装参数,启动 envoy 命令。在上面的 Agent 结构中会调用 envoy 结构的 Run 和 Cleanup 函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// pkg/envoy/proxy.go
type envoy struct {
  ProxyConfig
  extraArgs []string
}
type ProxyConfig struct {
  LogLevel          string
  ComponentLogLevel string
  NodeIPs           []string
  Sidecar           bool
  LogAsJSON         bool
  OutlierLogPath string

  BinaryPath             string
  ConfigPath             string
  ConfigCleanup          bool
  AdminPort              int32
  DrainDuration          *durationpb.Duration
  ParentShutdownDuration *durationpb.Duration
  Concurrency            int32

  TestOnly    bool
  AgentIsRoot bool
}

// 代理控制命令实例
func NewProxy(cfg ProxyConfig) Proxy {
  var args []string
  logLevel, componentLogs := splitComponentLog(cfg.LogLevel)
  if logLevel != "" {
    args = append(args, "-l", logLevel)
  }
  if len(componentLogs) > 0 {
    args = append(args, "--component-log-level", strings.Join(componentLogs, ","))
  } else if cfg.ComponentLogLevel != "" {
    // Use the old setting if we don't set any component log levels in LogLevel
    args = append(args, "--component-log-level", cfg.ComponentLogLevel)
  }

  return &envoy{
    ProxyConfig: cfg,
    extraArgs:   args,
  }
}

func (e *envoy) Run(abort <-chan error) error {
  // 启动一个新的 envoy 进程
  args := e.args(e.ConfigPath, istioBootstrapOverrideVar.Get())
  // 日志示例:Envoy command: [-c etc/istio/proxy/envoy-rev.json --drain-time-s 45 --drain-strategy immediate --parent-shutdown-time-s 60 --local-address-ip-version v4 --file-flush-interval-msec 1000 --disable-hot-restart --log-format %Y-%m-%dT%T.%fZ     %l      envoy %n        %v -l warning --component-log-level misc:error --concurrency 2]
  log.Infof("Envoy command: %v", args)
  // 组装启动 Cmd
  cmd := exec.Command(e.BinaryPath, args...)
  cmd.Stdout = os.Stdout
  cmd.Stderr = os.Stderr
  if e.AgentIsRoot {
    cmd.SysProcAttr = &syscall.SysProcAttr{}
    cmd.SysProcAttr.Credential = &syscall.Credential{
      Uid: 1337,
      Gid: 1337,
    }
  }
  // 启动 envoy 命令
  if err := cmd.Start(); err != nil {
    return err
  }
  done := make(chan error, 1)
  go func() {
    done <- cmd.Wait()
  }()
  // ...
}

func (e *envoy) Cleanup() {
  if e.ConfigCleanup {
    if err := os.Remove(e.ConfigPath); err != nil {
      log.Warnf("Failed to delete config file %s: %v", e.ConfigPath, err)
    }
  }
}

istio 证书逻辑

pilot-agent 代码中主要涉及到证书和 XDS,这里再根据实际的配置实例说明下 istio 的证书逻辑。Istio 使用 X.509 证书安全地为每个工作负载提供强身份,应用 sidecar 容器中的 istio-agent 会负责密钥和证书的自动更新,流程图如下: identify_provision_workflow

Istio 通过下面流程来提供密钥和证书:

  • istiod 提供一个用于接收 CSRs(证书签名请求)的 grpc 服务
  • 当 istio-agent 启动时,创建私钥和 CSR,将带有证书的 CSR 发送给 istiod
  • istiod 中的 CA(认证中心)校验 CSR 中的证书,成功后就签署 CSR 以生成证书
  • 当工作负载启动后,envoy 通过 SDS(secret 服务发现)API 来请求证书和密钥
  • istio-agent 发送私钥和从 istiod 中收到的证书给 envoy
  • istio-agent 监视负载的证书过期时间,一旦过期则重复上面的流程

istio 提供了两种类型的认证器:

  • 对等认证:用于服务对服务身份验证,基于 mTLS
  • 请求认证:用于终端用户认证,校验请求的证书,基于 JWT token

在认证中会多次提到主题备用名称(Subject Alternative Name,缩写 SAN),允许在安全证书中使用 subjectAltName 字段将多种值与证书关联,即为单个 SSL 证书指定其他主机名。

需要注意的是,这里使用的是 mTLS,即双向 TLS。对 Web 服务的访问一般使用单向 TLS,只需要服务端提供身份证明,如果服务端需要验证客户身份时,可以在代码中使用密码、token、双因子认证等方式认证。而双向 TLS 需要同时校验服务端和客户端的身份,运行在应用程序之外,不需要修改应用程序。

sidecar 证书配置

通过下面的命令导出 envoy 的完整配置:

1
istioctl proxy-config all sleep-78ff5975c6-4xpv4 -nsample -o json > sleep.json

envoy 配置的 SDS 服务发现地址配置如下,其中 pipe 设置的是 unix domain socket 地址,这是 pilot-agent 提供的 grpc 服务,两个服务在同一个容器中,因此使用的 socket。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
    "name": "sds-grpc",
    "type": "STATIC",
    "connect_timeout": "1s",
    "load_assignment": {
     "cluster_name": "sds-grpc",
     "endpoints": [
      {
       "lb_endpoints": [
        {
         "endpoint": {
          "address": {
           "pipe": {
            "path": "./var/run/secrets/workload-spiffe-uds/socket"
           }
          }
         }
        }
       ]
      }
     ]
    },
    "typed_extension_protocol_options": {
     "envoy.extensions.upstreams.http.v3.HttpProtocolOptions": {
      "@type": "type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions",
      "explicit_http_config": {
       "http2_protocol_options": {}
      }
     }
    }
}

在 enovy 边车配置中,有两处需要通过 SDS 来配置证书

  • Inbound Listener,用于接收下游的请求对外提供服务,设置了类型为 DownstreamTlsContext 的 tls 传输协议,里面指定了通过 SDS API 获取的 tls 证书。本地做服务器使用证书逻辑为:
    • default 证书:客户端连接时,向请求方发送 default 证书公钥,自己用 default 证书私钥,来进行 TLS 的认证
    • ROOTCA 证书:客户端连接时,用 ROOTCA 证书校验客户端的证书,并校验客户端证书中的 SAN(安装 Istio 时通过参数 values.global.trustDomain 指定了信任域)
  • Outbound Cluster:用于请求上游服务器,设置了 upstreamTlsContext 中的配置,里面设置了客户端证书、私钥,以及验证上游服务器端的 CA 根证书。本地做客户端使用证书逻辑为:
    • default 证书:连接服务端时,向服务端发送 default 证书公钥,用来证明自己的身份
    • ROOTCA 证书:连接服务端时,用 ROOTCA 证书校验服务端的证书,并校验服务端证书中的 SAN

先来看下 Inbound Listener 相关配置:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
{
    "name": "virtualInbound",
    "active_state": {
     "version_info": "2023-01-17T04:44:27Z/14",
     "listener": { // listener 配置
      "@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
      "name": "virtualInbound",
      "address": {
       "socket_address": {
        "address": "0.0.0.0",
        "port_value": 15006     // 15006 端口监听请求
       }
      },
      "filter_chains": [
       {
        "filter_chain_match": {
         "destination_port": 80,    // 向 sleep 服务的 80 端口请求
         "transport_protocol": "tls",   // tls 协议
         "application_protocols": [
          "istio",
          "istio-peer-exchange",
          "istio-http/1.0",
          "istio-http/1.1",
          "istio-h2"
         ]
        },
        "filters": [
         {
          "name": "istio.metadata_exchange"
         },
         {
          "name": "envoy.filters.network.http_connection_manager",
          "typed_config": {
           "@type": "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager",
           "stat_prefix": "inbound_0.0.0.0_80",
          }
         }
        ],
        "transport_socket": {
         "name": "envoy.transport_sockets.tls", // 见 [tls 传输 socket](https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/transport_sockets/tls/v3/tls.proto.html)
         "typed_config": { // 配置为下游 tls
          "@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext",
          "common_tls_context": {
           "tls_params": {
            "tls_minimum_protocol_version": "TLSv1_2",
            "tls_maximum_protocol_version": "TLSv1_3",
            "cipher_suites": [  // 加密套件
             "ECDHE-ECDSA-AES256-GCM-SHA384",
             "ECDHE-RSA-AES256-GCM-SHA384",
             "ECDHE-ECDSA-AES128-GCM-SHA256",
             "ECDHE-RSA-AES128-GCM-SHA256",
             "AES256-GCM-SHA384",
             "AES128-GCM-SHA256"
            ]
           },
           "alpn_protocols": [
            "h2",
            "http/1.1"
           ],
           "tls_certificate_sds_secret_configs": [ // 用来通过 SDS API 获取 tls 证书的配置
            {
             "name": "default",          // 本地使用的证书名称
             "sds_config": {                    // SDS 服务器配置
              "api_config_source": {
               "api_type": "GRPC",
               "grpc_services": [
                {
                 "envoy_grpc": {
                  "cluster_name": "sds-grpc"    // SDS 服务器的 cluster 名称
                 }
                }
               ],
               "set_node_on_first_message_only": true,
               "transport_api_version": "V3"
              },
              "initial_fetch_timeout": "0s",
              "resource_api_version": "V3"
             }
            }
           ],
           "combined_validation_context": { // 联合的证书验证上下文,包含 default_validation_context 和 validation_context_sds_secret_config 两个教育上下文
            "default_validation_context": { // 验证对等证书
             "match_subject_alt_names": [ // 主题备用名称(SAN)匹配器的列表,假如设置了,envoy 将会校验所提交证书的主题替代名称是否与指定的匹配器之一相匹配
              {
               "prefix": "spiffe://cluster.local/"
              }
             ]
            },
            "validation_context_sds_secret_config": { // 通过 SDS API 验证证书
             "name": "ROOTCA",
             "sds_config": {
              "api_config_source": {
               "api_type": "GRPC",
               "grpc_services": [
                {
                 "envoy_grpc": {
                  "cluster_name": "sds-grpc"    // SDS 服务器 cluster 名称
                 }
                }
               ],
               "set_node_on_first_message_only": true,
               "transport_api_version": "V3"
              },
              "initial_fetch_timeout": "0s",
              "resource_api_version": "V3"
             }
            }
           }
          },
          "require_client_certificate": true
         }
        },
        "name": "0.0.0.0_80"
       },
       {
        "filter_chain_match": {
         "destination_port": 80,
         "transport_protocol": "raw_buffer" // 默认值,没有传输协议时使用
        },
        "name": "0.0.0.0_80"
       }
      ],
      "listener_filters": [
       {
        "name": "envoy.filters.listener.original_dst",
        "typed_config": {
         "@type": "type.googleapis.com/envoy.extensions.filters.listener.original_dst.v3.OriginalDst"
        }
       },
       {
        "name": "envoy.filters.listener.tls_inspector", // tls 检测监听过滤器,检测传输是 tls 还是纯文本
        "typed_config": {
         "@type": "type.googleapis.com/envoy.extensions.filters.listener.tls_inspector.v3.TlsInspector"
        },
        "filter_disabled": {
         "destination_port_range": {
          "start": 15006,
          "end": 15007
         }
        }
       },
       {
        "name": "envoy.filters.listener.http_inspector",  // http 检测监听过滤器
        "typed_config": {
         "@type": "type.googleapis.com/envoy.extensions.filters.listener.http_inspector.v3.HttpInspector"
        },
       }
      ],
      "listener_filters_timeout": "0s",
      "traffic_direction": "INBOUND",
      "continue_on_listener_filters_timeout": true
     },
     "last_updated": "2023-01-17T04:44:32.331Z"
    }
   }

再来看下 Outbound Cluster 相关配置如下所示,sleep pods 的 enovy 通过名为“outbound|5000||helloworld.sample.svc.cluster.local”的 cluster 访问上游 helloworld 服务,因此需要在该 cluster 上配置客户端证书以及验证服务器端证书的 CA 根证书。在这里我们需要注意的是,Envoy 在验证服务器端证书时会同时验证证书中的 主题备用名称(Subject Alternative Name) 字段。该字段中设置的是 helloworld 服务 Pod 关联的 Service Account 名称。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
{
    "version_info": "2023-01-17T04:44:27Z/14",
    "cluster": {
     "@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
     "name": "outbound|5000||helloworld.sample.svc.cluster.local",
     "type": "EDS", // 使用 EDS 服务发现从 pilot 获取的
     "eds_cluster_config": {
      "eds_config": {
       "ads": {},
       "initial_fetch_timeout": "0s",
       "resource_api_version": "V3"
      },
      "service_name": "outbound|5000||helloworld.sample.svc.cluster.local"
     },
     "connect_timeout": "10s",
     "lb_policy": "LEAST_REQUEST",
     "circuit_breakers": {},
      // 访问 helloworld 服务的 5000 端口的出向流量
     "metadata": {
      "filter_metadata": {
       "istio": {
        "services": [
         {
          "namespace": "sample",
          "host": "helloworld.sample.svc.cluster.local",
          "name": "helloworld"
         }
        ],
        "default_original_port": 5000
       }
      }
     },
     "common_lb_config": {
      "locality_weighted_lb_config": {}
     },
     "transport_socket_matches": [ // 为不同的 endpoints 使用不同的传输 socket,见 [envoy cluster 配置](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/cluster/v3/cluster.proto.html)
      {
       "name": "tlsMode-istio",
       "match": {
        "tlsMode": "istio"
       },
       "transport_socket": {  // 可选的自定义传输 socket 实现,用于上游连接
        "name": "envoy.transport_sockets.tls",  // tls 传输 socket 协议
        "typed_config": {   // UpstreamTlsContext 表示上游连接
         "@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
         "common_tls_context": {
          "tls_params": {
           "tls_minimum_protocol_version": "TLSv1_2",
           "tls_maximum_protocol_version": "TLSv1_3"
          },
          "alpn_protocols": [
           "istio-peer-exchange",
           "istio"
          ],
          "tls_certificate_sds_secret_configs": [ // 用来通过 SDS API 获取 tls 证书的配置
           {
            "name": "default",
            "sds_config": {
             "api_config_source": {
              "api_type": "GRPC",
              "grpc_services": [
               {
                "envoy_grpc": {
                 "cluster_name": "sds-grpc"  // SDS 服务器的 cluster 名称
                }
               }
              ],
              "set_node_on_first_message_only": true,
              "transport_api_version": "V3"
             },
             "initial_fetch_timeout": "0s",
             "resource_api_version": "V3"
            }
           }
          ],
          "combined_validation_context": {
           "default_validation_context": {  // 验证对等证书
            "match_subject_alt_names": [ // SAN 匹配器的列表,假如设置了,envoy 会检验上游服务器的 SAN 是否一致
             {
              "exact": "spiffe://cluster.local/ns/sample/sa/default" // 上游服务器 service account 名称应该为 default
             }
            ]
           },
           "validation_context_sds_secret_config": {  // 通过 SDS API 验证证书
            "name": "ROOTCA",
            "sds_config": {
             "api_config_source": {
              "api_type": "GRPC",
              "grpc_services": [
               {
                "envoy_grpc": {
                 "cluster_name": "sds-grpc"
                }
               }
              ],
              "set_node_on_first_message_only": true,
              "transport_api_version": "V3"
             },
             "initial_fetch_timeout": "0s",
             "resource_api_version": "V3"
            }
           }
          }
         },
         "sni": "outbound_.5000_._.helloworld.sample.svc.cluster.local"
        }
       }
      },
      {
       "name": "tlsMode-disabled",
       "match": {},
       "transport_socket": {
        "name": "envoy.transport_sockets.raw_buffer",
        "typed_config": {
         "@type": "type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer"
        }
       }
      }
     ]
    },
    "last_updated": "2023-01-17T04:44:32.227Z"
}

因为这里的 helloworld 服务的 deploy 中没有设置 serviceAccount,因此默认为 false,可以查看环境的 serviceAccount 有 default 和 sleep:

1
2
3
4
root@cluster1:~/zt/istio# kubectl get serviceAccount -nsample
NAME      SECRETS   AGE
default   0         15d
sleep     0         15d

在配置中可以看到通过 SDS 服务器获取到的证书,配置如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
    // 通过 SDS 动态获取的 secret 信息
   "@type": "type.googleapis.com/envoy.admin.v3.SecretsConfigDump",
   "dynamic_active_secrets": [
    {
     "name": "default",
     "last_updated": "2023-01-31T16:44:32.716Z",
     "secret": { // 见 [secret 配置协议](https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/transport_sockets/tls/v3/secret.proto.html)
      "@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.Secret",
      "name": "default",
      "tls_certificate": {
       "certificate_chain": {  // tls 证书链
        "inline_bytes": "xxx"
       },
       "private_key": { // tls 私钥
        "inline_bytes": "W3JlZGFjdGVkXQ=="
       }
      }
     }
    },
    {
     "name": "ROOTCA",
     "last_updated": "2023-01-17T04:44:32.235Z",
     "secret": {
      "@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.Secret",
      "name": "ROOTCA",
      "validation_context": {   // 包含认证中心的证书数据,用来验证一个给定对等证书(peer certificate)
       "trusted_ca": {
        "inline_bytes": "xxx"
       }
      }
     }
    }
   ]
}

上面配置中,名为 default 的证书是当前服务的证书,证书中的 SAN 使用了该服务对应的命名空间下的 serviceaccount;名为 ROOTCA 的证书是集群的根证书,可以用于认证各个服务的证书。不同的服务使用的 default 证书不同,但使用的 ROOTCA 证书是相同的。(如何导出 default 和 ROOTCA 证书,分别对应下面的 pod.crt 和 root-cat.crt)

Gateway 证书配置

Gateway 中使用的容器和 sidecar 一样,也是 pilot-agent 和 envoy 进程组成。Gateway 分为 Ingress Gateway 和 Egress Gateway,分别负责集群入向和出向流量,其证书配置逻辑和 sidecar 的差别如下:

  • Ingress:做为服务端和 sidecar 有差异,使用的服务器端证书和私钥,一般是由一个权威 CA 或者第三方 CA 签发
  • Egress:做为客户端和 sizecar 有差异,外部服务器采用了 TLS,需要配置 CA 根证书来进行验证,这个 CA 根证书一般由权威 CA 或者第三方 CA 签发

参考