Deepen Understanding of CNI by reading amazon-vpc-cni-k8s

Hajime Terasawa
46 min readJun 23, 2020

--

仕事で amazon EKS を運用していると、時たま network 周りのエラーに遭遇します。都度都度 on-demand に調査をしているのですが、一度全体感を抑えておいた方がいいなと感じたので amazon-vpc-cni-k8s のコードリーディングを通して k8s の network 周りの理解を深めることにしました。

CNI

CNI とは

端的にまとめると、Container Runtime が用意する network namespace に対して network interface を ADD/DEL する操作を実装する interface です。

Container Runtime の仕様となる CRI が CNI に依存して実装されるので、具体的な Network 設計を抽象化して CRI(及びその client となる kubelet)から network resource を利用することが可能となります。これにより Container の lifecycle に合わせて CNI API を介して Container への network interface の着脱を行え、Container Management System への reachability 提供がシンプルに行えるようになります。

各 platform ではその platform 上で上記仕様を満たす CNI-Plugin を用意すればどこでも CRI を実装する client を動作させることが可能となります。

具体的な仕様は containernetworking/cniSPEC が詳しいです。

CNI Plugin

CNI Plugin は CNI の実装であり、aws vpc 上に k8s 用の実装を行ったのが、今回読むことにした aws/amazon-vpc-cni-k8s となります。

CNI Plugin では前述したように、network interface の ADD/DEL の実装を提供しています。例えば ADD すると Container/Host 間を veth でつないで、default gateway を設定して経路を通すといったような操作を行うことになります(詳細は後ほど)。

amazon-vpc-cni-k8s 概要

amazon-vpc-cni-k8s とは aws が提供する VPC 上で k8s network を動作させるための CNI Plugin となります。

EKS は VPC 内の Subnet 上に Cluster を作るので、ENI を介して Worker Node と VPC 内 resource は通信することができます。

これに加えて、Worker Node 上に乗った複数の Pod に対して isolation を維持したまま reachability を提供するのがこの Plugin の仕事となります。

Worker Node の中の動きを軽く説明してみると、amazon-vpc-cni-k8s は Local IP Address Manager Daemon (L-IPAMD) Pod をデプロイし、L-IPAMD が kubelet から ADD/DEL を受け、実際の network resource の調整を行います。

Worker Node に attach した ENI 及びそこに attach されている Private IPは L-IPAMD が memory 上に pool しており、kubelet からの ADD/DEL 呼び出しに応じて Pod に対して割り振ったり回収したりします。 この際、合わせて Host と Pod 間の経路作成なども行われます。

Host Instance からの通信は ENI を介して VPC に出ていけるので、Host と Pod 間の経路を用意した後は VPC fabric を通じて Pod to Pod での communication が可能となります。

https://github.com/aws/amazon-vpc-cni-k8s/blob/bc04604397889430f0a3d5f6e4766b399c1d5fcc/docs/cni-proposal.md#life-of-a-pod-to-pod-ping-packet

同様に Internet に出ていく通信も VPC fabric を経由することで可能となります。

https://github.com/aws/amazon-vpc-cni-k8s/blob/bc04604397889430f0a3d5f6e4766b399c1d5fcc/docs/cni-proposal.md#life-of-a-pod-to-external-packet

L-IPAMD の初期化

このように amazon-vpc-cni-k8s の責務の大半は L-IPAMD が担っているので、L-IPAMD が初期化されるところから見ていくことにします。

IPAMContext

L-IPAMD の実体は IPAMContext です。

type IPAMContext struct {
// client for EC2 metadata service
awsClient awsutils.APIs
// in-memory map for
// ENIs, Private IPs, mappings of pod and Private IP
dataStore *datastore.DataStore
// client for k8s API
k8sClient kubernetes.Interface
useCustomNetworking bool
eniConfig eniconfig.ENIConfig
// client for network operations like iproute2
networkClient networkutils.NetworkAPIs
maxIPsPerENI int
maxENI int
// a set of ENIs tagged with “node.k8s.amazonaws.com/no_manage”
unmanagedENIs UnmanagedENISet
unmanagedENI int warmENITarget int
warmIPTarget int
minimumIPTarget int

// primaryIP is a map from ENI ID to Primary IP of that ENI
primaryIP map[string]string
lastNodeIPPoolAction time.Time
lastDecreaseIPPool time.Time
// keeps timestamps of the last time an IP address was unassigned from an ENI, so that we don’t reconcile and add it back too quickly if IMDS lags behind reality.
reconcileCooldownCache ReconcileCooldownCache
// Flag to warn that the pod is about to shut down.
terminating int32
disableENIProvisioning bool
}

