Skip to content
This repository was archived by the owner on Dec 9, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ Install the latest stable version of DraNet using the provided manifest:
kubectl apply -f https://raw.githubusercontent.com/google/dranet/refs/heads/main/install.yaml
```

### Development

To build your own image for testing. Here is an example with a custom registry `ghcr.io/converged-computing`:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only necessary when developing on "real" clusters, right? We might clarify that.

A lot of this is developed locally, where the image name is ~arbitrary, because we don't push it to a remote host anyhow.


```sh
REGISTRY=ghcr.io/converged-computing make image
```

### How to Use It

Once DraNet is running, you can inspect the network interfaces and their
Expand Down
91 changes: 48 additions & 43 deletions pkg/inventory/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package inventory
import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -118,74 +117,79 @@ func (db *DB) GetPodNamespace(pod string) string {
func (db *DB) Run(ctx context.Context) error {
defer close(db.notifications)

nlHandle, err := netlink.NewHandle()
if err != nil {
return fmt.Errorf("error creating netlink handle %v", err)
}
// Resources are published periodically or if there is a netlink notification
// indicating a new interfaces was added or changed
// indicating a new interfaces was added or changed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// indicating a new interfaces was added or changed.
// indicating a new interface was added or changed.

nlChannel := make(chan netlink.LinkUpdate)
doneCh := make(chan struct{})
defer close(doneCh)
if err := netlink.LinkSubscribe(nlChannel, doneCh); err != nil {
klog.Error(err, "error subscribing to netlink interfaces, only syncing periodically", "interval", maxInterval.String())
}

// Obtain data that will not change after the startup
// Obtain data that will not change after the startup.
db.instance = getInstanceProperties(ctx)
// TODO: it is not common but may happen in edge cases that the default gateway changes
// revisit once we have more evidence this can be a potential problem or break some use
// cases.
gwInterfaces := getDefaultGwInterfaces()

for {
err := db.rateLimiter.Wait(ctx)
if err != nil {
klog.Error(err, "unexpected rate limited error trying to get system interfaces")
}

devices := []resourceapi.Device{}
ifaces, err := nlHandle.LinkList()
// Device lookup map is used to prevent duplicated.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Device lookup map is used to prevent duplicated.
// Device lookup map is used to prevent duplication.

devices := make(map[string]*resourceapi.Device)

// Keep track of seen devices to not register an RDMA device twice.
seenRdmaDevices := sets.New[string]()

// Kernel network interfaces are first priority.
netlinkDevices, err := db.discoverNetlinkDevices()
if err != nil {
klog.Error(err, "unexpected error trying to get system interfaces")
return err
}
for _, iface := range ifaces {
klog.V(7).InfoS("Checking network interface", "name", iface.Attrs().Name)
if gwInterfaces.Has(iface.Attrs().Name) {
klog.V(4).Infof("iface %s is an uplink interface", iface.Attrs().Name)
continue
}
for pciAddr, device := range netlinkDevices {

if ignoredInterfaceNames.Has(iface.Attrs().Name) {
klog.V(4).Infof("iface %s is in the list of ignored interfaces", iface.Attrs().Name)
// Do not add un-named netowrk device interfaces.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Do not add un-named netowrk device interfaces.
// Do not add unnamed network device interfaces.

ifName, ok := device.Basic.Attributes["dra.net/ifName"]
if !ok {
continue
}

// skip loopback interfaces
if iface.Attrs().Flags&net.FlagLoopback != 0 {
continue
// If it has RDMA, mark as seen.
if rdmaName, err := rdmamap.GetRdmaDeviceForNetdevice(*ifName.StringValue); err == nil && rdmaName != "" {
klog.V(4).Infof("Found netdev '%s' with associated RDMA device '%s'. Merging.", *ifName.StringValue, rdmaName)
seenRdmaDevices.Insert(pciAddr)
}
devices[*ifName.StringValue] = device

}
// We only allow rdma devices that have PCI addresses.
for pciAddr, rdmaDevice := range db.discoverRawRdmaDevices() {

// publish this network interface
device, err := db.netdevToDRAdev(iface)
if err != nil {
klog.V(2).Infof("could not obtain attributes for iface %s : %v", iface.Attrs().Name, err)
// Have we already seen it?
_, ok := seenRdmaDevices[pciAddr]
if ok {
continue
}
devices[rdmaDevice.Name] = rdmaDevice
}

devices = append(devices, *device)
klog.V(4).Infof("Found following network interface %s", iface.Attrs().Name)
// Create the final list to publish.
finalDevices := make([]resourceapi.Device, 0, len(devices))
for _, device := range devices {
finalDevices = append(finalDevices, *device)
}

klog.V(4).Infof("Found %d devices", len(devices))
if len(devices) > 0 || db.hasDevices {
db.hasDevices = len(devices) > 0
db.notifications <- devices
klog.V(4).Infof("Found %d devices", len(finalDevices))
if len(finalDevices) > 0 || db.hasDevices {
db.hasDevices = len(finalDevices) > 0
db.notifications <- finalDevices
}

// Wait for the next event or timeout.
select {
// trigger a reconcile
// Trigger a reconcile.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Trigger a reconcile.
// Trigger a reconciliation.

Note: reconcile may be acceptable since it's an established term.

case <-nlChannel:
// drain the channel so we only sync once
// Drain the channel so we only sync once.
for len(nlChannel) > 0 {
<-nlChannel
}
Expand All @@ -208,13 +212,14 @@ func (db *DB) netdevToDRAdev(link netlink.Link) (*resourceapi.Device, error) {
}
// Set the device name. It will be normalized only if necessary.
device.Name = names.SetDeviceName(ifName)
// expose the real interface name as an attribute in case it is normalized.

// expose the real interface name as an attribute in case it is normalized.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// expose the real interface name as an attribute in case it is normalized.
// Expose the real interface name as an attribute in case it is normalized.

device.Attributes["dra.net/ifName"] = resourceapi.DeviceAttribute{StringValue: &ifName}

linkType := link.Type()
linkAttrs := link.Attrs()

// identify the namespace holding the link as the other end of a veth pair
// Identify the namespace holding the link as the other end of a veth pair.
netnsid := link.Attrs().NetNsID
if podName := db.GetPodName(netnsid); podName != "" {
device.Attributes["dra.net/pod"] = resourceapi.DeviceAttribute{StringValue: &podName}
Expand Down Expand Up @@ -252,15 +257,15 @@ func (db *DB) netdevToDRAdev(link netlink.Link) (*resourceapi.Device, error) {
device.Attributes["dra.net/alias"] = resourceapi.DeviceAttribute{StringValue: &linkAttrs.Alias}
device.Attributes["dra.net/type"] = resourceapi.DeviceAttribute{StringValue: &linkType}

// Get eBPF properties from the interface using the legacy tc hooks
// Get eBPF properties from the interface using the legacy tc hooks.
isEbpf := false
filterNames, ok := getTcFilters(link)
if ok {
isEbpf = true
device.Attributes["dra.net/tcFilterNames"] = resourceapi.DeviceAttribute{StringValue: ptr.To(strings.Join(filterNames, ","))}
}

// Get eBPF properties from the interface using the tcx hooks
// Get eBPF properties from the interface using the tcx hooks.
programNames, ok := getTcxFilters(link)
if ok {
isEbpf = true
Expand Down Expand Up @@ -300,7 +305,7 @@ func addPCIAttributes(device *resourceapi.Device, ifName string, path string) {
klog.Infof("Could not get bdf address : %v", err)
} else {
if err := setPciRootAttr(device, address); err != nil {
klog.Infof("Could not get pci root attribute : %v", err)
klog.Infof("could not get pci root for %s: %v", ifName, err)
}
}

Expand All @@ -316,7 +321,7 @@ func addPCIAttributes(device *resourceapi.Device, ifName string, path string) {
device.Attributes["dra.net/pciSubsystem"] = resourceapi.DeviceAttribute{StringValue: &entry.Subsystem}
}
} else {
klog.Infof("could not get pci vendor information : %v", err)
klog.Infof("could not get pci vendor information for %s: %v", ifName, err)
}

numa, err := numaNode(ifName, path)
Expand Down
107 changes: 107 additions & 0 deletions pkg/inventory/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@ limitations under the License.
package inventory

import (
"fmt"
"net"

"github.com/Mellanox/rdmamap"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/google/dranet/pkg/names"
"github.com/vishvananda/netlink"

resourceapi "k8s.io/api/resource/v1beta1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)

func getDefaultGwInterfaces() sets.Set[string] {
Expand Down Expand Up @@ -118,3 +125,103 @@ func getTcxFilters(device netlink.Link) ([]string, bool) {
}
return programNames.UnsortedList(), isTcxEBPF
}

// discoverNetlinkDevices scans for kernel network interfaces
func (db *DB) discoverNetlinkDevices() (map[string]*resourceapi.Device, error) {
klog.V(4).Info("Starting netlink device discovery...")
devices := make(map[string]*resourceapi.Device)

// TODO: it is not common but may happen in edge cases that the default gateway changes
// revisit once we have more evidence this can be a potential problem or break some use
// cases.
gwInterfaces := getDefaultGwInterfaces()
nlHandle, err := netlink.NewHandle()
if err != nil {
return devices, fmt.Errorf("error creating netlink handle %v", err)
}

// Don't return early - we want print to user at end of function
// @vsoch This is logic from previous refactored version.
ifaces, err := nlHandle.LinkList()
if err != nil {
klog.Error(err, "unexpected error trying to get system interfaces")
}

for _, iface := range ifaces {
attrs := iface.Attrs()

klog.V(7).InfoS("Checking network interface", "name", attrs.Name)
if gwInterfaces.Has(attrs.Name) {
klog.V(4).Infof("iface %s is an uplink interface", attrs.Name)
continue
}

if ignoredInterfaceNames.Has(attrs.Name) {
klog.V(4).Infof("iface %s is in the list of ignored interfaces", attrs.Name)
continue
}

// Skip loopback interfaces.
if attrs.Flags&net.FlagLoopback != 0 {
continue
}

// Publish this network interface.
device, err := db.netdevToDRAdev(iface)
if err != nil {
klog.V(2).Infof("could not obtain attributes for iface %s : %v", attrs.Name, err)
continue
}

// This could be error prone if a missing address leads to a second entry in rdma devices.
pciAddress, err := bdfAddress(attrs.Name, rdmamap.RdmaClassDir)
pciAddr := pciAddress.device
if err != nil {
klog.Warningf("could not get PCI address for netdev %s, using fallback key. error: %v", attrs.Name, err)
pciAddr = "netdev-" + attrs.Name
}
devices[pciAddr] = device
}
klog.V(4).Infof("Finished netlink discovery. Found %d devices.", len(devices))
return devices, nil
}

// discoverRawRdmaDevices scans for raw RDMA devices using rdmamap listing
func (db *DB) discoverRawRdmaDevices() map[string]*resourceapi.Device {
klog.V(4).Info("Starting raw RDMA device discovery...")
devices := make(map[string]*resourceapi.Device)

// This was tested to work to list an Infiniband device without an associated netlink.
deviceNames := rdmamap.GetRdmaDeviceList()

for _, rdmaName := range deviceNames {
pciAddr, err := bdfAddress(rdmaName, rdmamap.RdmaClassDir)

// Assume that a missing PCI address would be missing for both netlink and rdma (not sure if this is true).
// I think there are cases when we wouldn't have one, but I want to be conservative and only
// allow RDMA interfaces with associated PCI addresses. This can change if needed.
if err != nil {
klog.Warningf("could not get PCI address for RDMA device %s, skipping: %v", rdmaName, err)
continue
}
sanitizedName := names.SetDeviceName(rdmaName)

// Create a new resourceapi device for the RDMA raw device.
device := &resourceapi.Device{
Name: sanitizedName,
Basic: &resourceapi.BasicDevice{
Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
"dra.net/rdma": {BoolValue: ptr.To(true)},
"dra.net/ifName": {StringValue: &rdmaName},
// https://github.com/vishvananda/netlink/blob/master/nl/nl_linux.go#L143
// This could also be ib, but "infiniband" is more clear
"dra.net/type": {StringValue: ptr.To("infiniband")},
},
},
}
addPCIAttributes(device.Basic, rdmaName, rdmamap.RdmaClassDir)
devices[pciAddr.device] = device
}
klog.V(4).Infof("Finished raw RDMA discovery. Found %d devices.", len(devices))
return devices
}