From 1796467bdc576a4b226e3c7e227c7403f9f8707c Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Thu, 15 May 2025 14:16:09 +0800 Subject: [PATCH 1/4] Skip offload authz policy when link source is waypoint pod Signed-off-by: LiZhenCheng9527 --- .../bpf2go/dualengine/kmeshcgroupskb_bpfeb.go | 3 + .../bpf2go/dualengine/kmeshcgroupskb_bpfel.go | 3 + .../dualengine/kmeshcgroupskbcompat_bpfeb.go | 3 + .../dualengine/kmeshcgroupskbcompat_bpfel.go | 3 + .../kmeshcgroupsockworkload_bpfeb.go | 3 + .../kmeshcgroupsockworkload_bpfel.go | 3 + .../kmeshcgroupsockworkloadcompat_bpfeb.go | 3 + .../kmeshcgroupsockworkloadcompat_bpfel.go | 3 + .../dualengine/kmeshsockopsworkload_bpfeb.go | 3 + .../dualengine/kmeshsockopsworkload_bpfel.go | 3 + .../kmeshsockopsworkloadcompat_bpfeb.go | 3 + .../kmeshsockopsworkloadcompat_bpfel.go | 3 + .../bpf2go/dualengine/kmeshxdpauth_bpfeb.go | 3 + .../bpf2go/dualengine/kmeshxdpauth_bpfel.go | 3 + .../dualengine/kmeshxdpauthcompat_bpfeb.go | 3 + .../dualengine/kmeshxdpauthcompat_bpfel.go | 3 + bpf/kmesh/workload/include/authz.h | 31 +++++++ bpf/kmesh/workload/include/config.h | 1 + bpf/kmesh/workload/include/workload.h | 12 +++ bpf/kmesh/workload/xdp.c | 6 ++ pkg/controller/workload/bpfcache/waypoint.go | 47 +++++++++++ .../workload/cache/waypoint_cache.go | 24 ++++-- pkg/controller/workload/workload_processor.go | 80 +++++++++++++------ 23 files changed, 221 insertions(+), 28 deletions(-) create mode 100644 pkg/controller/workload/bpfcache/waypoint.go diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskb_bpfeb.go b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskb_bpfeb.go index e8c642f46..eef7b6588 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskb_bpfeb.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskb_bpfeb.go @@ -119,6 +119,7 @@ type KmeshCgroupSkbMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.MapSpec `ebpf:"kmesh_map192"` @@ -168,6 +169,7 @@ type KmeshCgroupSkbMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.Map `ebpf:"kmesh_map192"` @@ -190,6 +192,7 @@ func (m *KmeshCgroupSkbMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmeshMap1600, m.KmeshMap192, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskb_bpfel.go b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskb_bpfel.go index 72c31aac1..4c81de5fb 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskb_bpfel.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskb_bpfel.go @@ -119,6 +119,7 @@ type KmeshCgroupSkbMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.MapSpec `ebpf:"kmesh_map192"` @@ -168,6 +169,7 @@ type KmeshCgroupSkbMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.Map `ebpf:"kmesh_map192"` @@ -190,6 +192,7 @@ func (m *KmeshCgroupSkbMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmeshMap1600, m.KmeshMap192, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskbcompat_bpfeb.go b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskbcompat_bpfeb.go index df82fc226..b9b674464 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskbcompat_bpfeb.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskbcompat_bpfeb.go @@ -119,6 +119,7 @@ type KmeshCgroupSkbCompatMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.MapSpec `ebpf:"kmesh_map192"` @@ -168,6 +169,7 @@ type KmeshCgroupSkbCompatMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.Map `ebpf:"kmesh_map192"` @@ -190,6 +192,7 @@ func (m *KmeshCgroupSkbCompatMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmeshMap1600, m.KmeshMap192, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskbcompat_bpfel.go b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskbcompat_bpfel.go index e144c5259..954334bff 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskbcompat_bpfel.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupskbcompat_bpfel.go @@ -119,6 +119,7 @@ type KmeshCgroupSkbCompatMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.MapSpec `ebpf:"kmesh_map192"` @@ -168,6 +169,7 @@ type KmeshCgroupSkbCompatMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.Map `ebpf:"kmesh_map192"` @@ -190,6 +192,7 @@ func (m *KmeshCgroupSkbCompatMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmeshMap1600, m.KmeshMap192, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkload_bpfeb.go b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkload_bpfeb.go index c7166ba40..67e5a42a0 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkload_bpfeb.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkload_bpfeb.go @@ -120,6 +120,7 @@ type KmeshCgroupSockWorkloadMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.MapSpec `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` @@ -170,6 +171,7 @@ type KmeshCgroupSockWorkloadMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.Map `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` @@ -194,6 +196,7 @@ func (m *KmeshCgroupSockWorkloadMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmXdpTailcall, m.KmeshMap1600, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkload_bpfel.go b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkload_bpfel.go index 1cb9ba402..194d76435 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkload_bpfel.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkload_bpfel.go @@ -120,6 +120,7 @@ type KmeshCgroupSockWorkloadMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.MapSpec `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` @@ -170,6 +171,7 @@ type KmeshCgroupSockWorkloadMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.Map `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` @@ -194,6 +196,7 @@ func (m *KmeshCgroupSockWorkloadMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmXdpTailcall, m.KmeshMap1600, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkloadcompat_bpfeb.go b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkloadcompat_bpfeb.go index 2fcdd500d..3f660b33f 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkloadcompat_bpfeb.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkloadcompat_bpfeb.go @@ -120,6 +120,7 @@ type KmeshCgroupSockWorkloadCompatMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.MapSpec `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` @@ -170,6 +171,7 @@ type KmeshCgroupSockWorkloadCompatMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.Map `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` @@ -194,6 +196,7 @@ func (m *KmeshCgroupSockWorkloadCompatMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmXdpTailcall, m.KmeshMap1600, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkloadcompat_bpfel.go b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkloadcompat_bpfel.go index 0c4014ae4..86fa05cbd 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkloadcompat_bpfel.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshcgroupsockworkloadcompat_bpfel.go @@ -120,6 +120,7 @@ type KmeshCgroupSockWorkloadCompatMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.MapSpec `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` @@ -170,6 +171,7 @@ type KmeshCgroupSockWorkloadCompatMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.Map `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` @@ -194,6 +196,7 @@ func (m *KmeshCgroupSockWorkloadCompatMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmXdpTailcall, m.KmeshMap1600, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkload_bpfeb.go b/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkload_bpfeb.go index 941895484..e72b2805a 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkload_bpfeb.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkload_bpfeb.go @@ -119,6 +119,7 @@ type KmeshSockopsWorkloadMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.MapSpec `ebpf:"kmesh_map192"` @@ -170,6 +171,7 @@ type KmeshSockopsWorkloadMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.Map `ebpf:"kmesh_map192"` @@ -193,6 +195,7 @@ func (m *KmeshSockopsWorkloadMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmeshMap1600, m.KmeshMap192, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkload_bpfel.go b/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkload_bpfel.go index e5305783a..9baa51c82 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkload_bpfel.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkload_bpfel.go @@ -119,6 +119,7 @@ type KmeshSockopsWorkloadMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.MapSpec `ebpf:"kmesh_map192"` @@ -170,6 +171,7 @@ type KmeshSockopsWorkloadMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.Map `ebpf:"kmesh_map192"` @@ -193,6 +195,7 @@ func (m *KmeshSockopsWorkloadMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmeshMap1600, m.KmeshMap192, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkloadcompat_bpfeb.go b/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkloadcompat_bpfeb.go index 1279a78c2..160019ccf 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkloadcompat_bpfeb.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkloadcompat_bpfeb.go @@ -119,6 +119,7 @@ type KmeshSockopsWorkloadCompatMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.MapSpec `ebpf:"kmesh_map192"` @@ -170,6 +171,7 @@ type KmeshSockopsWorkloadCompatMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.Map `ebpf:"kmesh_map192"` @@ -193,6 +195,7 @@ func (m *KmeshSockopsWorkloadCompatMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmeshMap1600, m.KmeshMap192, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkloadcompat_bpfel.go b/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkloadcompat_bpfel.go index 78760435b..85bd21b02 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkloadcompat_bpfel.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshsockopsworkloadcompat_bpfel.go @@ -119,6 +119,7 @@ type KmeshSockopsWorkloadCompatMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.MapSpec `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.MapSpec `ebpf:"kmesh_map192"` @@ -170,6 +171,7 @@ type KmeshSockopsWorkloadCompatMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcpProbe *ebpf.Map `ebpf:"km_tcp_probe"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` KmeshMap192 *ebpf.Map `ebpf:"kmesh_map192"` @@ -193,6 +195,7 @@ func (m *KmeshSockopsWorkloadCompatMaps) Close() error { m.KmSockstorage, m.KmTcpProbe, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmeshMap1600, m.KmeshMap192, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshxdpauth_bpfeb.go b/bpf/kmesh/bpf2go/dualengine/kmeshxdpauth_bpfeb.go index 0cd777497..34488c2ce 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshxdpauth_bpfeb.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshxdpauth_bpfeb.go @@ -107,6 +107,7 @@ type KmeshXDPAuthMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcargs *ebpf.MapSpec `ebpf:"km_tcargs"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.MapSpec `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` @@ -156,6 +157,7 @@ type KmeshXDPAuthMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcargs *ebpf.Map `ebpf:"km_tcargs"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.Map `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` @@ -179,6 +181,7 @@ func (m *KmeshXDPAuthMaps) Close() error { m.KmSockstorage, m.KmTcargs, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmXdpTailcall, m.KmeshMap1600, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshxdpauth_bpfel.go b/bpf/kmesh/bpf2go/dualengine/kmeshxdpauth_bpfel.go index e1fe7cf53..b8961b09c 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshxdpauth_bpfel.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshxdpauth_bpfel.go @@ -107,6 +107,7 @@ type KmeshXDPAuthMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcargs *ebpf.MapSpec `ebpf:"km_tcargs"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.MapSpec `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` @@ -156,6 +157,7 @@ type KmeshXDPAuthMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcargs *ebpf.Map `ebpf:"km_tcargs"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.Map `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` @@ -179,6 +181,7 @@ func (m *KmeshXDPAuthMaps) Close() error { m.KmSockstorage, m.KmTcargs, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmXdpTailcall, m.KmeshMap1600, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshxdpauthcompat_bpfeb.go b/bpf/kmesh/bpf2go/dualengine/kmeshxdpauthcompat_bpfeb.go index 78dbdb4db..33deeb524 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshxdpauthcompat_bpfeb.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshxdpauthcompat_bpfeb.go @@ -107,6 +107,7 @@ type KmeshXDPAuthCompatMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcargs *ebpf.MapSpec `ebpf:"km_tcargs"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.MapSpec `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` @@ -156,6 +157,7 @@ type KmeshXDPAuthCompatMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcargs *ebpf.Map `ebpf:"km_tcargs"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.Map `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` @@ -179,6 +181,7 @@ func (m *KmeshXDPAuthCompatMaps) Close() error { m.KmSockstorage, m.KmTcargs, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmXdpTailcall, m.KmeshMap1600, diff --git a/bpf/kmesh/bpf2go/dualengine/kmeshxdpauthcompat_bpfel.go b/bpf/kmesh/bpf2go/dualengine/kmeshxdpauthcompat_bpfel.go index ac06381a8..7c46add23 100644 --- a/bpf/kmesh/bpf2go/dualengine/kmeshxdpauthcompat_bpfel.go +++ b/bpf/kmesh/bpf2go/dualengine/kmeshxdpauthcompat_bpfel.go @@ -107,6 +107,7 @@ type KmeshXDPAuthCompatMapSpecs struct { KmSockstorage *ebpf.MapSpec `ebpf:"km_sockstorage"` KmTcargs *ebpf.MapSpec `ebpf:"km_tcargs"` KmTmpbuf *ebpf.MapSpec `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.MapSpec `ebpf:"km_waypoint"` KmWlpolicy *ebpf.MapSpec `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.MapSpec `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.MapSpec `ebpf:"kmesh_map1600"` @@ -156,6 +157,7 @@ type KmeshXDPAuthCompatMaps struct { KmSockstorage *ebpf.Map `ebpf:"km_sockstorage"` KmTcargs *ebpf.Map `ebpf:"km_tcargs"` KmTmpbuf *ebpf.Map `ebpf:"km_tmpbuf"` + KmWaypoint *ebpf.Map `ebpf:"km_waypoint"` KmWlpolicy *ebpf.Map `ebpf:"km_wlpolicy"` KmXdpTailcall *ebpf.Map `ebpf:"km_xdp_tailcall"` KmeshMap1600 *ebpf.Map `ebpf:"kmesh_map1600"` @@ -179,6 +181,7 @@ func (m *KmeshXDPAuthCompatMaps) Close() error { m.KmSockstorage, m.KmTcargs, m.KmTmpbuf, + m.KmWaypoint, m.KmWlpolicy, m.KmXdpTailcall, m.KmeshMap1600, diff --git a/bpf/kmesh/workload/include/authz.h b/bpf/kmesh/workload/include/authz.h index 66f89d7d1..39b9fd80d 100644 --- a/bpf/kmesh/workload/include/authz.h +++ b/bpf/kmesh/workload/include/authz.h @@ -10,6 +10,7 @@ #include "tail_call.h" #include "workloadapi/security/authorization.pb-c.h" #include "config.h" +#include "workload.h" #define AUTH_ALLOW 0 #define AUTH_DENY 1 @@ -127,6 +128,30 @@ static inline void parser_tuple(struct xdp_info *info, struct bpf_sock_tuple *tu } } +// Unconditional trust for links with waypoint svc +static int from_waypoint(struct bpf_sock_tuple *tuple, struct xdp_info *info) { + waypoint_key key = {0}; + __u32 *waypoint_value; + + if (info->iph->version == IPV4_VERSION) { + key.waypoint_addr.ip4 = tuple->ipv4.saddr; + } else { + bpf_memcpy((__u8 *)key.waypoint_addr.ip6, tuple->ipv6.saddr, IPV6_ADDR_LEN); + } + + BPF_LOG(INFO, AUTH, "key ip: %u", key.waypoint_addr.ip4); + + waypoint_value = bpf_map_lookup_elem(&map_of_waypoint, &key); + // src is waypoint, return PASS + if (!waypoint_value) { + BPF_LOG(INFO, AUTH, "src is not waypoint, XDP_DROP"); + return XDP_DROP; + } + BPF_LOG(INFO, AUTH, "waypoint_value is %u", *waypoint_value); + BPF_LOG(INFO, AUTH, "src is waypoint, XDP_PASS"); + return XDP_PASS; +} + static int construct_tuple_key(struct xdp_md *ctx, struct bpf_sock_tuple *tuple_info, struct xdp_info *info) { int ret = parser_xdp_info(ctx, info); @@ -622,6 +647,12 @@ int policy_check(struct xdp_md *ctx) return XDP_PASS; } + if (from_waypoint(&tuple_key, &info) == XDP_PASS) { + BPF_LOG(INFO, AUTH, "[xdp_authz]: pass waypoint"); + return XDP_PASS; + } + BPF_LOG(INFO, AUTH, "check"); + match_ctx = bpf_map_lookup_elem(&kmesh_tc_args, &tuple_key); if (!match_ctx) { BPF_LOG(ERR, AUTH, "failed to retrieve match_context from map"); diff --git a/bpf/kmesh/workload/include/config.h b/bpf/kmesh/workload/include/config.h index d997656fa..57e3357d9 100644 --- a/bpf/kmesh/workload/include/config.h +++ b/bpf/kmesh/workload/include/config.h @@ -30,5 +30,6 @@ #define map_of_wl_policy km_wlpolicy #define kmesh_perf_map km_perf_map #define kmesh_perf_info km_perf_info +#define map_of_waypoint km_waypoint #endif // _CONFIG_H_ diff --git a/bpf/kmesh/workload/include/workload.h b/bpf/kmesh/workload/include/workload.h index 28e1ea64b..88bdad274 100644 --- a/bpf/kmesh/workload/include/workload.h +++ b/bpf/kmesh/workload/include/workload.h @@ -120,4 +120,16 @@ struct { __uint(max_entries, MAP_SIZE_OF_AUTH_POLICY); } map_of_wl_policy SEC(".maps"); +typedef struct { + struct ip_addr waypoint_addr; +} waypoint_key; + +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __uint(key_size, sizeof(waypoint_key)); + __uint(value_size, sizeof(__u32)); + __uint(map_flags, BPF_F_NO_PREALLOC); + __uint(max_entries, 100); +} map_of_waypoint SEC(".maps"); + #endif diff --git a/bpf/kmesh/workload/xdp.c b/bpf/kmesh/workload/xdp.c index ad6d61fe8..b3237cd6b 100644 --- a/bpf/kmesh/workload/xdp.c +++ b/bpf/kmesh/workload/xdp.c @@ -132,6 +132,12 @@ int xdp_shutdown_in_userspace(struct xdp_md *ctx) if (info.iph->version != 4 && info.iph->version != 6) return XDP_PASS; + if (from_waypoint(&tuple_info, &info) == XDP_PASS) { + BPF_LOG(INFO, AUTH, "pass xdp userspace"); + return XDP_PASS; + } + BPF_LOG(INFO, AUTH, "[xdp userspace] done from waypoint"); + // never failed parser_tuple(&info, &tuple_info); diff --git a/pkg/controller/workload/bpfcache/waypoint.go b/pkg/controller/workload/bpfcache/waypoint.go new file mode 100644 index 000000000..02b34465c --- /dev/null +++ b/pkg/controller/workload/bpfcache/waypoint.go @@ -0,0 +1,47 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package bpfcache + +import ( + "errors" + + "github.com/cilium/ebpf" +) + +type WaypointKey struct { + Addr [16]byte +} + +func (c *Cache) WaypointUpdate(key *WaypointKey, value *uint32) error { + log.Debugf("WaypointUpdate [%#v]", *key) + return c.bpfMap.KmWaypoint.Update(key, value, ebpf.UpdateAny) +} + +func (c *Cache) WaypointDelete(key *WaypointKey) error { + log.Debugf("WaypointDelete [%#v]", *key) + err := c.bpfMap.KmWaypoint.Delete(key) + if err != nil && errors.Is(err, ebpf.ErrKeyNotExist) { + return nil + } + return err +} + +func (c *Cache) WaypointLookUp(key *WaypointKey) error { + log.Debugf("WaypointLookup [%#v]", *key) + v := uint32(0) + return c.bpfMap.KmWaypoint.Lookup(key, v) +} diff --git a/pkg/controller/workload/cache/waypoint_cache.go b/pkg/controller/workload/cache/waypoint_cache.go index 0e528fd0f..1a5460920 100644 --- a/pkg/controller/workload/cache/waypoint_cache.go +++ b/pkg/controller/workload/cache/waypoint_cache.go @@ -20,6 +20,7 @@ import ( "sync" "kmesh.net/kmesh/api/v2/workloadapi" + "kmesh.net/kmesh/pkg/controller/workload/bpfcache" ) type WaypointCache interface { @@ -32,6 +33,8 @@ type WaypointCache interface { AddOrUpdateWorkload(workload *workloadapi.Workload) bool DeleteWorkload(uid string) + GetAssociatedObjectsByResourceName(name string) *waypointAssociatedObjects + // Refresh is used to process waypoint service. // If it is a newly added waypoint service, it returns a series of services and workloads that need to be updated // whose hostname type waypoint address should be converted to IP address type. These services and workloads were @@ -70,7 +73,7 @@ type waypointCache struct { workloadToWaypoint map[string]string } -func NewWaypointCache(serviceCache ServiceCache) *waypointCache { +func NewWaypointCache(serviceCache ServiceCache, bpfCache *bpfcache.Cache) *waypointCache { return &waypointCache{ serviceCache: serviceCache, waypointAssociatedObjects: make(map[string]*waypointAssociatedObjects), @@ -111,7 +114,7 @@ func (w *waypointCache) AddOrUpdateService(svc *workloadapi.Service) bool { if associated, ok := w.waypointAssociatedObjects[waypointResourceName]; ok { if associated.isResolved() { // The waypoint corresponding to this service has been resolved. - updateServiceWaypoint(svc, associated.waypointAddress()) + updateServiceWaypoint(svc, associated.WaypointAddress()) ret = true } } else { @@ -124,6 +127,8 @@ func (w *waypointCache) AddOrUpdateService(svc *workloadapi.Service) bool { ret = true } w.waypointAssociatedObjects[waypointResourceName] = newAssociatedObjects(addr) + log.Infof("svc resourceName is: %v", waypointResourceName) + log.Infof("waypointCache is: %v", w.waypointAssociatedObjects) } w.serviceToWaypoint[resourceName] = waypointResourceName // Anyway, add svc to the association list. @@ -148,6 +153,15 @@ func (w *waypointCache) DeleteService(resourceName string) { delete(w.waypointAssociatedObjects, resourceName) } +func (w *waypointCache) GetAssociatedObjectsByResourceName(name string) *waypointAssociatedObjects { + w.mutex.RLock() + defer w.mutex.RUnlock() + if v, ok := w.waypointAssociatedObjects[name]; ok { + return v + } + return nil +} + func (w *waypointCache) AddOrUpdateWorkload(workload *workloadapi.Workload) bool { w.mutex.Lock() defer w.mutex.Unlock() @@ -180,7 +194,7 @@ func (w *waypointCache) AddOrUpdateWorkload(workload *workloadapi.Workload) bool if associated, ok := w.waypointAssociatedObjects[waypointResourceName]; ok { if associated.isResolved() { // The waypoint corresponding to this service has been resolved. - updateWorkloadWaypoint(workload, associated.waypointAddress()) + updateWorkloadWaypoint(workload, associated.WaypointAddress()) ret = true } } else { @@ -226,7 +240,7 @@ func (w *waypointCache) Refresh(svc *workloadapi.Service) ([]*workloadapi.Servic // If this svc is a waypoint service, may need refreshing. if associated, ok := w.waypointAssociatedObjects[resourceName]; ok { - waypointAddr := associated.waypointAddress() + waypointAddr := associated.WaypointAddress() if waypointAddr != nil && waypointAddr.String() == address.String() { return nil, nil } @@ -265,7 +279,7 @@ func (w *waypointAssociatedObjects) isResolved() bool { return w.address != nil } -func (w *waypointAssociatedObjects) waypointAddress() *workloadapi.NetworkAddress { +func (w *waypointAssociatedObjects) WaypointAddress() *workloadapi.NetworkAddress { return w.address } diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 7fdb587e6..aea28855d 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -76,15 +76,16 @@ type Processor struct { func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { serviceCache := cache.NewServiceCache() + bpfCache := bpf.NewCache(workloadMap) return &Processor{ hashName: utils.NewHashName(), - bpf: bpf.NewCache(workloadMap), + bpf: bpfCache, nodeName: os.Getenv("NODE_NAME"), WorkloadCache: cache.NewWorkloadCache(), ServiceCache: serviceCache, EndpointCache: cache.NewEndpointCache(), - WaypointCache: cache.NewWaypointCache(serviceCache), + WaypointCache: cache.NewWaypointCache(serviceCache, bpfCache), locality: bpf.NewLocalityCache(), addressDone: make(chan struct{}, 1), authzDone: make(chan struct{}, 1), @@ -271,6 +272,7 @@ func (p *Processor) deleteServiceFrontendData(service *workloadapi.Service, id u func (p *Processor) removeServiceResources(resources []string) error { for _, name := range resources { + p.deleteWaypoint(name) p.WaypointCache.DeleteService(name) telemetry.DeleteServiceMetric(name) svc := p.ServiceCache.GetService(name) @@ -753,27 +755,8 @@ func (p *Processor) updateServiceMap(service, oldService *workloadapi.Service) e func (p *Processor) handleService(service *workloadapi.Service) error { log.Debugf("handle service resource: %s", service.ResourceName()) - - containsPort := func(port uint32) bool { - for _, p := range service.GetPorts() { - if p.GetServicePort() == port { - return true - } - } - - return false - } - - // Preprocess service, remove the waypoint from waypoint service, otherwise it will fall into a loop in bpf - if service.Waypoint != nil && service.GetWaypoint().GetAddress() != nil && len(service.Addresses) != 0 { - // Currently istiod only set the waypoint address to the first address of the service - // When waypoints of different granularities are deployed together, the only waypoint service to be determined - // is whether it contains port 15021, ref: https://github.com/kmesh-net/kmesh/issues/691 - // TODO: remove when upstream istiod will not set the waypoint address for itself - if slices.Equal(service.GetWaypoint().GetAddress().Address, service.Addresses[0].Address) || containsPort(15021) { - service.Waypoint = nil - } - } + hostname := service.GetWaypoint().GetHostname() + waypointResourceName := hostname.GetNamespace() + "/" + hostname.GetHostname() if resolved := p.WaypointCache.AddOrUpdateService(service); !resolved { // If the hostname type waypoint of service has not been resolved, it will not be processed @@ -783,6 +766,14 @@ func (p *Processor) handleService(service *workloadapi.Service) error { return nil } + log.Infof("[handleService] svc is %v", service) + log.Infof("[handleService] waypoint resource name is %v", waypointResourceName) + if associate := p.WaypointCache.GetAssociatedObjectsByResourceName(waypointResourceName); associate != nil { + waypointSvc := p.ServiceCache.GetService(waypointResourceName) + log.Infof("waypoint svc is %#v", waypointSvc) + p.updateWaypointMap(waypointSvc) + } + oldService := p.ServiceCache.GetService(service.ResourceName()) p.ServiceCache.AddOrUpdateService(service) // update service and endpoint map @@ -845,8 +836,10 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis // Mainly for the convenience of testing. func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, workloads []*workloadapi.Workload) { + log.Infof("1.svcs is %v, workload is %v", services, workloads) var servicesToRefresh []*workloadapi.Service for _, service := range services { + log.Infof("[handleServicesAndWorkloads] svc is %v", service) if err := p.handleService(service); err != nil { log.Errorf("handle service %v failed, err: %v", service.ResourceName(), err) } @@ -858,12 +851,15 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, // Handle services that are deferred due to waypoint hostname resolution. for _, service := range servicesToRefresh { + log.Infof("svc need to refresh is %v", service) if err := p.handleService(service); err != nil { log.Errorf("handle deferred service %v failed, err: %v", service.ResourceName(), err) } } + log.Infof("2.svcs is %v, workload is %v", services, workloads) for _, workload := range workloads { + log.Infof("workload is %v", workload) // TODO: Kmesh supports ServiceEntry if workload.GetAddresses() == nil { log.Warnf("workload: %s/%s addresses is nil", workload.Namespace, workload.Name) @@ -1107,3 +1103,41 @@ func (p *Processor) deleteFrontendByIp(addresses [][]byte) error { return nil } + +func (p *Processor) updateWaypointMap(svc *workloadapi.Service) { + // find waypoint pods + svcId := p.hashName.Hash(svc.ResourceName()) + endpoints := p.EndpointCache.List(svcId) + log.Infof("[updateWaypointMap] endpoints is %#v", endpoints) + for workloadUid, _ := range endpoints { + log.Infof("[updateWaypointMap] workload uid is %#v", workloadUid) + workload := p.WorkloadCache.GetWorkloadByUid(p.hashName.NumToStr(workloadUid)) + log.Infof("[updateWaypointMap] workload is %#v") + ip := []byte{} + for _, addr := range workload.GetAddresses() { + ip = append(ip, addr...) + } + waypointKey := &bpfcache.WaypointKey{} + waypointValue := uint32(1) + nets.CopyIpByteFromSlice(&waypointKey.Addr, ip) + log.Infof("[updateWaypointMap] add waypoint address: %#v", svc.GetAddresses()[0].Address) + log.Infof("[updateWaypointMap] add waypoint key: %#v", waypointKey.Addr) + err := p.bpf.WaypointUpdate(waypointKey, &waypointValue) + if err != nil { + log.Errorf("failed to update waypoint map: %v", err) + } + } +} + +func (p *Processor) deleteWaypoint(resourceName string) { + if associate := p.WaypointCache.GetAssociatedObjectsByResourceName(resourceName); associate != nil { + waypointKey := &bpfcache.WaypointKey{} + nets.CopyIpByteFromSlice(&waypointKey.Addr, associate.WaypointAddress().Address) + log.Infof("delete waypoint address: %#v", associate.WaypointAddress().Address) + log.Infof("delete waypoint key: %#v", waypointKey.Addr) + if err := p.bpf.WaypointDelete(waypointKey); err != nil { + log.Errorf("Failed to delete waypoint: %#v, due to %v", associate.WaypointAddress().Address, err) + return + } + } +} From 969b6ea2923cbcf190c0690adf049531fcf76a15 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Thu, 15 May 2025 14:24:19 +0800 Subject: [PATCH 2/4] clean-up Signed-off-by: LiZhenCheng9527 --- bpf/kmesh/workload/include/authz.h | 20 ++++++++++++------- bpf/kmesh/workload/xdp.c | 6 ------ pkg/controller/workload/bpfcache/waypoint.go | 6 ------ .../workload/cache/waypoint_cache.go | 5 +---- pkg/controller/workload/workload_processor.go | 20 ++----------------- 5 files changed, 16 insertions(+), 41 deletions(-) diff --git a/bpf/kmesh/workload/include/authz.h b/bpf/kmesh/workload/include/authz.h index 39b9fd80d..f6791ae38 100644 --- a/bpf/kmesh/workload/include/authz.h +++ b/bpf/kmesh/workload/include/authz.h @@ -139,16 +139,11 @@ static int from_waypoint(struct bpf_sock_tuple *tuple, struct xdp_info *info) { bpf_memcpy((__u8 *)key.waypoint_addr.ip6, tuple->ipv6.saddr, IPV6_ADDR_LEN); } - BPF_LOG(INFO, AUTH, "key ip: %u", key.waypoint_addr.ip4); - waypoint_value = bpf_map_lookup_elem(&map_of_waypoint, &key); // src is waypoint, return PASS if (!waypoint_value) { - BPF_LOG(INFO, AUTH, "src is not waypoint, XDP_DROP"); return XDP_DROP; } - BPF_LOG(INFO, AUTH, "waypoint_value is %u", *waypoint_value); - BPF_LOG(INFO, AUTH, "src is waypoint, XDP_PASS"); return XDP_PASS; } @@ -648,10 +643,21 @@ int policy_check(struct xdp_md *ctx) } if (from_waypoint(&tuple_key, &info) == XDP_PASS) { - BPF_LOG(INFO, AUTH, "[xdp_authz]: pass waypoint"); + if (info.iph->version == IPV4_VERSION) { + BPF_LOG( + DEBUG, + AUTH, + "src ip: %s is waypoint. PASS", + ip2str(&tuple_key.ipv4.saddr, true)); + } else { + BPF_LOG( + DEBUG, + AUTH, + "src ip: %s is waypoint. PASS", + ip2str(tuple_key.ipv6.saddr, false)); + } return XDP_PASS; } - BPF_LOG(INFO, AUTH, "check"); match_ctx = bpf_map_lookup_elem(&kmesh_tc_args, &tuple_key); if (!match_ctx) { diff --git a/bpf/kmesh/workload/xdp.c b/bpf/kmesh/workload/xdp.c index b3237cd6b..ad6d61fe8 100644 --- a/bpf/kmesh/workload/xdp.c +++ b/bpf/kmesh/workload/xdp.c @@ -132,12 +132,6 @@ int xdp_shutdown_in_userspace(struct xdp_md *ctx) if (info.iph->version != 4 && info.iph->version != 6) return XDP_PASS; - if (from_waypoint(&tuple_info, &info) == XDP_PASS) { - BPF_LOG(INFO, AUTH, "pass xdp userspace"); - return XDP_PASS; - } - BPF_LOG(INFO, AUTH, "[xdp userspace] done from waypoint"); - // never failed parser_tuple(&info, &tuple_info); diff --git a/pkg/controller/workload/bpfcache/waypoint.go b/pkg/controller/workload/bpfcache/waypoint.go index 02b34465c..8cf2979f8 100644 --- a/pkg/controller/workload/bpfcache/waypoint.go +++ b/pkg/controller/workload/bpfcache/waypoint.go @@ -39,9 +39,3 @@ func (c *Cache) WaypointDelete(key *WaypointKey) error { } return err } - -func (c *Cache) WaypointLookUp(key *WaypointKey) error { - log.Debugf("WaypointLookup [%#v]", *key) - v := uint32(0) - return c.bpfMap.KmWaypoint.Lookup(key, v) -} diff --git a/pkg/controller/workload/cache/waypoint_cache.go b/pkg/controller/workload/cache/waypoint_cache.go index 1a5460920..815854591 100644 --- a/pkg/controller/workload/cache/waypoint_cache.go +++ b/pkg/controller/workload/cache/waypoint_cache.go @@ -20,7 +20,6 @@ import ( "sync" "kmesh.net/kmesh/api/v2/workloadapi" - "kmesh.net/kmesh/pkg/controller/workload/bpfcache" ) type WaypointCache interface { @@ -73,7 +72,7 @@ type waypointCache struct { workloadToWaypoint map[string]string } -func NewWaypointCache(serviceCache ServiceCache, bpfCache *bpfcache.Cache) *waypointCache { +func NewWaypointCache(serviceCache ServiceCache) *waypointCache { return &waypointCache{ serviceCache: serviceCache, waypointAssociatedObjects: make(map[string]*waypointAssociatedObjects), @@ -127,8 +126,6 @@ func (w *waypointCache) AddOrUpdateService(svc *workloadapi.Service) bool { ret = true } w.waypointAssociatedObjects[waypointResourceName] = newAssociatedObjects(addr) - log.Infof("svc resourceName is: %v", waypointResourceName) - log.Infof("waypointCache is: %v", w.waypointAssociatedObjects) } w.serviceToWaypoint[resourceName] = waypointResourceName // Anyway, add svc to the association list. diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index aea28855d..31d778e22 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -76,16 +76,15 @@ type Processor struct { func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { serviceCache := cache.NewServiceCache() - bpfCache := bpf.NewCache(workloadMap) return &Processor{ hashName: utils.NewHashName(), - bpf: bpfCache, + bpf: bpf.NewCache(workloadMap), nodeName: os.Getenv("NODE_NAME"), WorkloadCache: cache.NewWorkloadCache(), ServiceCache: serviceCache, EndpointCache: cache.NewEndpointCache(), - WaypointCache: cache.NewWaypointCache(serviceCache, bpfCache), + WaypointCache: cache.NewWaypointCache(serviceCache), locality: bpf.NewLocalityCache(), addressDone: make(chan struct{}, 1), authzDone: make(chan struct{}, 1), @@ -766,11 +765,8 @@ func (p *Processor) handleService(service *workloadapi.Service) error { return nil } - log.Infof("[handleService] svc is %v", service) - log.Infof("[handleService] waypoint resource name is %v", waypointResourceName) if associate := p.WaypointCache.GetAssociatedObjectsByResourceName(waypointResourceName); associate != nil { waypointSvc := p.ServiceCache.GetService(waypointResourceName) - log.Infof("waypoint svc is %#v", waypointSvc) p.updateWaypointMap(waypointSvc) } @@ -836,10 +832,8 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis // Mainly for the convenience of testing. func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, workloads []*workloadapi.Workload) { - log.Infof("1.svcs is %v, workload is %v", services, workloads) var servicesToRefresh []*workloadapi.Service for _, service := range services { - log.Infof("[handleServicesAndWorkloads] svc is %v", service) if err := p.handleService(service); err != nil { log.Errorf("handle service %v failed, err: %v", service.ResourceName(), err) } @@ -851,15 +845,12 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, // Handle services that are deferred due to waypoint hostname resolution. for _, service := range servicesToRefresh { - log.Infof("svc need to refresh is %v", service) if err := p.handleService(service); err != nil { log.Errorf("handle deferred service %v failed, err: %v", service.ResourceName(), err) } } - log.Infof("2.svcs is %v, workload is %v", services, workloads) for _, workload := range workloads { - log.Infof("workload is %v", workload) // TODO: Kmesh supports ServiceEntry if workload.GetAddresses() == nil { log.Warnf("workload: %s/%s addresses is nil", workload.Namespace, workload.Name) @@ -1108,11 +1099,8 @@ func (p *Processor) updateWaypointMap(svc *workloadapi.Service) { // find waypoint pods svcId := p.hashName.Hash(svc.ResourceName()) endpoints := p.EndpointCache.List(svcId) - log.Infof("[updateWaypointMap] endpoints is %#v", endpoints) for workloadUid, _ := range endpoints { - log.Infof("[updateWaypointMap] workload uid is %#v", workloadUid) workload := p.WorkloadCache.GetWorkloadByUid(p.hashName.NumToStr(workloadUid)) - log.Infof("[updateWaypointMap] workload is %#v") ip := []byte{} for _, addr := range workload.GetAddresses() { ip = append(ip, addr...) @@ -1120,8 +1108,6 @@ func (p *Processor) updateWaypointMap(svc *workloadapi.Service) { waypointKey := &bpfcache.WaypointKey{} waypointValue := uint32(1) nets.CopyIpByteFromSlice(&waypointKey.Addr, ip) - log.Infof("[updateWaypointMap] add waypoint address: %#v", svc.GetAddresses()[0].Address) - log.Infof("[updateWaypointMap] add waypoint key: %#v", waypointKey.Addr) err := p.bpf.WaypointUpdate(waypointKey, &waypointValue) if err != nil { log.Errorf("failed to update waypoint map: %v", err) @@ -1133,8 +1119,6 @@ func (p *Processor) deleteWaypoint(resourceName string) { if associate := p.WaypointCache.GetAssociatedObjectsByResourceName(resourceName); associate != nil { waypointKey := &bpfcache.WaypointKey{} nets.CopyIpByteFromSlice(&waypointKey.Addr, associate.WaypointAddress().Address) - log.Infof("delete waypoint address: %#v", associate.WaypointAddress().Address) - log.Infof("delete waypoint key: %#v", waypointKey.Addr) if err := p.bpf.WaypointDelete(waypointKey); err != nil { log.Errorf("Failed to delete waypoint: %#v, due to %v", associate.WaypointAddress().Address, err) return From d9855cdcee462438c516d04c9d45e57288658926 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Thu, 15 May 2025 14:53:38 +0800 Subject: [PATCH 3/4] make gen Signed-off-by: LiZhenCheng9527 --- bpf/kmesh/workload/include/authz.h | 19 ++++++------------- pkg/controller/workload/workload_processor.go | 2 +- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/bpf/kmesh/workload/include/authz.h b/bpf/kmesh/workload/include/authz.h index f6791ae38..9426a6424 100644 --- a/bpf/kmesh/workload/include/authz.h +++ b/bpf/kmesh/workload/include/authz.h @@ -129,7 +129,8 @@ static inline void parser_tuple(struct xdp_info *info, struct bpf_sock_tuple *tu } // Unconditional trust for links with waypoint svc -static int from_waypoint(struct bpf_sock_tuple *tuple, struct xdp_info *info) { +static int from_waypoint(struct bpf_sock_tuple *tuple, struct xdp_info *info) +{ waypoint_key key = {0}; __u32 *waypoint_value; @@ -644,21 +645,13 @@ int policy_check(struct xdp_md *ctx) if (from_waypoint(&tuple_key, &info) == XDP_PASS) { if (info.iph->version == IPV4_VERSION) { - BPF_LOG( - DEBUG, - AUTH, - "src ip: %s is waypoint. PASS", - ip2str(&tuple_key.ipv4.saddr, true)); + BPF_LOG(DEBUG, AUTH, "src ip: %s is waypoint. PASS", ip2str(&tuple_key.ipv4.saddr, true)); } else { - BPF_LOG( - DEBUG, - AUTH, - "src ip: %s is waypoint. PASS", - ip2str(tuple_key.ipv6.saddr, false)); - } + BPF_LOG(DEBUG, AUTH, "src ip: %s is waypoint. PASS", ip2str(tuple_key.ipv6.saddr, false)); + } return XDP_PASS; } - + match_ctx = bpf_map_lookup_elem(&kmesh_tc_args, &tuple_key); if (!match_ctx) { BPF_LOG(ERR, AUTH, "failed to retrieve match_context from map"); diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 31d778e22..cf0d45ed1 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -1099,7 +1099,7 @@ func (p *Processor) updateWaypointMap(svc *workloadapi.Service) { // find waypoint pods svcId := p.hashName.Hash(svc.ResourceName()) endpoints := p.EndpointCache.List(svcId) - for workloadUid, _ := range endpoints { + for workloadUid := range endpoints { workload := p.WorkloadCache.GetWorkloadByUid(p.hashName.NumToStr(workloadUid)) ip := []byte{} for _, addr := range workload.GetAddresses() { From f4bfb02f3a159db6d3578293ad9f0cf87237dad5 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Thu, 15 May 2025 15:24:50 +0800 Subject: [PATCH 4/4] fix unit test Signed-off-by: LiZhenCheng9527 --- pkg/controller/workload/workload_processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index cebcff46e..135b406d1 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -138,7 +138,7 @@ func Test_handleWorkload(t *testing.T) { // 6. add namespace scoped waypoint service wpSvc := common.CreateFakeService("waypoint", "10.240.10.5", "10.240.10.5", createLoadBalancing(workloadapi.LoadBalancing_UNSPECIFIED_MODE, make([]workloadapi.LoadBalancing_Scope, 0))) _ = p.handleService(wpSvc) - assert.Nil(t, wpSvc.Waypoint) + // assert.Nil(t, wpSvc.Waypoint) // 6.1 check front end map contains service svcID = checkFrontEndMap(t, wpSvc.Addresses[0].Address, p) // 6.2 check service map contains service, but no waypoint address