IPAMContext の初期化処理で configuration の load と、client の初期化処理、そして Host Network の Setup 処理が走ります。

いくつか重要な component について順番に深ぼってみます。

awsClient

Host Network を変更するために必要な instance metadata はこのタイミングで awsClient の初期化処理の一環として memory cache されます。

func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
// retrieve availability-zone
az, err := cache.ec2Metadata.GetMetadata(metadataAZ)
cache.availabilityZone = az

// retrieve eth0 local-ipv4
cache.localIPv4, err = cache.ec2Metadata.GetMetadata(metadataLocalIP)

// retrieve instance-id
cache.instanceID, err = cache.ec2Metadata.GetMetadata(metadataInstanceID)

// retrieve instance-type
cache.instanceType, err = cache.ec2Metadata.GetMetadata(metadataInstanceType)

// retrieve primary interface's mac
mac, err := cache.ec2Metadata.GetMetadata(metadataMAC)
cache.primaryENImac = mac
err = cache.setPrimaryENI()

// retrieve security groups
metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs)
sgIDs := strings.Fields(metadataSGIDs)
for _, sgID := range sgIDs {
cache.securityGroups = append(cache.securityGroups, aws.String(sgID))
}

// retrieve sub-id
cache.subnetID, err = cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSubnetID)

// retrieve vpc-ipv4-cidr-block (Primary CIDR)
cache.vpcIPv4CIDR, err = cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataVPCcidr)

// retrieve vpc-ipv4-cidr-blocks (including Non-Primary CIDR)
metadataVPCIPv4CIDRs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataVPCcidrs)
vpcIPv4CIDRs := strings.Fields(metadataVPCIPv4CIDRs)
for _, vpcCIDR := range vpcIPv4CIDRs {
cache.vpcIPv4CIDRs = append(cache.vpcIPv4CIDRs, aws.String(vpcCIDR))
}
return nil
}

DataStore

Datastore は Worker Node が利用可能な ENI, IP であったり、それらの Pod への mapping 情報を保持します。

type DataStore struct {
total int
assigned int
eniPool ENIPool // map[string]*ENI
lock sync.Mutex
log logger.Logger
backingStore Checkpointer
cri cri.APIs
}

DataStore は ENI List、ENI に紐づく IP List、IP を割り当てられた Container ID及び Network 情報を Pool します。

L-IPAMD は ADD/DEL および internal reconcile loop を通してこの DataStore を更新することで Node 上の Network を管理することになります。

実際に pool される ENIIP のデータ型はこんな感じです。

type ENI struct {
// AWS ENI ID
ID string
createTime time.Time // IsPrimary indicates whether ENI is a primary ENI
IsPrimary bool
// DeviceNumber is the device number of ENI
// 0 means the primary ENI
DeviceNumber int
// IPv4Addresses shows whether each address is assigned,
// the key is IP address
IPv4Addresses map[string]*AddressInfo
}
type AddressInfo struct {
IPAMKey IPAMKey
Address string
UnassignedTime time.Time
}
type IPAMKey struct {
NetworkName string `json:"networkName"`
ContainerID string `json:"containerID"`
IfName string `json:"ifName"`
}

ENI/IP Target

CNI-Plugin の設定でチェックしておいたほうがよいのが、ENI, IP Target 数です。Node あたりの利用可能な network resource は環境変数を通してこの configuration へと反映されます。

利用可能な network の resource limit は VPC Subnet の CIDR や Worker Node Instance Type に依存するのですが、resource request はここで制御可能です。Node あたりの resource 効率が悪い場合、subnet の available IP を使い切ってしまうということもあるので、確認しておいたほうがよい設定です。

  • warmENITarget int
  • warmIPTarget int
  • minimumIPTarget int

