base commit

This commit is contained in:
Guillermo Roche 2025-05-26 20:45:07 +02:00
commit 1394b5d76c
30 changed files with 3651 additions and 0 deletions

9
.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
### https://raw.github.com/github/gitignore/master/Rust.gitignore
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# These are backup files generated by rustfmt
**/*.rs.bk

2142
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

3
Cargo.toml Normal file
View File

@ -0,0 +1,3 @@
[workspace]
resolver = "2"
members = ["xtask", "net-logger", "net-logger-common"]

41
README.md Normal file
View File

@ -0,0 +1,41 @@
# net2elk
## network tool for generate logs in elasticsearch as tcpdump
Project developed exclusively for educational purposes, use at your own risk
To configure the elasticsearch acces edit the file _net-logger/src/elk/elasticsearch.rs_
## Prerequisites
1. Install bpf-linker: `cargo install bpf-linker`
## Build eBPF
```bash
cargo xtask build-ebpf
```
To perform a release build you can use the `--release` flag.
You may also change the target architecture with the `--target` flag.
## Build Userspace
```bash
cargo build
```
## Build eBPF and Userspace
```bash
cargo xtask build
```
## Run
```bash
RUST_LOG=info cargo xtask run
```
## elasticsearch
To configure the elasticsearch acces edit the file _net-logger/src/elk/elasticsearch.rs_

View File

@ -0,0 +1,15 @@
[package]
name = "net-logger-common"
version = "0.1.0"
edition = "2021"
[features]
default = []
user = ["aya"]
[dependencies]
aya = { version = "0.12", optional = true }
network-types = "0.0.6"
[lib]
path = "src/lib.rs"

View File

@ -0,0 +1,18 @@
#![no_std]
use network_types::ip::{in6_addr, IpProto};
use network_types::eth::EtherType;
use core::net::Ipv6Addr;
#[derive(Clone, Copy)]
#[repr(C)]
pub struct Event {
pub ipv: EtherType,
pub source_port: u16,
pub source_ipv4: u32,
pub source_ipv6: Ipv6Addr,
pub dest_port: u16,
pub dest_ipv4: u32,
pub dest_ipv6: Ipv6Addr,
pub len: u16,
pub proto: IpProto,
}

View File

@ -0,0 +1,6 @@
[build]
target-dir = "../target"
target = "bpfel-unknown-none"
[unstable]
build-std = ["core"]

View File

@ -0,0 +1,2 @@
[editor]
workspace-lsp-roots = []

View File

@ -0,0 +1,4 @@
{
"rust-analyzer.cargo.target": "bpfel-unknown-none",
"rust-analyzer.checkOnSave.allTargets": false
}

4
net-logger-ebpf/.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,4 @@
{
"rust-analyzer.cargo.target": "bpfel-unknown-none",
"rust-analyzer.checkOnSave.allTargets": false
}

197
net-logger-ebpf/Cargo.lock generated Normal file
View File

@ -0,0 +1,197 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "aya-ebpf"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7318de0c49a17873182763831cb22f74fb30d04e2eb7e6d7b7e9b7d86d70ed3"
dependencies = [
"aya-ebpf-bindings",
"aya-ebpf-cty",
"aya-ebpf-macros",
"rustversion",
]
[[package]]
name = "aya-ebpf-bindings"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8536b7e39b232ecd854e587f473ba15640c09afc3e08408fc28144a7404ae75"
dependencies = [
"aya-ebpf-cty",
]
[[package]]
name = "aya-ebpf-cty"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5c130d898322b9698937465b3b749095dae85dba0da4ee648235947eb95738d"
[[package]]
name = "aya-ebpf-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce7820cc83547582284a140ffbdd46ab527d7ee2d9d0cfedf3f184fad3f8e15c"
dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "aya-log-common"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6d38a351ee2d5dc24e04cac6184b1b39408642d9a8b585892c99146f8dd4edb"
dependencies = [
"num_enum",
]
[[package]]
name = "aya-log-ebpf"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a10bbadd0829895a91eb1cd2bb02d7af145704087f03812bed60cb9fe65dbb3"
dependencies = [
"aya-ebpf",
"aya-log-common",
"aya-log-ebpf-macros",
]
[[package]]
name = "aya-log-ebpf-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6d8251a75f56077db51892041aa6b77c70ef2723845d7a210979700b2f01bc4"
dependencies = [
"aya-log-common",
"aya-log-parser",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "aya-log-parser"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14b102eb5c88c9aa0b49102d3fbcee08ecb0dfa81014f39b373311de7a7032cb"
dependencies = [
"aya-log-common",
]
[[package]]
name = "net-logger-common"
version = "0.1.0"
dependencies = [
"network-types",
]
[[package]]
name = "net-logger-ebpf"
version = "0.1.0"
dependencies = [
"aya-ebpf",
"aya-log-ebpf",
"net-logger-common",
"network-types",
]
[[package]]
name = "network-types"
version = "0.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41b13eba62f530cd2ea031938ac4472b9b649694baa1e587c2a2fadc07844d3c"
[[package]]
name = "num_enum"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro2"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rustversion"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6"
[[package]]
name = "syn"
version = "2.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"

View File

@ -0,0 +1,34 @@
cargo-features = ["profile-rustflags"]
[package]
name = "net-logger-ebpf"
version = "0.1.0"
edition = "2021"
[dependencies]
aya-ebpf = "0.1.0"
aya-log-ebpf = "0.1.0"
net-logger-common = { path = "../net-logger-common" }
network-types = "0.0.6"
[[bin]]
name = "net-logger"
path = "src/main.rs"
[profile.dev]
opt-level = 3
debug = false
debug-assertions = false
overflow-checks = false
lto = true
panic = "abort"
incremental = false
codegen-units = 1
rpath = false
[profile.release]
lto = true
panic = "abort"
codegen-units = 1
[workspace]
members = []

View File

@ -0,0 +1,13 @@
[toolchain]
channel = "nightly"
# The source code of rustc, provided by the rust-src component, is needed for
# building eBPF programs.
components = [
"cargo",
"clippy",
"rust-docs",
"rust-src",
"rust-std",
"rustc",
"rustfmt",
]

View File

@ -0,0 +1,23 @@
#![no_std]
#![no_main]
use aya_ebpf::{bindings::xdp_action, macros::xdp, programs::XdpContext};
use aya_log_ebpf::info;
pub mod stract_data;
#[xdp]
pub fn net_logger(ctx: XdpContext) -> u32 {
match stract_data::trafic_router(ctx) {
Ok(ret) => ret,
Err(_) => xdp_action::XDP_ABORTED,
}
}
fn try_net_logger(ctx: XdpContext) -> Result<u32, u32> {
info!(&ctx, "received a packet");
Ok(xdp_action::XDP_PASS)
}
#[panic_handler]
fn panic(_info: &core::panic::PanicInfo) -> ! {
unsafe { core::hint::unreachable_unchecked() }
}

View File

@ -0,0 +1,113 @@
use aya_ebpf::{
bindings::xdp_action,
macros::map,
programs::XdpContext,
maps::PerfEventArray,
maps::PerCpuArray,
};
use aya_log_ebpf::info;
use net_logger_common::Event;
use core::mem;
use network_types::{
eth::{EthHdr, EtherType},
ip::{IpProto, Ipv4Hdr, Ipv6Hdr, in6_addr},
tcp::TcpHdr,
udp::UdpHdr,
icmp::IcmpHdr,
};
#[map]
pub static mut SCRATCH: PerCpuArray<Event> = PerCpuArray::with_max_entries(1, 0); // per-cpu
#[map]
pub static mut EVENTS: PerfEventArray<Event> = PerfEventArray::with_max_entries(0, 0);
#[inline(always)] //
fn ptr_at<T>(ctx: &XdpContext, offset: usize) -> Result<*const T, ()> {
let start = ctx.data();
let end = ctx.data_end();
let len = mem::size_of::<T>();
if start + offset + len > end {
return Err(());
}
Ok((start + offset) as *const T)
}
pub fn trafic_router(ctx: XdpContext) -> Result<u32, ()> {
let ethhdr: *const EthHdr = ptr_at(&ctx, 0)?; //
match unsafe { (*ethhdr).ether_type } {
EtherType::Ipv4 => process_v4(ctx),
EtherType::Ipv6 => process_v6(ctx),
_ => Ok(xdp_action::XDP_PASS),
}
}
pub fn get_ports(ctx: &XdpContext, protocol: IpProto, ip_hdr_len: usize) -> Result<(u16, u16),()> {
let source_port;
let dest_port;
match protocol {
IpProto::Tcp => {
let tcphdr: *const TcpHdr =
ptr_at(ctx, EthHdr::LEN + ip_hdr_len)?;
source_port = u16::from_be(unsafe { (*tcphdr).source });
dest_port = u16::from_be(unsafe { (*tcphdr).dest });
}
IpProto::Udp => {
let udphdr: *const UdpHdr =
ptr_at(ctx, EthHdr::LEN + ip_hdr_len)?;
source_port = u16::from_be(unsafe { (*udphdr).source });
dest_port = u16::from_be(unsafe { (*udphdr).dest });
}
_ => {
source_port = 0;
dest_port = 0;
},
};
Ok((source_port, dest_port))
}
pub fn process_v6(ctx: XdpContext) -> Result<u32, ()> {
let ipv6hdr: *const Ipv6Hdr = ptr_at(&ctx, EthHdr::LEN)?;
let source_addr = unsafe { (*ipv6hdr).src_addr() };
let dest_addr = unsafe { (*ipv6hdr).dst_addr() };
let protocol = unsafe { (*ipv6hdr).next_hdr };
let pack_len = unsafe { (*ipv6hdr).payload_len };
let ports = get_ports(&ctx, protocol, Ipv6Hdr::LEN)?;
let mut event = unsafe { *SCRATCH.get_ptr_mut(0).ok_or(())? };
event.ipv = EtherType::Ipv6;
event.source_port = ports.0;
event.source_ipv6 = source_addr;
event.dest_port = ports.1;
event.dest_ipv6 = dest_addr;
event.proto = protocol;
event.len = pack_len;
unsafe { EVENTS.output(&ctx, &mut event, 0); }
Ok(xdp_action::XDP_PASS)
}
pub fn process_v4(ctx: XdpContext) -> Result<u32, ()> {
let ipv4hdr: *const Ipv4Hdr = ptr_at(&ctx, EthHdr::LEN)?;
let source_addr = u32::from_be(unsafe { (*ipv4hdr).src_addr });
let dest_addr = u32::from_be(unsafe { (*ipv4hdr).dst_addr });
let protocol = unsafe { (*ipv4hdr).proto };
let pack_len = unsafe { (*ipv4hdr).tot_len };
let ports = get_ports(&ctx, protocol, Ipv4Hdr::LEN)?;
let mut event = unsafe { *SCRATCH.get_ptr_mut(0).ok_or(())? };
event.ipv = EtherType::Ipv4;
event.source_port = ports.0;
event.source_ipv4 = source_addr;
event.dest_port = ports.1;
event.dest_ipv4 = dest_addr;
event.proto = protocol;
event.len = pack_len;
unsafe { EVENTS.output(&ctx, &mut event, 0); }
Ok(xdp_action::XDP_PASS)
}

32
net-logger/Cargo.toml Normal file
View File

@ -0,0 +1,32 @@
[package]
name = "net-logger"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
aya = "0.12"
aya-log = "0.2"
clap = { version = "4.1", features = ["derive"] }
net-logger-common = { path = "../net-logger-common", features = ["user"] }
anyhow = "1"
env_logger = "0.10"
libc = "0.2"
log = "0.4"
tokio = { version = "1.25", features = [
"macros",
"rt",
"rt-multi-thread",
"net",
"signal",
] }
bytes = "*"
network-types = "0.0.6"
elasticsearch = "8.17.0-alpha.1"
serde_json = "*"
serde = "*"
chrono = "*"
[[bin]]
name = "net-logger"
path = "src/main.rs"

View File

@ -0,0 +1,252 @@
use std::{
collections::HashMap,
net::{Ipv4Addr, Ipv6Addr},
time::SystemTime,
};
use crate::utils::ip::print_proto;
use network_types::ip::{IpProto, Ipv4Hdr};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
#[derive(Serialize, Deserialize)]
pub struct ConectionCoreAtom {
pub len: u128,
pub ip4_list: Vec<IpAtom<Ipv4Addr>>,
pub ip6_list: Vec<IpAtom<Ipv6Addr>>,
}
#[derive(Serialize, Deserialize)]
pub struct IpAtom<IpType> {
len: u128,
ip_local: IpType,
port_local: u16,
ip_remote: IpType,
port_remote: u16,
//pub proto_len: HashMap<u8, u128>
pub proto_len: Vec<(u8, u128)>,
}
impl ConectionCoreAtom {
pub fn new() -> Self {
ConectionCoreAtom {
len: 0,
ip4_list: Vec::new(),
ip6_list: Vec::new(),
}
}
pub fn addv4(
&mut self,
source_ip: u32,
dest_ip: u32,
source_port: u16,
dest_port: u16,
protocol: IpProto,
size: u16,
) {
self.len += size as u128;
let source_ip_format = Ipv4Addr::from_bits(source_ip);
let dest_ip_format = Ipv4Addr::from_bits(dest_ip);
let local_ip;
let remote_ip;
let local_port;
let remote_port;
if source_ip_format.is_private() {
local_ip = source_ip_format;
remote_ip = dest_ip_format;
local_port = source_port;
remote_port = dest_port;
} else {
local_ip = dest_ip_format;
remote_ip = source_ip_format;
local_port = dest_port;
remote_port = dest_port;
}
match self.check_packagev4(local_ip, remote_ip, local_port, remote_port) {
Some(ip_atom) => {
ip_atom.add(protocol, size as u128);
}
None => {
let mut ip_atom = IpAtom::new_ip(local_ip, remote_ip, local_port, remote_port);
ip_atom.add(protocol, size as u128);
self.ip4_list.push(ip_atom);
}
}
}
pub fn addv6(
&mut self,
source_ip: Ipv6Addr,
dest_ip: Ipv6Addr,
source_port: u16,
dest_port: u16,
protocol: IpProto,
size: u16,
) {
self.len += size as u128;
let local_ip;
let remote_ip;
let local_port;
let remote_port;
if source_ip.is_unique_local() {
local_ip = source_ip;
local_port = source_port;
remote_ip = dest_ip;
remote_port = dest_port;
} else {
local_ip = dest_ip;
local_port = dest_port;
remote_ip = source_ip;
remote_port = source_port;
}
match self.check_packagev6(local_ip, remote_ip, local_port, remote_port) {
Some(ip_atom) => {
ip_atom.add(protocol, size as u128);
}
None => {
let mut ip_atom = IpAtom::new_ip(local_ip, remote_ip, local_port, remote_port);
ip_atom.add(protocol, size as u128);
self.ip6_list.push(ip_atom);
}
}
}
fn check_packagev4(
&mut self,
local_ip: Ipv4Addr,
remote_ip: Ipv4Addr,
local_port: u16,
remote_port: u16,
) -> Option<&mut IpAtom<Ipv4Addr>> {
let mut index = 0;
loop {
match self.ip4_list.get(index) {
Some(ip) => {
if ip.eq_splited(local_ip, remote_ip, local_port, remote_port) {
return self.ip4_list.get_mut(index);
}
}
None => {
return None;
}
}
index += 1;
}
}
fn check_packagev6(
&mut self,
local_ip: Ipv6Addr,
remote_ip: Ipv6Addr,
local_port: u16,
remote_port: u16,
) -> Option<&mut IpAtom<Ipv6Addr>> {
let index = 0;
loop {
match self.ip6_list.get(index) {
Some(ip) => {
if ip.eq_splited(local_ip, remote_ip, local_port, remote_port) {
return self.ip6_list.get_mut(index);
}
}
None => return None,
}
}
}
pub fn generate_json(&self, timestamp: String) -> Value {
json!({
"@timestamp" : timestamp,
"ips_v4" : self.ip4_list,
"ips_v6" : self.ip6_list,
})
}
pub fn reset(&mut self) -> Self {
let old_len = self.len;
self.len = 0;
Self {
len: old_len,
ip4_list: self.ip4_list.drain(..).collect(),
ip6_list: self.ip6_list.drain(..).collect(),
}
}
/*fn create_ipv4(&mut self, source_ip: u32, dest_ip: u32) {
let source_ip_format = Ipv4Addr::from_bits(source_ip);
let dest_ip_format = Ipv4Addr::from_bits(dest_ip);
self.ip4_list.push(if source_ip_format.is_private() {
IpAtom::new_ip(source_ip_format, dest_ip_format)
} else {
IpAtom::new_ip(dest_ip_format, source_ip_format)
})
}
fn create_ipv6(&mut self, source_ip: Ipv6Addr, dest_ip: Ipv6Addr) {
self.ip6_list.push(if source_ip.is_unique_local() {
IpAtom::new_ip(source_ip, dest_ip)
} else {
IpAtom::new_ip(dest_ip, source_ip)
});
}*/
}
impl<IpType: Eq> IpAtom<IpType> {
pub fn new_ip(local_ip: IpType, remote_ip: IpType, local_port: u16, remote_port: u16) -> Self {
IpAtom {
len: 0,
ip_local: local_ip,
port_local: local_port,
ip_remote: remote_ip,
port_remote: remote_port,
proto_len: Vec::new(),
}
}
//hashmap can be less efficient
/*pub fn add(&mut self,protocol: IpProto, size: u128) {
self.len+=size;
match self.proto_len.get_mut(&(protocol as u8)) {
Some(l) => {
*l+=size;
},
None => {
self.proto_len.insert(protocol as u8,size);
},
};
}*/
pub fn add(&mut self, protocol: IpProto, size: u128) {
self.len += size;
let mut index = 0;
loop {
match self.proto_len.get_mut(index) {
Some(proto) => {
if proto.0 == protocol as u8 {
proto.1 += size;
}
}
None => {
self.proto_len.push((protocol as u8, size));
break;
}
}
index += 1;
}
}
pub fn eq_splited(
&self,
local_ip: IpType,
remote_ip: IpType,
local_port: u16,
remote_port: u16,
) -> bool {
((self.ip_local == local_ip && self.port_local == local_port)
&& (self.ip_remote == remote_ip && self.port_remote == remote_port))
|| ((self.ip_local == remote_ip && self.port_local == remote_port)
&& (self.ip_remote == local_ip && self.port_remote == local_port))
}
}

