Skip to content

Commit

Permalink
Merge pull request #213 from network-intelligence/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
davidmcgrew authored and GitHub Enterprise committed Oct 22, 2023
2 parents 6404a19 + b84dd5c commit 6b181c7
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 81 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.5.21
2.5.22
6 changes: 6 additions & 0 deletions doc/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# CHANGELOG for Mercury

## Version 2.5.22

* JSON records created from incomplete TCP segments are now highlighted with `"reassembly_properties": { "truncated": true }`.
* Improved TCP segment handling.
* Removed inappropriate output regarding truncation in X509 certificates.

## Version 2.5.21

* Fixed a slow memory leak in TCP reassembly
Expand Down
63 changes: 53 additions & 10 deletions src/libmerc/pkt_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ void stateful_pkt_proc::set_tcp_protocol(protocol &x,
{
struct tls_record rec{pkt};
struct tls_handshake handshake{rec.fragment};
if (reassembler_ptr && tcp_pkt && handshake.additional_bytes_needed) {
if (tcp_pkt && handshake.additional_bytes_needed) {
tcp_pkt->reassembly_needed(handshake.additional_bytes_needed);
// set pkt type as tls CH, so that initial segments can be fingerprinted as best effort for reassembly failed cases
}
Expand Down Expand Up @@ -425,13 +425,18 @@ bool stateful_pkt_proc::process_tcp_data (protocol &x,
return true;
}
else {
// non initial seg, directly put in reassembler
// non initial seg
// call set_tcp_protocol in case there is something worth fingerpriting
set_tcp_protocol(x, pkt, false, &tcp_pkt);
if (!tcp_pkt.additional_bytes_needed && !(std::holds_alternative<unknown_initial_packet>(x) || std::holds_alternative<std::monostate>(x)) ) {
reassembler->curr_reassembly_state = reassembly_none;
reassembler->dump_pkt = false;
return true;
}
reassembler->init_segment(k, ts->tv_sec, seg_context, syn_seq, pkt);
// write_pkt = false; for out of order pkts, write to pcap file only after initial seg is known
reassembler->dump_pkt = false;
reassembler->curr_reassembly_state = reassembly_in_progress;
// call set_tcp_protocol in case there is something worth fingerpriting
set_tcp_protocol(x, pkt, false, &tcp_pkt);
return true;
}
}
Expand All @@ -444,17 +449,30 @@ bool stateful_pkt_proc::process_tcp_data (protocol &x,
is_init_seg = reassembler->is_init_seg(k, seg_context.seq);
if (is_init_seg) {
set_tcp_protocol(x, pkt, true, &tcp_pkt);
seg_context.additional_bytes_needed = tcp_pkt.additional_bytes_needed;
if (!tcp_pkt.additional_bytes_needed && !(std::holds_alternative<unknown_initial_packet>(x) || std::holds_alternative<std::monostate>(x)) ) {
reassembler->curr_reassembly_state = reassembly_none;
reassembler->dump_pkt = false;
reassembler->remove_segment(k);
return true;
}
else {
seg_context.additional_bytes_needed = tcp_pkt.additional_bytes_needed;
}
}
else {
set_tcp_protocol(x, pkt, false, &tcp_pkt);
if (!tcp_pkt.additional_bytes_needed && !(std::holds_alternative<unknown_initial_packet>(x) || std::holds_alternative<std::monostate>(x)) ) {
reassembler->curr_reassembly_state = reassembly_none;
reassembler->dump_pkt = false;
reassembler->remove_segment(k);
return true;
}
}

bool reassembly_consumed = false;
struct tcp_segment *seg = reassembler->check_packet(k, ts->tv_sec, seg_context, pkt_copy, reassembly_consumed);
if (reassembly_consumed) {
// reassmebled data already consumed for this flow
reassembler->pruner.nodes[seg->prune_index].is_in_map = false;
reassembler->remove_segment(k);
reassembler->dump_pkt = false;
reassembler->curr_reassembly_state = reassembly_done;
Expand All @@ -468,7 +486,6 @@ bool stateful_pkt_proc::process_tcp_data (protocol &x,
}

if(seg->done) {
reassembler->pruner.nodes[seg->prune_index].is_in_map = false;
struct datum reassembled_data = seg->get_reassembled_segment();
set_tcp_protocol(x, reassembled_data, true, &tcp_pkt);
reassembler->dump_pkt = false;
Expand All @@ -479,6 +496,10 @@ bool stateful_pkt_proc::process_tcp_data (protocol &x,

reassembler->curr_reassembly_state = reassembly_in_progress;
}

if (!in_reassembly && tcp_pkt.additional_bytes_needed) {
reassembler->curr_reassembly_state = truncated;
}
return true;
}

Expand Down Expand Up @@ -507,6 +528,11 @@ size_t stateful_pkt_proc::ip_write_json(void *buffer,
struct datum pkt{ip_packet, ip_packet+length};
ip ip_pkt{pkt, k};
uint8_t transport_proto = ip_pkt.transport_protocol();
bool truncated_tcp = false;

if (reassembler) {
reassembler->curr_reassembly_state = reassembly_none;
}

// process encapsulations
//
Expand Down Expand Up @@ -560,11 +586,17 @@ size_t stateful_pkt_proc::ip_write_json(void *buffer,
}
// note: we could check for non-empty data field

} else {
} else if (tcp_pkt.is_FIN() || tcp_pkt.is_RST()) {
tcp_flow_table.find_and_erase(k);
}
else {
//bool write_pkt = false;
if (!process_tcp_data(x, pkt, tcp_pkt, k, ts, reassembler)) {
return 0;
}
else if (tcp_pkt.additional_bytes_needed) {
truncated_tcp = true;
}
}

} else if (transport_proto == ip::protocol::udp) {
Expand Down Expand Up @@ -625,7 +657,14 @@ size_t stateful_pkt_proc::ip_write_json(void *buffer,
}
if (crypto_policy) { std::visit(do_crypto_assessment{crypto_policy, record}, x); }

if (reassembler) {
// write indication of truncation or reassembly
//
if (!reassembler && truncated_tcp) {
struct json_object flags{record, "reassembly_properties"};
flags.print_key_bool("truncated", true);
flags.close();
}
else if (reassembler && reassembler->curr_reassembly_state != reassembly_status::reassembly_none) {
reassembler->write_flags(record, "reassembly_properties");
if (reassembler->curr_reassembly_consumed == true) {
reassembler->remove_segment(reassembler->reap_it);
Expand Down Expand Up @@ -773,7 +812,11 @@ bool stateful_pkt_proc::analyze_ip_packet(const uint8_t *packet,
tcp_flow_table.syn_packet(k, ts->tv_sec, ntoh(tcp_pkt.header->seq));
} else if (tcp_pkt.is_SYN_ACK()) {
tcp_flow_table.syn_packet(k, ts->tv_sec, ntoh(tcp_pkt.header->seq));
} else {
}
else if (tcp_pkt.is_FIN() || tcp_pkt.is_RST()) {
tcp_flow_table.find_and_erase(k);
}
else {
bool ret = process_tcp_data(x, pkt, tcp_pkt, k, ts, reassembler);
if (reassembler->curr_reassembly_state == reassembly_in_progress) {
analysis.flow_state_pkts_needed = true;
Expand Down
106 changes: 80 additions & 26 deletions src/libmerc/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ struct tcp_segment {
uint32_t total_bytes_needed;
uint32_t current_bytes;
uint32_t seg_count;
uint16_t prune_index; // index of prune node in pruning table
unsigned int init_time;
bool done;
bool seg_overlap; // current pkt overlaps with a previous segment
Expand All @@ -310,7 +309,7 @@ struct tcp_segment {
//std::vector< std::pair<uint32_t,uint32_t> > seg;

tcp_segment() : seq_init{0}, curr_seq{0}, index{0}, end_index{0}, seg_len{0}, max_index{8192}, total_bytes_needed{8192},
current_bytes{0}, seg_count{0}, prune_index{0}, init_time{0}, done{false}, seg_overlap{false}, max_seg_exceed{false} {}
current_bytes{0}, seg_count{0}, init_time{0}, done{false}, seg_overlap{false}, max_seg_exceed{false} {}

bool init_from_pkt (unsigned int sec, struct tcp_seg_context &tcp_pkt, uint32_t syn_seq, datum &p) {
seq_init = syn_seq;
Expand Down Expand Up @@ -425,6 +424,8 @@ struct tcp_segment {

};

/* Comment reassembly pruning logic
struct prune_node {
unsigned int init_timestamp;
struct key seg_key;
Expand Down Expand Up @@ -549,43 +550,51 @@ struct prune_table {
return force_pruned;
}
};
End comment reassembly pruning logic */

void fprintf_json_string_escaped(FILE *f, const char *key, const uint8_t *data, unsigned int len);

enum reassembly_status {
reassembly_none = 0,
reassembly_in_progress = 1,
reassembly_done = 2
reassembly_done = 2,
truncated = 3 // truncated but cant reassemble as sync seq not known TODO: Try reassmbling wihtout syn seq
};

struct tcp_reassembler {
bool dump_pkt; // current pkt involved in reassembly, dump pkt regardless of json
struct prune_table pruner;
uint64_t force_prunes;
bool curr_reassembly_consumed;
enum reassembly_status curr_reassembly_state;

static const uint32_t max_map_entries = 5000;
static const uint32_t force_prune_count = 4000;
static const uint32_t max_map_entries = 10000; // Hard limit to map entries

std::unordered_map<struct key, struct tcp_segment> segment_table;
std::unordered_map<struct key, struct tcp_segment>::iterator reap_it;

tcp_reassembler(unsigned int size) : dump_pkt{false}, pruner{}, force_prunes{0}, curr_reassembly_consumed{false}, curr_reassembly_state{reassembly_none}, segment_table{}, reap_it{segment_table.end()} {
tcp_reassembler(unsigned int size) : dump_pkt{false}, curr_reassembly_consumed{false}, curr_reassembly_state{reassembly_none}, segment_table{}, reap_it{segment_table.end()} {
segment_table.reserve(size);
reap_it = segment_table.end();
}

bool init_segment(const struct key &k, unsigned int sec, struct tcp_seg_context &tcp_pkt, uint32_t syn_seq, datum &p) {
active_prune(sec); // try pruning before inserting
if (segment_table.size() >= max_map_entries) {
// aggressive : remove two entries
increment_reap_iterator();
if (reap_it != segment_table.end()) {
reap_it = segment_table.erase(reap_it);
}
increment_reap_iterator();
if (reap_it != segment_table.end()) {
reap_it = segment_table.erase(reap_it);
}
}
else {
reap(sec); // passive: try cleaning expired entries
}

tcp_segment segment;
if (segment.init_from_pkt(sec, tcp_pkt, syn_seq, p)) {
reap_it = segment_table.emplace(k, segment).first;
uint16_t index;
if (pruner.add_node(sec,k,segment_table,index)) {
force_prunes++;
}
reap_it->second.prune_index = index;
reap_it = segment_table.emplace(k, segment).first;
//++reap_it;
return true;
}
Expand All @@ -602,10 +611,10 @@ struct tcp_reassembler {

struct tcp_segment *check_packet(const struct key &k, unsigned int sec, struct tcp_seg_context &tcp_pkt, datum &p, bool &reassembly_consumed) {

reap(sec); // passive cleaning
auto it = segment_table.find(k);
if (it != segment_table.end()) {
if (it->second.expired(sec)) {
pruner.nodes[it->second.prune_index].is_in_map = false;
remove_segment(it);
return nullptr;
}
Expand Down Expand Up @@ -633,16 +642,6 @@ struct tcp_reassembler {
}
}

void active_prune(unsigned int ts) {
if (segment_table.size() >= force_prune_count) {
pruner.do_pruning(ts, segment_table);
}
if (segment_table.size() == max_map_entries) {
pruner.do_force_pruning(segment_table);
}
pruner.check_time_pruning(ts, segment_table);
}

void count_all() {
auto it = segment_table.begin();
while (it != segment_table.end()) {
Expand All @@ -651,6 +650,14 @@ struct tcp_reassembler {
}

void write_flags(struct json_object &record, const char *key) {
if (curr_reassembly_state == truncated) {
// truncated but not in reassembly
struct json_object flags{record, key};
flags.print_key_bool("truncated", true);
flags.close();
return;
}

if (reap_it == segment_table.end()) {
return;
}
Expand All @@ -665,10 +672,35 @@ struct tcp_reassembler {
}
flags.close();
}
else {
struct json_object flags{record, key};
flags.print_key_bool("truncated", true);
flags.close();
}
reap_it = segment_table.end();
return;
}

void reap(unsigned int sec) {

// check for expired flows
increment_reap_iterator();
if (reap_it != segment_table.end() && reap_it->second.expired(sec)) {
reap_it = segment_table.erase(reap_it);
}
increment_reap_iterator();
if (reap_it != segment_table.end() && reap_it->second.expired(sec)) {
reap_it = segment_table.erase(reap_it);
}
}

void increment_reap_iterator() {
if (reap_it != segment_table.end()) {
++reap_it;
} else {
reap_it = segment_table.begin();
}
}
};

struct flow_table {
Expand Down Expand Up @@ -754,20 +786,42 @@ struct tcp_context {
struct flow_table_tcp {
std::unordered_map<struct key, struct tcp_context> table;
std::unordered_map<struct key, struct tcp_context>::iterator reap_it;
static constexpr uint32_t max_entries = 20000;

flow_table_tcp(unsigned int size) : table{}, reap_it{table.end()} {
table.reserve(size);
reap_it = table.end();
}

void syn_packet(const struct key &k, unsigned int sec, uint32_t seq) {
if (table.size() >= max_entries) {
// aggressive : try to remove two entries
increment_reap_iterator();
if (reap_it != table.end()) {
reap_it = table.erase(reap_it);
}
increment_reap_iterator();
if (reap_it != table.end()) {
reap_it = table.erase(reap_it);
}
}
else {
reap(sec); // passive: try clean expired entries
}
auto it = table.find(k);
if (it == table.end()) {
table.insert({k, {sec, seq}});
// printf_err(log_debug, "tcp_flow_table size: %zu\n", table.size());
}
}

void find_and_erase(const struct key &k) {
auto it = table.find(k);
if (it != table.end()) {
reap_it = table.erase(it);
}
}

bool is_first_data_packet(const struct key &k, unsigned int sec, uint32_t seq) {
auto it = table.find(k);
if (it != table.end()) {
Expand Down
4 changes: 4 additions & 0 deletions src/libmerc/tcpip.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ struct tcp_packet : public base_protocol {
return header && TCP_IS_FIN(header->flags);
}

bool is_RST() {
return header && TCP_IS_RST(header->flags);
}

uint32_t seq() const { return hton(header->seq); }

void set_key(struct key &k) {
Expand Down
1 change: 0 additions & 1 deletion src/libmerc/x509.h
Original file line number Diff line number Diff line change
Expand Up @@ -1722,7 +1722,6 @@ struct x509_cert {
bool signature_is_weak(bool unsigned_is_weak=false) const {

if (signature_algorithm.parameters.is_truncated()) {
fprintf(stdout, "truncated signature_algorithm\n");
return false; // missing data
}

Expand Down
Loading

0 comments on commit 6b181c7

Please sign in to comment.