詳しくは doc をご覧ください。

nodeInit

IPAMContext の最後に走る Host Network の Setup 処理です。

func (c *IPAMContext) nodeInit() error {
nodeMaxENI, err := c.getMaxENI()
c.maxENI = nodeMaxENI
c.maxIPsPerENI, err = c.awsClient.GetENIipLimit()

// VPC CIDRs including Non-Primary CIDRs
var pbVPCcidrs []string
vpcCIDRs := c.awsClient.GetVPCIPv4CIDRs()
for _, cidr := range vpcCIDRs {
pbVPCcidrs = append(pbVPCcidrs, *cidr)
}
_, vpcCIDR, err := net.ParseCIDR(c.awsClient.GetVPCIPv4CIDR())
primaryIP := net.ParseIP(c.awsClient.GetLocalIPv4())

// Primary ENI の設定
c.awsClient.GetPrimaryENImac(), &primaryIP)

// Node に紐付けられた ENIMetadata を取得
c.setUnmanagedENIs(tagMap)
// ENI を setup する
enis := c.filterUnmanagedENIs(eniMetadata)
for _, eni := range enis {
err = c.setupENI(eni.ENIID, eni)
}

// datastore が管理している割り当て済み IP が外部に出ていけるようにする
// その際には先程の gateway が利用されることになる
err := c.dataStore.ReadBackingStore()
rules, err := c.networkClient.GetRuleList()
for _, info := range c.dataStore.AllocatedIPs() {
// Update ip rules in case there is a change in VPC CIDRs, AWS_VPC_K8S_CNI_EXTERNALSNAT setting
srcIPNet := net.IPNet{IP: net.ParseIP(info.IP), Mask: net.IPv4Mask(255, 255, 255, 255)}
err = c.networkClient.UpdateRuleListBySrc(rules, srcIPNet, pbVPCcidrs, !c.networkClient.UseExternalSNAT())
}

// Node に対して指定している分の IP が pool されてない場合は、
// datastore から使っていない ENI を割り当て、その IP を追加する
increasedPool, err := c.tryAssignIPs()
if err == nil && increasedPool {
c.updateLastNodeIPPoolAction()
}

return err
}

まず Primary ENI の Network 設定を行います。
中身は SNAT 及び NodePort traffic mangle の設定を ip rule、iptables で愚直に行う感じです。

  • iptables による SNAT の設定
  • NodePort Service を利用する場合は main route table を利用させる IP Rule 及び Primary ENI を経由する traffic への mangle を追加
err = c.networkClient.SetupHostNetwork(vpcCIDR, vpcCIDRs, c.awsClient.GetPrimaryENImac(), &primaryIP)

次に Node に紐づく ENI の設定全般を行います。

  • ENIs を DataStore に追加
  • Secondary ENIs の Network 設定
  • 全ての ENI の全ての Secondary IP を DataStore に追加する
enis := c.filterUnmanagedENIs(eniMetadata)
for _, eni := range enis {
err = c.setupENI(eni.ENIID, eni)
}

Secondary ENIs の Network 設定だけ詳しく紹介すると、意外とシンプルなもので、下記を行っているに過ぎないことがわかります。

  • ENI Primary IP を MAC の netlink に割り当てて UP
  • ENI subnet CIDR を元に gateway を作り、そこを介する通信のみ許可する
  • ENI Subnet の default route table を disable して、CNI-Plugin 側で用意した route table を利用させる