View File

@ -0,0 +1,50 @@
use std::net::{Ipv4Addr, Ipv6Addr};
pub struct Ip4Wrapper {
raw: Ipv4Addr,
}
pub struct Ip6Wrapper {
raw: Ipv6Addr,
}
pub trait IpWrapper<IpType> {
fn is_private(&self) -> bool;
fn get_raw(&self) -> IpType;
}
impl Ip4Wrapper {
pub fn new(ip: u32) -> Self {
Self {
raw: Ipv4Addr::from_bits(ip),
}
}
}
impl Ip6Wrapper {
pub fn new(ip: Ipv6Addr) -> Self {
Self {
raw: ip,
}
}
}
impl IpWrapper<Ipv4Addr> for Ip4Wrapper {
fn is_private(&self) -> bool {
self.raw.is_private()
}
fn get_raw(&self) -> Ipv4Addr {
self.raw
}
}
impl IpWrapper<Ipv6Addr> for Ip6Wrapper {
fn is_private(&self) -> bool {
self.raw.is_unique_local()
}
fn get_raw(&self) -> Ipv6Addr {
self.raw
}
}

View File

@ -0,0 +1,3 @@
pub mod ip_group;
mod ip_wrapper;
pub mod store;

View File

@ -0,0 +1,96 @@
use network_types::ip::IpProto;
use crate::elk::elasticsearch::ElasticConection;
use super::ip_group::ConectionCoreAtom;
use std::{
future::IntoFuture,
net::Ipv6Addr,
time::{Duration, SystemTime, UNIX_EPOCH},
};
pub struct net_stats_storage {
connections_store: ConectionCoreAtom,
elastic_connection: ElasticConection,
old_connections: Vec<ConectionCoreAtom>,
last_insert: SystemTime,
}
impl net_stats_storage {
pub fn new() -> Self {
Self {
connections_store: ConectionCoreAtom::new(),
elastic_connection: ElasticConection::new().unwrap(),
old_connections: Vec::new(),
last_insert: SystemTime::now(),
}
}
pub async fn addv4(
&mut self,
source_ip: u32,
dest_ip: u32,
source_port: u16,
dest_port: u16,
protocol: IpProto,
size: u16,
) {
self.store_or_not().await;
self.connections_store
.addv4(source_ip, dest_ip, source_port, dest_port, protocol, size);
}
pub async fn addv6(
&mut self,
source_ip: Ipv6Addr,
dest_ip: Ipv6Addr,
source_port: u16,
dest_port: u16,
protocol: IpProto,
size: u16,
) {
self.store_or_not().await;
self.connections_store
.addv6(source_ip, dest_ip, source_port, dest_port, protocol, size);
}
pub async fn store_or_not(&mut self) {
if self
.last_insert
.duration_since(SystemTime::now())
.unwrap_or(Duration::from_secs(2))
.as_secs()
> 2
|| self.connections_store.len > 200000
{
self.store_in_elastic().await;
}
}
pub async fn store_in_elastic(&mut self) {
self.old_connections.push(self.connections_store.reset());
self.connections_store = ConectionCoreAtom::new();
println!("entra:{}", self.old_connections.len());
for con in &self.old_connections {
println!(
"datos:{}",
chrono::offset::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
);
match self
.elastic_connection
.send(con.generate_json(
chrono::offset::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
))
.await
{
Ok(_) => continue,
Err(_) => {
println!("No va");
break;
}
}
}
}
}

