基于eBPF-XDP的流量采样
基于eBPF/XDP的流量采样
Ref
https://juejin.cn/post/7156502840888786952
https://ebpf-go.dev/guides/getting-started/
https://github.com/ns1/xdp-workshop
https://eunomia.dev/zh/tutorials/7-execsnoop/
https://ebpf-go.dev/contributing/architecture/
基本系统信息
➜ WatchGod git:(main) neofetch
_,met$$$$$gg. root@debian
,g$$$$$$$$$$$$$$$P. -----------
,g$$P" """Y$$.". OS: Debian GNU/Linux 12 (bookworm) x86_64
,$$P' `$$$. Host: VMware Virtual Platform None
',$$P ,ggs. `$$b: Kernel: 6.1.0-29-amd64
`d$$' ,$P"' . $$$ Uptime: 8 hours, 51 mins
$$P d$' , $$P Packages: 1707 (dpkg)
$$: $$. - ,d$$' Shell: zsh 5.9
$$; Y$b._ _,d$P' Resolution: 2358x1238
Y$$. `.`"Y$$$$P"' Terminal: /dev/pts/1
`$$b "-.__ CPU: AMD Ryzen 9 7940H w/ Radeon 780M Graphics (16) @ 3.992GHz
`Y$$ GPU: 00:0f.0 VMware SVGA II Adapter
`Y$$. Memory: 1399MiB / 7362MiB
`$$b.
`Y$$b.
`"Y$b._
`"""
➜ WatchGod git:(main) go version
go version go1.24.0 linux/amd64
➜ WatchGod git:(main) clang -v
Debian clang version 14.0.6
Target: x86_64-pc-linux-gnu
Thread model: posix
InstalledDir: /usr/bin
Found candidate GCC installation: /usr/bin/../lib/gcc/x86_64-linux-gnu/12
Selected GCC installation: /usr/bin/../lib/gcc/x86_64-linux-gnu/12
Candidate multilib: .;@m64
Selected multilib: .;@m64
虚拟机镜像地址(建议IDM):
https://mirrors.aliyun.com/debian-cd/current/amd64/iso-dvd/debian-12.9.0-amd64-DVD-1.iso
需要安装的程序
apt install clang llvm libelf-dev linux-headers-$(uname -r) bpfcc-tools
apt install build-essential
apt install libbpf-dev
什么是eBPF/XDP
建议先阅读Ref部分,可以简单认为是内核中的Hook,你可以在数据包进入正式处理程序(产生sk_buff之前),就对其进行预处理
在数据包通过网络栈的初期就对其进行预处理好处多多:
1.极致的性能
2.更底层的控制
3.良好的可编程性
看前须知
尽量看一眼头上的参考文献,至少看一眼https://ebpf-go.dev/guides/getting-started/#ebpf-c-program
来得知如何使用cilium/ebpf来运行一个比较基本的XDP程序
内核空间
/* SPDX-License-Identifier: GPL-2.0 */
#include <linux/bpf.h>
#include <linux/types.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
// 常量定义
#define MAX_PACKET_SIZE 512
#define MAX_CPUS 16
#define ETH_HDR_SIZE 14 // 以太网头部固定大小
#define MIN_CAPTURE 34 // 至少捕获以太网+IPv4头部(14+20)
#define MAX_CAPTURE 54 // 最大尝试捕获到以太网+IPv4+TCP(14+20+20)
// 定义元数据结构
struct packet_metadata {
__u32 packet_size; // 原始数据包大小
__u32 captured_size; // 实际捕获的大小
__u32 protocol; // 协议类型
__u32 flags; // 标志位
__u64 timestamp; // 时间戳
} __attribute__((packed));
// 定义数据缓冲区结构
struct packet_buffer {
__u8 data[MAX_PACKET_SIZE];
};
// 定义maps
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct packet_metadata);
__uint(max_entries, 1);
} metadata_map SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct packet_buffer);
__uint(max_entries, 1);
} packet_data_map SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__type(key, int);
__type(value, int);
__uint(max_entries, MAX_CPUS);
} events SEC(".maps");
SEC("xdp")
int sampler(struct xdp_md *ctx)
{
void *data_end = (void *)(long)ctx->data_end;
void *data = (void *)(long)ctx->data;
// 确保至少有以太网头部
if (data + ETH_HDR_SIZE > data_end)
return XDP_PASS;
// 获取元数据缓冲区
__u32 zero = 0;
struct packet_metadata *meta = bpf_map_lookup_elem(&metadata_map, &zero);
if (!meta)
return XDP_PASS;
// 获取数据包缓冲区
struct packet_buffer *buffer = bpf_map_lookup_elem(&packet_data_map, &zero);
if (!buffer)
return XDP_PASS;
// 安全地计算数据包大小
__u64 packet_size = data_end - data;
// 解析以太网头部
struct ethhdr {
__u8 h_dest[6];
__u8 h_source[6];
__u16 h_proto;
} __attribute__((packed));
// 明确检查边界,确保我们可以安全访问以太网头部
struct ethhdr *eth = data;
if ((void*)(eth + 1) > data_end)
return XDP_PASS;
// 获取以太网协议类型
__u16 eth_proto = bpf_ntohs(eth->h_proto);
// 计算我们能捕获多少数据
__u32 capture_size = ETH_HDR_SIZE; // 至少捕获以太网头部
// 尝试捕获更多数据,但不超过MAX_CAPTURE
if (data + MIN_CAPTURE <= data_end) {
// 能捕获至少以太网+IPv4头部
capture_size = MIN_CAPTURE;
// 如果有更多数据,尝试捕获更多
if (data + MAX_CAPTURE <= data_end) {
capture_size = MAX_CAPTURE; // 能捕获以太网+IPv4+TCP头部
}
}
// 填充元数据
meta->packet_size = packet_size;
meta->captured_size = capture_size;
meta->protocol = eth_proto; // 默认为以太网协议类型
meta->timestamp = bpf_ktime_get_ns();
meta->flags = 0;
// 复制以太网头部
buffer->data[0] = eth->h_dest[0];
buffer->data[1] = eth->h_dest[1];
buffer->data[2] = eth->h_dest[2];
buffer->data[3] = eth->h_dest[3];
buffer->data[4] = eth->h_dest[4];
buffer->data[5] = eth->h_dest[5];
buffer->data[6] = eth->h_source[0];
buffer->data[7] = eth->h_source[1];
buffer->data[8] = eth->h_source[2];
buffer->data[9] = eth->h_source[3];
buffer->data[10] = eth->h_source[4];
buffer->data[11] = eth->h_source[5];
buffer->data[12] = (__u8)(eth_proto >> 8);
buffer->data[13] = (__u8)(eth_proto & 0xFF);
// 尝试复制更多数据 - 从以太网头部之后开始
if (data + ETH_HDR_SIZE < data_end) {
// 复制第15个字节
if (data + ETH_HDR_SIZE + 1 <= data_end)
buffer->data[14] = *(__u8 *)(data + ETH_HDR_SIZE);
// 复制第16个字节
if (data + ETH_HDR_SIZE + 2 <= data_end)
buffer->data[15] = *(__u8 *)(data + ETH_HDR_SIZE + 1);
// 复制第17-20个字节
if (data + ETH_HDR_SIZE + 6 <= data_end) {
buffer->data[16] = *(__u8 *)(data + ETH_HDR_SIZE + 2);
buffer->data[17] = *(__u8 *)(data + ETH_HDR_SIZE + 3);
buffer->data[18] = *(__u8 *)(data + ETH_HDR_SIZE + 4);
buffer->data[19] = *(__u8 *)(data + ETH_HDR_SIZE + 5);
}
// 继续复制更多数据...
// 这里使用静态展开而不是循环,以确保通过验证器
// 复制IPv4头部的关键字段(协议、源IP、目标IP)
if (eth_proto == 0x0800) {
// 协议字段(第10个字节)
if (data + ETH_HDR_SIZE + 10 <= data_end) {
buffer->data[ETH_HDR_SIZE + 9] = *(__u8 *)(data + ETH_HDR_SIZE + 9);
meta->protocol = *(__u8 *)(data + ETH_HDR_SIZE + 9); // IP协议
}
// 源IP地址(第13-16个字节)
if (data + ETH_HDR_SIZE + 16 <= data_end) {
buffer->data[ETH_HDR_SIZE + 12] = *(__u8 *)(data + ETH_HDR_SIZE + 12);
buffer->data[ETH_HDR_SIZE + 13] = *(__u8 *)(data + ETH_HDR_SIZE + 13);
buffer->data[ETH_HDR_SIZE + 14] = *(__u8 *)(data + ETH_HDR_SIZE + 14);
buffer->data[ETH_HDR_SIZE + 15] = *(__u8 *)(data + ETH_HDR_SIZE + 15);
}
// 目标IP地址(第17-20个字节)
if (data + ETH_HDR_SIZE + 20 <= data_end) {
buffer->data[ETH_HDR_SIZE + 16] = *(__u8 *)(data + ETH_HDR_SIZE + 16);
buffer->data[ETH_HDR_SIZE + 17] = *(__u8 *)(data + ETH_HDR_SIZE + 17);
buffer->data[ETH_HDR_SIZE + 18] = *(__u8 *)(data + ETH_HDR_SIZE + 18);
buffer->data[ETH_HDR_SIZE + 19] = *(__u8 *)(data + ETH_HDR_SIZE + 19);
}
// 继续复制更多数据,如果是TCP或UDP协议
if (data + ETH_HDR_SIZE + 10 <= data_end) {
__u8 ip_proto = *(__u8 *)(data + ETH_HDR_SIZE + 9);
if (ip_proto == 6 || ip_proto == 17) { // TCP或UDP
// 复制端口号
if (data + ETH_HDR_SIZE + 24 <= data_end) {
buffer->data[ETH_HDR_SIZE + 20] = *(__u8 *)(data + ETH_HDR_SIZE + 20);
buffer->data[ETH_HDR_SIZE + 21] = *(__u8 *)(data + ETH_HDR_SIZE + 21);
buffer->data[ETH_HDR_SIZE + 22] = *(__u8 *)(data + ETH_HDR_SIZE + 22);
buffer->data[ETH_HDR_SIZE + 23] = *(__u8 *)(data + ETH_HDR_SIZE + 23);
}
}
}
}
}
// 发送元数据
__u32 meta_size = sizeof(struct packet_metadata);
int ret = bpf_perf_event_output(ctx, &events,
BPF_F_CURRENT_CPU | ((__u64)meta_size << 32),
meta, meta_size);
if (ret < 0)
return XDP_PASS;
// 发送数据包内容
__u32 data_size = capture_size;
if (data_size > MAX_CAPTURE)
data_size = MAX_CAPTURE; // 确保不超过最大值
bpf_perf_event_output(ctx, &events,
BPF_F_CURRENT_CPU | ((__u64)data_size << 32),
buffer->data, data_size);
return XDP_PASS;
}
char LICENSE[] SEC("license") = "GPL";
eBPF程序的逆天之处
1.内核程序只允许使用最多256字节的栈空间
2.不允许使用过多“循环”,最推荐的使用for循环的实践是,使用#unroll标识来告诉编译器将循环展开
3.严格的对于指针使用的静态分析
总结:内核程序必须尽量保持精简,对于不重要的逻辑尽量在用户空间处理
Maps
当我们将eBPF程序载入到内核之后,内核会返回一个文件描述符(fd),可以通过这个fd来访问eBPF Maps,实现了用户空间和内核空间共享一片内存来读写信息
定义了三个Map:metadata_map, packet_data_map, events
前两个Map的作用是:为程序提供两个缓冲区,前文说过,内核给eBPF提供的沙箱环境只允许其使用256字节的栈空间,我们不得不借助Map来给程序提供缓冲区来存放一些中间值
后面一个Map的类型是BPF_MAP_TYPE_PERF_EVENT_ARRAY,一个环形缓冲区,用于使用perf将内核事件传输到用户空间
sampler程序
逻辑并不难,从两个脚手架map获取缓冲区之后,向缓冲区逐步填充数据,然后使用perf向用户空间发送数据
注意,对于一个xdp context,bpf_perf_event_output将会执行两次,一次是发送数据包i的元数据(时间,协议,大小),另一次是采样的数据包头部
这么做主要是为了在第二次数据包采样之前,先得知这个数据包的存在,防止因为采样失败导致的数据包的丢失
用户空间
从上文不难看出,内核空间做的事情是抓取并传递,相对的,用户空间做的应该是拿取数据并且分析数据包
内核空间采取的是二次发送策略,特点是先先发送数据包元数据,再发送数据包采样数据
也就是说,对于一个数据包,用户空间同时得到两份数据,这里按照数据包大小来区分元数据和数据包采样数据
我们可以这样处理数据
for {
select {
case <-sig:
fmt.Println("\nReceived signal, exiting...")
return
default:
record, err := rd.Read()
if err != nil {
if err == perf.ErrClosed {
return
}
log.Printf("Reading perf event: %v", err)
continue
}
// 处理丢失的样本
if record.LostSamples != 0 {
log.Printf("Lost %d samples", record.LostSamples)
continue
}
if debug {
log.Printf("Received event of size %d bytes", len(record.RawSample))
if len(record.RawSample) > 0 && len(record.RawSample) <= 32 {
log.Printf("Event hex dump: %s", hex.EncodeToString(record.RawSample))
}
}
// 基于大小的特征识别事件类型
if len(record.RawSample) == metadataSize {
// 这是元数据
var meta PacketMetadata
if err := binary.Read(bytes.NewReader(record.RawSample), binary.LittleEndian, &meta); err != nil {
log.Printf("Error parsing metadata: %v", err)
continue
}
metaCopy := meta // 创建一个副本
// 存储元数据并获取事件ID
eventID := eventBuffer.AddMetadata(&metaCopy)
if debug {
log.Printf("Stored metadata (ID=%d): size=%d captured=%d proto=0x%x",
eventID, meta.PacketSize, meta.CapturedSize, meta.Protocol)
}
// 尝试获取下一个事件作为数据包内容
dataRecord, err := rd.Read()
if err != nil {
if err != perf.ErrClosed {
log.Printf("Error reading packet data: %v", err)
}
continue
}
if dataRecord.LostSamples != 0 {
log.Printf("Lost samples between metadata and data")
continue
}
// 处理数据包
if len(dataRecord.RawSample) > 0 {
// 打印数据包
printer.PrintPacket(&metaCopy, dataRecord.RawSample)
} else {
log.Printf("Received empty packet data")
}
} else if len(record.RawSample) >= 14 {
// 这似乎是直接的数据包内容,没有元数据
// 创建一个合成的元数据并直接处理
syntheticMeta := PacketMetadata{
PacketSize: uint32(len(record.RawSample)),
CapturedSize: uint32(len(record.RawSample)),
Protocol: 0, // 未知协议,由PrintPacket解析
Timestamp: uint64(time.Now().UnixNano()),
Flags: 0,
}
if debug {
log.Printf("Processing direct packet data: %d bytes", len(record.RawSample))
}
// 打印数据包
printer.PrintPacket(&syntheticMeta, record.RawSample)
} else if debug {
// 未知格式,记录调试信息
log.Printf("Ignoring unrecognized event format: %d bytes", len(record.RawSample))
}
}
}
理想的情况是:数据包严格按照FIFO队列,每个数据包有序到达,可以先接受元数据再接受数据包采样数据,再将其一并处理
不能保证所有数据包都能正确被用户空间接收,所以对于不符合常理的数据包我们将其暂且存入内存
对于这些“孤儿数据”,这里创建一个goroutine来进行专门处理
go func() {
for {
select {
case <-processTicker.C:
// 尝试处理任何未匹配的事件
for {
// 检查是否有额外的元数据和数据可以匹配
meta, metaID, hasMeta := eventBuffer.GetExtraMeta()
data, dataID, hasData := eventBuffer.GetExtraData()
if !hasMeta && !hasData {
break // 没有未匹配的事件了
}
if hasMeta && hasData {
if debug {
log.Printf("Processing unmatched events: meta=%d, data=%d", metaID, dataID)
}
// 处理数据包
printer.PrintPacket(meta, data)
} else if hasData {
// 有数据但没有元数据,创建合成元数据
syntheticMeta := &PacketMetadata{
PacketSize: uint32(len(data)),
CapturedSize: uint32(len(data)),
Protocol: 0,
Timestamp: uint64(time.Now().UnixNano()),
Flags: 0,
}
if debug {
log.Printf("Processing unmatched data (%d bytes) with synthetic metadata", len(data))
}
printer.PrintPacket(syntheticMeta, data)
} else {
// 退出循环,等待下一轮检查
break
}
}
case <-sig:
return
}
}
}()
后话
1.这里采用了元数据和数据包采样分开发送的方式,但是现在认为意义并不大
2.metaCopy似乎没有必要存在
3.对于孤儿数据其实没有必要处理,存入内存也是一笔开销,可以直接无视
To do list
1.可以将数据包采样数据存入Redis,再懒写入硬盘做持久化,减少IO
2.在Redis这层可以在ip地址,端口,通讯协议这几个维度来分析流量态势,分析流量特征
3.在TC钩子层可以做数据包的自定义拦截(因为xdp层对于协议的解析非常繁琐,vmware加入的头部也会干扰对数据包的分析)
4.对于其他系统调用的恶意行为分析