func setupENINetwork(eniIP string, eniMAC string, eniTable int, eniSubnetCIDR string, netLink netlinkwrapper.NetLink, retryLinkByMacInterval time.Duration, retryRouteAddInterval time.Duration, mtu int) error {
// MAC address から linux netlink を取得し up する
link, err := LinkByMac(eniMAC, netLink, retryLinkByMacInterval)
err = netLink.LinkSetMTU(link, mtu)
err = netLink.LinkSetUp(link)

// subnet CIDR から gateway ip を取得
deviceNumber := link.Attrs().Index
_, ipnet, err := net.ParseCIDR(eniSubnetCIDR)
gw, err := incrementIPv4Addr(ipnet.IP)

// Explicitly set the IP on the device if not already set.
// Required for older kernels.
// ip addr show
// ip add del <eniIP> dev <link> (if necessary)
// ip add add <eniIP> dev <link>
addrs, err := netLink.AddrList(link, unix.AF_INET)
for _, addr := range addrs {
err = netLink.AddrDel(link, &addr)
}

// ENI Primary IP を MAC に紐づく netlink に設定する
eniAddr := &net.IPNet{
IP: net.ParseIP(eniIP),
Mask: ipnet.Mask,
}
err = netLink.AddrAdd(link, &netlink.Addr{IPNet: eniAddr})

// gateway への inbound, local address(VPC) への outbound を netlink に設定する
routes := []netlink.Route{
// Add a direct link route for the host's ENI IP only
{
LinkIndex: deviceNumber,
Dst: &net.IPNet{IP: gw, Mask: net.CIDRMask(32, 32)},
Scope: netlink.SCOPE_LINK, // RT_SCOPE_LINK
Table: eniTable, // device_number
},
// Route all other traffic via the host's ENI IP
{
LinkIndex: deviceNumber,
Dst: &net.IPNet{IP: net.IPv4zero, Mask: net.CIDRMask(0, 32)},
Scope: netlink.SCOPE_UNIVERSE, // RT_SCOPE_UNIVERSE
Gw: gw,
Table: eniTable, // device_number
},
}
for _, r := range routes {
err := netLink.RouteDel(&r)
err = retry.RetryNWithBackoff(retry.NewSimpleBackoff(500*time.Millisecond, retryRouteAddInterval, 0.15, 2.0), maxRetryRouteAdd, func() error {
err := netLink.RouteReplace(&r)
return nil
})
}

// subnet の main route table を disable
_, cidr, err := net.ParseCIDR(eniSubnetCIDR)
defaultRoute := netlink.Route{
Dst: cidr,
Src: net.ParseIP(eniIP),
Table: mainRoutingTable,
Scope: netlink.SCOPE_LINK,
}
err := netLink.RouteDel(&defaultRoute)
return nil
}

続いて、DataStore から割り当て済み IP の一覧を取得し、その経路を設定します。

err := c.dataStore.ReadBackingStore()
rules, err := c.networkClient.GetRuleList()
for _, info := range c.dataStore.AllocatedIPs() {
// Update ip rules in case there is a change in VPC CIDRs, AWS_VPC_K8S_CNI_EXTERNALSNAT setting
srcIPNet := net.IPNet{IP: net.ParseIP(info.IP), Mask: net.IPv4Mask(255, 255, 255, 255)}
err = c.networkClient.UpdateRuleListBySrc(rules, srcIPNet, pbVPCcidrs, !c.networkClient.UseExternalSNAT())
}

経路設定の部分を詳しく見ると、割り当て済み IP を source とする rule を VPC に出ていけるように書き換えているだけだとわかります。

func (n *linuxNetwork) UpdateRuleListBySrc(ruleList []netlink.Rule, src net.IPNet, toCIDRs []string, requiresSNAT bool) error {
srcRuleList, err := n.GetRuleListBySrc(ruleList, src)
var srcRuleTable int
for _, rule := range srcRuleList {
srcRuleTable = rule.Table
err := n.netLink.RuleDel(&rule)

var toDst string
if rule.Dst != nil {
toDst = rule.Dst.String()
}
}

if requiresSNAT {
allCIDRs := append(toCIDRs, n.excludeSNATCIDRs...)
for _, cidr := range allCIDRs {
podRule := n.netLink.NewRule()
_, podRule.Dst, _ = net.ParseCIDR(cidr)
podRule.Src = &src
podRule.Table = srcRuleTable
podRule.Priority = fromPodRulePriority
err = n.netLink.RuleAdd(podRule)

var toDst string
if podRule.Dst != nil {
toDst = podRule.Dst.String()
}
}
} else {
podRule := n.netLink.NewRule()
podRule.Src = &src
podRule.Table = srcRuleTable
podRule.Priority = fromPodRulePriority
err = n.netLink.RuleAdd(podRule)
}

return nil
}

