eBPF_flat_p3

Introduction

In this post we will write the user space part of our eBPF program, flat to calculate and display the network latency using the data we gather in the kernel space program.

Make sure to check out the previous posts to be able to follow the details of this article.

I have made some small modification to the previous post on October 12th 2023, migrating the eBPF map type from perfbuf to ringbuf in order to lessen the memory overhead and to improve the performance. If you had already read it before the update, make sure to have a look at this part.

The Big Picture

The user space program (and this post) will be noticeably larger than the kernel space one since we need to:

  1. Compile and embed the kernel space code into our user space program to load it into the kernel
  2. Similar to the kernel space code, get the system time since boot to compare with the timestamp or ts values in packet_t struct of kernel space program, in order to prune stale entries from the connection or flow table
  3. Define a new qdisc of type clsact to sit in the path of incoming and outgoing network traffic
  4. Define a thread-safe connection or flow table to store the flow hash as key and timestamp as value
  5. Packet operations: unmarshal, hash and display the flow latency by comparing ingress and egress traffic, utilizing the data coming from the kernel space program (packet_t struct) through the ringbuf map
  6. Provide an option for users to select the network interface to attach the program (probe) to
  7. Lock memory for our process
  8. Create and attach an eBPF probe or hook to the specified interface
  9. Remove the qdisc and its filters when the program is stopped or terminated

1 - Compile and Embed eBPF code Using Go

I have already covered how to compile and embed the eBPF code into our Go program in the second bullet-point here and in order to avoid repetition, we will not go through those details again. However, the code used is available and explained in this post.

2 - Get time since system boot

In our kernel space code, we used the bpf_ktime_get_ns helper function as a means to timestamp the packets as we received them. We need a similar approach in our user space program in order to prune the old or stale flows from the flow table. However, since we do not have access to BPF helper functions in user space, this leaves us with no option but to write it ourselves.

Luckily, this is pretty straightforward. Inside the internal/timer/timer.go file, we use the ClockGettime function, passing it CLOCK_MONOTONIC which is a clock that cannot be set and represents monotonic time since some unspecified starting point, usually system boot time.

A monotonic clock is a time source that will not get affected by jumps in the system time. For instance daylight saving time changes

Here is the code:

package timer

import (
    "log"
    "time"

    "golang.org/x/sys/unix"
)

// GetNanosecSinceBoot returns the nanoseconds since system boot time
func GetNanosecSinceBoot() uint64 {
    var ts unix.Timespec

    err := unix.ClockGettime(unix.CLOCK_MONOTONIC, &ts)

    if err != nil {
        log.Println("Could not get MONOTONIC Clock time ", err)
        return 0
    }
    return uint64(ts.Nsec + ts.Sec*int64(time.Second))
}

The total number of nanoseconds since boot is calculated by summing the number of nanoseconds (ts.Nsec) and seconds (ts.Sec) multiplied by the number of nanoseconds in a second (time.Second). That was a mouthful, hopefully not too confusing 😅.

Initially, I implemented this functionality using CGO. If you are curious to see how it looked, check it out here

3 - Defining The CLSACT qdisc

The clsact is a special qdisc or queueing discipline that enables us to handle both ingress and egress traffic on an interface. It is particularly helpful when attaching eBPF programs in direct-action mode.

The direct-action mode enables the eBPF program to select what action to take (for example, whether to pass or drop a packet) without consulting the underlying qdiscs. This ability to bypass the traditional traffic control model makes the direct-action mode very powerful and flexible

At the time of this writing, the clsact qdisc is not part of the netlink package, so we need to write the functionality on our own inside the clsact/clsact.go file.

We do so by defining a struct called ClsAct with a single field called attrs which is a pointer to a netlink.QdiscAttrs that represents a qdisc.

Next, we define a function named NewClsAct that takes in the qdisc attributes to construct the ClsAct struct and then

package clsact

import "github.com/vishvananda/netlink"

type ClsAct struct {
    attrs *netlink.QdiscAttrs
}

func NewClsAct(attrs *netlink.QdiscAttrs) *ClsAct {
    return &ClsAct{attrs: attrs}
}

Lastly, we define two additional methods, Attrs and Type to satisfy the qdisc interface.

func (qdisc *ClsAct) Attrs() *netlink.QdiscAttrs {
    return qdisc.attrs
}

func (qdisc *ClsAct) Type() string {
    return "clsact"
}