View File

@ -0,0 +1,71 @@
use elasticsearch::auth::Credentials;
use elasticsearch::http::transport::SingleNodeConnectionPool;
use elasticsearch::http::transport::TransportBuilder;
use elasticsearch::http::Url;
use elasticsearch::Elasticsearch;
use elasticsearch::IndexParts;
use serde_json::Value;
use std::fmt;
pub struct ElasticConection {
conection: Elasticsearch,
}
pub struct ElasticConErr {
content: String,
}
impl ElasticConection {
pub fn new() -> Result<Self, Box<dyn std::error::Error>> {
let url = Url::parse("http://elastic_ip:9200")?;
let credentials = Credentials::Basic("elastic".into(), "password".into());
let connection_pool = SingleNodeConnectionPool::new(url);
let transport = TransportBuilder::new(connection_pool)
.auth(credentials)
.build()?;
Ok(Self {
conection: Elasticsearch::new(transport),
})
}
pub async fn send(&self, data: Value) -> Result<(), ElasticConErr> {
println!("aquí tambi'en entra");
let raw_response = self
.conection
.index(IndexParts::Index("netlogger-0.1"))
.body(data)
.send()
.await;
let response = match raw_response {
Ok(r) => r,
Err(e) => {
return Err(ElasticConErr {
content: e.to_string(),
})
}
};
let status_code = response.status_code();
match status_code.clone().is_success() {
true => Ok(()),
false => {
let err_ret = match response.text().await {
Ok(ret) => ret,
Err(_e) => status_code.as_str().to_string(),
};
Err(ElasticConErr { content: err_ret })
}
}
}
}
impl fmt::Display for ElasticConErr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.content)
}
}
impl fmt::Debug for ElasticConErr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.content)
}
}