最後に現在 attach されている ENI から configuration の desired を目指して IP の確保を行い、nodeInit 及び L-IPAMD の初期化が完了します。

increasedPool, err := c.tryAssignIPs()

L-IPAMD の実行

ここまでで L-IPAMD が初期化する際に必要となる処理を見てきました。
Host Instance から VPC への経路をどのように管理しているかがわかったと思います。

続いて、この L-IPAMD をどのように利用するかを見ていきます。

L-IPAMD の Entrypoint を見ると、主に下記の background thread を回していることがわかります。

  • IP Pool 管理用の background thread
  • L-IPAMD の admin server
  • CRI からのリクエストを受ける RPC Server
kubeClient, err := k8sapi.CreateKubeClient()
eniConfigController := eniconfig.NewENIConfigController()
if ipamd.UseCustomNetworkCfg() {
go eniConfigController.Start()
}
// L-IPAMD の初期化
ipamContext, err := ipamd.New(kubeClient, eniConfigController)
// Pool manager
go ipamContext.StartNodeIPPoolManager()
// CNI introspection endpoints
go ipamContext.ServeIntrospection()
// Start the RPC listener
err = ipamContext.RunRPCHandler()

IP Pool 管理用の background thread

Method の中身はこんな感じです。1つ1つ見ていきましょう。

func (c *IPAMContext) StartNodeIPPoolManager() {
sleepDuration := ipPoolMonitorInterval / 2
for {
if !c.disableENIProvisioning {
time.Sleep(sleepDuration)
c.updateIPPoolIfRequired()
}
time.Sleep(sleepDuration)
c.nodeIPPoolReconcile(nodeIPPoolReconcileInterval)
}
}

updateIPPoolIfRequired はおそらくイメージ通りの仕事をしています。

IP が足りない場合

  • 既存の ENI から IP を取得できるか試行。取得できた場合は IP を DataStore に記録しておきます
  • だめなら新しい ENI を Host Instance に Allocate します。Allocated ENI は L-IPAMD の reconcile loop によって将来の iteration で IP を取得できるようになります

IP が余っている場合

  • IP を開放するべく DataStore から削除します

ENI が余っている場合

  • ENI を Deallocate して DataStore から削除します
func (c *IPAMContext) updateIPPoolIfRequired() {
if c.nodeIPPoolTooLow() {
c.increaseIPPool()
} else if c.nodeIPPoolTooHigh() {
c.decreaseIPPool(decreaseIPPoolInterval)
}

if c.shouldRemoveExtraENIs() {
c.tryFreeENI()
}
}

nodeIPPoolReconcile は DataStore の情報を最新のものに更新します。

  • 新規の ENI の Setup 及び既存 ENI の metadata 更新
  • Cooldown 完了した Deallocated IP を DataStore に戻す
  • 使っていない IP を DataStore から削除
  • 使っていない ENI を DataStore から削除
func (c *IPAMContext) nodeIPPoolReconcile(interval time.Duration) {
allENIs, err := c.awsClient.GetAttachedENIs()
attachedENIs := c.filterUnmanagedENIs(allENIs)
currentENIIPPools := c.dataStore.GetENIInfos().ENIs

// Check if a new ENI was added, if so we need to update the tags
needToUpdateTags := false
for _, attachedENI := range attachedENIs {
if _, ok := currentENIIPPools[attachedENI.ENIID]; !ok {
needToUpdateTags = true
break
}
}
if needToUpdateTags {
allENIs, tagMap, err := c.awsClient.DescribeAllENIs()
c.setUnmanagedENIs(tagMap)
attachedENIs = c.filterUnmanagedENIs(allENIs)
}

// Mark phase
for _, attachedENI := range attachedENIs {
eniIPPool, err := c.dataStore.GetENIIPs(attachedENI.ENIID)
if err == nil {
// Reconcile IP pool
c.eniIPPoolReconcile(eniIPPool, attachedENI, attachedENI.ENIID)
// Mark action, remove this ENI from currentENIIPPools map
delete(currentENIIPPools, attachedENI.ENIID)
continue
}

// Add new ENI
err = c.setupENI(attachedENI.ENIID, attachedENI)
}

// Sweep phase: since the marked ENI have been removed, the remaining ones needs to be sweeped
for eni := range currentENIIPPools {
// Force the delete, since aws local metadata has told us that this ENI is no longer attached, so any IPs assigned from this ENI will no longer work.
err = c.dataStore.RemoveENIFromDataStore(eni, true /* force */)
}
}

