22 minutes
Building an Efficient Network Flow Monitoring Tool with eBPF - Part 3
Introduction
In this post we will write the user space part of our eBPF program, flat to calculate and display the network latency using the data we gather in the kernel space program.
Make sure to check out the previous posts to be able to follow the details of this article.
- eBPF primer
- Setup an eBPF Development Environment
- Building an Efficient Network Flow Monitoring Tool with eBPF - Part 1
- Network Headers
- Building an Efficient Network Flow Monitoring Tool with eBPF - Part 2
I have made some small modification to the previous post on October 12th 2023, migrating the eBPF map type from perfbuf
to ringbuf
in order to lessen the memory overhead and to improve the performance. If you had already read it before the update, make sure to have a look at this part.
The Big Picture
The user space program (and this post) will be noticeably larger than the kernel space one since we need to:
- Compile and embed the kernel space code into our user space program to load it into the kernel
- Similar to the kernel space code, get the system time since boot to compare with the timestamp or ts values in
packet_t
struct of kernel space program, in order to prune stale entries from the connection or flow table - Define a new
qdisc
of typeclsact
to sit in the path of incoming and outgoing network traffic - Define a thread-safe connection or flow table to store the flow hash as key and timestamp as value
- Packet operations: unmarshal, hash and display the flow latency by comparing
ingress
andegress
traffic, utilizing the data coming from the kernel space program (packet_t
struct) through theringbuf
map - Provide an option for users to select the network interface to attach the program (probe) to
- Lock memory for our process
- Create and attach an eBPF probe or hook to the specified interface
- Remove the
qdisc
and its filters when the program is stopped or terminated
1 - Compile and Embed eBPF code Using Go
I have already covered how to compile and embed the eBPF code into our Go program in the second bullet-point here and in order to avoid repetition, we will not go through those details again. However, the code used is available and explained in this post.
2 - Get time since system boot
In our kernel space code, we used the bpf_ktime_get_ns
helper function as a means to timestamp the packets as we received them. We need a similar approach in our user space program in order to prune the old or stale flows from the flow table. However, since we do not have access to BPF helper functions in user space, this leaves us with no option but to write it ourselves.
Luckily, this is pretty straightforward. Inside the internal/timer/timer.go
file, we use the ClockGettime function, passing it CLOCK_MONOTONIC
which is a clock that cannot be set and represents monotonic time since some unspecified starting point, usually system boot time.
A monotonic clock is a time source that will not get affected by jumps in the system time. For instance daylight saving time changes
Here is the code:
package timer
import (
"log"
"time"
"golang.org/x/sys/unix"
)
// GetNanosecSinceBoot returns the nanoseconds since system boot time
func GetNanosecSinceBoot() uint64 {
var ts unix.Timespec
err := unix.ClockGettime(unix.CLOCK_MONOTONIC, &ts)
if err != nil {
log.Println("Could not get MONOTONIC Clock time ", err)
return 0
}
return uint64(ts.Nsec + ts.Sec*int64(time.Second))
}
The total number of nanoseconds since boot is calculated by summing the number of nanoseconds (ts.Nsec
) and seconds (ts.Sec
) multiplied by the number of nanoseconds in a second (time.Second
). That was a mouthful, hopefully not too confusing 😅.
Initially, I implemented this functionality using CGO. If you are curious to see how it looked, check it out here
3 - Defining The CLSACT qdisc
The clsact is a special qdisc or queueing discipline that enables us to handle both ingress
and egress
traffic on an interface. It is particularly helpful when attaching eBPF programs in direct-action
mode.
The
direct-action
mode enables the eBPF program to select what action to take (for example, whether to pass or drop a packet) without consulting the underlyingqdisc
s. This ability to bypass the traditional traffic control model makes thedirect-action
mode very powerful and flexible
At the time of this writing, the clsact
qdisc is not part of the netlink package, so we need to write the functionality on our own inside the clsact/clsact.go
file.
We do so by defining a struct called ClsAct
with a single field called attrs
which is a pointer to a netlink.QdiscAttrs that represents a qdisc
.
Next, we define a function named NewClsAct
that takes in the qdisc attributes to construct the ClsAct
struct and then
package clsact
import "github.com/vishvananda/netlink"
type ClsAct struct {
attrs *netlink.QdiscAttrs
}
func NewClsAct(attrs *netlink.QdiscAttrs) *ClsAct {
return &ClsAct{attrs: attrs}
}
Lastly, we define two additional methods, Attrs
and Type
to satisfy the qdisc interface.
func (qdisc *ClsAct) Attrs() *netlink.QdiscAttrs {
return qdisc.attrs
}
func (qdisc *ClsAct) Type() string {
return "clsact"
}
It is also worthwhile to mention that
clsact
qdisc does not perform any actual queuing and is mainly used for classifying the traffic
4 - Creating a Flow Table
Alright, we need a connection/flow table to contain all of our TCP and UDP flows in the form of a packet hash as a key and timestamp as a value in order to check for a match and determine network latency or simply add a new entry if no match is found.
This flow table has to have certain characteristics:
- Should be able to handle requests from different goroutines or threads without having a race condition
- Have a pruning mechanism to periodically (10 seconds would be reasonable) remove stale entries from the table, preventing it to unnecessarily grow large and waste resources in case of scenarios like DDoS attacks
Inside the internal/flowtable/flowtable.go
file, we start off by defining the package name, importing the necessary packages and then creating the FlowTable
struct.
package flowtable
import (
"log"
"sync"
"time"
"github.com/pouriyajamshidi/flat/internal/timer"
)
type FlowTable struct {
Ticker *time.Ticker
sync.Map
}
This struct has two fields:
- A ticker that enables us to perform a task; pruning in this case, repeatedly at regular intervals
- A sync map that is a concurrent map implementation that allows many
goroutines
to safely access and modify its contents efficiently.sync map
has its own very specific and debated use cases and I think we are better off utilizing it rather than leveraging RWMutex or Mutex since we are writing unique ephemeral keys and reading them a few times. In other words, differentgoroutines
do not write or update the same keys
Let’s define the function and methods required to operate on the FlowTable
.
NewFlowTable
creates a flow table, sets the ticker
interval to 10 seconds and returns the memory address of the struct.
func NewFlowTable() *FlowTable {
return &FlowTable{Ticker: time.NewTicker(time.Second * 10)}
}
Insert
takes in the flow hash and a timestamp then stores them in the flow table as key and value respectively.
func (table *FlowTable) Insert(hash, timestamp uint64) {
table.Store(hash, timestamp)
}
Get
looks for a flow hash in the flow table and if its found, the value is returned.
func (table *FlowTable) Get(hash uint64) (uint64, bool) {
value, ok := table.Load(hash)
if !ok {
return 0, ok
}
return value.(uint64), ok
}
Remove
deletes a flow hash and its timestamp from the flow table.
func (table *FlowTable) Remove(hash uint64) {
_, found := table.Load(hash)
if found {
table.Delete(hash)
} else {
log.Printf("hash %v is not in flow table", hash)
}
}
Prune
is invoked every ten seconds, gets the system monotonic time using GetNanosecSinceBoot, looks at all entries in the flow table and if it finds an entry that is older than 10 seconds, removes it from the flow table.
func (table *FlowTable) Prune() {
now := timer.GetNanosecSinceBoot()
table.Range(func(hash, timestamp interface{}) bool {
if (now-timestamp.(uint64))/1000000 > 10000 {
log.Printf("Pruning stale entry from flow table: %v", hash)
table.Delete(hash)
return true
}
return false
})
}
Lastly, we have an optional method called Entries
that returns the current number of entries in the flow table. This is a nice-to-have functionality and does not change how the program runs, hence making it optional.
func (table *FlowTable) Entries() int {
count := 0
table.Range(func(_, _ interface{}) bool {
count++
return true
})
return count
}
5 - Packet Operations
This is the heart of our program. Here, we unmarshal the flow data coming from the kernel space program through the ringbuf
map, hash the flows and calculate the flow latencies to display them.
Let’s kick off by defining the packet
struct inside the internal/packet/packet.go
file, resembling the packet_t struct in our kernel space code.
type Packet struct {
SrcIP netip.Addr
DstIP netip.Addr
SrcPort uint16
DstPort uint16
Protocol uint8
TTL uint8
Syn bool
Ack bool
TimeStamp uint64
}
A short Primer on Decoding C structs
Before writing out the unmarshalling logic in Go, it is important to understand the size and offsets of our fields inside the kernel space packet_t
struct.
One simple approach to accomplish this is to wrap our struct in a tiny C program and utilize the sizeof
and offsetof
functions to obtain the data we require.
#include <stdio.h>
#include <stdbool.h>
#include <stddef.h>
#include <netinet/in.h>
#include <linux/types.h>
struct packet_t {
struct in6_addr src_ip;
struct in6_addr dst_ip;
__be16 src_port;
__be16 dst_port;
__u8 protocol;
__u8 ttl;
bool syn;
bool ack;
uint64_t ts;
};
int main() {
printf("src_ip: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->src_ip), offsetof(struct packet_t, src_ip));
printf("dst_ip: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->dst_ip), offsetof(struct packet_t, dst_ip));
printf("src_port: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->src_port), offsetof(struct packet_t, src_port));
printf("dst_port: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->dst_port), offsetof(struct packet_t, dst_port));
printf("protocol: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->protocol), offsetof(struct packet_t, protocol));
printf("ttl: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->ttl), offsetof(struct packet_t, ttl));
printf("syn: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->syn), offsetof(struct packet_t, syn));
printf("ack: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->ack), offsetof(struct packet_t, ack));
printf("ts: size = %lu, offset = %lu\n", sizeof(((struct packet_t *)0)->ts), offsetof(struct packet_t, ts));
return 0;
}
This will output:
src_ip: size = 16, offset = 0
dst_ip: size = 16, offset = 16
src_port: size = 2, offset = 32
dst_port: size = 2, offset = 34
protocol: size = 1, offset = 36
ttl: size = 1, offset = 37
syn: size = 1, offset = 38
ack: size = 1, offset = 39
ts: size = 8, offset = 40
So, using this information, we know that if we wish to extract the ts
or timestamp field, we must start at offset 40 and advance 8 bytes forward.
Unmarshalling C structs In Go
Continuing inside the packet.go
file, let’s define the unmarshalling logic in which we take in a byte slice and extract the values into the Packet
struct.
The source and destination IP addresses are the first 32 bytes of the total received bytes, which we retrieve each 16 bytes individually and feed to the AddFromSlice function to get our IP addresses.
The source and destination ports are from bytes 32 to 33 and 34 to 35 respectively. The binary.BigEndian.Uint16
function in the binary package takes a byte slice as an argument, treats it as big endian encoded value and returns its equivalent uint16
.
func UnmarshalBinary(in []byte) (Packet, bool) {
srcIP, ok := netip.AddrFromSlice(in[0:16])
if !ok {
return Packet{}, ok
}
dstIP, ok := netip.AddrFromSlice(in[16:32])
if !ok {
return Packet{}, ok
}
return Packet{
SrcIP: srcIP,
DstIP: dstIP,
SrcPort: binary.BigEndian.Uint16(in[32:34]),
DstPort: binary.BigEndian.Uint16(in[34:36]),
Protocol: in[36],
TTL: in[37],
Syn: in[38] == 1,
Ack: in[39] == 1,
TimeStamp: binary.LittleEndian.Uint64(in[40:48]),
}, true
}
The rest of the fields are pretty much self-explanatory. Although, TimeStamp
field has to be treated as a little endian encoded value.
If a value (like
Protocol
,TTL
, etc.) is just one byte, the big or little endian representation of it is the same
Hashing Packets
Calculating a unique hash value for each network flow, facilitates the matching of related ingress
and egress
flows. This is done through the calculation of a five-tuple hash, named for the five properties that define a network flow:
- source IP
- Source port
- Destination IP
- Destination port
- Protocol
To calculate this hash, the Hash
method first constructs three byte slices for each of these five properties (by clubbing source IP and port then destination IP and port together). The source and destination IP addresses are obtained directly from the Packet
structure, while the source and destination ports and the protocol are first converted to byte arrays using the binary.BigEndian.PutUint16
function.
func (pkt *Packet) Hash() uint64 {
tmp := make([]byte, 2)
var src []byte
var dst []byte
var proto []byte
binary.BigEndian.PutUint16(tmp, pkt.SrcPort)
src = append(pkt.SrcIP.AsSlice(), tmp...)
binary.BigEndian.PutUint16(tmp, pkt.DstPort)
dst = append(pkt.DstIP.AsSlice(), tmp...)
binary.BigEndian.PutUint16(tmp, uint16(pkt.Protocol))
proto = append(proto, tmp...)
return hash(src) + hash(dst) + hash(proto)
}
These byte slices are then passed to the hash
function, which calculates a 64-bit hash value for each byte slice using the efficient FNV-1a algorithm.
The FNV-1a algorithm is a non-cryptographic hash function that provides a good balance between speed and hash distribution, making it a popular choice for hashing in many applications.
func hash(value []byte) uint64 {
hash := fnv.New64a()
hash.Write(value)
return hash.Sum64()
}
The hash
function works by creating a new FNV-1a
hash object, writing the byte slice to this object, and then calling the Sum64
method to obtain the 64-bit hash value. This process is repeated for each of the three byte slices (source IP and port, destination IP and port, and protocol), and the three resulting hash values are added together to produce the final hash value.
Calculating Flow Latency
Let’s create a function named CalcLatency
that accepts two arguments: a Packet and a pointer to the flow table to:
- Map the protocol number to its name, for better readability
- Hash the packet as described in the previous section
- Search the flow table for the existing hash, and add it to the table and return if it is not found
- Otherwise, calculate the latency by subtracting the related flow timestamps and dividing the result by 1_000_000 to get the value in milliseconds
- Print the flow information in a pleasant way for TCP
SYN/ACK
in cyan and UDP response in light yellow for better distinction using the gookit color package - Remove the hash from the flow table, since it is of no use anymore
var ipProtoNums = map[uint8]string{
6: "TCP",
17: "UDP",
}
func CalcLatency(pkt Packet, table *flowtable.FlowTable) {
proto, ok := ipProtoNums[pkt.Protocol]
if !ok {
log.Print("Failed fetching protocol number: ", pkt.Protocol)
return
}
pktHash := pkt.Hash()
ts, ok := table.Get(pktHash)
if !ok && pkt.Syn {
table.Insert(pktHash, pkt.TimeStamp)
return
} else if !ok && proto == "UDP" {
table.Insert(pktHash, pkt.TimeStamp)
return
} else if !ok {
return
}
if pkt.Ack {
colorCyan("(%v) | src: %v:%-7v\tdst: %v:%-9v\tTTL: %-4v\tlatency: %.3f ms\n",
proto,
pkt.DstIP.Unmap().String(),
pkt.DstPort,
pkt.SrcIP.Unmap().String(),
pkt.SrcPort,
pkt.TTL,
(float64(pkt.TimeStamp)-float64(ts))/1_000_000,
)
table.Remove(pktHash)
} else if proto == "UDP" {
colorLightYellow("(%v) | src: %v:%-7v\tdst: %v:%-9v\tTTL: %-4v\tlatency: %.3f ms\n",
proto,
pkt.DstIP.Unmap().String(),
pkt.DstPort,
pkt.SrcIP.Unmap().String(),
pkt.SrcPort,
pkt.TTL,
(float64(pkt.TimeStamp)-float64(ts))/1_000_000,
)
table.Remove(pktHash)
}
}
var (
colorLightYellow = color.LightYellow.Printf
colorCyan = color.Cyan.Printf
)
The numbers before print verbs, for instance
-7
dictate the right padding. Omitting the-
will result in left padding
Here is the link to the completed version.
6 - Selecting The Network Interface
Let’s start off in the cmd/flat.go
file by defining a flag to get the interface name to attach to when starting the program as well as ensuring it exists using the netlink package.
func main() {
ifaceFlag := flag.String("i", "eth0", "interface to attach the probe to")
flag.Parse()
iface, err := netlink.LinkByName(*ifaceFlag)
if err != nil {
log.Printf("Could not find interface %v: %v", *ifaceFlag, err)
displayInterfaces()
}
}
In case the interface does not exist, and for convenience, we’ll show the available network interfaces and exit the program.
func displayInterfaces() {
interfaces, err := net.Interfaces()
if err != nil {
log.Fatal("Failed fetching network interfaces")
return
}
for i, iface := range interfaces {
fmt.Printf("%d) %s\n", i, iface.Name)
}
os.Exit(1)
}
Next, we will create a context and a signal handler so that we can remove the qdisc
and its filters when the program is interrupted (SIGINT
) or terminated (SIGTERM
).
func main() {
// Previous code
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
signalHandler(cancel)
}
func signalHandler(cancel context.CancelFunc) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("\nExiting")
cancel()
}()
}
After that, we invoke our probe
function; which will be implemented here, passing it the context and the network interface.
func main() {
// Previous code
if err := probe.Run(ctx, iface); err != nil {
log.Fatalf("Failed running the probe: %v", err)
}
}
Here is the link to the complete version.
7 - Lock Memory For Resources
In most examples you see the process is allowed to lock as much memory as it needs; preventing page swaps, but you might have a use case that demands setting specific limitations and it would not hurt to see how it is done.
Using setrlimit we set a soft (cur
) limit of 20 megabytes and a hard (max
) limit of 40 megabytes for our process to lock memory, inside the internal/probe/probe.go
file:
const tenMegaBytes = 1024 * 1024 * 10
const twentyMegaBytes = tenMegaBytes * 2
const fortyMegaBytes = twentyMegaBytes * 2
func setRlimit() error {
log.Println("Setting rlimit")
return unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{
Cur: twentyMegaBytes,
Max: fortyMegaBytes,
})
}
Learn more about resource limits here
If we do not lock a specific or unlimited amount of memory for our process, executing the program once it is complete will result in the errors shown below:
TheGrayNode.io~$ sudo ./flat -i eth0
Failed loading probe objects: field Flat: program flat: map pipe: map create: operation not permitted (MEMLOCK may be too low, consider rlimit.RemoveMemlock)
Failed running the probe: field Flat: program flat: map pipe: map create: operation not permitted (MEMLOCK may be too low, consider rlimit.RemoveMemlock)
So, by leveraging setrlimit
we ensure our process have access to enough memory to be able to run.
8 - Creating And Attaching An eBPF Hook To An Interface
At the beginning of the internal/probe/probe.go
file, we define the package name as probe, import the necessary packages, and then add the generate tag, directing the Go compiler where to find the eBPF code and how to compile it as described in a previous post.
package probe
import (
"context"
"log"
"github.com/cilium/ebpf/ringbuf"
"github.com/pouriyajamshidi/flat/clsact"
"github.com/pouriyajamshidi/flat/internal/flowtable"
"github.com/pouriyajamshidi/flat/internal/packet"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
)
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go probe ../../bpf/flat.c - -O2 -Wall -Werror -Wno-address-of-packed-member
The
probe
keyword after thebpf2go
command will be the prefix of the functions and files that will be automatically generated by thego generate ./...
command. On a successful run, the folder will look like this: More on that later.
Defining And Attaching The Probe
We define a struct called probe
to hold:
iface
: The network interface specified by the userhandle
: A pointer to a netlink handle of type NETLINK_ROUTE which is used to add and removeqdisc
s and its filters (as well as routes, IP addresses, etc.) in a specific network namespaceqdisc
: A pointer to the clsact qdiscbpfObjects
: A pointer to a struct containing all eBPF objects; programs and maps, after they are loaded into the kernelfilters
: A pointer to the slice of BPF filters foringress
andegress
traffic, whether IPv4 or IPv6
Here is how it looks:
type probe struct {
iface netlink.Link
handle *netlink.Handle
qdisc *clsact.ClsAct
bpfObjects *probeObjects
filters []*netlink.BpfFilter
}
Then, a function to piece things together by first creating the NETLINK_ROUTE
handle:
func newProbe(iface netlink.Link) (*probe, error) {
log.Println("Creating a new probe")
handle, err := netlink.NewHandle(unix.NETLINK_ROUTE)
if err != nil {
log.Printf("Failed getting netlink handle: %v", err)
return nil, err
}
// rest of the code
}
Check out all the available
netlink
families here.
Instantiate the probe
struct, passing it the network interface and the netlink handle
func newProbe(iface netlink.Link) (*probe, error) {
// previous code
prbe := probe{
iface: iface,
handle: handle,
}
// rest of the code
}
Finally, we load our eBPF program into the kernel using loadObjects
, create the qdisc
using createQdisc
and the classifiers using the createFilters
methods.
func newProbe(iface netlink.Link) (*probe, error) {
// previous code
if err := prbe.loadObjects(); err != nil {
log.Printf("Failed loading probe objects: %v", err)
return nil, err
}
if err := prbe.createQdisc(); err != nil {
log.Printf("Failed creating qdisc: %v", err)
return nil, err
}
if err := prbe.createFilters(); err != nil {
log.Printf("Failed creating qdisc filters: %v", err)
return nil, err
}
return &prbe, nil
}
Let’s expand on these methods.
The loadObjects method
loadObjects
instantiates an empty probeObjects
struct that holds the eBPF program and maps, passing it to loadProbeObjects
in order to load the program into the kernel and then places the returned objects in bpfObjects
field of the probe
struct.
func (p *probe) loadObjects() error {
log.Printf("Loading probe object to kernel")
objs := probeObjects{}
if err := loadProbeObjects(&objs, nil); err != nil {
return err
}
p.bpfObjects = &objs
return nil
}
On little endian systems (most machines) the
probeObjects
struct andloadProbeObjects
function reside inside theprobe_bpfel.go
file that is automatically generated when runninggo generate ./...
The createQdisc method
createQdisc
creates the clsact
qdisc that we demonstrated here passing it:
- The network interface index
- A netlink handle by first creating it and passing custom major and minor numbers in order to uniquely identify our qdisc
clsact
as the parent qdisc
Then using our netlink handle in the probe
struct, we add the qdisc and if it fails; perhaps due to a previously failed cleanup, we try to replace the qdisc before giving up.
func (p *probe) createQdisc() error {
log.Printf("Creating qdisc")
p.qdisc = clsact.NewClsAct(&netlink.QdiscAttrs{
LinkIndex: p.iface.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_CLSACT,
})
if err := p.handle.QdiscAdd(p.qdisc); err != nil {
if err := p.handle.QdiscReplace(p.qdisc); err != nil {
return err
}
}
return nil
}
The createFilters method
createFilters
as its name suggests, is where we create our BPF filters or classifiers. It contains a nested function called addFilter
that:
- Takes a netlink filter
- Builds a BPF filter, passing it the filter attributes and our program’s file descriptor
- Sets the
direct-action
to true in order to enable the classifier to decide the packet’s fait (pass or drop) - Finally, it appends the filter to the slice of our probe’s struct filters field
Next, we invoke our addFilter
function four times to cover ingress
and egress
traffic for both IPv4 and IPv6 protocols followed by a loop to add, or in case of a failure, replace these filters using our netlink handle, similar to how we previously created the clsact qdisc
.
func (p *probe) createFilters() error {
log.Printf("Creating qdisc filters")
addFilter := func(attrs netlink.FilterAttrs) {
p.filters = append(p.filters, &netlink.BpfFilter{
FilterAttrs: attrs,
Fd: p.bpfObjects.probePrograms.Flat.FD(),
DirectAction: true,
})
}
addFilter(netlink.FilterAttrs{
LinkIndex: p.iface.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_MIN_INGRESS,
Protocol: unix.ETH_P_IP,
})
addFilter(netlink.FilterAttrs{
LinkIndex: p.iface.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_MIN_EGRESS,
Protocol: unix.ETH_P_IP,
})
addFilter(netlink.FilterAttrs{
LinkIndex: p.iface.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_MIN_INGRESS,
Protocol: unix.ETH_P_IPV6,
})
addFilter(netlink.FilterAttrs{
LinkIndex: p.iface.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_MIN_EGRESS,
Protocol: unix.ETH_P_IPV6,
})
for _, filter := range p.filters {
if err := p.handle.FilterAdd(filter); err != nil {
if err := p.handle.FilterReplace(filter); err != nil {
return err
}
}
}
return nil
}
Processing The Data
Ok, time to write our Run
function and set a cap for the amount of memory that our process can lock into RAM as demonstrated in the previous section.
func Run(ctx context.Context, iface netlink.Link) error {
log.Println("Starting up the probe")
if err := setRlimit(); err != nil {
log.Printf("Failed setting rlimit: %v", err)
return err
}
// rest of the code
}
Then, instantiate a new flow table and prune stale entries every 10 seconds using an anonymous goroutine
.
func Run(ctx context.Context, iface netlink.Link) error {
// previous code
flowtable := flowtable.NewFlowTable()
go func() {
for range flowtable.Ticker.C {
flowtable.Prune()
}
}()
// rest of the code
}
After that, we get a hold of our eBPF ringbuf map named pipe
, create a ring buf reader from user space and pass it the map object.
func Run(ctx context.Context, iface netlink.Link) error {
// previous code
pipe := probe.bpfObjects.probeMaps.Pipe
reader, err := ringbuf.NewReader(pipe)
if err != nil {
log.Println("Failed creating ring buf reader")
return err
}
// rest of the code
}
Next, we make a byte slice channel named c
and using an anonymous goroutine
, start polling for events/data that is submitted via bpf_ringbuf_output
. Upon receiving data, we pass it up through the channel.
func Run(ctx context.Context, iface netlink.Link) error {
// previous code
c := make(chan []byte)
go func() {
for {
event, err := reader.Read()
if err != nil {
log.Printf("Failed reading from ringbuf: %v", err)
return
}
c <- event.RawSample
}
}()
// rest of the code
}
Now, we create an infinite loop and leveraging the select
statement, we either:
- Clean up after ourselves when the program is interrupted or terminated
- Unmarshal, calculate and display the latency of each flow
func Run(ctx context.Context, iface netlink.Link) error {
// previous code
for {
select {
case <-ctx.Done():
flowtable.Ticker.Stop()
return probe.Close()
case pkt := <-c:
packetAttrs, ok := packet.UnmarshalBinary(pkt)
if !ok {
log.Printf("Could not unmarshall packet: %+v", pkt)
continue
}
packet.CalcLatency(packetAttrs, flowtable)
}
}
}
A
select
statement blocks until one of its cases can be executed, at which point it performs that case. If there are more than one tasks ready, it selects one randomly.
Here is the link to the completed version.
9 - Cleaning Up
We need a mechanism to clean up and remove the:
- qdisc
- netlink handle
- The eBPF program and maps
when the program gets interrupted (SIGINT
) or terminated (SIGTERM
).
To that end, we create a method called Close
to first delete the qdisc
, then the netlink handle
and finally the eBPF program
and maps
.
func (p *probe) Close() error {
log.Println("Removing qdisc")
if err := p.handle.QdiscDel(p.qdisc); err != nil {
log.Println("Failed deleting qdisc")
return err
}
log.Println("Deleting handle")
p.handle.Delete()
log.Println("Closing eBPF object")
if err := p.bpfObjects.Close(); err != nil {
log.Println("Failed closing eBPF object")
return err
}
return nil
}
Running The Program
Let’s take our program for a spin.
While at the root of project directory, to compile the C code and generate the helper functions, run:
go generate ./...
Compile the Go program:
go build -ldflags "-s -w" -o flat cmd/flat.go
Run it with elevated privileges:
# Replace eth0 with your desired interface name
sudo ./flat -i eth0
You should get something similar to this output:
If you are making changes to the code and want to quickly see the results without compiling the code every single time, just run:
# Replace eth0 with your desired interface name
go generate ./... && sudo go run ./... -i eth0
What’s Next?
Everything appears to be perfect, and it is. However, depending on the type of the eBPF program, if we copied the compiled executable to another machine and tried to run it, we would most likely either get non-sense in the output or the program will not run at all.
The reason for that is:
- Different kernel versions come with new or modified structs (newly added, removed or renamed fields) resulting in different memory layouts
- There are various CPU instruction set architectures. Although this is also a problem in non-eBPF programs
The __sk_buff
struct that we utilized in this series is a stable BPF helper that provides access to socket buffer data. It is specifically designed to work with eBPF and is stable across different kernel versions, so we are good.
Nonetheless, in the next post I will demonstrate how to write portable eBPF programs using CO-RE.
Stay tuned and thanks for reading.
References
eBPF BPF TCP UDP IPv4 IPv6 Network Linux Kernel Programming Tutorial Flow monitoring Golang flat
4628 Words
2023-10-21 11:23