It is also worthwhile to mention that clsact qdisc does not perform any actual queuing and is mainly used for classifying the traffic

4 - Creating a Flow Table

Alright, we need a connection/flow table to contain all of our TCP and UDP flows in the form of a packet hash as a key and timestamp as a value in order to check for a match and determine network latency or simply add a new entry if no match is found.

This flow table has to have certain characteristics:

  • Should be able to handle requests from different goroutines or threads without having a race condition
  • Have a pruning mechanism to periodically (10 seconds would be reasonable) remove stale entries from the table, preventing it to unnecessarily grow large and waste resources in case of scenarios like DDoS attacks

Inside the internal/flowtable/flowtable.go file, we start off by defining the package name, importing the necessary packages and then creating the FlowTable struct.

package flowtable

import (
    "log"
    "sync"
    "time"

    "github.com/pouriyajamshidi/flat/internal/timer"
)

type FlowTable struct {
    Ticker *time.Ticker
    sync.Map
}

This struct has two fields:

  • A ticker that enables us to perform a task; pruning in this case, repeatedly at regular intervals
  • A sync map that is a concurrent map implementation that allows many goroutines to safely access and modify its contents efficiently. sync map has its own very specific and debated use cases and I think we are better off utilizing it rather than leveraging RWMutex or Mutex since we are writing unique ephemeral keys and reading them a few times. In other words, different goroutines do not write or update the same keys

Let’s define the function and methods required to operate on the FlowTable.

NewFlowTable creates a flow table, sets the ticker interval to 10 seconds and returns the memory address of the struct.

func NewFlowTable() *FlowTable {
    return &FlowTable{Ticker: time.NewTicker(time.Second * 10)}
}

Insert takes in the flow hash and a timestamp then stores them in the flow table as key and value respectively.

func (table *FlowTable) Insert(hash, timestamp uint64) {
    table.Store(hash, timestamp)
}

Get looks for a flow hash in the flow table and if its found, the value is returned.

func (table *FlowTable) Get(hash uint64) (uint64, bool) {
    value, ok := table.Load(hash)

    if !ok {
        return 0, ok
    }
    return value.(uint64), ok
}

Remove deletes a flow hash and its timestamp from the flow table.

func (table *FlowTable) Remove(hash uint64) {
    _, found := table.Load(hash)

    if found {
        table.Delete(hash)
    } else {
        log.Printf("hash %v is not in flow table", hash)
    }
}

Prune is invoked every ten seconds, gets the system monotonic time using GetNanosecSinceBoot, looks at all entries in the flow table and if it finds an entry that is older than 10 seconds, removes it from the flow table.

func (table *FlowTable) Prune() {
    now := timer.GetNanosecSinceBoot()

    table.Range(func(hash, timestamp interface{}) bool {
        if (now-timestamp.(uint64))/1000000 > 10000 {
            log.Printf("Pruning stale entry from flow table: %v", hash)

            table.Delete(hash)

            return true
        }
        return false
    })
}

Lastly, we have an optional method called Entries that returns the current number of entries in the flow table. This is a nice-to-have functionality and does not change how the program runs, hence making it optional.

func (table *FlowTable) Entries() int {
    count := 0
    table.Range(func(_, _ interface{}) bool {
        count++
        return true
    })
    return count
}

5 - Packet Operations

This is the heart of our program. Here, we unmarshal the flow data coming from the kernel space program through the ringbuf map, hash the flows and calculate the flow latencies to display them.

Let’s kick off by defining the packet struct inside the internal/packet/packet.go file, resembling the packet_t struct in our kernel space code.

type Packet struct {
    SrcIP     netip.Addr
    DstIP     netip.Addr
    SrcPort   uint16
    DstPort   uint16
    Protocol  uint8
    TTL       uint8
    Syn       bool
    Ack       bool
    TimeStamp uint64
}

A short Primer on Decoding C structs

Before writing out the unmarshalling logic in Go, it is important to understand the size and offsets of our fields inside the kernel space packet_t struct.

One simple approach to accomplish this is to wrap our struct in a tiny C program and utilize the sizeof and offsetof functions to obtain the data we require.

#include <stdio.h>
#include <stdbool.h>
#include <stddef.h>
#include <netinet/in.h>
#include <linux/types.h>