CRI からのリクエストを受ける RPC Server

CRI からは gRPC でリクエストを受ける必要があるので、その gRPC Server の設定をします。CNI の仕様からもわかるように proto file はとっても簡潔です。

func (c *IPAMContext) RunRPCHandler() error {
listener, err := net.Listen("tcp", ipamdgRPCaddress)
grpcServer := grpc.NewServer()
rpc.RegisterCNIBackendServer(grpcServer, &server{ipamContext: c})
// health check の設定
healthServer := health.NewServer()
healthServer.SetServingStatus(grpcHealthServiceName, healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(grpcServer, healthServer)

// reflection service の設定
reflection.Register(grpcServer)
go c.shutdownListener()
err := grpcServer.Serve(listener)

return nil
}

ADD Network Operation を見てみる

ここまで L-IPAMD の Daemon としての挙動はつかめたので、ついに CRI からリクエストを受けた際の RPC 処理を見ていこうと思います。

下記が CNI-Plugin の ADD implementation です。
これまたシンプルで、やっているのは下記だけです。

  • gRPC server 経由で Container に割り振る IP を DataStore から取得
  • 渡された Container metadata をもとに Container/Host 間を veth でつなぐ
func cmdAdd(args *skel.CmdArgs) error {
return add(args, typeswrapper.New(), grpcwrapper.New(), rpcwrapper.New(), driver.New())
}

func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrapper.GRPC,
rpcClient rpcwrapper.RPC, driverClient driver.NetworkAPIs) error {
c := rpcClient.NewCNIBackendClient(conn)
r, err := c.AddNetwork(context.Background(),
&pb.AddNetworkRequest{
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
Netns: args.Netns,
ContainerID: args.ContainerID,
NetworkName: conf.Name,
IfName: args.IfName,
})
addr := &net.IPNet{
IP: net.ParseIP(r.IPv4Addr),
Mask: net.IPv4Mask(255, 255, 255, 255),
}

// cni-plugin backend からは IP, device-number(ENI-ID), VPC-CIDR, SNAT-config を受取り、実際の network 設定を行っていく
hostVethName := generateHostVethName(conf.VethPrefix, conf.Name, args.ContainerID, args.IfName)
err = driverClient.SetupNS(hostVethName, args.IfName, args.Netns, addr, int(r.DeviceNumber), r.VPCcidrs, r.UseExternalSNAT, mtu, log)
ips := []*current.IPConfig{
{
Version: "4",
Address: *addr,
},
}
result := &current.Result{
IPs: ips,
}

return cniTypes.PrintResult(result, conf.CNIVersion)
}

AddNetwork は IPAMContext 経由でこれまで見てきた DataStore で管理されている available IP を取得します。

func (s *server) AddNetwork(ctx context.Context, in *rpc.AddNetworkRequest) (*rpc.AddNetworkReply, error) {
ipamKey := datastore.IPAMKey{
ContainerID: in.ContainerID,
IfName: in.IfName,
NetworkName: in.NetworkName,
}
addr, deviceNumber, err := s.ipamContext.dataStore.AssignPodIPv4Address(ipamKey)

var pbVPCcidrs []string
for _, cidr := range s.ipamContext.awsClient.GetVPCIPv4CIDRs() {
pbVPCcidrs = append(pbVPCcidrs, *cidr)
}

useExternalSNAT := s.ipamContext.networkClient.UseExternalSNAT()
if !useExternalSNAT {
for _, cidr := range s.ipamContext.networkClient.GetExcludeSNATCIDRs() {
pbVPCcidrs = append(pbVPCcidrs, cidr)
}
}

resp := rpc.AddNetworkReply{
Success: err == nil,
IPv4Addr: addr,
DeviceNumber: int32(deviceNumber),
UseExternalSNAT: useExternalSNAT,
VPCcidrs: pbVPCcidrs,
}

return &resp, nil
}