View File

@ -0,0 +1 @@
pub mod elasticsearch;

159
net-logger/src/main.rs Normal file
View File

@ -0,0 +1,159 @@
use anyhow::Context;
use aya::maps::AsyncPerfEventArray;
use aya::programs::{Xdp, XdpFlags};
use aya::util::online_cpus;
use aya::{include_bytes_aligned, Bpf};
use aya_log::BpfLogger;
use aya_log::Formatter;
use bytes::BytesMut;
use clap::Parser;
use log::{debug, info, warn};
use net_logger_common::Event;
use network_types::eth::EtherType;
use tokio::signal;
mod clasificator;
mod elk;
mod utils;
#[derive(Debug, Parser)]
struct Opt {
#[clap(short, long, default_value = "lo")]
iface: String,
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
env_logger::init();
// Bump the memlock rlimit. This is needed for older kernels that don't use the
// new memcg based accounting, see https://lwn.net/Articles/837122/
let rlim = libc::rlimit {
rlim_cur: libc::RLIM_INFINITY,
rlim_max: libc::RLIM_INFINITY,
};
let ret = unsafe { libc::setrlimit(libc::RLIMIT_MEMLOCK, &rlim) };
if ret != 0 {
debug!("remove limit on locked memory failed, ret is: {}", ret);
}
if let Err(e) = load_bpf().await {
eprintln!("error: {:#}", e);
}
Ok(())
}
async fn load_bpf() -> Result<(), aya::BpfError> {
// This will include your eBPF object file as raw bytes at compile-time and load it at
// runtime. This approach is recommended for most real-world use cases. If you would
// like to specify the eBPF program at runtime rather than at compile-time, you can
// reach for `Bpf::load_file` instead.
#[cfg(debug_assertions)]
let mut bpf = Bpf::load(include_bytes_aligned!(
"../../target/bpfel-unknown-none/debug/net-logger"
))?;
#[cfg(not(debug_assertions))]
let mut bpf = Bpf::load(include_bytes_aligned!(
"../../target/bpfel-unknown-none/release/net-logger"
))?;
if let Err(e) = BpfLogger::init(&mut bpf) {
// This can happen if you remove all log statements from your eBPF program.
warn!("failed to initialize eBPF logger: {}", e);
}
proces_events_xdp(bpf).await
}
async fn proces_events_xdp(mut bpf: Bpf) -> Result<(), aya::BpfError> {
let opt = Opt::parse();
// This will include your eBPF object file as raw bytes at compile-time and load it at
// runtime. This approach is recommended for most real-world use cases. If you would
// like to specify the eBPF program at runtime rather than at compile-time, you can
// reach for `Bpf::load_file` instead.
//let program : &mut SockOps = bpf.program_mut("get_package_data").unwrap().try_into().unwrap();
//program.load().unwrap();
let program: &mut Xdp = bpf.program_mut("net_logger").unwrap().try_into().unwrap();
program.load().unwrap();
program.attach(&opt.iface, XdpFlags::default())
.context("failed to attach the XDP program with default flags - try changing XdpFlags::default() to XdpFlags::SKB_MODE").unwrap();
let cpus = online_cpus().unwrap();
let num_cpus = cpus.len();
//let events_raw = bpf.map_mut("EVENTS");
let mut events: AsyncPerfEventArray<_> = bpf
.take_map("EVENTS")
.context("failed to map QUERY_RING")
.unwrap()
.try_into()
.unwrap();
for cpu in cpus {
let mut buf = events.open(cpu, None).unwrap();
tokio::task::spawn(async move {
let mut buffers = (0..num_cpus)
.map(|_| BytesMut::with_capacity(10240))
.collect::<Vec<_>>();
let mut con = clasificator::store::net_stats_storage::new();
loop {
let events = buf.read_events(&mut buffers).await.unwrap();
for i in 0..events.read {
// get timestamp
//let now = Local::now();
// read the event
let buf = &mut buffers[i];
let ptr = buf.as_ptr() as *const Event;
let data = unsafe { ptr.read_unaligned() };
// parse out the data
match data.ipv {
EtherType::Ipv4 => {
println!("source ip:{}, source port:{}, dest ip:{}, dest port:{}, proto:{}, size: {}",
aya_log::Ipv4Formatter::format(data.source_ipv4),
data.source_port,
aya_log::Ipv4Formatter::format(data.dest_ipv4),
data.dest_port,
utils::ip::print_proto(data.proto),
data.len);
con.addv4(
data.source_ipv4,
data.dest_ipv4,
data.source_port,
data.dest_port,
data.proto,
data.len,
)
.await;
//println!("package size:{}, ips:{}", con.len, con.ip4_list.get(con.ip4_list.len()-1).unwrap().proto_len.get(0).unwrap_or(&(0,0)).1);
}
EtherType::Ipv6 => {
println!("source ip:{}, source port:{}, dest ip:{}, dest port:{}, proto:{}, size: {}",
aya_log::Ipv6Formatter::format(data.source_ipv6),
data.source_port,
aya_log::Ipv6Formatter::format(data.dest_ipv6),
data.dest_port,
utils::ip::print_proto(data.proto),
data.len);
con.addv6(
data.source_ipv6,
data.dest_ipv6,
data.source_port,
data.dest_port,
data.proto,
data.len,
)
.await;
}
_ => println!("result not coverd"),
}
}
}
});
}
info!("Waiting for Ctrl-C...");
signal::ctrl_c().await.unwrap();
info!("Exiting...");
Ok(())
}

