基于eBPF-XDP的流量采样

基于eBPF/XDP的流量采样

Ref

https://juejin.cn/post/7156502840888786952

https://ebpf-go.dev/guides/getting-started/

https://github.com/ns1/xdp-workshop

https://eunomia.dev/zh/tutorials/7-execsnoop/

https://ebpf-go.dev/contributing/architecture/

https://www.v2ex.com/t/973562

基本系统信息

➜  WatchGod git:(main) neofetch
       _,met$$$$$gg.          root@debian
    ,g$$$$$$$$$$$$$$$P.       -----------
  ,g$$P"     """Y$$.".        OS: Debian GNU/Linux 12 (bookworm) x86_64
 ,$$P'              `$$$.     Host: VMware Virtual Platform None
',$$P       ,ggs.     `$$b:   Kernel: 6.1.0-29-amd64
`d$$'     ,$P"'   .    $$$    Uptime: 8 hours, 51 mins
 $$P      d$'     ,    $$P    Packages: 1707 (dpkg)
 $$:      $$.   -    ,d$$'    Shell: zsh 5.9
 $$;      Y$b._   _,d$P'      Resolution: 2358x1238
 Y$$.    `.`"Y$$$$P"'         Terminal: /dev/pts/1
 `$$b      "-.__              CPU: AMD Ryzen 9 7940H w/ Radeon 780M Graphics (16) @ 3.992GHz
  `Y$$                        GPU: 00:0f.0 VMware SVGA II Adapter
   `Y$$.                      Memory: 1399MiB / 7362MiB
     `$$b.
       `Y$$b.
          `"Y$b._
              `"""

➜  WatchGod git:(main) go version
go version go1.24.0 linux/amd64
➜  WatchGod git:(main) clang -v
Debian clang version 14.0.6
Target: x86_64-pc-linux-gnu
Thread model: posix
InstalledDir: /usr/bin
Found candidate GCC installation: /usr/bin/../lib/gcc/x86_64-linux-gnu/12
Selected GCC installation: /usr/bin/../lib/gcc/x86_64-linux-gnu/12
Candidate multilib: .;@m64
Selected multilib: .;@m64

虚拟机镜像地址(建议IDM):

https://mirrors.aliyun.com/debian-cd/current/amd64/iso-dvd/debian-12.9.0-amd64-DVD-1.iso

需要安装的程序

apt install clang llvm libelf-dev linux-headers-$(uname -r) bpfcc-tools

apt install build-essential

apt install libbpf-dev

什么是eBPF/XDP

建议先阅读Ref部分,可以简单认为是内核中的Hook,你可以在数据包进入正式处理程序(产生sk_buff之前),就对其进行预处理

在数据包通过网络栈的初期就对其进行预处理好处多多:

1.极致的性能

2.更底层的控制

3.良好的可编程性

看前须知

尽量看一眼头上的参考文献,至少看一眼https://ebpf-go.dev/guides/getting-started/#ebpf-c-program

来得知如何使用cilium/ebpf来运行一个比较基本的XDP程序

内核空间

/* SPDX-License-Identifier: GPL-2.0 */
#include <linux/bpf.h>
#include <linux/types.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>
#include <linux/if_ether.h>
#include <linux/ip.h>

// 常量定义
#define MAX_PACKET_SIZE 512
#define MAX_CPUS 16
#define ETH_HDR_SIZE 14       // 以太网头部固定大小
#define MIN_CAPTURE 34        // 至少捕获以太网+IPv4头部(14+20)
#define MAX_CAPTURE 54        // 最大尝试捕获到以太网+IPv4+TCP(14+20+20)

// 定义元数据结构
struct packet_metadata {
    __u32 packet_size;    // 原始数据包大小
    __u32 captured_size;  // 实际捕获的大小
    __u32 protocol;       // 协议类型
    __u32 flags;          // 标志位
    __u64 timestamp;      // 时间戳
} __attribute__((packed));

// 定义数据缓冲区结构
struct packet_buffer {
    __u8 data[MAX_PACKET_SIZE];
};

// 定义maps
struct {
    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
    __type(key, __u32);
    __type(value, struct packet_metadata);
    __uint(max_entries, 1);
} metadata_map SEC(".maps");

struct {
    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
    __type(key, __u32);
    __type(value, struct packet_buffer);
    __uint(max_entries, 1);
} packet_data_map SEC(".maps");

struct {
    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
    __type(key, int);
    __type(value, int);
    __uint(max_entries, MAX_CPUS);
} events SEC(".maps");

SEC("xdp")
int sampler(struct xdp_md *ctx)
{
    void *data_end = (void *)(long)ctx->data_end;
    void *data = (void *)(long)ctx->data;
    
    // 确保至少有以太网头部
    if (data + ETH_HDR_SIZE > data_end)
        return XDP_PASS;

    // 获取元数据缓冲区
    __u32 zero = 0;
    struct packet_metadata *meta = bpf_map_lookup_elem(&metadata_map, &zero);
    if (!meta)
        return XDP_PASS;

    // 获取数据包缓冲区
    struct packet_buffer *buffer = bpf_map_lookup_elem(&packet_data_map, &zero);
    if (!buffer)
        return XDP_PASS;

    // 安全地计算数据包大小
    __u64 packet_size = data_end - data;
    
    // 解析以太网头部
    struct ethhdr {
        __u8 h_dest[6];
        __u8 h_source[6];
        __u16 h_proto;
    } __attribute__((packed));
    
    // 明确检查边界,确保我们可以安全访问以太网头部
    struct ethhdr *eth = data;
    if ((void*)(eth + 1) > data_end)
        return XDP_PASS;
    
    // 获取以太网协议类型
    __u16 eth_proto = bpf_ntohs(eth->h_proto);
    
    // 计算我们能捕获多少数据
    __u32 capture_size = ETH_HDR_SIZE;  // 至少捕获以太网头部
    
    // 尝试捕获更多数据,但不超过MAX_CAPTURE
    if (data + MIN_CAPTURE <= data_end) {
        // 能捕获至少以太网+IPv4头部
        capture_size = MIN_CAPTURE;
        
        // 如果有更多数据,尝试捕获更多
        if (data + MAX_CAPTURE <= data_end) {
            capture_size = MAX_CAPTURE;  // 能捕获以太网+IPv4+TCP头部
        }
    }
    
    // 填充元数据
    meta->packet_size = packet_size;
    meta->captured_size = capture_size;
    meta->protocol = eth_proto;  // 默认为以太网协议类型
    meta->timestamp = bpf_ktime_get_ns();
    meta->flags = 0;
    
    // 复制以太网头部
    buffer->data[0] = eth->h_dest[0];
    buffer->data[1] = eth->h_dest[1];
    buffer->data[2] = eth->h_dest[2];
    buffer->data[3] = eth->h_dest[3];
    buffer->data[4] = eth->h_dest[4];
    buffer->data[5] = eth->h_dest[5];
    
    buffer->data[6] = eth->h_source[0];
    buffer->data[7] = eth->h_source[1];
    buffer->data[8] = eth->h_source[2];
    buffer->data[9] = eth->h_source[3];
    buffer->data[10] = eth->h_source[4];
    buffer->data[11] = eth->h_source[5];
    
    buffer->data[12] = (__u8)(eth_proto >> 8);
    buffer->data[13] = (__u8)(eth_proto & 0xFF);
    
    // 尝试复制更多数据 - 从以太网头部之后开始
    if (data + ETH_HDR_SIZE < data_end) {
        // 复制第15个字节
        if (data + ETH_HDR_SIZE + 1 <= data_end)
            buffer->data[14] = *(__u8 *)(data + ETH_HDR_SIZE);
            
        // 复制第16个字节
        if (data + ETH_HDR_SIZE + 2 <= data_end)
            buffer->data[15] = *(__u8 *)(data + ETH_HDR_SIZE + 1);
            
        // 复制第17-20个字节
        if (data + ETH_HDR_SIZE + 6 <= data_end) {
            buffer->data[16] = *(__u8 *)(data + ETH_HDR_SIZE + 2);
            buffer->data[17] = *(__u8 *)(data + ETH_HDR_SIZE + 3);
            buffer->data[18] = *(__u8 *)(data + ETH_HDR_SIZE + 4);
            buffer->data[19] = *(__u8 *)(data + ETH_HDR_SIZE + 5);
        }
        
        // 继续复制更多数据...
        // 这里使用静态展开而不是循环,以确保通过验证器
        // 复制IPv4头部的关键字段(协议、源IP、目标IP)
        if (eth_proto == 0x0800) {
            // 协议字段(第10个字节)
            if (data + ETH_HDR_SIZE + 10 <= data_end) {
                buffer->data[ETH_HDR_SIZE + 9] = *(__u8 *)(data + ETH_HDR_SIZE + 9);
                meta->protocol = *(__u8 *)(data + ETH_HDR_SIZE + 9);  // IP协议
            }
            
            // 源IP地址(第13-16个字节)
            if (data + ETH_HDR_SIZE + 16 <= data_end) {
                buffer->data[ETH_HDR_SIZE + 12] = *(__u8 *)(data + ETH_HDR_SIZE + 12);
                buffer->data[ETH_HDR_SIZE + 13] = *(__u8 *)(data + ETH_HDR_SIZE + 13);
                buffer->data[ETH_HDR_SIZE + 14] = *(__u8 *)(data + ETH_HDR_SIZE + 14);
                buffer->data[ETH_HDR_SIZE + 15] = *(__u8 *)(data + ETH_HDR_SIZE + 15);
            }
            
            // 目标IP地址(第17-20个字节)
            if (data + ETH_HDR_SIZE + 20 <= data_end) {
                buffer->data[ETH_HDR_SIZE + 16] = *(__u8 *)(data + ETH_HDR_SIZE + 16);
                buffer->data[ETH_HDR_SIZE + 17] = *(__u8 *)(data + ETH_HDR_SIZE + 17);
                buffer->data[ETH_HDR_SIZE + 18] = *(__u8 *)(data + ETH_HDR_SIZE + 18);
                buffer->data[ETH_HDR_SIZE + 19] = *(__u8 *)(data + ETH_HDR_SIZE + 19);
            }
            
            // 继续复制更多数据,如果是TCP或UDP协议
            if (data + ETH_HDR_SIZE + 10 <= data_end) {
                __u8 ip_proto = *(__u8 *)(data + ETH_HDR_SIZE + 9);
                if (ip_proto == 6 || ip_proto == 17) {  // TCP或UDP
                    // 复制端口号
                    if (data + ETH_HDR_SIZE + 24 <= data_end) {
                        buffer->data[ETH_HDR_SIZE + 20] = *(__u8 *)(data + ETH_HDR_SIZE + 20);
                        buffer->data[ETH_HDR_SIZE + 21] = *(__u8 *)(data + ETH_HDR_SIZE + 21);
                        buffer->data[ETH_HDR_SIZE + 22] = *(__u8 *)(data + ETH_HDR_SIZE + 22);
                        buffer->data[ETH_HDR_SIZE + 23] = *(__u8 *)(data + ETH_HDR_SIZE + 23);
                    }
                }
            }
        }
    }
    
    // 发送元数据
    __u32 meta_size = sizeof(struct packet_metadata);
    int ret = bpf_perf_event_output(ctx, &events, 
                                  BPF_F_CURRENT_CPU | ((__u64)meta_size << 32),
                                  meta, meta_size);
    if (ret < 0)
        return XDP_PASS;

    // 发送数据包内容
    __u32 data_size = capture_size;
    if (data_size > MAX_CAPTURE)
        data_size = MAX_CAPTURE;  // 确保不超过最大值
        
    bpf_perf_event_output(ctx, &events, 
                         BPF_F_CURRENT_CPU | ((__u64)data_size << 32), 
                         buffer->data, data_size);

    return XDP_PASS;
}

char LICENSE[] SEC("license") = "GPL";

eBPF程序的逆天之处

1.内核程序只允许使用最多256字节的栈空间

2.不允许使用过多“循环”,最推荐的使用for循环的实践是,使用#unroll标识来告诉编译器将循环展开

3.严格的对于指针使用的静态分析

总结:内核程序必须尽量保持精简,对于不重要的逻辑尽量在用户空间处理

Maps

当我们将eBPF程序载入到内核之后,内核会返回一个文件描述符(fd),可以通过这个fd来访问eBPF Maps,实现了用户空间和内核空间共享一片内存来读写信息

定义了三个Map:metadata_map, packet_data_map, events

前两个Map的作用是:为程序提供两个缓冲区,前文说过,内核给eBPF提供的沙箱环境只允许其使用256字节的栈空间,我们不得不借助Map来给程序提供缓冲区来存放一些中间值

后面一个Map的类型是BPF_MAP_TYPE_PERF_EVENT_ARRAY,一个环形缓冲区,用于使用perf将内核事件传输到用户空间

sampler程序

逻辑并不难,从两个脚手架map获取缓冲区之后,向缓冲区逐步填充数据,然后使用perf向用户空间发送数据

注意,对于一个xdp context,bpf_perf_event_output将会执行两次,一次是发送数据包i的元数据(时间,协议,大小),另一次是采样的数据包头部

这么做主要是为了在第二次数据包采样之前,先得知这个数据包的存在,防止因为采样失败导致的数据包的丢失

用户空间

从上文不难看出,内核空间做的事情是抓取并传递,相对的,用户空间做的应该是拿取数据并且分析数据包

内核空间采取的是二次发送策略,特点是先先发送数据包元数据,再发送数据包采样数据

也就是说,对于一个数据包,用户空间同时得到两份数据,这里按照数据包大小来区分元数据和数据包采样数据

我们可以这样处理数据

for {
		select {
		case <-sig:
			fmt.Println("\nReceived signal, exiting...")
			return
		default:
			record, err := rd.Read()
			if err != nil {
				if err == perf.ErrClosed {
					return
				}
				log.Printf("Reading perf event: %v", err)
				continue
			}

			// 处理丢失的样本
			if record.LostSamples != 0 {
				log.Printf("Lost %d samples", record.LostSamples)
				continue
			}

			if debug {
				log.Printf("Received event of size %d bytes", len(record.RawSample))
				if len(record.RawSample) > 0 && len(record.RawSample) <= 32 {
					log.Printf("Event hex dump: %s", hex.EncodeToString(record.RawSample))
				}
			}

			// 基于大小的特征识别事件类型
			if len(record.RawSample) == metadataSize {
				// 这是元数据
				var meta PacketMetadata
				if err := binary.Read(bytes.NewReader(record.RawSample), binary.LittleEndian, &meta); err != nil {
					log.Printf("Error parsing metadata: %v", err)
					continue
				}

				metaCopy := meta // 创建一个副本

				// 存储元数据并获取事件ID
				eventID := eventBuffer.AddMetadata(&metaCopy)

				if debug {
					log.Printf("Stored metadata (ID=%d): size=%d captured=%d proto=0x%x",
						eventID, meta.PacketSize, meta.CapturedSize, meta.Protocol)
				}

				// 尝试获取下一个事件作为数据包内容
				dataRecord, err := rd.Read()
				if err != nil {
					if err != perf.ErrClosed {
						log.Printf("Error reading packet data: %v", err)
					}
					continue
				}

				if dataRecord.LostSamples != 0 {
					log.Printf("Lost samples between metadata and data")
					continue
				}

				// 处理数据包
				if len(dataRecord.RawSample) > 0 {
					// 打印数据包
					printer.PrintPacket(&metaCopy, dataRecord.RawSample)
				} else {
					log.Printf("Received empty packet data")
				}

			} else if len(record.RawSample) >= 14 {
				// 这似乎是直接的数据包内容,没有元数据
				// 创建一个合成的元数据并直接处理
				syntheticMeta := PacketMetadata{
					PacketSize:   uint32(len(record.RawSample)),
					CapturedSize: uint32(len(record.RawSample)),
					Protocol:     0, // 未知协议,由PrintPacket解析
					Timestamp:    uint64(time.Now().UnixNano()),
					Flags:        0,
				}

				if debug {
					log.Printf("Processing direct packet data: %d bytes", len(record.RawSample))
				}

				// 打印数据包
				printer.PrintPacket(&syntheticMeta, record.RawSample)

			} else if debug {
				// 未知格式,记录调试信息
				log.Printf("Ignoring unrecognized event format: %d bytes", len(record.RawSample))
			}
		}
	}

理想的情况是:数据包严格按照FIFO队列,每个数据包有序到达,可以先接受元数据再接受数据包采样数据,再将其一并处理

不能保证所有数据包都能正确被用户空间接收,所以对于不符合常理的数据包我们将其暂且存入内存

对于这些“孤儿数据”,这里创建一个goroutine来进行专门处理

go func() {
		for {
			select {
			case <-processTicker.C:
				// 尝试处理任何未匹配的事件
				for {
					// 检查是否有额外的元数据和数据可以匹配
					meta, metaID, hasMeta := eventBuffer.GetExtraMeta()
					data, dataID, hasData := eventBuffer.GetExtraData()

					if !hasMeta && !hasData {
						break // 没有未匹配的事件了
					}

					if hasMeta && hasData {
						if debug {
							log.Printf("Processing unmatched events: meta=%d, data=%d", metaID, dataID)
						}

						// 处理数据包
						printer.PrintPacket(meta, data)
					} else if hasData {
						// 有数据但没有元数据,创建合成元数据
						syntheticMeta := &PacketMetadata{
							PacketSize:   uint32(len(data)),
							CapturedSize: uint32(len(data)),
							Protocol:     0,
							Timestamp:    uint64(time.Now().UnixNano()),
							Flags:        0,
						}

						if debug {
							log.Printf("Processing unmatched data (%d bytes) with synthetic metadata", len(data))
						}

						printer.PrintPacket(syntheticMeta, data)
					} else {
						// 退出循环,等待下一轮检查
						break
					}
				}
			case <-sig:
				return
			}
		}
	}()

后话

1.这里采用了元数据和数据包采样分开发送的方式,但是现在认为意义并不大

2.metaCopy似乎没有必要存在

3.对于孤儿数据其实没有必要处理,存入内存也是一笔开销,可以直接无视

To do list

1.可以将数据包采样数据存入Redis,再懒写入硬盘做持久化,减少IO

2.在Redis这层可以在ip地址,端口,通讯协议这几个维度来分析流量态势,分析流量特征

3.在TC钩子层可以做数据包的自定义拦截(因为xdp层对于协议的解析非常繁琐,vmware加入的头部也会干扰对数据包的分析)

4.对于其他系统调用的恶意行为分析