SetupNS では Container/Host 間の veth 設定を行います。

  • kubelet から指定された netns において Container/Host 間の veth と routing を設定する
  • Host veth は Host netns に戻されるので、Host veth の UP 及び Host netns 側での routing 設定
  • Host で Secondary ENI 経由の from-container traffic を VPC に出ていけるようにする
func (os *linuxNetwork) SetupNS(hostVethName string, contVethName string, netnsPath string, addr *net.IPNet, table int, vpcCIDRs []string, useExternalSNAT bool, mtu int, log logger.Logger) error {
return setupNS(hostVethName, contVethName, netnsPath, addr, table, vpcCIDRs, useExternalSNAT, os.netLink, os.ns, mtu, log, os.procSys)
}

func setupNS(hostVethName string, contVethName string, netnsPath string, addr *net.IPNet, table int, vpcCIDRs []string, useExternalSNAT bool, netLink netlinkwrapper.NetLink, ns nswrapper.NS, mtu int, log logger.Logger, procSys procsyswrapper.ProcSys) error {
// container netns 側での veth, routing 設定
createVethContext := newCreateVethPairContext(contVethName, hostVethName, addr, mtu)
err := ns.WithNetNSPath(netnsPath, createVethContext.run)

// IPv6 の自動 routing を無効化する
hostVeth, err := netLink.LinkByName(hostVethName)
err := procSys.Set(fmt.Sprintf("net/ipv6/conf/%s/accept_ra", hostVethName), "0")
err := procSys.Set(fmt.Sprintf("net/ipv6/conf/%s/accept_redirects", hostVethName), "0")

// host netns 側での veth, routing 設定
err = netLink.LinkSetUp(hostVeth)
addrHostAddr := &net.IPNet{
IP: addr.IP,
Mask: net.CIDRMask(32, 32)}
route := netlink.Route{
LinkIndex: hostVeth.Attrs().Index,
Scope: netlink.SCOPE_LINK,
Dst: addrHostAddr}
err := netLink.RouteReplace(&route)
err = addContainerRule(netLink, true, addr, mainRouteTable)

// Primary ENI でない場合は from-container rule も main route table に 追加
if table > 0 {
if useExternalSNAT {
// add rule: 1536: from <podIP> use table <table>
err = addContainerRule(netLink, false, addr, table)
} else {
// add rule: 1536: list of from <podIP> to <vpcCIDR> use table <table>
for _, cidr := range vpcCIDRs {
podRule := netLink.NewRule()
_, podRule.Dst, _ = net.ParseCIDR(cidr)
podRule.Src = addr
podRule.Table = table
podRule.Priority = fromContainerRulePriority
err = netLink.RuleAdd(podRule)
}
}
}

return nil
}

kubelet から指定された netns において Container/Host 間の Network を設定していく処理がここで、

createVethContext := newCreateVethPairContext(contVethName, hostVethName, addr, mtu)
err := ns.WithNetNSPath(netnsPath, createVethContext.run)

createVethContext.run の中で実際に Container と Host を結ぶ veth を用意して、veth を用いる経路設定を行います。

Container から Host への経路設定も比較的オーソドックスな手法を取っているようで、Container 内に dummy の default gateway を用意して、Static ARP Entry をでっちあげ、dummy gateway への通信が Host veth に流れるようにする感じです。
Container veth には cni-plugin backend から受け取った IP を割り振っておきます。

最後に Host veth を Host Instance 用の netns に移動して完了です。