154
net-logger/src/utils/ip.rs Normal file
View File

@ -0,0 +1,154 @@
use network_types::ip::IpProto;
pub fn print_proto(proto: IpProto) -> String {
String::from(match proto {
IpProto::HopOpt => "HopOpt",
IpProto::Icmp => "Icmp",
IpProto::Igmp => "Igmp",
IpProto::Ggp => "Ggp",
IpProto::Ipv4 => "Ipv4",
IpProto::Stream => "Stream",
IpProto::Tcp => "Tcp",
IpProto::Cbt => "Cbt",
IpProto::Egp => "Egp",
IpProto::Igp => "Igp",
IpProto::BbnRccMon => "BbnRccMon",
IpProto::NvpII => "NvpII",
IpProto::Pup => "Pup",
IpProto::Argus => "Argus",
IpProto::Emcon => "Emcon",
IpProto::Xnet => "Xnet",
IpProto::Chaos => "Chaos",
IpProto::Udp => "Udp",
IpProto::Mux => "Mux",
IpProto::DcnMeas => "DcnMeas",
IpProto::Hmp => "Hmp",
IpProto::Prm => "Prm",
IpProto::Idp => "Idp",
IpProto::Trunk1 => "Trunk1",
IpProto::Trunk2 => "Trunk2",
IpProto::Leaf1 => "Leaf1",
IpProto::Leaf2 => "Leaf2",
IpProto::Rdp => "Rdp",
IpProto::Irtp => "Irtp",
IpProto::Tp4 => "Tp4",
IpProto::Netblt => "Netblt",
IpProto::MfeNsp => "MfeNsp",
IpProto::MeritInp => "MeritInp",
IpProto::Dccp => "Dccp",
IpProto::ThirdPartyConnect => "ThirdPartyConnect",
IpProto::Idpr => "Idpr",
IpProto::Xtp => "Xtp",
IpProto::Ddp => "Ddp",
IpProto::IdprCmtp => "IdprCmtp",
IpProto::TpPlusPlus => "TpPlusPlus",
IpProto::Il => "Il",
IpProto::Ipv6 => "Ipv6",
IpProto::Sdrp => "Sdrp",
IpProto::Ipv6Route => "Ipv6Route",
IpProto::Ipv6Frag => "Ipv6Frag",
IpProto::Idrp => "Idrp",
IpProto::Rsvp => "Rsvp",
IpProto::Gre => "Gre",
IpProto::Dsr => "Dsr",
IpProto::Bna => "Bna",
IpProto::Esp => "Esp",
IpProto::Ah => "Ah",
IpProto::Inlsp => "Inlsp",
IpProto::Swipe => "Swipe",
IpProto::Narp => "Narp",
IpProto::Mobile => "Mobile",
IpProto::Tlsp => "Tlsp",
IpProto::Skip => "Skip",
IpProto::Ipv6Icmp => "Ipv6Icmp",
IpProto::Ipv6NoNxt => "Ipv6NoNxt",
IpProto::Ipv6Opts => "Ipv6Opts",
IpProto::AnyHostInternal => "AnyHostInternal",
IpProto::Cftp => "Cftp",
IpProto::AnyLocalNetwork => "AnyLocalNetwork",
IpProto::SatExpak => "SatExpak",
IpProto::Kryptolan => "Kryptolan",
IpProto::Rvd => "Rvd",
IpProto::Ippc => "Ippc",
IpProto::AnyDistributedFileSystem => "AnyDistributedFileSystem",
IpProto::SatMon => "SatMon",
IpProto::Visa => "Visa",
IpProto::Ipcv => "Ipcv",
IpProto::Cpnx => "Cpnx",
IpProto::Cphb => "Cphb",
IpProto::Wsn => "Wsn",
IpProto::Pvp => "Pvp",
IpProto::BrSatMon => "BrSatMon",
IpProto::SunNd => "SunNd",
IpProto::WbMon => "WbMon",
IpProto::WbExpak => "WbExpak",
IpProto::IsoIp => "IsoIp",
IpProto::Vmtp => "Vmtp",
IpProto::SecureVmtp => "SecureVmtp",
IpProto::Vines => "Vines",
IpProto::Ttp => "Ttp",
IpProto::NsfnetIgp => "NsfnetIgp",
IpProto::Dgp => "Dgp",
IpProto::Tcf => "Tcf",
IpProto::Eigrp => "Eigrp",
IpProto::Ospfigp => "Ospfigp",
IpProto::SpriteRpc => "SpriteRpc",
IpProto::Larp => "Larp",
IpProto::Mtp => "Mtp",
IpProto::Ax25 => "Ax25",
IpProto::Ipip => "Ipip",
IpProto::Micp => "Micp",
IpProto::SccSp => "SccSp",
IpProto::Etherip => "Etherip",
IpProto::Encap => "Encap",
IpProto::AnyPrivateEncryptionScheme => "AnyPrivateEncryptionScheme",
IpProto::Gmtp => "Gmtp",
IpProto::Ifmp => "Ifmp",
IpProto::Pnni => "Pnni",
IpProto::Pim => "Pim",
IpProto::Aris => "Aris",
IpProto::Scps => "Scps",
IpProto::Qnx => "Qnx",
IpProto::ActiveNetworks => "ActiveNetworks",
IpProto::IpComp => "IpComp",
IpProto::Snp => "Snp",
IpProto::CompaqPeer => "CompaqPeer",
IpProto::IpxInIp => "IpxInIp",
IpProto::Vrrp => "Vrrp",
IpProto::Pgm => "Pgm",
IpProto::AnyZeroHopProtocol => "AnyZeroHopProtocol",
IpProto::L2tp => "L2tp",
IpProto::Ddx => "Ddx",
IpProto::Iatp => "Iatp",
IpProto::Stp => "Stp",
IpProto::Srp => "Srp",
IpProto::Uti => "Uti",
IpProto::Smp => "Smp",
IpProto::Sm => "Sm",
IpProto::Ptp => "Ptp",
IpProto::IsisOverIpv4 => "IsisOverIpv4",
IpProto::Fire => "Fire",
IpProto::Crtp => "Crtp",
IpProto::Crudp => "Crudp",
IpProto::Sscopmce => "Sscopmce",
IpProto::Iplt => "Iplt",
IpProto::Sps => "Sps",
IpProto::Pipe => "Pipe",
IpProto::Sctp => "Sctp",
IpProto::Fc => "Fc",
IpProto::RsvpE2eIgnore => "RsvpE2eIgnore",
IpProto::MobilityHeader => "MobilityHeader",
IpProto::UdpLite => "UdpLite",
IpProto::Mpls => "Mpls",
IpProto::Manet => "Manet",
IpProto::Hip => "Hip",
IpProto::Shim6 => "Shim6",
IpProto::Wesp => "Wesp",
IpProto::Rohc => "Rohc",
IpProto::EthernetInIpv4 => "EthernetInIpv4",
IpProto::Aggfrag => "Aggfrag",
IpProto::Test1 => "Test1",
IpProto::Test2 => "Test2",
IpProto::Reserved => "Reserved",
})
}