struct packet_t {
    struct in6_addr src_ip;
    struct in6_addr dst_ip;
    __be16 src_port;
    __be16 dst_port;
    __u8 protocol;
    __u8 ttl;
    bool syn;
    bool ack;
    uint64_t ts;
};

int main() {
    printf("src_ip: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->src_ip), offsetof(struct packet_t, src_ip));
    printf("dst_ip: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->dst_ip), offsetof(struct packet_t, dst_ip));
    printf("src_port: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->src_port), offsetof(struct packet_t, src_port));
    printf("dst_port: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->dst_port), offsetof(struct packet_t, dst_port));
    printf("protocol: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->protocol), offsetof(struct packet_t, protocol));
    printf("ttl: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->ttl), offsetof(struct packet_t, ttl));
    printf("syn: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->syn), offsetof(struct packet_t, syn));
    printf("ack: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->ack), offsetof(struct packet_t, ack));
    printf("ts: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->ts), offsetof(struct packet_t, ts));

    return 0;
}

This will output:

src_ip:     size = 16,   offset = 0
dst_ip:     size = 16,   offset = 16
src_port:   size = 2,    offset = 32
dst_port:   size = 2,    offset = 34
protocol:   size = 1,    offset = 36
ttl:        size = 1,    offset = 37
syn:        size = 1,    offset = 38
ack:        size = 1,    offset = 39
ts:         size = 8,    offset = 40

So, using this information, we know that if we wish to extract the ts or timestamp field, we must start at offset 40 and advance 8 bytes forward.

Unmarshalling C structs In Go

Continuing inside the packet.go file, let’s define the unmarshalling logic in which we take in a byte slice and extract the values into the Packet struct.

The source and destination IP addresses are the first 32 bytes of the total received bytes, which we retrieve each 16 bytes individually and feed to the AddFromSlice function to get our IP addresses.

The source and destination ports are from bytes 32 to 33 and 34 to 35 respectively. The binary.BigEndian.Uint16 function in the binary package takes a byte slice as an argument, treats it as big endian encoded value and returns its equivalent uint16.

func UnmarshalBinary(in []byte) (Packet, bool) {
    srcIP, ok := netip.AddrFromSlice(in[0:16])

    if !ok {
        return Packet{}, ok
    }

    dstIP, ok := netip.AddrFromSlice(in[16:32])

    if !ok {
        return Packet{}, ok
    }

    return Packet{
        SrcIP:     srcIP,
        DstIP:     dstIP,
        SrcPort:   binary.BigEndian.Uint16(in[32:34]),
        DstPort:   binary.BigEndian.Uint16(in[34:36]),
        Protocol:  in[36],
        TTL:       in[37],
        Syn:       in[38] == 1,
        Ack:       in[39] == 1,
        TimeStamp: binary.LittleEndian.Uint64(in[40:48]),
    }, true
}

The rest of the fields are pretty much self-explanatory. Although, TimeStamp field has to be treated as a little endian encoded value.

If a value (like Protocol, TTL, etc.) is just one byte, the big or little endian representation of it is the same

Hashing Packets

Calculating a unique hash value for each network flow, facilitates the matching of related ingress and egress flows. This is done through the calculation of a five-tuple hash, named for the five properties that define a network flow:

  1. source IP
  2. Source port
  3. Destination IP
  4. Destination port
  5. Protocol

To calculate this hash, the Hash method first constructs three byte slices for each of these five properties (by clubbing source IP and port then destination IP and port together). The source and destination IP addresses are obtained directly from the Packet structure, while the source and destination ports and the protocol are first converted to byte arrays using the binary.BigEndian.PutUint16 function.

func (pkt *Packet) Hash() uint64 {
    tmp := make([]byte, 2)

    var src []byte
    var dst []byte
    var proto []byte

    binary.BigEndian.PutUint16(tmp, pkt.SrcPort)
    src = append(pkt.SrcIP.AsSlice(), tmp...)

    binary.BigEndian.PutUint16(tmp, pkt.DstPort)
    dst = append(pkt.DstIP.AsSlice(), tmp...)

    binary.BigEndian.PutUint16(tmp, uint16(pkt.Protocol))
    proto = append(proto, tmp...)

    return hash(src) + hash(dst) + hash(proto)
}

These byte slices are then passed to the hash function, which calculates a 64-bit hash value for each byte slice using the efficient FNV-1a algorithm.

The FNV-1a algorithm is a non-cryptographic hash function that provides a good balance between speed and hash distribution, making it a popular choice for hashing in many applications.


func hash(value []byte) uint64 {
    hash := fnv.New64a()
    hash.Write(value)
    return hash.Sum64()
}

The hash function works by creating a new FNV-1a hash object, writing the byte slice to this object, and then calling the Sum64 method to obtain the 64-bit hash value. This process is repeated for each of the three byte slices (source IP and port, destination IP and port, and protocol), and the three resulting hash values are added together to produce the final hash value.

Calculating Flow Latency

Let’s create a function named CalcLatency that accepts two arguments: a Packet and a pointer to the flow table to:

  • Map the protocol number to its name, for better readability
  • Hash the packet as described in the previous section
  • Search the flow table for the existing hash, and add it to the table and return if it is not found
  • Otherwise, calculate the latency by subtracting the related flow timestamps and dividing the result by 1_000_000 to get the value in milliseconds
  • Print the flow information in a pleasant way for TCP SYN/ACK in cyan and UDP response in light yellow for better distinction using the gookit color package
  • Remove the hash from the flow table, since it is of no use anymore
var ipProtoNums = map[uint8]string{
    6:  "TCP",
    17: "UDP",
}

func CalcLatency(pkt Packet, table *flowtable.FlowTable) {
    proto, ok := ipProtoNums[pkt.Protocol]

    if !ok {
        log.Print("Failed fetching protocol number: ", pkt.Protocol)
        return
    }

    pktHash := pkt.Hash()

    ts, ok := table.Get(pktHash)

    if !ok && pkt.Syn {
        table.Insert(pktHash, pkt.TimeStamp)
        return
    } else if !ok && proto == "UDP" {
        table.Insert(pktHash, pkt.TimeStamp)
        return
    } else if !ok {
        return
    }

    if pkt.Ack {
        colorCyan("(%v) | src: %v:%-7v\tdst: %v:%-9v\tTTL: %-4v\tlatency: %.3f ms\n",
            proto,
            pkt.DstIP.Unmap().String(),
            pkt.DstPort,
            pkt.SrcIP.Unmap().String(),
            pkt.SrcPort,
            pkt.TTL,
            (float64(pkt.TimeStamp)-float64(ts))/1_000_000,
        )
        table.Remove(pktHash)
    } else if proto == "UDP" {
        colorLightYellow("(%v) | src: %v:%-7v\tdst: %v:%-9v\tTTL: %-4v\tlatency: %.3f ms\n",
            proto,
            pkt.DstIP.Unmap().String(),
            pkt.DstPort,
            pkt.SrcIP.Unmap().String(),
            pkt.SrcPort,
            pkt.TTL,
            (float64(pkt.TimeStamp)-float64(ts))/1_000_000,
        )
        table.Remove(pktHash)
    }
}

var (
    colorLightYellow = color.LightYellow.Printf
    colorCyan        = color.Cyan.Printf
)

The numbers before print verbs, for instance -7 dictate the right padding. Omitting the - will result in left padding

Here is the link to the completed version.

6 - Selecting The Network Interface

Let’s start off in the cmd/flat.go file by defining a flag to get the interface name to attach to when starting the program as well as ensuring it exists using the netlink package.

func main() {
     ifaceFlag := flag.String("i", "eth0", "interface to attach the probe to")
     flag.Parse()

     iface, err := netlink.LinkByName(*ifaceFlag)

     if err != nil {
         log.Printf("Could not find interface %v: %v", *ifaceFlag, err)
         displayInterfaces()
     }
}

In case the interface does not exist, and for convenience, we’ll show the available network interfaces and exit the program.

func displayInterfaces() {
     interfaces, err := net.Interfaces()

     if err != nil {
         log.Fatal("Failed fetching network interfaces")
         return
     }

     for i, iface := range interfaces {
         fmt.Printf("%d) %s\n", i, iface.Name)
     }
     os.Exit(1)
}

Next, we will create a context and a signal handler so that we can remove the qdisc and its filters when the program is interrupted (SIGINT) or terminated (SIGTERM).

func main() {
     // Previous code

     ctx := context.Background()
     ctx, cancel := context.WithCancel(ctx)

     signalHandler(cancel)
}

func signalHandler(cancel context.CancelFunc) {
     sigChan := make(chan os.Signal, 1)
     signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

     go func() {
         <-sigChan
         log.Println("\nExiting")
         cancel()
     }()
}

After that, we invoke our probe function; which will be implemented here, passing it the context and the network interface.

func main() {
     // Previous code

     if err := probe.Run(ctx, iface); err != nil {
         log.Fatalf("Failed running the probe: %v", err)
     }
}

Here is the link to the complete version.

7 - Lock Memory For Resources

In most examples you see the process is allowed to lock as much memory as it needs; preventing page swaps, but you might have a use case that demands setting specific limitations and it would not hurt to see how it is done.

Using setrlimit we set a soft (cur) limit of 20 megabytes and a hard (max) limit of 40 megabytes for our process to lock memory, inside the internal/probe/probe.go file:

const tenMegaBytes    = 1024 * 1024 * 10
const twentyMegaBytes = tenMegaBytes * 2
const fortyMegaBytes  = twentyMegaBytes * 2

func setRlimit() error {
     log.Println("Setting rlimit")

     return unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{
         Cur: twentyMegaBytes,
         Max: fortyMegaBytes,
     })
}

Learn more about resource limits here

If we do not lock a specific or unlimited amount of memory for our process, executing the program once it is complete will result in the errors shown below:

TheGrayNode.io~$ sudo ./flat -i eth0

Failed loading probe objects: field Flat: program flat: map pipe: map create: operation not permitted (MEMLOCK may be too low, consider rlimit.RemoveMemlock)
Failed running the probe: field Flat: program flat: map pipe: map create: operation not permitted (MEMLOCK may be too low, consider rlimit.RemoveMemlock)

So, by leveraging setrlimit we ensure our process have access to enough memory to be able to run.

8 - Creating And Attaching An eBPF Hook To An Interface

At the beginning of the internal/probe/probe.go file, we define the package name as probe, import the necessary packages, and then add the generate tag, directing the Go compiler where to find the eBPF code and how to compile it as described in a previous post.

package probe

import (
    "context"
    "log"
    "github.com/cilium/ebpf/ringbuf"
    "github.com/pouriyajamshidi/flat/clsact"
    "github.com/pouriyajamshidi/flat/internal/flowtable"
    "github.com/pouriyajamshidi/flat/internal/packet"
    "github.com/vishvananda/netlink"
    "golang.org/x/sys/unix"
)

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go probe ../../bpf/flat.c - -O2  -Wall -Werror -Wno-address-of-packed-member

The probe keyword after the bpf2go command will be the prefix of the functions and files that will be automatically generated by the go generate ./... command. On a successful run, the folder will look like this: go generate result More on that later.

Defining And Attaching The Probe

We define a struct called probe to hold:

  1. iface: The network interface specified by the user
  2. handle: A pointer to a netlink handle of type NETLINK_ROUTE which is used to add and remove qdiscs and its filters (as well as routes, IP addresses, etc.) in a specific network namespace
  3. qdisc: A pointer to the clsact qdisc
  4. bpfObjects: A pointer to a struct containing all eBPF objects; programs and maps, after they are loaded into the kernel
  5. filters: A pointer to the slice of BPF filters for ingress and egress traffic, whether IPv4 or IPv6

Here is how it looks:

type probe struct {
    iface      netlink.Link
    handle     *netlink.Handle
    qdisc      *clsact.ClsAct
    bpfObjects *probeObjects
    filters    []*netlink.BpfFilter
}

Then, a function to piece things together by first creating the NETLINK_ROUTE handle:

func newProbe(iface netlink.Link) (*probe, error) {
    log.Println("Creating a new probe")

    handle, err := netlink.NewHandle(unix.NETLINK_ROUTE)

    if err != nil {
        log.Printf("Failed getting netlink handle: %v", err)
        return nil, err
    }

    // rest of the code
}

Check out all the available netlink families here.

Instantiate the probe struct, passing it the network interface and the netlink handle

func newProbe(iface netlink.Link) (*probe, error) {
    // previous code

    prbe := probe{
       iface:  iface,
       handle: handle,
    }

   // rest of the code
}

Finally, we load our eBPF program into the kernel using loadObjects, create the qdisc using createQdisc and the classifiers using the createFilters methods.

func newProbe(iface netlink.Link) (*probe, error) {
    // previous code

    if err := prbe.loadObjects(); err != nil {
        log.Printf("Failed loading probe objects: %v", err)
        return nil, err
    }

    if err := prbe.createQdisc(); err != nil {
        log.Printf("Failed creating qdisc: %v", err)
        return nil, err
    }

    if err := prbe.createFilters(); err != nil {
        log.Printf("Failed creating qdisc filters: %v", err)
        return nil, err
    }

    return &prbe, nil
}

Let’s expand on these methods.

The loadObjects method

loadObjects instantiates an empty probeObjects struct that holds the eBPF program and maps, passing it to loadProbeObjects in order to load the program into the kernel and then places the returned objects in bpfObjects field of the probe struct.

func (p *probe) loadObjects() error {
    log.Printf("Loading probe object to kernel")

    objs := probeObjects{}

    if err := loadProbeObjects(&objs, nil); err != nil {
        return err
    }

    p.bpfObjects = &objs

    return nil
}

On little endian systems (most machines) the probeObjects struct and loadProbeObjects function reside inside the probe_bpfel.go file that is automatically generated when running go generate ./...

The createQdisc method

createQdisc creates the clsact qdisc that we demonstrated here passing it:

  • The network interface index
  • A netlink handle by first creating it and passing custom major and minor numbers in order to uniquely identify our qdisc
  • clsact as the parent qdisc

Then using our netlink handle in the probe struct, we add the qdisc and if it fails; perhaps due to a previously failed cleanup, we try to replace the qdisc before giving up.

func (p *probe) createQdisc() error {
   log.Printf("Creating qdisc")

   p.qdisc = clsact.NewClsAct(&netlink.QdiscAttrs{
      LinkIndex: p.iface.Attrs().Index,
      Handle:    netlink.MakeHandle(0xffff, 0),
      Parent:    netlink.HANDLE_CLSACT,
   })

   if err := p.handle.QdiscAdd(p.qdisc); err != nil {
      if err := p.handle.QdiscReplace(p.qdisc); err != nil {
         return err
      }
   }

   return nil
}
The createFilters method

createFilters as its name suggests, is where we create our BPF filters or classifiers. It contains a nested function called addFilter that:

  • Takes a netlink filter
  • Builds a BPF filter, passing it the filter attributes and our program’s file descriptor
  • Sets the direct-action to true in order to enable the classifier to decide the packet’s fait (pass or drop)
  • Finally, it appends the filter to the slice of our probe’s struct filters field

Next, we invoke our addFilter function four times to cover ingress and egress traffic for both IPv4 and IPv6 protocols followed by a loop to add, or in case of a failure, replace these filters using our netlink handle, similar to how we previously created the clsact qdisc.

func (p *probe) createFilters() error {
    log.Printf("Creating qdisc filters")

    addFilter := func(attrs netlink.FilterAttrs) {
        p.filters = append(p.filters, &netlink.BpfFilter{
         FilterAttrs:  attrs,
         Fd:           p.bpfObjects.probePrograms.Flat.FD(),
         DirectAction: true,
        })
    }

    addFilter(netlink.FilterAttrs{
        LinkIndex: p.iface.Attrs().Index,
        Handle:    netlink.MakeHandle(0xffff, 0),
        Parent:    netlink.HANDLE_MIN_INGRESS,
        Protocol:  unix.ETH_P_IP,
    })

    addFilter(netlink.FilterAttrs{
        LinkIndex: p.iface.Attrs().Index,
        Handle:    netlink.MakeHandle(0xffff, 0),
        Parent:    netlink.HANDLE_MIN_EGRESS,
        Protocol:  unix.ETH_P_IP,
    })

    addFilter(netlink.FilterAttrs{
        LinkIndex: p.iface.Attrs().Index,
        Handle:    netlink.MakeHandle(0xffff, 0),
        Parent:    netlink.HANDLE_MIN_INGRESS,
        Protocol:  unix.ETH_P_IPV6,
    })

    addFilter(netlink.FilterAttrs{
        LinkIndex: p.iface.Attrs().Index,
        Handle:    netlink.MakeHandle(0xffff, 0),
        Parent:    netlink.HANDLE_MIN_EGRESS,
        Protocol:  unix.ETH_P_IPV6,
    })

    for _, filter := range p.filters {
        if err := p.handle.FilterAdd(filter); err != nil {
            if err := p.handle.FilterReplace(filter); err != nil {
                return err
            }
        }
    }

    return nil
}

Processing The Data

Ok, time to write our Run function and set a cap for the amount of memory that our process can lock into RAM as demonstrated in the previous section.

func Run(ctx context.Context, iface netlink.Link) error {
     log.Println("Starting up the probe")

      if err := setRlimit(); err != nil {
         log.Printf("Failed setting rlimit: %v", err)
         return err
     }

    // rest of the code
}

Then, instantiate a new flow table and prune stale entries every 10 seconds using an anonymous goroutine.

func Run(ctx context.Context, iface netlink.Link) error {
    // previous code

    flowtable := flowtable.NewFlowTable()

    go func() {
        for range flowtable.Ticker.C {
            flowtable.Prune()
        }
    }()

    // rest of the code
}

After that, we get a hold of our eBPF ringbuf map named pipe, create a ring buf reader from user space and pass it the map object.

func Run(ctx context.Context, iface netlink.Link) error {
    // previous code
    pipe := probe.bpfObjects.probeMaps.Pipe

    reader, err := ringbuf.NewReader(pipe)

    if err != nil {
        log.Println("Failed creating ring buf reader")
        return err
    }

    // rest of the code
}

Next, we make a byte slice channel named c and using an anonymous goroutine, start polling for events/data that is submitted via bpf_ringbuf_output. Upon receiving data, we pass it up through the channel.

func Run(ctx context.Context, iface netlink.Link) error {
    // previous code

    c := make(chan []byte)

    go func() {
        for {
            event, err := reader.Read()
            if err != nil {
                log.Printf("Failed reading from ringbuf: %v", err)
                return
            }
            c <- event.RawSample
        }
    }()

    // rest of the code
}

Now, we create an infinite loop and leveraging the select statement, we either:

  • Clean up after ourselves when the program is interrupted or terminated
  • Unmarshal, calculate and display the latency of each flow
func Run(ctx context.Context, iface netlink.Link) error {
    // previous code
    for {
        select {
        case <-ctx.Done():
            flowtable.Ticker.Stop()
            return probe.Close()

        case pkt := <-c:
            packetAttrs, ok := packet.UnmarshalBinary(pkt)
            if !ok {
                log.Printf("Could not unmarshall packet: %+v", pkt)
                continue
            }
            packet.CalcLatency(packetAttrs, flowtable)
        }
    }
}

A select statement blocks until one of its cases can be executed, at which point it performs that case. If there are more than one tasks ready, it selects one randomly.

Here is the link to the completed version.

9 - Cleaning Up

We need a mechanism to clean up and remove the:

  • qdisc
  • netlink handle
  • The eBPF program and maps

when the program gets interrupted (SIGINT) or terminated (SIGTERM).

To that end, we create a method called Close to first delete the qdisc, then the netlink handle and finally the eBPF program and maps.

func (p *probe) Close() error {
    log.Println("Removing qdisc")
    if err := p.handle.QdiscDel(p.qdisc); err != nil {
        log.Println("Failed deleting qdisc")
        return err
    }

    log.Println("Deleting handle")
    p.handle.Delete()

    log.Println("Closing eBPF object")
    if err := p.bpfObjects.Close(); err != nil {
        log.Println("Failed closing eBPF object")
        return err
    }

    return nil
}

Running The Program

Let’s take our program for a spin.

While at the root of project directory, to compile the C code and generate the helper functions, run:

go generate ./...

Compile the Go program:

go build -ldflags "-s -w" -o flat cmd/flat.go

Run it with elevated privileges:

# Replace eth0 with your desired interface name
sudo ./flat -i eth0

You should get something similar to this output:

flat

If you are making changes to the code and want to quickly see the results without compiling the code every single time, just run:

# Replace eth0 with your desired interface name
go generate ./... &&  sudo go run ./... -i eth0

What’s Next?

Everything appears to be perfect, and it is. However, depending on the type of the eBPF program, if we copied the compiled executable to another machine and tried to run it, we would most likely either get non-sense in the output or the program will not run at all.

The reason for that is:

  • Different kernel versions come with new or modified structs (newly added, removed or renamed fields) resulting in different memory layouts
  • There are various CPU instruction set architectures. Although this is also a problem in non-eBPF programs

The __sk_buff struct that we utilized in this series is a stable BPF helper that provides access to socket buffer data. It is specifically designed to work with eBPF and is stable across different kernel versions, so we are good.

Nonetheless, in the next post I will demonstrate how to write portable eBPF programs using CO-RE.

Stay tuned and thanks for reading.

References