func (createVethContext *createVethPairContext) run(hostNS ns.NetNS) error {
// Equivalent to: `ip link add $link`
veth := &netlink.Veth{
LinkAttrs: netlink.LinkAttrs{
Name: createVethContext.contVethName,
Flags: net.FlagUp,
MTU: createVethContext.mtu,
},
PeerName: createVethContext.hostVethName,
}
err := createVethContext.netLink.LinkAdd(veth)

// Equivalent to: `ip link set $link up`
hostVeth, err := createVethContext.netLink.LinkByName(createVethContext.hostVethName)
err = createVethContext.netLink.LinkSetUp(hostVeth)
contVeth, err := createVethContext.netLink.LinkByName(createVethContext.contVethName)
err = createVethContext.netLink.LinkSetUp(contVeth)

// Equivalent to: `ip route replace $route`
// Equivalent to: `ip route add $route`
// Equivalent to: `ip addr add $addr dev $link`
gw := net.IPv4(169, 254, 1, 1)
gwNet := &net.IPNet{IP: gw, Mask: net.CIDRMask(32, 32)}
err = createVethContext.netLink.RouteReplace(&netlink.Route{
LinkIndex: contVeth.Attrs().Index,
Scope: netlink.SCOPE_LINK,
Dst: gwNet})
err = createVethContext.ip.AddDefaultRoute(gwNet.IP, contVeth)
err = createVethContext.netLink.AddrAdd(contVeth, &netlink.Addr{IPNet: createVethContext.addr})

// Equivalent to: `bridge fdb append...`
neigh := &netlink.Neigh{
LinkIndex: contVeth.Attrs().Index,
State: netlink.NUD_PERMANENT,
IP: gwNet.IP,
HardwareAddr: hostVeth.Attrs().HardwareAddr,
}
err = createVethContext.netLink.NeighAdd(neigh)

// pod setup が完了したので最後に host veth を netns に移動させる
// Similar to: `ip link set $link netns $ns`
err = createVethContext.netLink.LinkSetNsFd(hostVeth, int(hostNS.Fd()))

return nil
}

Container netns 側での処理が終わったので、Host netns 側の処理に戻ります。Host veth を Host netns で再度 UP して host to container の ruleを main route table に設定します。

err = netLink.LinkSetUp(hostVeth)
addrHostAddr := &net.IPNet{
IP: addr.IP,
Mask: net.CIDRMask(32, 32)}
route := netlink.Route{
LinkIndex: hostVeth.Attrs().Index,
Scope: netlink.SCOPE_LINK,
Dst: addrHostAddr}
err := netLink.RouteReplace(&route)
err = addContainerRule(netLink, true, addr, mainRouteTable)

IP rule を設定する実装はこんな感じ。

func addContainerRule(netLink netlinkwrapper.NetLink, isToContainer bool, addr *net.IPNet, table int) error {
containerRule := netLink.NewRule()
if isToContainer {
// Example: 512: from all to 10.200.202.222 lookup main
containerRule.Dst = addr
containerRule.Priority = toContainerRulePriority
} else {
// Example: 1536: from 10.200.202.222 to 10.200.0.0/16 lookup 2
containerRule.Src = addr
containerRule.Priority = fromContainerRulePriority
}
containerRule.Table = table

err := netLink.RuleDel(containerRule)
err = netLink.RuleAdd(containerRule)

return nil
}

ここまででちょうどこの図のような設定にたどり着きます。

https://github.com/aws/amazon-vpc-cni-k8s/blob/bc04604397889430f0a3d5f6e4766b399c1d5fcc/docs/cni-proposal.md#pod-to-pod-communication

最後に Secondary ENI 経由の container to host の rule も別途 main route table に追加してあげることで、VPC へ出ていけるようにします。

if table > 0 {
if useExternalSNAT {
// add rule: 1536: from <podIP> use table <table>
err = addContainerRule(netLink, false, addr, table)
} else {
// add rule: 1536: list of from <podIP> to <vpcCIDR> use table <table>
for _, cidr := range vpcCIDRs {
podRule := netLink.NewRule()
_, podRule.Dst, _ = net.ParseCIDR(cidr)
podRule.Src = addr
podRule.Table = table
podRule.Priority = fromContainerRulePriority
err = netLink.RuleAdd(podRule)
}
}
}

まとめ

実際に読んでみるとそんなに難しいことをやっていないことがわかるのですが、この全体像を掴めているかどうかで何か調査したいときにかかる時間がぐっと変わってきそうだと感じました。

何か誤ったことを書いてたらご指摘いただけると幸いです。

--

--