View File

@ -0,0 +1 @@
pub mod ip;

8
xtask/Cargo.toml Normal file
View File

@ -0,0 +1,8 @@
[package]
name = "xtask"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1"
clap = { version = "4.1", features = ["derive"] }

42
xtask/src/build.rs Normal file
View File

@ -0,0 +1,42 @@
use std::process::Command;
use anyhow::Context as _;
use clap::Parser;
use crate::build_ebpf::{build_ebpf, Architecture, Options as BuildOptions};
#[derive(Debug, Parser)]
pub struct Options {
/// Set the endianness of the BPF target
#[clap(default_value = "bpfel-unknown-none", long)]
pub bpf_target: Architecture,
/// Build and run the release target
#[clap(long)]
pub release: bool,
}
/// Build the project
fn build_project(opts: &Options) -> Result<(), anyhow::Error> {
let mut args = vec!["build"];
if opts.release {
args.push("--release")
}
let status = Command::new("cargo")
.args(&args)
.status()
.expect("failed to build userspace");
assert!(status.success());
Ok(())
}
/// Build our ebpf program and the project
pub fn build(opts: Options) -> Result<(), anyhow::Error> {
// build our ebpf program followed by our application
build_ebpf(BuildOptions {
target: opts.bpf_target,
release: opts.release,
})
.context("Error while building eBPF program")?;
build_project(&opts).context("Error while building userspace application")?;
Ok(())
}

67
xtask/src/build_ebpf.rs Normal file
View File

@ -0,0 +1,67 @@
use std::{path::PathBuf, process::Command};
use clap::Parser;
#[derive(Debug, Copy, Clone)]
pub enum Architecture {
BpfEl,
BpfEb,
}
impl std::str::FromStr for Architecture {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"bpfel-unknown-none" => Architecture::BpfEl,
"bpfeb-unknown-none" => Architecture::BpfEb,
_ => return Err("invalid target".to_owned()),
})
}
}
impl std::fmt::Display for Architecture {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Architecture::BpfEl => "bpfel-unknown-none",
Architecture::BpfEb => "bpfeb-unknown-none",
})
}
}
#[derive(Debug, Parser)]
pub struct Options {
/// Set the endianness of the BPF target
#[clap(default_value = "bpfel-unknown-none", long)]
pub target: Architecture,
/// Build the release target
#[clap(long)]
pub release: bool,
}
pub fn build_ebpf(opts: Options) -> Result<(), anyhow::Error> {
let dir = PathBuf::from("net-logger-ebpf");
let target = format!("--target={}", opts.target);
let mut args = vec![
"build",
target.as_str(),
"-Z",
"build-std=core",
];
if opts.release {
args.push("--release")
}
// Command::new creates a child process which inherits all env variables. This means env
// vars set by the cargo xtask command are also inherited. RUSTUP_TOOLCHAIN is removed
// so the rust-toolchain.toml file in the -ebpf folder is honored.
let status = Command::new("cargo")
.current_dir(dir)
.env_remove("RUSTUP_TOOLCHAIN")
.args(&args)
.status()
.expect("failed to build bpf program");
assert!(status.success());
Ok(())
}

36
xtask/src/main.rs Normal file
View File

@ -0,0 +1,36 @@
mod build_ebpf;
mod build;
mod run;
use std::process::exit;
use clap::Parser;
#[derive(Debug, Parser)]
pub struct Options {
#[clap(subcommand)]
command: Command,
}
#[derive(Debug, Parser)]
enum Command {
BuildEbpf(build_ebpf::Options),
Build(build::Options),
Run(run::Options),
}
fn main() {
let opts = Options::parse();
use Command::*;
let ret = match opts.command {
BuildEbpf(opts) => build_ebpf::build_ebpf(opts),
Run(opts) => run::run(opts),
Build(opts) => build::build(opts),
};
if let Err(e) = ret {
eprintln!("{e:#}");
exit(1);
}
}

55
xtask/src/run.rs Normal file
View File

@ -0,0 +1,55 @@
use std::process::Command;
use anyhow::Context as _;
use clap::Parser;
use crate::{build::{build, Options as BuildOptions}, build_ebpf::Architecture};
#[derive(Debug, Parser)]
pub struct Options {
/// Set the endianness of the BPF target
#[clap(default_value = "bpfel-unknown-none", long)]
pub bpf_target: Architecture,
/// Build and run the release target
#[clap(long)]
pub release: bool,
/// The command used to wrap your application
#[clap(short, long, default_value = "sudo -E")]
pub runner: String,
/// Arguments to pass to your application
#[clap(name = "args", last = true)]
pub run_args: Vec<String>,
}
/// Build and run the project
pub fn run(opts: Options) -> Result<(), anyhow::Error> {
// Build our ebpf program and the project
build(BuildOptions{
bpf_target: opts.bpf_target,
release: opts.release,
}).context("Error while building project")?;
// profile we are building (release or debug)
let profile = if opts.release { "release" } else { "debug" };
let bin_path = format!("target/{profile}/net-logger");
// arguments to pass to the application
let mut run_args: Vec<_> = opts.run_args.iter().map(String::as_str).collect();
// configure args
let mut args: Vec<_> = opts.runner.trim().split_terminator(' ').collect();
args.push(bin_path.as_str());
args.append(&mut run_args);
// run the command
let status = Command::new(args.first().expect("No first argument"))
.args(args.iter().skip(1))
.status()
.expect("failed to run the command");
if !status.success() {
anyhow::bail!("Failed to run `{}`", args.join(" "));
}
Ok(())
}