Index: Makefile.in =================================================================== RCS file: /var/cvs/flowd/Makefile.in,v retrieving revision 1.25 diff -u -p -r1.25 Makefile.in --- Makefile.in 3 Mar 2005 22:43:09 -0000 1.25 +++ Makefile.in 21 Aug 2005 09:49:17 -0000 @@ -40,7 +40,8 @@ TARGETS=flowd flowd-reader all: $(TARGETS) -LIBFLOWD_OBJS= atomicio.o addr.o store.o crc32.o strlcpy.o strlcat.o +LIBFLOWD_OBJS= atomicio.o addr.o store.o store-v2.o crc32.o \ + strlcpy.o strlcat.o LIBFLOWD_HEADERS= addr.h crc32.h store.h FLOWD_OBJS= flowd.o privsep_fdpass.o privsep.o filter.o \ parse.o log.o daemon.o peer.o \ Index: TODO =================================================================== RCS file: /var/cvs/flowd/TODO,v retrieving revision 1.51 diff -u -p -r1.51 TODO --- TODO 14 May 2005 06:08:30 -0000 1.51 +++ TODO 21 Aug 2005 09:49:17 -0000 @@ -53,9 +53,6 @@ - Define net store.h types for: - min/max_pkt_lngth - - source_id (important!) - -- src/dst AS should be increased in size to 4 bytes - Discard protocol-specific state when we receive a packet in a different protocol @@ -64,6 +61,6 @@ we only validate to the flowset level) - Could at least iterate over packet and check for truncation? -- Renovate Perl/Python API. - - This was my first attempt at writing C/Perl/Python glue, so the interface is +- Renovate Perl API. + - This was my first attempt at writing C/Perl glue, so the interface is pretty clumsy :( Index: configure.ac =================================================================== RCS file: /var/cvs/flowd/configure.ac,v retrieving revision 1.21 diff -u -p -r1.21 configure.ac --- configure.ac 14 May 2005 07:22:08 -0000 1.21 +++ configure.ac 21 Aug 2005 09:49:17 -0000 @@ -17,7 +17,7 @@ AC_INIT AC_CONFIG_SRCDIR([flowd.c]) -PROGVER=0.8.5 +PROGVER=0.9 AC_SUBST(PROGVER) AC_CONFIG_HEADER(config.h) Index: filter.c =================================================================== RCS file: /var/cvs/flowd/filter.c,v retrieving revision 1.18 diff -u -p -r1.18 filter.c --- filter.c 14 May 2005 06:04:48 -0000 1.18 +++ filter.c 21 Aug 2005 09:49:17 -0000 @@ -175,21 +175,21 @@ format_rule(const struct filter_rule *ru } static int -flow_time_match(time_t recv_secs, int day_mask, int after, int before) +flow_time_match(time_t recv_sec, int day_mask, int after, int before) { struct tm *tm; - int secs; + int sec; - tm = localtime(&recv_secs); + tm = localtime(&recv_sec); if (day_mask != 0 && (day_mask & (1 << tm->tm_wday)) == 0) return (0); - secs = tm->tm_sec + (tm->tm_min * 60) + (tm->tm_hour * 3600); + sec = tm->tm_sec + (tm->tm_min * 60) + (tm->tm_hour * 3600); - if ((before != -1) && secs > before) + if ((before != -1) && sec > before) return (0); - if ((after != -1) && secs < after) + if ((after != -1) && sec < after) return (0); return (1); @@ -259,7 +259,7 @@ flow_match(const struct filter_rule *rul FRRET(TCP_FLAGS); } if (FRMATCH(DAYTIME)) { - m = flow_time_match(ntohl(flow->recv_time.recv_secs), + m = flow_time_match(ntohl(flow->recv_time.recv_sec), rule->match.day_mask, rule->match.after, rule->match.before); FRRET(DAYTIME); Index: flowd-reader.8.in =================================================================== RCS file: /var/cvs/flowd/flowd-reader.8.in,v retrieving revision 1.4 diff -u -p -r1.4 flowd-reader.8.in --- flowd-reader.8.in 19 Apr 2005 11:57:41 -0000 1.4 +++ flowd-reader.8.in 21 Aug 2005 09:49:17 -0000 @@ -22,7 +22,7 @@ .Nd Read, filter and concatenate binary flowd logfiles .Sh SYNOPSIS .Nm flowd-reader -.Op Fl Uvqd +.Op Fl LUvqd .Op Fl f Ar filter_file .Op Fl o Ar output_file .Ar flow_log @@ -59,6 +59,13 @@ to which all the flows that have been re .Pp The command-line options are as follows: .Bl -tag -width Ds +.It Fl L +Allows +.Nm +to read legacy version 2 flow logs (generated by +.Xr flowd 8 +versions prior to v9.0). +This may be used to convert old flow logs to the newer form. .It Fl U Causes .Nm Index: flowd-reader.c =================================================================== RCS file: /var/cvs/flowd/flowd-reader.c,v retrieving revision 1.15 diff -u -p -r1.15 flowd-reader.c --- flowd-reader.c 4 Feb 2005 06:26:10 -0000 1.15 +++ flowd-reader.c 21 Aug 2005 09:49:17 -0000 @@ -29,6 +29,7 @@ #include "common.h" #include "flowd.h" #include "store.h" +#include "store-v2.h" #include "atomicio.h" RCSID("$Id: flowd-reader.c,v 1.15 2005/02/04 06:26:10 djm Exp $"); @@ -40,6 +41,7 @@ usage(void) PROGNAME); fprintf(stderr, "This is %s version %s. Valid commandline options:\n", PROGNAME, PROGVER); + fprintf(stderr, " -L Read/convert legacy flow logs\n"); fprintf(stderr, " -q Don't print flows to stdout (use with -o)\n"); fprintf(stderr, " -d Print debugging information\n"); fprintf(stderr, " -f path Filter flows using rule file\n"); @@ -53,48 +55,16 @@ static int open_start_log(const char *path, int debug) { int fd; - off_t pos; - char ebuf[512]; if (path == NULL) { /* Logfile on stdout */ fd = STDOUT_FILENO; - } else { - if ((fd = open(path, O_RDWR|O_APPEND|O_CREAT, 0600)) == -1) - logerr("open(%s)", path); - - /* Only write out the header if we are at the start of the file */ - switch ((pos = lseek(fd, 0, SEEK_END))) { - case 0: - /* New file, continue below */ - break; - case -1: - logerr("lseek"); - default: - /* Logfile exists, don't write new header */ - if (lseek(fd, 0, SEEK_SET) != 0) - logerr("lseek"); - if (store_check_header(fd, ebuf, sizeof(ebuf)) != - STORE_ERR_OK) - logerrx("Store error: %s", ebuf); - if (lseek(fd, 0, SEEK_END) <= 0) { - fprintf(stderr, "lseek: %s\n", strerror(errno)); - exit(1); - } - if (debug) { - fprintf(stderr, "Continuing with existing " - "logfile len %lld\n", (long long)pos); - } - return (fd); - } - } + } else if ((fd = open(path, O_RDWR|O_APPEND|O_CREAT, 0600)) == -1) + logerr("open(%s)", path); if (debug) fprintf(stderr, "Writing new logfile header\n"); - if (store_put_header(fd, ebuf, sizeof(ebuf)) != STORE_ERR_OK) - logerrx("Store error: %s", ebuf); - return (fd); } @@ -106,26 +76,30 @@ main(int argc, char **argv) extern char *optarg; extern int optind; struct store_flow_complete flow; - struct store_header hdr; + struct store_v2_flow_complete flow_v2; char buf[2048], ebuf[512]; const char *ffile, *ofile; FILE *ffilef; - int ofd; + int ofd, read_legacy; u_int32_t disp_mask; struct flowd_config filter_config; + struct store_v2_header hdr_v2; - utc = verbose = debug = 0; + utc = verbose = debug = read_legacy = 0; ofile = ffile = NULL; ofd = -1; ffilef = NULL; bzero(&filter_config, sizeof(filter_config)); - while ((ch = getopt(argc, argv, "Udf:ho:qv")) != -1) { + while ((ch = getopt(argc, argv, "LUdf:ho:qv")) != -1) { switch (ch) { case 'h': usage(); return (0); + case 'L': + read_legacy = 1; + break; case 'U': utc = 1; break; @@ -190,31 +164,44 @@ main(int argc, char **argv) else if ((fd = open(argv[i], O_RDONLY)) == -1) logerr("open(%s)", argv[i]); - if (store_get_header(fd, &hdr, ebuf, + if (read_legacy && store_v2_get_header(fd, &hdr_v2, ebuf, sizeof(ebuf)) != STORE_ERR_OK) - logerrx("%s", ebuf); + logerrx("%s", ebuf); if (verbose >= 0) { - printf("LOGFILE %s started at %s\n", argv[i], - iso_time(ntohl(hdr.start_time), utc)); + printf("LOGFILE %s", argv[i]); + if (read_legacy) + printf(" started at %s", + iso_time(ntohl(hdr_v2.start_time), utc)); + printf("\n"); fflush(stdout); } for (;;) { bzero(&flow, sizeof(flow)); - if ((r = store_get_flow(fd, &flow, ebuf, - sizeof(ebuf))) == STORE_ERR_EOF) + if (read_legacy) + r = store_v2_get_flow(fd, &flow_v2, ebuf, + sizeof(ebuf)); + else + r = store_get_flow(fd, &flow, ebuf, + sizeof(ebuf)); + + if (r == STORE_ERR_EOF) break; else if (r != STORE_ERR_OK) logerrx("%s", ebuf); + if (read_legacy && + store_v2_flow_convert(&flow_v2, &flow) == -1) + logerrx("legacy flow conversion failed"); + if (ffile != NULL && filter_flow(&flow, &filter_config.filter_list) == FF_ACTION_DISCARD) continue; if (verbose >= 0) { store_format_flow(&flow, buf, sizeof(buf), - utc, disp_mask); + utc, disp_mask, 0); printf("%s\n", buf); fflush(stdout); } Index: flowd.c =================================================================== RCS file: /var/cvs/flowd/flowd.c,v retrieving revision 1.56 diff -u -p -r1.56 flowd.c --- flowd.c 28 Apr 2005 09:02:58 -0000 1.56 +++ flowd.c 21 Aug 2005 09:49:17 -0000 @@ -119,37 +119,10 @@ static int start_log(int monitor_fd) { int fd; - off_t pos; - char ebuf[512]; if ((fd = client_open_log(monitor_fd)) == -1) logerrx("Logfile open failed, exiting"); - /* Only write out the header if we are at the start of the file */ - switch ((pos = lseek(fd, 0, SEEK_END))) { - case 0: - /* New file, continue below */ - break; - case -1: - logerr("%s: lseek error, exiting", __func__); - default: - /* Logfile exists, don't write new header */ - if (lseek(fd, 0, SEEK_SET) != 0) - logerr("%s: lseek error, exiting", __func__); - if (store_check_header(fd, ebuf, sizeof(ebuf)) != STORE_ERR_OK) - logerrx("%s: Exiting on %s", __func__, ebuf); - if (lseek(fd, 0, SEEK_END) <= 0) - logerr("%s: lseek error, exiting", __func__); - logit(LOG_DEBUG, "Continuing with existing logfile len %lld", - (long long)pos); - return (fd); - } - - logit(LOG_DEBUG, "Writing new logfile header"); - - if (store_put_header(fd, ebuf, sizeof(ebuf)) != STORE_ERR_OK) - logerrx("%s: Exiting on %s", __func__, ebuf); - return (fd); } @@ -168,14 +141,14 @@ process_flow(struct store_flow_complete /* Prepare for writing */ flow->hdr.fields = htonl(flow->hdr.fields); - - flow->recv_time.recv_secs = htonl(flow->recv_time.recv_secs); + flow->recv_time.recv_sec = htonl(flow->recv_time.recv_sec); + flow->recv_time.recv_usec = htonl(flow->recv_time.recv_usec); if (conf->opts & FLOWD_OPT_VERBOSE) { char fbuf[1024]; store_format_flow(flow, fbuf, sizeof(fbuf), 0, - STORE_DISPLAY_ALL); + STORE_DISPLAY_ALL, 0); logit(LOG_DEBUG, "%s: flow %s", __func__, fbuf); } @@ -199,6 +172,7 @@ process_netflow_v1(u_int8_t *pkt, size_t struct store_flow_complete flow; size_t offset; u_int i, nflows; + struct timeval tv; if (len < sizeof(*nf1_hdr)) { peer->ninvalid++; @@ -240,7 +214,9 @@ process_netflow_v1(u_int8_t *pkt, size_t flow.hdr.fields &= ~STORE_FIELD_AS_INFO; flow.hdr.fields &= ~STORE_FIELD_FLOW_ENGINE_INFO; - flow.recv_time.recv_secs = time(NULL); + gettimeofday(&tv, NULL); + flow.recv_time.recv_sec = tv.tv_sec; + flow.recv_time.recv_usec = tv.tv_usec; flow.pft.tcp_flags = nf1_flow->tcp_flags; flow.pft.protocol = nf1_flow->protocol; @@ -288,6 +264,7 @@ process_netflow_v5(u_int8_t *pkt, size_t struct store_flow_complete flow; size_t offset; u_int i, nflows; + struct timeval tv; if (len < sizeof(*nf5_hdr)) { peer->ninvalid++; @@ -327,7 +304,9 @@ process_netflow_v5(u_int8_t *pkt, size_t flow.hdr.fields &= ~STORE_FIELD_DST_ADDR6; flow.hdr.fields &= ~STORE_FIELD_GATEWAY_ADDR6; - flow.recv_time.recv_secs = time(NULL); + gettimeofday(&tv, NULL); + flow.recv_time.recv_sec = tv.tv_sec; + flow.recv_time.recv_usec = tv.tv_usec; flow.pft.tcp_flags = nf5_flow->tcp_flags; flow.pft.protocol = nf5_flow->protocol; @@ -384,6 +363,7 @@ process_netflow_v7(u_int8_t *pkt, size_t struct store_flow_complete flow; size_t offset; u_int i, nflows; + struct timeval tv; if (len < sizeof(*nf7_hdr)) { peer->ninvalid++; @@ -429,7 +409,9 @@ process_netflow_v7(u_int8_t *pkt, size_t * the Cat5k (e.g. destination-only mls nde mode) */ - flow.recv_time.recv_secs = time(NULL); + gettimeofday(&tv, NULL); + flow.recv_time.recv_sec = tv.tv_sec; + flow.recv_time.recv_usec = tv.tv_usec; flow.pft.tcp_flags = nf7_flow->tcp_flags; flow.pft.protocol = nf7_flow->protocol; @@ -660,9 +642,10 @@ nf9_check_rec_len(u_int type, u_int len) static int nf9_flowset_to_store(u_int8_t *pkt, size_t len, struct xaddr *flow_source, struct NF9_HEADER *nf9_hdr, struct peer_nf9_template *template, - struct store_flow_complete *flow) + u_int32_t source_id, struct store_flow_complete *flow) { u_int offset, i; + struct timeval tv; if (template->total_len > len) return (-1); @@ -675,7 +658,10 @@ nf9_flowset_to_store(u_int8_t *pkt, size flow->ainfo.time_sec = nf9_hdr->time_sec; flow->ainfo.netflow_version = nf9_hdr->c.version; flow->finf.flow_sequence = nf9_hdr->package_sequence; - flow->recv_time.recv_secs = time(NULL); + flow->finf.source_id = htonl(source_id); + gettimeofday(&tv, NULL); + flow->recv_time.recv_sec = tv.tv_sec; + flow->recv_time.recv_usec = tv.tv_usec; memcpy(&flow->agent_addr, flow_source, sizeof(flow->agent_addr)); offset = 0; @@ -693,7 +679,7 @@ nf9_flowset_to_store(u_int8_t *pkt, size static int process_netflow_v9_template(u_int8_t *pkt, size_t len, struct peer_state *peer, - struct peers *peers, u_int source_id) + struct peers *peers, u_int32_t source_id) { struct NF9_TEMPLATE_FLOWSET_HEADER *tmplh; struct NF9_TEMPLATE_FLOWSET_RECORD *tmplr; @@ -785,7 +771,7 @@ process_netflow_v9_template(u_int8_t *pk static int process_netflow_v9_data(u_int8_t *pkt, size_t len, struct peer_state *peer, - u_int source_id, struct NF9_HEADER *nf9_hdr, struct flowd_config *conf, + u_int32_t source_id, struct NF9_HEADER *nf9_hdr, struct flowd_config *conf, int log_fd, u_int *num_flows) { struct store_flow_complete *flows; @@ -835,7 +821,8 @@ process_netflow_v9_data(u_int8_t *pkt, s for (i = 0; i < num_flowsets; i++) { if (nf9_flowset_to_store(pkt + offset, template->total_len, - &peer->from, nf9_hdr, template, &flows[i]) == -1) { + &peer->from, nf9_hdr, template, source_id, + &flows[i]) == -1) { peer->ninvalid++; free(flows); logit(LOG_WARNING, "invalid netflow v.9 data flowset " @@ -863,8 +850,8 @@ process_netflow_v9(u_int8_t *pkt, size_t { struct NF9_HEADER *nf9_hdr = (struct NF9_HEADER *)pkt; struct NF9_FLOWSET_HEADER_COMMON *flowset; - u_int i, count, flowset_id, flowset_len, flowset_flows, total_flows; - u_int offset, source_id; + u_int32_t i, count, flowset_id, flowset_len, flowset_flows; + u_int32_t offset, source_id, total_flows; if (len < sizeof(*nf9_hdr)) { peer->ninvalid++; Index: flowd.py =================================================================== RCS file: flowd.py diff -N flowd.py --- flowd.py 14 May 2005 07:22:08 -0000 1.10 +++ /dev/null 1 Jan 1970 00:00:00 -0000 @@ -1,226 +0,0 @@ -# Copyright (c) 2004 Damien Miller -# -# Permission to use, copy, modify, and distribute this software for any -# purpose with or without fee is hereby granted, provided that the above -# copyright notice and this permission notice appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR -# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN -# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF -# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -# $Id: flowd.py,v 1.10 2005/05/14 07:22:08 djm Exp $ - -VERSION = "0.8.5" - -import struct -import time -import socket -import sys -import flowd_serialiser - -def iso_time(secs, utc = 0): - if utc: - tm = time.gmtime(secs) - else: - tm = time.localtime(secs) - - return "%04u-%02u-%02uT%02u:%02u:%02u" % tm[:6] - -def interval_time(t): - intervals = [ [ "s", 60 ], [ "m", 60 ], [ "h", 24 ], - [ "d", 7 ], [ "w", 52 ]] - ret = "" - for interval in intervals: - r = t % interval[1] - t = int(t / interval[1]) - if r != 0 or interval[0] == "s": - ret = "%u%s%s" % (r, interval[0], ret) - if t > 0: - ret = "%uy%s", (t, ret) - - return ret - - -def interval_time_ms(tms): - return "%s.%03u" % ( interval_time(int(tms / 1000)), tms % 1000 ) - -class log: - def __init__(self, path, mode = "r", start_time = None): - self.path = path - self.mode = mode - if mode == "r": - self.flow_file = open(path, "rb") - # Read header - hdr = self.flow_file.read(16) - if len(hdr) != 16: - raise ValueError, "Short read on flow header" - (self.magic, self.version, self.start_time, \ - self.flags) = struct.unpack(">IIII", hdr) - - if self.magic != 0x012cf047: - raise ValueError, "Bad magic" - if self.version != 0x00000002: - raise ValueError, "Unsupported version" - elif mode == "w" or mode == "a": - self.flow_file = open(path, mode + "b") - self.flow_file.seek(0, 2) - if self.flow_file.tell() > 0: - return - - # Write header - self.magic = 0x012cf047 - self.version = 0x02 - self.flags = 0x00 - self.start_time = start_time - if start_time is None: - self.start_time = int(time.time()) - hdr = struct.pack(">IIII", self.magic, self.version, \ - self.start_time, self.flags) - if len(hdr) != 16: - raise ValueError, "Internal error: bad header" - self.flow_file.write(hdr) - self.flow_file.flush() - else: - raise ValueError, "Invalid mode value"; - - def finish(self): - self.flow_file.close() - self.flow_file = None - - def readflow(self): - try: - f = flow() - f.from_file(self.flow_file) - except EOFError: - f = None - return f - - def writeflow(self, flow): - flow.to_file(self.flow_file) - -class flow: - TAG = 0x00000001 - RECV_TIME = 0x00000002 - PROTO_FLAGS_TOS = 0x00000004 - AGENT_ADDR4 = 0x00000008 - AGENT_ADDR6 = 0x00000010 - SRC_ADDR4 = 0x00000020 - SRC_ADDR6 = 0x00000040 - DST_ADDR4 = 0x00000080 - DST_ADDR6 = 0x00000100 - GATEWAY_ADDR4 = 0x00000200 - GATEWAY_ADDR6 = 0x00000400 - SRCDST_PORT = 0x00000800 - PACKETS = 0x00001000 - OCTETS = 0x00002000 - IF_INDICES = 0x00004000 - AGENT_INFO = 0x00008000 - FLOW_TIMES = 0x00010000 - AS_INFO = 0x00020000 - FLOW_ENGINE_INFO = 0x00040000 - CRC32 = 0x40000000 - - # Some useful combinations - AGENT_ADDR = 0x00000018 - SRC_ADDR = 0x00000060 - DST_ADDR = 0x00000180 - SRCDST_ADDR = 0x000001e0 - GATEWAY_ADDR = 0x00000600 - BRIEF = 0x000039ff - ALL = 0x4007ffff - - def __init__(self): - self.fields = { "fields" : 0 } - - def from_file(self, flow_file): - # Read flow header - needlen = flowd_serialiser.header_len() - hdr = flow_file.read(needlen) - if len(hdr) == 0: - raise EOFError - if len(hdr) != needlen: - raise ValueError, "Short read on flow header" - - needlen = flowd_serialiser.flow_len(hdr) - flow = flow_file.read(needlen) - if len(flow) == 0: - raise EOFError - if len(flow) != needlen: - raise ValueError, "Short read on flow data" - - self.fields = flowd_serialiser.deserialise(hdr + flow) - - def to_file(self, flow_file, field_mask = 0xffffffffL): - flow = flowd_serialiser.serialise(self.fields, field_mask) - flow_file.write(flow) - flow_file.flush() - - def format(self, field_mask = BRIEF, utc = 0): - fields = self.fields["fields"] & field_mask - ret = "FLOW " - - if fields & self.__class__.TAG != 0: - ret = ret + "tag %u " % self.fields["tag"] - if fields & self.__class__.RECV_TIME != 0: - ret = ret + "recv_time %s " % \ - iso_time(self.fields["recv_secs"], utc) - if fields & self.__class__.PROTO_FLAGS_TOS != 0: - ret = ret + "proto %u " % self.fields["protocol"] - ret = ret + "tcpflags %02x " % self.fields["tcp_flags"] - ret = ret + "tos %02x " % self.fields["tos"] - if fields & self.__class__.AGENT_ADDR != 0: - ret = ret + "agent [%s] " % self.fields["agent_addr"] - if fields & self.__class__.SRC_ADDR != 0: - ret = ret + "src [%s]" % self.fields["src_addr"]; - if fields & self.__class__.SRCDST_PORT != 0: - ret = ret + ":%u" % self.fields["src_port"]; - ret = ret + " "; - if fields & self.__class__.DST_ADDR != 0: - ret = ret + "dst [%s]" % self.fields["dst_addr"]; - if fields & self.__class__.SRCDST_PORT != 0: - ret = ret + ":%u" % self.fields["dst_port"]; - ret = ret + " "; - if fields & self.__class__.GATEWAY_ADDR != 0: - ret = ret + "gateway [%s] " % \ - self.fields["gateway_addr"]; - if fields & self.__class__.PACKETS != 0: - ret = ret + "packets %s " % self.fields["flow_packets"]; - if fields & self.__class__.OCTETS != 0: - ret = ret + "octets %s " % self.fields["flow_octets"]; - if fields & self.__class__.IF_INDICES != 0: - ret = ret + "in_if %u " % self.fields["if_index_in"]; - ret = ret + "out_if %u " % self.fields["if_index_out"]; - if fields & self.__class__.AGENT_INFO != 0: - ret = ret + "sys_uptime_ms %s " % \ - interval_time_ms(self.fields["sys_uptime_ms"]); - ret = ret + "time_sec %s " % \ - iso_time(self.fields["time_sec"], utc); - ret = ret + "time_nanosec %u " % \ - self.fields["time_nanosec"]; - ret = ret + "netflow ver %u " % \ - self.fields["netflow_version"]; - if fields & self.__class__.FLOW_TIMES != 0: - ret = ret + "flow_start %s " % \ - interval_time_ms(self.fields["flow_start"]); - ret = ret + "flow_finish %s " % \ - interval_time_ms(self.fields["flow_finish"]); - if fields & self.__class__.AS_INFO != 0: - ret = ret + "src_AS %u " % self.fields["src_as"]; - ret = ret + "src_masklen %u " % \ - self.fields["src_masklen"]; - ret = ret + "dst_AS %u " % self.fields["dst_as"]; - ret = ret + "dst_masklen %u " % \ - self.fields["dst_masklen"]; - if fields & self.__class__.FLOW_ENGINE_INFO != 0: - ret = ret + "engine_type %u " % \ - self.fields["engine_type"]; - ret = ret + "engine_id %u " % self.fields["engine_id"]; - ret = ret + "seq %u " % self.fields["flow_sequence"]; - if fields & self.__class__.CRC32 != 0: - ret = ret + "crc32 %08x " % self.fields["crc"]; - - return ret; Index: flowd_python.c =================================================================== RCS file: /var/cvs/flowd/flowd_python.c,v retrieving revision 1.4 diff -u -p -r1.4 flowd_python.c --- flowd_python.c 11 Mar 2005 19:07:11 -0000 1.4 +++ flowd_python.c 21 Aug 2005 09:49:17 -0000 @@ -14,544 +14,696 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -#include -#include -#include -#include "common.h" +#include "Python.h" +#include "structmember.h" #include "store.h" -RCSID("$Id: flowd_python.c,v 1.4 2005/03/11 19:07:11 djm Exp $"); +/* $Id$ */ -static PyObject * -flow_header_length(PyObject *self, PyObject *args) +/* XXX: I wish python includes something like this */ +/* XXX: this is probably broken on some platforms */ +#define FL_T_U32 T_ULONG +#define FL_T_U16 T_USHORT +#define FL_T_U8 T_UBYTE + +/* Prototypes */ +PyMODINIT_FUNC initflowd(void); +struct _FlowLogObject; +struct _FlowLogIterObject; +static struct _FlowLogIterObject *newFlowLogIterObject(struct _FlowLogObject *); + +/* ------------------------------------------------------------------------ */ + +/* Flows*/ + +typedef struct { + PyObject_HEAD + PyObject *user_attr; /* User-specified attributes */ + PyObject *octets; /* bah. python >2.5 lacks T_LONGLONG */ + PyObject *packets; /* ditto */ + char *agent_addr; + char *src_addr; + char *dst_addr; + char *gateway_addr; + struct store_flow_complete flow; +} FlowObject; + +static PyTypeObject Flow_Type; + +static FlowObject * +newFlowObject(void) { - int version; + FlowObject *self; - version = STORE_VERSION; - if (!PyArg_ParseTuple(args, "|k", &version)) - return (NULL); - if (version != STORE_VERSION) { - PyErr_SetString(PyExc_NotImplementedError, - "Unsupported store version"); - return (NULL); + self = PyObject_New(FlowObject, &Flow_Type); + if (self == NULL) + return NULL; + + self->octets = Py_None; + Py_INCREF(Py_None); + self->packets = Py_None; + Py_INCREF(Py_None); + self->user_attr = PyDict_New(); + + if (self->user_attr == NULL) { + /* Flow_dealloc will clean up for us */ + Py_XDECREF(self); + return (NULL); } - return (PyInt_FromLong(sizeof(struct store_flow))); + + return self; } -static PyObject * -flow_length(PyObject *self, PyObject *args) +static FlowObject * +newFlowObject_from_flow(struct store_flow_complete *flow) { - u_char *buf; - int version, len; + FlowObject *self; + char addr_buf[128]; - version = STORE_VERSION; - if (!PyArg_ParseTuple(args, "s#|k", &buf, &len, &version) || - buf == NULL) - return (NULL); - if (version != STORE_VERSION) { - PyErr_SetString(PyExc_NotImplementedError, - "Unsupported store version"); - return (NULL); + /* Sanity check */ + if (flow == NULL) + return NULL; + + self = PyObject_New(FlowObject, &Flow_Type); + if (self == NULL) + return NULL; + + self->user_attr = NULL; + self->src_addr = self->dst_addr = NULL; + self->agent_addr = self->gateway_addr = NULL; + memcpy(&self->flow, flow, sizeof(self->flow)); + + store_swab_flow(&self->flow, 0); + +#define FL_ADDR(addr, which) do { \ + if ((self->flow.hdr.fields & STORE_FIELD_##which) != 0) { \ + if (addr_ntop(&self->flow.addr, addr_buf, \ + sizeof(addr_buf)) != -1) \ + self->addr = strdup(addr_buf); \ + } } while (0) + + FL_ADDR(src_addr, SRC_ADDR); + FL_ADDR(dst_addr, DST_ADDR); + FL_ADDR(agent_addr, AGENT_ADDR); + FL_ADDR(gateway_addr, GATEWAY_ADDR); + +#undef FL_ADDR + self->octets = PyLong_FromUnsignedLongLong( + self->flow.octets.flow_octets); + self->packets = PyLong_FromUnsignedLongLong( + self->flow.packets.flow_packets); + self->user_attr = PyDict_New(); + + if (self->user_attr == NULL || self->octets == NULL || + self->packets == NULL) { + /* Flow_dealloc will clean up for us */ + Py_XDECREF(self); + return (NULL); } - if (len < (ssize_t)sizeof(struct store_flow)) { - PyErr_SetString(PyExc_ValueError, - "Supplied header is too short"); + + return self; +} + +static int +flowobj_normalise(FlowObject *f) +{ + if (f->octets != NULL) + f->flow.octets.flow_octets = PyLong_AsUnsignedLongLong( + f->octets); + if (f->packets != NULL) + f->flow.packets.flow_packets = PyLong_AsUnsignedLongLong( + f->packets); + +#define FL_ADDR(addr, tag) do { \ + if (f->addr != NULL && *f->addr != '\0') { \ + if (addr_pton(f->addr, &f->flow.addr) == -1) { \ + PyErr_SetString(PyExc_ValueError, \ + "Invalid \""#addr"\""); \ + return (-1); \ + } \ + f->flow.hdr.fields |= STORE_FIELD_##tag; \ + } else { \ + f->flow.hdr.fields &= ~STORE_FIELD_##tag; \ + } } while (0) + + FL_ADDR(src_addr, SRC_ADDR); + FL_ADDR(dst_addr, DST_ADDR); + FL_ADDR(agent_addr, AGENT_ADDR); + FL_ADDR(gateway_addr, GATEWAY_ADDR); + +#undef FL_ADDR + + return (0); +} + +static FlowObject * +newFlowObject_from_blob(u_int8_t *buf, u_int len) +{ + struct store_flow_complete flow; + char ebuf[512]; + + /* Sanity check */ + if (buf == NULL || len == 0 || len > 8192) + return NULL; + + if (store_flow_deserialise(buf, len, &flow, ebuf, + sizeof(ebuf)) != STORE_ERR_OK) { + PyErr_SetString(PyExc_ValueError, ebuf); return (NULL); } - return (PyInt_FromLong(store_calc_flow_len((struct store_flow *)buf))); + + return newFlowObject_from_flow(&flow); +} + +/* Flow methods */ + +static void +Flow_dealloc(FlowObject *self) +{ + Py_XDECREF(self->user_attr); + Py_XDECREF(self->octets); + Py_XDECREF(self->packets); + if (self->src_addr != NULL) + free(self->src_addr); + if (self->dst_addr != NULL) + free(self->dst_addr); + if (self->agent_addr != NULL) + free(self->agent_addr); + if (self->gateway_addr != NULL) + free(self->gateway_addr); + PyObject_Del(self); +} + +PyDoc_STRVAR(flow_format_doc, +"Flow.format(utc = 0, mask = flowd.DISPLAY_BRIEF) -> String\n\ +\n\ +Format a flow to a string.\n\ +"); + +static PyObject * +flow_format(FlowObject *self, PyObject *args, PyObject *kw_args) +{ + static char *keywords[] = { "utc", "mask", NULL }; + char buf[1024]; + int utcflag = 0; + unsigned long mask = STORE_DISPLAY_BRIEF; + + if (!PyArg_ParseTupleAndKeywords(args, kw_args, "|ik:format", keywords, + &utcflag, &mask)) + return NULL; + + if (flowobj_normalise(self) == -1) + return (NULL); + + store_format_flow(&self->flow, buf, sizeof(buf), utcflag, mask, 1); + + return PyString_FromString(buf); } +PyDoc_STRVAR(flow_serialise_doc, +"Flow.serialise() -> String\n\ +\n\ +Format convert a flow object to a binary representation.\n\ +"); + static PyObject * -flow_deserialise(PyObject *self, PyObject *args) +flow_serialise(FlowObject *self) { - u_int32_t fields; - int version, len, r; + char buf[1024], ebuf[512]; struct store_flow_complete flow; - u_int8_t *buf; - char ebuf[512], addr_buf[128]; - PyObject *ret, *field; - - version = STORE_VERSION; - if (!PyArg_ParseTuple(args, "s#|k", &buf, &len, &version) || - buf == NULL) + int len; + + if (flowobj_normalise(self) == -1) return (NULL); - if (version != STORE_VERSION) { - PyErr_SetString(PyExc_NotImplementedError, - "Unsupported store version"); + + memcpy(&self->flow, &flow, sizeof(flow)); + store_swab_flow(&flow, 1); + + if (store_flow_serialise(&flow, buf, sizeof(buf), &len , ebuf, + sizeof(ebuf)) != STORE_ERR_OK) { + PyErr_SetString(PyExc_ValueError, ebuf); return (NULL); } - r = store_flow_deserialise(buf, len, &flow, ebuf, sizeof(ebuf)); - if (r != STORE_ERR_OK) { + return PyString_FromStringAndSize(buf, len); +} + +static PyMemberDef Flow_members[] = { + {"data", T_OBJECT, offsetof(FlowObject, user_attr), 0}, + {"src_addr", T_STRING, offsetof(FlowObject, src_addr), 0}, + {"dst_addr", T_STRING, offsetof(FlowObject, dst_addr), 0}, + {"agent_addr", T_STRING, offsetof(FlowObject, agent_addr), 0}, + {"gateway_addr",T_STRING, offsetof(FlowObject, gateway_addr), 0}, + {"octets", T_OBJECT, offsetof(FlowObject, octets), 0}, + {"packets", T_OBJECT, offsetof(FlowObject, packets), 0}, + {"flow_ver", FL_T_U8, offsetof(FlowObject, flow.hdr.version), 0}, + {"fields", FL_T_U32, offsetof(FlowObject, flow.hdr.fields), 0}, + {"tag", FL_T_U32, offsetof(FlowObject, flow.tag.tag), 0}, + {"recv_sec", FL_T_U32, offsetof(FlowObject, flow.recv_time.recv_sec),0}, + {"recv_usec", FL_T_U32, offsetof(FlowObject, flow.recv_time.recv_usec),0}, + {"tcp_flags", FL_T_U8, offsetof(FlowObject, flow.pft.tcp_flags), 0}, + {"protocol", FL_T_U8, offsetof(FlowObject, flow.pft.protocol), 0}, + {"tos", FL_T_U8, offsetof(FlowObject, flow.pft.tos), 0}, + {"src_port", FL_T_U16, offsetof(FlowObject, flow.ports.src_port), 0}, + {"dst_port", FL_T_U16, offsetof(FlowObject, flow.ports.dst_port), 0}, + {"if_ndx_in", FL_T_U32, offsetof(FlowObject, flow.ifndx.if_index_in),0}, + {"if_ndx_out", FL_T_U32, offsetof(FlowObject, flow.ifndx.if_index_out),0}, + {"sys_uptime_ms",FL_T_U32,offsetof(FlowObject, flow.ainfo.sys_uptime_ms),0}, + {"agent_sec", FL_T_U32, offsetof(FlowObject, flow.ainfo.time_sec), 0}, + {"agent_usec", FL_T_U32, offsetof(FlowObject, flow.ainfo.time_nanosec),0}, + {"netflow_ver", FL_T_U16, offsetof(FlowObject, flow.ainfo.netflow_version),0}, + {"flow_start", FL_T_U32, offsetof(FlowObject, flow.ftimes.flow_start),0}, + {"flow_finish", FL_T_U32, offsetof(FlowObject, flow.ftimes.flow_finish),0}, + {"src_as", FL_T_U32, offsetof(FlowObject, flow.asinf.src_as), 0}, + {"dst_as", FL_T_U32, offsetof(FlowObject, flow.asinf.dst_as), 0}, + {"src_mask", FL_T_U8, offsetof(FlowObject, flow.asinf.src_mask), 0}, + {"dst_mask", FL_T_U8, offsetof(FlowObject, flow.asinf.dst_mask), 0}, + {"engine_type", FL_T_U16, offsetof(FlowObject, flow.finf.engine_type),0}, + {"engine_id", FL_T_U16, offsetof(FlowObject, flow.finf.engine_id), 0}, + {"flow_sequence",FL_T_U32,offsetof(FlowObject, flow.finf.flow_sequence),0}, + {"source_id", FL_T_U32, offsetof(FlowObject, flow.finf.source_id), 0}, + {"crc32", FL_T_U32, offsetof(FlowObject, flow.crc32.crc32), 0}, + {NULL} +}; + +static PyMethodDef Flow_methods[] = { + {"format", (PyCFunction)flow_format, METH_VARARGS|METH_KEYWORDS, flow_format_doc }, + {"serialise", (PyCFunction)flow_serialise, 0, flow_serialise_doc }, + {NULL, NULL} /* sentinel */ +}; + +PyDoc_STRVAR(Flow_doc, +"Object representing a single NetFlow flow"); + +static PyTypeObject Flow_Type = { + /* The ob_type field must be initialized in the module init function + * to be portable to Windows without using C++. */ + PyObject_HEAD_INIT(NULL) + 0, /*ob_size*/ + "flowd.Flow", /*tp_name*/ + sizeof(FlowObject), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + /* methods */ + (destructor)Flow_dealloc,/*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash*/ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + Flow_doc, /*tp_doc*/ + 0, /*tp_traverse*/ + 0, /*tp_clear*/ + 0, /*tp_richcompare*/ + 0, /*tp_weaklistoffset*/ + 0, /*tp_iter*/ + 0, /*tp_iternext*/ + Flow_methods, /*tp_methods*/ + Flow_members, /*tp_members*/ + 0, /*tp_getset*/ + 0, /*tp_base*/ + 0, /*tp_dict*/ + 0, /*tp_descr_get*/ + 0, /*tp_descr_set*/ + 0, /*tp_dictoffset*/ + 0, /*tp_init*/ + 0, /*tp_alloc*/ + 0, /*tp_new*/ + 0, /*tp_free*/ + 0, /*tp_is_gc*/ +}; + +/* ------------------------------------------------------------------------ */ + +typedef struct _FlowLogObject { + PyObject_HEAD + PyObject *flowlog; /* PyFile */ +} FlowLogObject; + +static PyTypeObject FlowLog_Type; + +/* FlowLog methods */ + +static void +FlowLog_dealloc(FlowLogObject *self) +{ + Py_XDECREF(self->flowlog); + PyObject_Del(self); +} + +PyDoc_STRVAR(FlowLog_read_flow_doc, +"FlowLog.read_flow() -> new Flow object\n\ +\n\ +Reads a flow record from the flow log and returns a Flow object\n\ +"); + +static PyObject * +FlowLog_read_flow(FlowLogObject *self) +{ + struct store_flow_complete flow; + char ebuf[512]; + + switch (store_read_flow(PyFile_AsFile(self->flowlog), &flow, + ebuf, sizeof(ebuf))) { + case STORE_ERR_OK: + return (PyObject *)newFlowObject_from_flow(&flow); + case STORE_ERR_EOF: + Py_INCREF(Py_None); + return Py_None; + default: PyErr_SetString(PyExc_ValueError, ebuf); return (NULL); } + /* NOTREACHED */ +} - fields = ntohl(flow.hdr.fields); +PyDoc_STRVAR(FlowLog_write_flow_doc, +"FlowLog.write_flow(flow, mask = flowd.DISPLAY_ALL) -> None\n\ +\n\ +Writes a flow record to the flow log\n\ +"); - if ((ret = PyDict_New()) == NULL) +static PyObject * +FlowLog_write_flow(FlowLogObject *self, PyObject *args, PyObject *kw_args) +{ + struct store_flow_complete flow; + static char *keywords[] = { "flow", "fieldmask", NULL }; + char ebuf[512]; + + FlowObject *flowobj = NULL; + u_int32_t mask = STORE_DISPLAY_ALL; + + if (!PyArg_ParseTupleAndKeywords(args, kw_args, "O!|k:write_flow", + keywords, &Flow_Type, (PyObject *)&flowobj, &mask)) + return NULL; + + if (flowobj_normalise(flowobj) == -1) return (NULL); - field = PyLong_FromUnsignedLong(fields); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "fields", field)) - goto setitem_err; - Py_DECREF(field); - - if (fields & STORE_FIELD_TAG) { - field = PyLong_FromUnsignedLong(ntohl(flow.tag.tag)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "tag", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_RECV_TIME) { - field = PyLong_FromUnsignedLong(ntohl(flow.recv_time.recv_secs)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "recv_secs", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_PROTO_FLAGS_TOS) { - field = PyInt_FromLong(flow.pft.tcp_flags); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "tcp_flags", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(flow.pft.protocol); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "protocol", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(flow.pft.tos); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "tos", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & (STORE_FIELD_AGENT_ADDR4|STORE_FIELD_AGENT_ADDR6)) { - addr_ntop(&flow.agent_addr, addr_buf, sizeof(addr_buf)); - field = PyString_FromString(addr_buf); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "agent_addr", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(flow.agent_addr.af); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "agent_addr_af", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & (STORE_FIELD_SRC_ADDR4|STORE_FIELD_SRC_ADDR6)) { - addr_ntop(&flow.src_addr, addr_buf, sizeof(addr_buf)); - field = PyString_FromString(addr_buf); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "src_addr", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(flow.src_addr.af); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "src_addr_af", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & (STORE_FIELD_DST_ADDR4|STORE_FIELD_DST_ADDR6)) { - addr_ntop(&flow.dst_addr, addr_buf, sizeof(addr_buf)); - field = PyString_FromString(addr_buf); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "dst_addr", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(flow.dst_addr.af); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "dst_addr_af", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & (STORE_FIELD_GATEWAY_ADDR4|STORE_FIELD_GATEWAY_ADDR6)) { - addr_ntop(&flow.gateway_addr, addr_buf, sizeof(addr_buf)); - field = PyString_FromString(addr_buf); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "gateway_addr", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(flow.gateway_addr.af); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "gateway_addr_af", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_SRCDST_PORT) { - field = PyInt_FromLong(ntohs(flow.ports.src_port)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "src_port", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(ntohs(flow.ports.dst_port)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "dst_port", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_PACKETS) { - field = PyLong_FromUnsignedLongLong( - store_ntohll(flow.packets.flow_packets)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "flow_packets", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_OCTETS) { - field = PyLong_FromUnsignedLongLong( - store_ntohll(flow.octets.flow_octets)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "flow_octets", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_IF_INDICES) { - field = PyInt_FromLong(ntohs(flow.ifndx.if_index_in)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "if_index_in", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(ntohs(flow.ifndx.if_index_out)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "if_index_out", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_AGENT_INFO) { - field = PyLong_FromUnsignedLong( - ntohl(flow.ainfo.sys_uptime_ms)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "sys_uptime_ms", field)) - goto setitem_err; - Py_DECREF(field); - field = PyLong_FromUnsignedLong(ntohl(flow.ainfo.time_sec)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "time_sec", field)) - goto setitem_err; - Py_DECREF(field); - field = PyLong_FromUnsignedLong(ntohl(flow.ainfo.time_nanosec)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "time_nanosec", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(ntohs(flow.ainfo.netflow_version)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "netflow_version", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_FLOW_TIMES) { - field = PyLong_FromUnsignedLong(ntohl(flow.ftimes.flow_start)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "flow_start", field)) - goto setitem_err; - Py_DECREF(field); - field = PyLong_FromUnsignedLong(ntohl(flow.ftimes.flow_finish)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "flow_finish", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_AS_INFO) { - field = PyInt_FromLong(ntohs(flow.asinf.src_as)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "src_as", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(ntohs(flow.asinf.dst_as)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "dst_as", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(flow.asinf.src_mask); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "src_mask", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(flow.asinf.dst_mask); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "dst_mask", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_FLOW_ENGINE_INFO) { - field = PyInt_FromLong(flow.finf.engine_type); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "engine_type", field)) - goto setitem_err; - Py_DECREF(field); - field = PyInt_FromLong(flow.finf.engine_id); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "engine_id", field)) - goto setitem_err; - Py_DECREF(field); - field = PyLong_FromUnsignedLong(htonl(flow.finf.flow_sequence)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "flow_sequence", field)) - goto setitem_err; - Py_DECREF(field); - } - if (fields & STORE_FIELD_CRC32) { - field = PyLong_FromUnsignedLong(ntohl(flow.crc32.crc32)); - if (field == NULL) - goto field_err; - if (PyDict_SetItemString(ret, "crc", field)) - goto setitem_err; - Py_DECREF(field); + memcpy(&flow, &flowobj->flow, sizeof(flow)); + store_swab_flow(&flow, 1); + + if (store_write_flow(PyFile_AsFile(self->flowlog), &flow, mask, + ebuf, sizeof(ebuf)) != STORE_ERR_OK) { + PyErr_SetString(PyExc_ValueError, ebuf); + return (NULL); } - return (ret); + Py_INCREF(Py_None); + return Py_None; +} - setitem_err: - Py_DECREF(field); - field_err: - Py_DECREF(ret); - return (NULL); +static PyObject * +FlowLog_getiter(FlowLogObject *self) +{ + return (PyObject *)newFlowLogIterObject(self); } -static int -sr_get_u64(PyObject *dict, const char *key, u_int64_t *val) +static PyMemberDef FlowLog_members[] = { + {"file", T_OBJECT, offsetof(FlowLogObject, flowlog), 0}, + {NULL} +}; + +PyDoc_STRVAR(FlowLog_doc, "NetFlow log"); + +static PyMethodDef FlowLog_methods[] = { + {"read_flow", (PyCFunction)FlowLog_read_flow, 0, FlowLog_read_flow_doc }, + {"write_flow", (PyCFunction)FlowLog_write_flow,METH_VARARGS|METH_KEYWORDS, FlowLog_write_flow_doc }, + {NULL, NULL} /* sentinel */ +}; + +static PyTypeObject FlowLog_Type = { + /* The ob_type field must be initialized in the module init function + * to be portable to Windows without using C++. */ + PyObject_HEAD_INIT(NULL) + 0, /*ob_size*/ + "flowd.FlowLog", /*tp_name*/ + sizeof(FlowLogObject), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + /* methods */ + (destructor)FlowLog_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash*/ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + FlowLog_doc, /*tp_doc*/ + 0, /*tp_traverse*/ + 0, /*tp_clear*/ + 0, /*tp_richcompare*/ + 0, /*tp_weaklistoffset*/ + (getiterfunc)FlowLog_getiter, /*tp_iter*/ + 0, /*tp_iternext*/ + FlowLog_methods, /*tp_methods*/ + FlowLog_members, /*tp_members*/ + 0, /*tp_getset*/ + 0, /*tp_base*/ + 0, /*tp_dict*/ + 0, /*tp_descr_get*/ + 0, /*tp_descr_set*/ + 0, /*tp_dictoffset*/ + 0, /*tp_init*/ + 0, /*tp_alloc*/ + 0, /*tp_new*/ + 0, /*tp_free*/ + 0, /*tp_is_gc*/ +}; + +/* ------------------------------------------------------------------------ */ + +/* FlowLogIter: netflow log iterator */ + +typedef struct _FlowLogIterObject { + PyObject_HEAD + FlowLogObject *parent; +} FlowLogIterObject; + +static PyTypeObject FlowLogIter_Type; + +static FlowLogIterObject * +newFlowLogIterObject(FlowLogObject *parent) { - unsigned long long ullval; - PyObject *field; + FlowLogIterObject *self; - if ((field = PyDict_GetItemString(dict, key)) == NULL) - return (0); + self = PyObject_New(FlowLogIterObject, &FlowLogIter_Type); + if (self == NULL) + return NULL; - if ((ullval = PyInt_AsUnsignedLongLongMask(field)) == ULLONG_MAX) { - PyErr_Format(PyExc_TypeError, - "\"%s\" entry is not an integer", key); - return (-1); - } - *val = ullval; - return (1); + self->parent = parent; + Py_XINCREF(self->parent); + + return self; } -static int -sr_get_addr(PyObject *dict, const char *key, struct xaddr *val) +/* FlowLogIter methods */ + +static void +FlowLogIter_dealloc(FlowLogIterObject *self) { - PyObject *field; - const char *addr; - struct xaddr xa; - - if ((field = PyDict_GetItemString(dict, key)) == NULL) - return (0); - if ((addr = PyString_AsString(field)) == NULL) { - PyErr_Format(PyExc_TypeError, - "\"%s\" entry is not a string", key); - return (-1); - } - if (addr_pton(addr, &xa) == -1) { - PyErr_Format(PyExc_ValueError, - "Invalid \"%s\" address", key); - return (-1); - } - memcpy(val, &xa, sizeof(*val)); - return (1); + Py_XDECREF(self->parent); + PyObject_Del(self); } static PyObject * -flow_serialise(PyObject *self, PyObject *args) +FlowLogIter_iternext(FlowLogIterObject *self) { - u_int32_t fields, mask; - u_int64_t uv; - int version, len, r; struct store_flow_complete flow; char ebuf[512]; - PyObject *flow_dict; - struct xaddr addr; - /* XXX: assume that a serialised flow will fit into a unpacked struct */ - u_int8_t flow_buf[sizeof(struct store_flow_complete)]; - - version = STORE_VERSION; - mask = 0xffffffff; - if (!PyArg_ParseTuple(args, "O!|kk", &PyDict_Type, &flow_dict, - &mask, &version)) - return (NULL); - if (version != STORE_VERSION) { - PyErr_SetString(PyExc_NotImplementedError, - "Unsupported store version"); + + switch (store_read_flow(PyFile_AsFile(self->parent->flowlog), &flow, + ebuf, sizeof(ebuf))) { + case STORE_ERR_OK: + return (PyObject *)newFlowObject_from_flow(&flow); + case STORE_ERR_EOF: + return NULL; + default: + PyErr_SetString(PyExc_ValueError, ebuf); return (NULL); } + /* NOTREACHED */ +} - memset(&flow, 0, sizeof(flow)); - fields = 0; +PyDoc_STRVAR(FlowLogIter_doc, +"FlowLog tree iterator"); -#define U8_ENTRY(name, field, target) do { \ - if ((r = sr_get_u64(flow_dict, name, &uv)) == -1) \ - return (NULL); \ - if (r == 1) { \ - fields |= STORE_FIELD_##field; \ - if (uv > 0xff) { \ - PyErr_Format(PyExc_ValueError, \ - "\"%s\" entry out of range", name); \ - return (NULL); \ - } \ - flow.target = uv & 0xff; \ - } \ - } while (0) -#define U16_ENTRY(name, field, target) do { \ - if ((r = sr_get_u64(flow_dict, name, &uv)) == -1) \ - return (NULL); \ - if (r == 1) { \ - fields |= STORE_FIELD_##field; \ - if (uv > 0xffff) { \ - PyErr_Format(PyExc_ValueError, \ - "\"%s\" entry out of range", name); \ - return (NULL); \ - } \ - flow.target = htons(uv & 0xffff); \ - } \ - } while (0) -#define U32_ENTRY(name, field, target) do { \ - if ((r = sr_get_u64(flow_dict, name, &uv)) == -1) \ - return (NULL); \ - if (r == 1) { \ - fields |= STORE_FIELD_##field; \ - if (uv > 0xffffffff) { \ - PyErr_Format(PyExc_ValueError, \ - "\"%s\" entry out of range", name); \ - return (NULL); \ - } \ - flow.target = htonl(uv & 0xffffffff); \ - } \ - } while (0) -#define U64_ENTRY(name, field, target) do { \ - if ((r = sr_get_u64(flow_dict, name, &uv)) == -1) \ - return (NULL); \ - if (r == 1) { \ - fields |= STORE_FIELD_##field; \ - flow.target = store_htonll(uv); \ - } \ - } while (0) -#define ADDR_ENTRY(name, field, target) do { \ - if ((r = sr_get_addr(flow_dict, name, &addr)) == -1) \ - return (NULL); \ - if (r == 1) { \ - if (addr.af == AF_INET) \ - fields |= STORE_FIELD_##field##4; \ - else if (addr.af == AF_INET6) \ - fields |= STORE_FIELD_##field##6; \ - else \ - return (NULL); \ - flow.target = addr; \ - } \ - } while (0) +static PyTypeObject FlowLogIter_Type = { + /* The ob_type field must be initialized in the module init function + * to be portable to Windows without using C++. */ + PyObject_HEAD_INIT(NULL) + 0, /*ob_size*/ + "flowd.FlowLogIter", /*tp_name*/ + sizeof(FlowLogIterObject),/*tp_basicsize*/ + 0, /*tp_itemsize*/ + /* methods */ + (destructor)FlowLogIter_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash*/ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + FlowLogIter_doc, /*tp_doc*/ + 0, /*tp_traverse*/ + 0, /*tp_clear*/ + 0, /*tp_richcompare*/ + 0, /*tp_weaklistoffset*/ + 0, /*tp_iter*/ + (iternextfunc)FlowLogIter_iternext, /*tp_iternext*/ + 0, /*tp_methods*/ + 0, /*tp_members*/ + 0, /*tp_getset*/ + 0, /*tp_base*/ + 0, /*tp_dict*/ + 0, /*tp_descr_get*/ + 0, /*tp_descr_set*/ + 0, /*tp_dictoffset*/ + 0, /*tp_init*/ + 0, /*tp_alloc*/ + 0, /*tp_new*/ + 0, /*tp_free*/ + 0, /*tp_is_gc*/ +}; - U32_ENTRY("tag", TAG, tag.tag); - U32_ENTRY("recv_secs", RECV_TIME, recv_time.recv_secs); - U8_ENTRY("tcp_flags", PROTO_FLAGS_TOS, pft.tcp_flags); - U8_ENTRY("protocol", PROTO_FLAGS_TOS, pft.protocol); - U8_ENTRY("tos", PROTO_FLAGS_TOS, pft.tos); - ADDR_ENTRY("agent_addr", AGENT_ADDR, agent_addr); - ADDR_ENTRY("src_addr", SRC_ADDR, src_addr); - ADDR_ENTRY("dst_addr", DST_ADDR, dst_addr); - ADDR_ENTRY("gateway_addr", GATEWAY_ADDR, gateway_addr); - U16_ENTRY("src_port", SRCDST_PORT, ports.src_port); - U16_ENTRY("dst_port", SRCDST_PORT, ports.dst_port); - U64_ENTRY("flow_packets", PACKETS, packets.flow_packets); - U64_ENTRY("flow_octets", OCTETS, octets.flow_octets); - U16_ENTRY("if_index_in", IF_INDICES, ifndx.if_index_in); - U16_ENTRY("if_index_out", IF_INDICES, ifndx.if_index_out); - U32_ENTRY("sys_uptime_ms", AGENT_INFO, ainfo.sys_uptime_ms); - U32_ENTRY("time_sec", AGENT_INFO, ainfo.time_sec); - U32_ENTRY("time_nanosec", AGENT_INFO, ainfo.time_nanosec); - U16_ENTRY("netflow_version", AGENT_INFO, ainfo.netflow_version); - U32_ENTRY("flow_start", FLOW_TIMES, ftimes.flow_start); - U32_ENTRY("flow_finish", FLOW_TIMES, ftimes.flow_finish); - U16_ENTRY("src_as", AS_INFO, asinf.src_as); - U16_ENTRY("dst_as", AS_INFO, asinf.dst_as); - U8_ENTRY("src_mask", AS_INFO, asinf.src_mask); - U8_ENTRY("dst_mask", AS_INFO, asinf.dst_mask); - U8_ENTRY("engine_type", FLOW_ENGINE_INFO, finf.engine_type); - U8_ENTRY("engine_id", FLOW_ENGINE_INFO, finf.engine_id); - U32_ENTRY("flow_sequence", FLOW_ENGINE_INFO, finf.flow_sequence); - -#undef U8_ENTRY -#undef U16_ENTRY -#undef U32_ENTRY -#undef U64_ENTRY -#undef ADDR_ENTRY - - fields &= mask; - flow.hdr.fields = htonl(fields); - - if ((r = store_calc_flow_len(&flow.hdr)) < 0) { - PyErr_SetString(PyExc_ValueError, - "Invalid field specification"); - return (NULL); - } - r = store_flow_serialise(&flow, flow_buf, sizeof(flow_buf), &len, - ebuf, sizeof(ebuf)); - if (r != STORE_ERR_OK) { - PyErr_SetString(PyExc_ValueError, ebuf); - return (NULL); - } - return (PyString_FromStringAndSize(flow_buf, len)); +/* ------------------------------------------------------------------------ */ + +PyDoc_STRVAR(flow_Flow_doc, +"Flow(blob = None) -> new Flow object\n\ +\n\ +Instantiate a new Flow object. If the 'blob' parameter is specified,\n\ +the flow will be created from the specified binary flow record, otherwise \n\ +the Flow object will be created empty."); + +static PyObject * +flow_Flow(PyObject *args, PyObject *kw_args) +{ + FlowObject *rv; + static char *keywords[] = { "blob", NULL }; + char *blob = NULL; + int bloblen = -1; + + if (!PyArg_ParseTupleAndKeywords(args, kw_args, "|s#:Flow", keywords, + &blob, &bloblen)) + return NULL; + if (bloblen == -1) + rv = newFlowObject(); + else + rv = newFlowObject_from_blob(blob, bloblen); + if (rv == NULL) + return NULL; + return (PyObject *)rv; } +PyDoc_STRVAR(flow_FlowLog_doc, +"FlowLog(path, mode = \"rb\") -> new FlowFlow object\n\ +\n\ +Instantiate a new FlowLog object.\n\ +"); -PyDoc_STRVAR(flowd_doc, -"This module performs conversions from binary flowd logs to Python\n" -"dictionaries. This is used by the flowd.py module to provide a high-level\n" -"API to flowd logs. Unless you really have a need, just use flowd.py"); +static PyObject * +flow_FlowLog(PyObject *self, PyObject *args, PyObject *kw_args) +{ + FlowLogObject *rv; + static char *keywords[] = { "path", "mode", NULL }; + char *path = NULL, *mode = "rb"; + + if (!PyArg_ParseTupleAndKeywords(args, kw_args, "s|s:FlowLog", keywords, + &path, &mode)) + return NULL; + if ((rv = PyObject_New(FlowLogObject, &FlowLog_Type)) == NULL) + return (NULL); + if ((rv->flowlog = PyFile_FromString(path, mode)) == NULL) + return (NULL); + + return (PyObject *)rv; +} static PyMethodDef flowd_methods[] = { - { "header_len", flow_header_length, METH_VARARGS, - PyDoc_STR("Return the length of a flow record header") }, - { "flow_len", flow_length, METH_VARARGS, - PyDoc_STR("Calcuate the length of a flow record, given its header") }, - { "deserialise", flow_deserialise, METH_VARARGS, - PyDoc_STR("Convert a binary flow log record into a dict of fields") }, - { "serialise", flow_serialise, METH_VARARGS, - PyDoc_STR("Convert a dict flow record into a binary log record") }, - { NULL, NULL, 0, NULL} /* sentinel */ + {"Flow", (PyCFunction)flow_Flow, METH_VARARGS|METH_KEYWORDS, flow_Flow_doc }, + {"FlowLog", (PyCFunction)flow_FlowLog, METH_VARARGS|METH_KEYWORDS, flow_FlowLog_doc }, + {NULL, NULL} /* sentinel */ }; -PyMODINIT_FUNC initflowd_serialiser(void); +PyDoc_STRVAR(module_doc, +"XXX.\n\ +"); PyMODINIT_FUNC -initflowd_serialiser(void) +initflowd(void) { PyObject *m; - m = Py_InitModule3("flowd_serialiser", flowd_methods, flowd_doc); + if (PyType_Ready(&Flow_Type) < 0) + return; + if (PyType_Ready(&FlowLog_Type) < 0) + return; + m = Py_InitModule3("flowd", flowd_methods, module_doc); + +#define STORE_CONST(c) \ + PyModule_AddObject(m, #c, PyLong_FromUnsignedLong(STORE_##c)) + STORE_CONST(FIELD_TAG); + STORE_CONST(FIELD_RECV_TIME); + STORE_CONST(FIELD_PROTO_FLAGS_TOS); + STORE_CONST(FIELD_AGENT_ADDR4); + STORE_CONST(FIELD_AGENT_ADDR6); + STORE_CONST(FIELD_SRC_ADDR4); + STORE_CONST(FIELD_SRC_ADDR6); + STORE_CONST(FIELD_DST_ADDR4); + STORE_CONST(FIELD_DST_ADDR6); + STORE_CONST(FIELD_GATEWAY_ADDR4); + STORE_CONST(FIELD_GATEWAY_ADDR6); + STORE_CONST(FIELD_SRCDST_PORT); + STORE_CONST(FIELD_PACKETS); + STORE_CONST(FIELD_OCTETS); + STORE_CONST(FIELD_IF_INDICES); + STORE_CONST(FIELD_AGENT_INFO); + STORE_CONST(FIELD_FLOW_TIMES); + STORE_CONST(FIELD_AS_INFO); + STORE_CONST(FIELD_FLOW_ENGINE_INFO); + STORE_CONST(FIELD_CRC32); + STORE_CONST(FIELD_RESERVED); + STORE_CONST(FIELD_ALL); + STORE_CONST(FIELD_AGENT_ADDR); + STORE_CONST(FIELD_SRC_ADDR); + STORE_CONST(FIELD_DST_ADDR); + STORE_CONST(FIELD_SRCDST_ADDR); + STORE_CONST(FIELD_GATEWAY_ADDR); + STORE_CONST(DISPLAY_ALL); + STORE_CONST(DISPLAY_BRIEF); +#undef STORE_CONST +#define STORE_CONST(c) \ + PyModule_AddObject(m, "STORE_"#c, PyLong_FromUnsignedLong(STORE_##c)) + STORE_CONST(VER_MAJOR); + STORE_CONST(VER_MINOR); + STORE_CONST(VERSION); +#undef STORE_CONST + PyModule_AddStringConstant(m, "__version__", PROGVER); } Index: reader.pl =================================================================== RCS file: /var/cvs/flowd/reader.pl,v retrieving revision 1.8 diff -u -p -r1.8 reader.pl --- reader.pl 31 Oct 2004 06:21:49 -0000 1.8 +++ reader.pl 21 Aug 2005 09:49:17 -0000 @@ -35,12 +35,10 @@ usage() unless (defined $ARGV[0]); foreach my $ffile (@ARGV) { my $log = Flowd->new($ffile); - printf "LOGFILE %s started at %s\n", - $ffile, Flowd::iso_time($log->{start_time}, 0); + printf "LOGFILE %s \n", $ffile; while (my $flow = $log->read_flow()) { - print $log->format(Flowd::BRIEF, 0, $flow); - print "\n"; + print $log->format(Flowd::BRIEF, 0, $flow) . "\n"; } $log->finish(); } Index: reader.py =================================================================== RCS file: /var/cvs/flowd/reader.py,v retrieving revision 1.2 diff -u -p -r1.2 reader.py --- reader.py 13 Aug 2004 02:31:17 -0000 1.2 +++ reader.py 21 Aug 2005 09:49:17 -0000 @@ -24,7 +24,8 @@ import sys import getopt def usage(): - print >> sys.stderr, "reader.pl (flowd.py version %s)" % flowd.VERSION + print >> sys.stderr, "reader.pl (flowd.py version %s)" % \ + flowd.__version__ print >> sys.stderr, "Usage: reader.pl [options] [flowd-store]"; print >> sys.stderr, "Options:"; print >> sys.stderr, " -h Display this help"; @@ -58,27 +59,18 @@ def main(): usage() if verbose: - mask = flowd.flow.ALL + mask = flowd.DISPLAY_ALL else: - mask = flowd.flow.BRIEF + mask = flowd.DISPLAY_BRIEF for ffile in args: - flog = flowd.log(ffile) + flog = flowd.FlowLog(ffile) try: - print "LOGFILE " + ffile + " started at " + \ - flowd.iso_time(flog.start_time, utc = utc) + print "LOGFILE " + ffile except IOError: break; - while 1: - flow = flog.readflow() - if flow is None: - break - try: - print flow.format(field_mask = mask, utc = utc) - except IOError: - break; - - flog.finish() + for flow in flog: + print flow.format(mask = mask, utc = utc) if __name__ == '__main__': main() Index: setup.py =================================================================== RCS file: /var/cvs/flowd/setup.py,v retrieving revision 1.8 diff -u -p -r1.8 setup.py --- setup.py 14 May 2005 07:22:08 -0000 1.8 +++ setup.py 21 Aug 2005 09:49:17 -0000 @@ -22,16 +22,16 @@ from distutils.core import setup, Extens if __name__ == '__main__': if sys.hexversion < 0x02030000: print >> sys.stderr, "error: " + \ - "flowd.py requires python >= 2.3" + "flowd requires python >= 2.3" sys.exit(1) - flowd_serialiser = Extension('flowd_serialiser', + flowd = Extension('flowd', sources = ['flowd_python.c'], - define_macros = [('PROGVER', '"0.8.5"')], + define_macros = [('PROGVER', '"0.9"')], libraries = ['flowd'], library_dirs = ['.', '../..']) setup( name = "flowd", - version = "0.8.5", + version = "0.9", author = "Damien Miller", author_email = "djm@mindrot.org", url = "http://www.mindrot.org/flowd.html", @@ -41,6 +41,5 @@ This is an API to parse the binary flow collector. """, license = "BSD", - py_modules = ['flowd'], - ext_modules = [flowd_serialiser] + ext_modules = [flowd] ) Index: store-v2.c =================================================================== RCS file: store-v2.c diff -N store-v2.c --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ store-v2.c 21 Aug 2005 09:49:17 -0000 @@ -0,0 +1,556 @@ +/* + * Copyright (c) 2004 Damien Miller + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "store-v2.h" +#include "atomicio.h" +#include "crc32.h" + +RCSID("$Id$"); + +/* This is a useful abbreviation, used in several places below */ +#define SHASFIELD(flag) (fields & STORE_V2_FIELD_##flag) + +/* Stash error message and return */ +#define SFAILX(i, m, f) do { \ + if (ebuf != NULL && elen > 0) { \ + snprintf(ebuf, elen, "%s%s%s", \ + (f) ? __func__ : "", (f) ? ": " : "", m); \ + } \ + return (i); \ + } while (0) + +/* Stash error message, appending strerror into "ebuf" and return */ +#define SFAIL(i, m, f) do { \ + if (ebuf != NULL && elen > 0) { \ + snprintf(ebuf, elen, "%s%s%s: %s", \ + (f) ? __func__ : "", (f) ? ": " : "", m, \ + strerror(errno)); \ + } \ + return (i); \ + } while (0) + +int +store_v2_validate_header(struct store_v2_header *hdr, char *ebuf, int elen) +{ + if (ntohl(hdr->magic) != STORE_V2_MAGIC) + SFAILX(STORE_ERR_BAD_MAGIC, "Bad magic", 0); + if (ntohl(hdr->version) != STORE_V2_VERSION) + SFAILX(STORE_ERR_UNSUP_VERSION, "Unsupported version", 0); + + return (STORE_ERR_OK); +} + +int +store_v2_get_header(int fd, struct store_v2_header *hdr, char *ebuf, int elen) +{ + ssize_t r; + + if ((r = atomicio(read, fd, hdr, sizeof(*hdr))) == -1) + SFAIL(STORE_ERR_IO, "read error", 0); + if (r < (ssize_t)sizeof(*hdr)) + SFAILX(STORE_ERR_EOF, "premature EOF", 0); + + return (store_v2_validate_header(hdr, ebuf, elen)); +} + +int +store_v2_calc_flow_len(struct store_v2_flow *hdr) +{ + int ret = 0; + u_int32_t fields; + + fields = ntohl(hdr->fields); +#define ADDFIELD(flag) do { \ + if (SHASFIELD(flag)) { \ + ret += sizeof(struct store_v2_flow_##flag); \ + fields &= ~STORE_V2_FIELD_##flag; \ + } } while (0) + ADDFIELD(TAG); + ADDFIELD(RECV_TIME); + ADDFIELD(PROTO_FLAGS_TOS); + ADDFIELD(AGENT_ADDR4); + ADDFIELD(AGENT_ADDR6); + ADDFIELD(SRC_ADDR4); + ADDFIELD(SRC_ADDR6); + ADDFIELD(DST_ADDR4); + ADDFIELD(DST_ADDR6); + ADDFIELD(GATEWAY_ADDR4); + ADDFIELD(GATEWAY_ADDR6); + ADDFIELD(SRCDST_PORT); + ADDFIELD(PACKETS); + ADDFIELD(OCTETS); + ADDFIELD(IF_INDICES); + ADDFIELD(AGENT_INFO); + ADDFIELD(FLOW_TIMES); + ADDFIELD(AS_INFO); + ADDFIELD(FLOW_ENGINE_INFO); + ADDFIELD(CRC32); +#undef ADDFIELD + + /* Make sure we have captured everything */ + if (fields != 0) + return (-1); + + return (ret); +} + +int +store_v2_flow_deserialise(u_int8_t *buf, int len, struct store_v2_flow_complete *f, + char *ebuf, int elen) +{ + int offset, r; + struct store_v2_flow_AGENT_ADDR4 aa4; + struct store_v2_flow_AGENT_ADDR6 aa6; + struct store_v2_flow_SRC_ADDR4 sa4; + struct store_v2_flow_SRC_ADDR6 sa6; + struct store_v2_flow_DST_ADDR4 da4; + struct store_v2_flow_DST_ADDR6 da6; + struct store_v2_flow_GATEWAY_ADDR4 ga4; + struct store_v2_flow_GATEWAY_ADDR6 ga6; + u_int32_t fields, crc; + + bzero(f, sizeof(*f)); + crc32_start(&crc); + + memcpy(&f->hdr.fields, buf, sizeof(f->hdr.fields)); + + if (len < sizeof(f->hdr)) + SFAILX(STORE_ERR_BUFFER_SIZE, + "supplied length is too small", 1); + + if ((r = store_v2_calc_flow_len((struct store_v2_flow *)buf)) == -1) + SFAILX(STORE_ERR_FLOW_INVALID, + "unsupported flow fields specified", 0); + + if (len - sizeof(f->hdr) < r) + SFAILX(STORE_ERR_BUFFER_SIZE, + "calulated flow length is less than supplied len", 1); + + crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc); + + fields = ntohl(f->hdr.fields); + + offset = sizeof(f->hdr); + +#define RFIELD(flag, dest) do { \ + if (SHASFIELD(flag)) { \ + memcpy(&dest, buf + offset, sizeof(dest)); \ + offset += sizeof(dest); \ + if (SHASFIELD(CRC32) && \ + STORE_V2_FIELD_##flag != STORE_V2_FIELD_CRC32) { \ + crc32_update((u_char *)&dest, sizeof(dest), \ + &crc); \ + } \ + } } while (0) + + RFIELD(TAG, f->tag); + RFIELD(RECV_TIME, f->recv_time); + RFIELD(PROTO_FLAGS_TOS, f->pft); + RFIELD(AGENT_ADDR4, aa4); + RFIELD(AGENT_ADDR6, aa6); + RFIELD(SRC_ADDR4, sa4); + RFIELD(SRC_ADDR6, sa6); + RFIELD(DST_ADDR4, da4); + RFIELD(DST_ADDR6, da6); + RFIELD(GATEWAY_ADDR4, ga4); + RFIELD(GATEWAY_ADDR6, ga6); + RFIELD(SRCDST_PORT, f->ports); + RFIELD(PACKETS, f->packets); + RFIELD(OCTETS, f->octets); + RFIELD(IF_INDICES, f->ifndx); + RFIELD(AGENT_INFO, f->ainfo); + RFIELD(FLOW_TIMES, f->ftimes); + RFIELD(AS_INFO, f->asinf); + RFIELD(FLOW_ENGINE_INFO, f->finf); + RFIELD(CRC32, f->crc32); + + /* Sanity check and convert addresses */ + if (SHASFIELD(AGENT_ADDR4) && SHASFIELD(AGENT_ADDR6)) + SFAILX(-1, "Flow has both v4/v6 agent addrs", 0); + if (SHASFIELD(SRC_ADDR4) && SHASFIELD(SRC_ADDR6)) + SFAILX(-1, "Flow has both v4/v6 src addrs", 0); + if (SHASFIELD(DST_ADDR4) && SHASFIELD(DST_ADDR6)) + SFAILX(-1, "Flow has both v4/v6 dst addrs", 0); + if (SHASFIELD(GATEWAY_ADDR4) && SHASFIELD(GATEWAY_ADDR6)) + SFAILX(-1, "Flow has both v4/v6 gateway addrs", 0); + +#define S_CPYADDR(d, s, fam) do { \ + (d).af = (fam == 4) ? AF_INET : AF_INET6; \ + memcpy(&d.v##fam, &s, sizeof(d.v##fam)); \ + } while (0) + + if (SHASFIELD(AGENT_ADDR4)) + S_CPYADDR(f->agent_addr, aa4.flow_agent_addr, 4); + if (SHASFIELD(AGENT_ADDR6)) + S_CPYADDR(f->agent_addr, aa6.flow_agent_addr, 6); + if (SHASFIELD(SRC_ADDR4)) + S_CPYADDR(f->src_addr, sa4.src_addr, 4); + if (SHASFIELD(SRC_ADDR6)) + S_CPYADDR(f->src_addr, sa6.src_addr, 6); + if (SHASFIELD(DST_ADDR4)) + S_CPYADDR(f->dst_addr, da4.dst_addr, 4); + if (SHASFIELD(DST_ADDR6)) + S_CPYADDR(f->dst_addr, da6.dst_addr, 6); + if (SHASFIELD(GATEWAY_ADDR4)) + S_CPYADDR(f->gateway_addr, ga4.gateway_addr, 4); + if (SHASFIELD(GATEWAY_ADDR6)) + S_CPYADDR(f->gateway_addr, ga6.gateway_addr, 6); + + if (SHASFIELD(CRC32) && crc != ntohl(f->crc32.crc32)) + SFAILX(STORE_ERR_CRC_MISMATCH, "Flow checksum mismatch", 0); + +#undef S_CPYADDR +#undef RFIELD + + return (STORE_ERR_OK); +} + +int +store_v2_get_flow(int fd, struct store_v2_flow_complete *f, char *ebuf, int elen) +{ + int r, len; + u_int8_t buf[512]; + + /* Read header */ + if ((r = atomicio(read, fd, buf, sizeof(struct store_v2_flow))) == -1) + SFAIL(STORE_ERR_IO, "read flow header", 0); + if (r < sizeof(struct store_v2_flow)) + SFAILX(STORE_ERR_EOF, "EOF reading flow header", 0); + + if ((len = store_v2_calc_flow_len((struct store_v2_flow *)buf)) == -1) + SFAILX(STORE_ERR_FLOW_INVALID, + "unsupported flow fields specified", 0); + if (len > sizeof(buf) - sizeof(struct store_v2_flow)) + SFAILX(STORE_ERR_INTERNAL, + "Internal error: flow buffer too small", 1); + + if ((r = atomicio(read, fd, buf + sizeof(struct store_v2_flow), len)) == -1) + SFAIL(STORE_ERR_IO, "read flow data", 0); + if (r < len) + SFAILX(STORE_ERR_EOF, "EOF reading flow data", 0); + + return (store_v2_flow_deserialise(buf, len + sizeof(struct store_v2_flow), + f, ebuf, elen)); +} + +int +store_v2_check_header(int fd, char *ebuf, int elen) +{ + struct store_v2_header hdr; + int r; + + if ((r = store_v2_get_header(fd, &hdr, ebuf, elen)) != STORE_ERR_OK) + return (r); + + /* store_get_header does all the magic & version checks for us */ + + return (STORE_ERR_OK); +} + +int +store_v2_put_header(int fd, char *ebuf, int elen) +{ + struct store_v2_header hdr; + int r; + + bzero(&hdr, sizeof(hdr)); + hdr.magic = htonl(STORE_V2_MAGIC); + hdr.version = htonl(STORE_V2_VERSION); + hdr.start_time = htonl(time(NULL)); + hdr.flags = htonl(0); + + r = atomicio(vwrite, fd, &hdr, sizeof(hdr)); + if (r == -1) + SFAIL(STORE_ERR_IO, "write error on header", 0); + if (r < (ssize_t)sizeof(hdr)) + SFAILX(STORE_ERR_EOF, "EOF while writing header", 0); + + return (STORE_ERR_OK); +} + +int +store_v2_flow_serialise(struct store_v2_flow_complete *f, u_int8_t *buf, int buflen, + int *flowlen, char *ebuf, int elen) +{ + struct store_v2_flow_AGENT_ADDR4 aa4; + struct store_v2_flow_AGENT_ADDR6 aa6; + struct store_v2_flow_SRC_ADDR4 sa4; + struct store_v2_flow_SRC_ADDR6 sa6; + struct store_v2_flow_DST_ADDR4 da4; + struct store_v2_flow_DST_ADDR6 da6; + struct store_v2_flow_GATEWAY_ADDR4 gwa4; + struct store_v2_flow_GATEWAY_ADDR6 gwa6; + u_int32_t fields, crc; + int offset; + + fields = ntohl(f->hdr.fields); + + /* Convert addresses and set AF fields correctly */ + /* XXX this is too repetitive */ + switch(f->agent_addr.af) { + case AF_INET: + if ((fields & STORE_V2_FIELD_AGENT_ADDR4) == 0) + break; + memcpy(&aa4.flow_agent_addr, &f->agent_addr.v4, + sizeof(aa4.flow_agent_addr)); + fields |= STORE_V2_FIELD_AGENT_ADDR4; + fields &= ~STORE_V2_FIELD_AGENT_ADDR6; + break; + case AF_INET6: + if ((fields & STORE_V2_FIELD_AGENT_ADDR6) == 0) + break; + memcpy(&aa6.flow_agent_addr, &f->agent_addr.v6, + sizeof(aa6.flow_agent_addr)); + fields |= STORE_V2_FIELD_AGENT_ADDR6; + fields &= ~STORE_V2_FIELD_AGENT_ADDR4; + break; + default: + if ((fields & STORE_V2_FIELD_AGENT_ADDR) == 0) + break; + SFAILX(STORE_ERR_FLOW_INVALID, "silly agent addr af", 1); + } + + switch(f->src_addr.af) { + case AF_INET: + if ((fields & STORE_V2_FIELD_SRC_ADDR4) == 0) + break; + memcpy(&sa4.src_addr, &f->src_addr.v4, + sizeof(sa4.src_addr)); + fields |= STORE_V2_FIELD_SRC_ADDR4; + fields &= ~STORE_V2_FIELD_SRC_ADDR6; + break; + case AF_INET6: + if ((fields & STORE_V2_FIELD_SRC_ADDR6) == 0) + break; + memcpy(&sa6.src_addr, &f->src_addr.v6, + sizeof(sa6.src_addr)); + fields |= STORE_V2_FIELD_SRC_ADDR6; + fields &= ~STORE_V2_FIELD_SRC_ADDR4; + break; + default: + if ((fields & STORE_V2_FIELD_SRC_ADDR) == 0) + break; + SFAILX(STORE_ERR_FLOW_INVALID, "silly src addrs af", 1); + } + + switch(f->dst_addr.af) { + case AF_INET: + if ((fields & STORE_V2_FIELD_DST_ADDR4) == 0) + break; + memcpy(&da4.dst_addr, &f->dst_addr.v4, + sizeof(da4.dst_addr)); + fields |= STORE_V2_FIELD_DST_ADDR4; + fields &= ~STORE_V2_FIELD_DST_ADDR6; + break; + case AF_INET6: + if ((fields & STORE_V2_FIELD_DST_ADDR6) == 0) + break; + memcpy(&da6.dst_addr, &f->dst_addr.v6, + sizeof(da6.dst_addr)); + fields |= STORE_V2_FIELD_DST_ADDR6; + fields &= ~STORE_V2_FIELD_DST_ADDR4; + break; + default: + if ((fields & STORE_V2_FIELD_DST_ADDR) == 0) + break; + SFAILX(STORE_ERR_FLOW_INVALID, "silly dst addrs af", 1); + } + + switch(f->gateway_addr.af) { + case AF_INET: + if ((fields & STORE_V2_FIELD_GATEWAY_ADDR4) == 0) + break; + memcpy(&gwa4.gateway_addr, &f->gateway_addr.v4, + sizeof(gwa4.gateway_addr)); + fields |= STORE_V2_FIELD_GATEWAY_ADDR4; + fields &= ~STORE_V2_FIELD_GATEWAY_ADDR6; + break; + case AF_INET6: + if ((fields & STORE_V2_FIELD_GATEWAY_ADDR6) == 0) + break; + memcpy(&gwa6.gateway_addr, &f->gateway_addr.v6, + sizeof(gwa6.gateway_addr)); + fields |= STORE_V2_FIELD_GATEWAY_ADDR6; + fields &= ~STORE_V2_FIELD_GATEWAY_ADDR4; + break; + default: + if ((fields & STORE_V2_FIELD_GATEWAY_ADDR) == 0) + break; + SFAILX(STORE_ERR_FLOW_INVALID, "silly gateway addr af", 1); + } + + crc32_start(&crc); + offset = 0; + + /* Fields have probably changes as a result of address conversion */ + f->hdr.fields = htonl(fields); + if (store_v2_calc_flow_len(&f->hdr) > buflen) + SFAILX(STORE_ERR_BUFFER_SIZE, "flow buffer too small", 1); + + memcpy(buf + offset, &f->hdr, sizeof(f->hdr)); + offset += sizeof(f->hdr); + crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc); + +#define WFIELD(spec, what) do { \ + if (SHASFIELD(spec)) { \ + memcpy(buf + offset, &(what), sizeof(what)); \ + offset += sizeof(what); \ + if (SHASFIELD(spec) && \ + (STORE_V2_FIELD_##spec != STORE_V2_FIELD_CRC32)) { \ + crc32_update((u_char *)&(what), sizeof(what), \ + &crc); \ + } \ + } } while (0) + + WFIELD(TAG, f->tag); + WFIELD(RECV_TIME, f->recv_time); + WFIELD(PROTO_FLAGS_TOS, f->pft); + WFIELD(AGENT_ADDR4, aa4); + WFIELD(AGENT_ADDR6, aa6); + WFIELD(SRC_ADDR4, sa4); + WFIELD(SRC_ADDR6, sa6); + WFIELD(DST_ADDR4, da4); + WFIELD(DST_ADDR6, da6); + WFIELD(GATEWAY_ADDR4, gwa4); + WFIELD(GATEWAY_ADDR6, gwa6); + WFIELD(SRCDST_PORT, f->ports); + WFIELD(PACKETS, f->packets); + WFIELD(OCTETS, f->octets); + WFIELD(IF_INDICES, f->ifndx); + WFIELD(AGENT_INFO, f->ainfo); + WFIELD(FLOW_TIMES, f->ftimes); + WFIELD(AS_INFO, f->asinf); + WFIELD(FLOW_ENGINE_INFO, f->finf); + if (fields & (STORE_V2_FIELD_CRC32)) + f->crc32.crc32 = htonl(crc); + WFIELD(CRC32, f->crc32); +#undef WFIELD + + *flowlen = offset; + return (STORE_ERR_OK); +} + +int +store_v2_put_flow(int fd, struct store_v2_flow_complete *flow, u_int32_t fieldmask, + char *ebuf, int elen) +{ + u_int32_t fields, origfields; + off_t startpos; + u_int8_t buf[512]; + int len, r, saved_errno, ispipe = 0; + + /* Remember where we started, so we can back errors out */ + if ((startpos = lseek(fd, 0, SEEK_CUR)) == -1) { + if (errno == ESPIPE) + ispipe = 1; + else + SFAIL(STORE_ERR_IO_SEEK, "lseek", 1); + } + + origfields = ntohl(flow->hdr.fields); + fields = origfields & fieldmask; + flow->hdr.fields = htonl(fields); + + r = store_v2_flow_serialise(flow, buf, sizeof(buf), &len, ebuf, elen); + if (r != STORE_ERR_OK) { + flow->hdr.fields = htonl(origfields); + return (r); + } + + r = atomicio(vwrite, fd, buf, len); + saved_errno = errno; + flow->hdr.fields = htonl(origfields); + + if (r == len) + return (STORE_ERR_OK); + + if (ispipe) + SFAIL(STORE_ERR_CORRUPT, "corrupting failure on pipe", 1); + + /* Try to rewind to starting position, so we don't corrupt flow store */ + if (lseek(fd, startpos, SEEK_SET) == -1) + SFAIL(STORE_ERR_CORRUPT, "corrupting failure on lseek", 1); + if (ftruncate(fd, startpos) == -1) + SFAIL(STORE_ERR_CORRUPT, "corrupting failure on ftruncate", 1); + + /* Partial flow record has been removed, return with orig. error */ + errno = saved_errno; + if (r == -1) + SFAIL(STORE_ERR_IO, "write flow", 0); + else + SFAILX(STORE_ERR_EOF, "EOF on write flow", 0); +} + +int +store_v2_flow_convert(struct store_v2_flow_complete *fv2, + struct store_flow_complete *f) +{ + int len; + + bzero(f, sizeof(*f)); + f->hdr.version = STORE_VERSION; + f->hdr.fields = fv2->hdr.fields; + if ((len = store_calc_flow_len(&f->hdr)) == -1) + return (-1); + f->hdr.len_words = len / 4; + + f->tag.tag = fv2->tag.tag; + f->recv_time.recv_sec = fv2->recv_time.recv_sec; + f->pft.tcp_flags = fv2->pft.tcp_flags; + f->pft.protocol = fv2->pft.protocol; + f->pft.tos = fv2->pft.tos; + f->pft.pad = fv2->pft.pad; + f->agent_addr = fv2->agent_addr; + f->src_addr = fv2->src_addr; + f->dst_addr = fv2->dst_addr; + f->gateway_addr = fv2->gateway_addr; + f->ports.src_port = fv2->ports.src_port; + f->ports.dst_port = fv2->ports.dst_port; + f->packets.flow_packets = fv2->packets.flow_packets; + f->octets.flow_octets = fv2->octets.flow_octets; + f->ifndx.if_index_in = htonl(ntohs(fv2->ifndx.if_index_in)); + f->ifndx.if_index_out = htonl(ntohs(fv2->ifndx.if_index_out)); + f->ainfo.sys_uptime_ms = fv2->ainfo.sys_uptime_ms; + f->ainfo.time_sec = fv2->ainfo.time_sec; + f->ainfo.time_nanosec = fv2->ainfo.time_nanosec; + f->ainfo.netflow_version = fv2->ainfo.netflow_version; + f->ainfo.pad = fv2->ainfo.pad; + f->ftimes.flow_start = fv2->ftimes.flow_start; + f->ftimes.flow_finish = fv2->ftimes.flow_finish; + f->asinf.src_as = htonl(ntohs(fv2->asinf.src_as)); + f->asinf.dst_as = htonl(ntohs(fv2->asinf.dst_as)); + f->asinf.src_mask = fv2->asinf.src_mask; + f->asinf.dst_mask = fv2->asinf.dst_mask; + f->asinf.pad = fv2->asinf.pad; + f->finf.engine_type = htons(fv2->finf.engine_type); + f->finf.engine_id = htons(fv2->finf.engine_id); + f->finf.flow_sequence = fv2->finf.flow_sequence; + f->crc32.crc32 = fv2->crc32.crc32; + + return (0); +} Index: store-v2.h =================================================================== RCS file: store-v2.h diff -N store-v2.h --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ store-v2.h 21 Aug 2005 09:49:17 -0000 @@ -0,0 +1,255 @@ +/* $Id$ */ + +/* + * Copyright (c) 2004 Damien Miller + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +/* On-disk storage format */ + +#ifndef _STORE_V2_H +#define _STORE_v2_H + +#if defined(HAVE_SYS_CDEFS_H) +# include /* For __packed, etc on platforms that have it */ +#endif +#if defined(__GNUC__) && !defined(__packed) +# define __packed __attribute__((__packed__)) +#endif + +#include "addr.h" +#include "store.h" + +#define STORE_V2_MAGIC 0x012cf047 +#define STORE_V2_VERSION 0x00000002 +/* Start of flow log file */ +struct store_v2_header { + u_int32_t magic; + u_int32_t version; + u_int32_t start_time; + u_int32_t flags; /* Currently 0 */ +} __packed; + +/* + * Optional flow fields, specify what is stored for the flow + * NB - the flow records appear in this order on disk + */ +#define STORE_V2_FIELD_TAG (1U) +#define STORE_V2_FIELD_RECV_TIME (1U<<1) +#define STORE_V2_FIELD_PROTO_FLAGS_TOS (1U<<2) +#define STORE_V2_FIELD_AGENT_ADDR4 (1U<<3) +#define STORE_V2_FIELD_AGENT_ADDR6 (1U<<4) +#define STORE_V2_FIELD_SRC_ADDR4 (1U<<5) +#define STORE_V2_FIELD_SRC_ADDR6 (1U<<6) +#define STORE_V2_FIELD_DST_ADDR4 (1U<<7) +#define STORE_V2_FIELD_DST_ADDR6 (1U<<8) +#define STORE_V2_FIELD_GATEWAY_ADDR4 (1U<<9) +#define STORE_V2_FIELD_GATEWAY_ADDR6 (1U<<10) +#define STORE_V2_FIELD_SRCDST_PORT (1U<<11) +#define STORE_V2_FIELD_PACKETS (1U<<12) +#define STORE_V2_FIELD_OCTETS (1U<<13) +#define STORE_V2_FIELD_IF_INDICES (1U<<14) +#define STORE_V2_FIELD_AGENT_INFO (1U<<15) +#define STORE_V2_FIELD_FLOW_TIMES (1U<<16) +#define STORE_V2_FIELD_AS_INFO (1U<<17) +#define STORE_V2_FIELD_FLOW_ENGINE_INFO (1U<<18) +/* ... more one day */ + +#define STORE_V2_FIELD_CRC32 (1U<<30) +#define STORE_V2_FIELD_RESERVED (1U<<31) /* For extension header */ +#define STORE_V2_FIELD_ALL (((1U<<19)-1)|STORE_V2_FIELD_CRC32) + +/* Useful combinations */ +#define STORE_V2_FIELD_AGENT_ADDR (STORE_V2_FIELD_AGENT_ADDR4|\ + STORE_V2_FIELD_AGENT_ADDR6) +#define STORE_V2_FIELD_SRC_ADDR (STORE_V2_FIELD_SRC_ADDR4|\ + STORE_V2_FIELD_SRC_ADDR6) +#define STORE_V2_FIELD_DST_ADDR (STORE_V2_FIELD_DST_ADDR4|\ + STORE_V2_FIELD_DST_ADDR6) +#define STORE_V2_FIELD_SRCDST_ADDR (STORE_V2_FIELD_SRC_ADDR|\ + STORE_V2_FIELD_DST_ADDR) +#define STORE_V2_FIELD_GATEWAY_ADDR (STORE_V2_FIELD_GATEWAY_ADDR4|\ + STORE_V2_FIELD_GATEWAY_ADDR6) + +#define STORE_V2_DISPLAY_ALL STORE_V2_FIELD_ALL +#define STORE_V2_DISPLAY_BRIEF (STORE_V2_FIELD_TAG|\ + STORE_V2_FIELD_RECV_TIME|\ + STORE_V2_FIELD_PROTO_FLAGS_TOS|\ + STORE_V2_FIELD_SRCDST_PORT|\ + STORE_V2_FIELD_PACKETS|\ + STORE_V2_FIELD_OCTETS|\ + STORE_V2_FIELD_SRCDST_ADDR|\ + STORE_V2_FIELD_AGENT_ADDR4|\ + STORE_V2_FIELD_AGENT_ADDR6) + +/* Start of flow record - present for every flow */ +struct store_v2_flow { + u_int32_t fields; +} __packed; + +/* + * Optional flow records + * NB. suffixes must match the corresponding STORE_FIELD_ define (see store.c) + */ + +/* Optional flow field - present if STORE_FIELD_TAG */ +struct store_v2_flow_TAG { + u_int32_t tag; /* set by filter */ +} __packed; + +/* Optional flow field - present if STORE_FIELD_RECV_TIME */ +struct store_v2_flow_RECV_TIME { + u_int32_t recv_sec; +} __packed; + +/* Optional flow field - present if STORE_FIELD_PROTO_FLAGS_TOS */ +struct store_v2_flow_PROTO_FLAGS_TOS { + u_int8_t tcp_flags; + u_int8_t protocol; + u_int8_t tos; + u_int8_t pad; +} __packed; + +/* Optional flow field - present if STORE_FIELD_AGENT_ADDR */ +struct store_v2_flow_AGENT_ADDR4 { + struct store_addr4 flow_agent_addr; +} __packed; +struct store_v2_flow_AGENT_ADDR6 { + struct store_addr6 flow_agent_addr; +} __packed; + +/* Optional flow field - present if STORE_FIELD_SRC_ADDR4 */ +struct store_v2_flow_SRC_ADDR4 { + struct store_addr4 src_addr; +} __packed; + +/* Optional flow field - present if STORE_FIELD_DST_ADDR4 */ +struct store_v2_flow_DST_ADDR4 { + struct store_addr4 dst_addr; +} __packed; + +/* Optional flow field - present if STORE_FIELD_SRC_ADDR6 */ +struct store_v2_flow_SRC_ADDR6 { + struct store_addr6 src_addr; +} __packed; + +/* Optional flow field - present if STORE_FIELD_DST_ADDR6 */ +struct store_v2_flow_DST_ADDR6 { + struct store_addr6 dst_addr; +} __packed; + +/* Optional flow field - present if STORE_FIELD_GATEWAY_ADDR */ +struct store_v2_flow_GATEWAY_ADDR4 { + struct store_addr4 gateway_addr; +} __packed; +struct store_v2_flow_GATEWAY_ADDR6 { + struct store_addr6 gateway_addr; +} __packed; + +/* Optional flow field - present if STORE_FIELD_SRCDST_PORT */ +struct store_v2_flow_SRCDST_PORT { + u_int16_t src_port; + u_int16_t dst_port; +} __packed; + +/* Optional flow field - present if STORE_FIELD_PACKETS */ +struct store_v2_flow_PACKETS { + u_int64_t flow_packets; +} __packed; + +/* Optional flow field - present if STORE_FIELD_OCTETS */ +struct store_v2_flow_OCTETS { + u_int64_t flow_octets; +} __packed; + +/* Optional flow field - present if STORE_FIELD_IF_INDICES */ +struct store_v2_flow_IF_INDICES { + u_int16_t if_index_in; + u_int16_t if_index_out; +} __packed; + +/* Optional flow field - present if STORE_FIELD_AGENT_INFO */ +struct store_v2_flow_AGENT_INFO { + u_int32_t sys_uptime_ms; + u_int32_t time_sec; + u_int32_t time_nanosec; + u_int16_t netflow_version; + u_int16_t pad; +} __packed; + +/* Optional flow field - present if STORE_FIELD_FLOW_TIMES */ +struct store_v2_flow_FLOW_TIMES { + u_int32_t flow_start; + u_int32_t flow_finish; +} __packed; + +/* Optional flow field - present if STORE_FIELD_AS_INFO */ +struct store_v2_flow_AS_INFO { + u_int16_t src_as; + u_int16_t dst_as; + u_int8_t src_mask; + u_int8_t dst_mask; + u_int16_t pad; +} __packed; + +/* Optional flow field - present if STORE_FIELD_FLOW_ENGINE_INFO */ +struct store_v2_flow_FLOW_ENGINE_INFO { + u_int8_t engine_type; + u_int8_t engine_id; + u_int16_t pad; + u_int32_t flow_sequence; +} __packed; + +/* Optional flow field - present if STORE_FIELD_CRC32 */ +struct store_v2_flow_CRC32 { + u_int32_t crc32; +} __packed; + +/* A abstract flow record (all fields included) */ +struct store_v2_flow_complete { + struct store_v2_flow hdr; + struct store_v2_flow_TAG tag; + struct store_v2_flow_RECV_TIME recv_time; + struct store_v2_flow_PROTO_FLAGS_TOS pft; + struct xaddr agent_addr; + struct xaddr src_addr; + struct xaddr dst_addr; + struct xaddr gateway_addr; + struct store_v2_flow_SRCDST_PORT ports; + struct store_v2_flow_PACKETS packets; + struct store_v2_flow_OCTETS octets; + struct store_v2_flow_IF_INDICES ifndx; + struct store_v2_flow_AGENT_INFO ainfo; + struct store_v2_flow_FLOW_TIMES ftimes; + struct store_v2_flow_AS_INFO asinf; + struct store_v2_flow_FLOW_ENGINE_INFO finf; + struct store_v2_flow_CRC32 crc32; +} __packed; + +int store_v2_get_header(int fd, struct store_v2_header *hdr, char *ebuf, int elen); +int store_v2_get_flow(int fd, struct store_v2_flow_complete *f, char *ebuf, int elen); +int store_v2_check_header(int fd, char *ebuf, int elen); +int store_v2_put_header(int fd, char *ebuf, int elen); +int store_v2_put_flow(int fd, struct store_v2_flow_complete *flow, + u_int32_t fieldmask, char *ebuf, int elen); +int store_v2_validate_header(struct store_v2_header *hdr, char *ebuf, int elen); +int store_v2_calc_flow_len(struct store_v2_flow *hdr); +int store_v2_flow_deserialise(u_int8_t *buf, int len, + struct store_v2_flow_complete *f, char *ebuf, int elen); +int store_v2_flow_serialise(struct store_v2_flow_complete *f, u_int8_t *buf, int buflen, + int *flowlen, char *ebuf, int elen); +int store_v2_flow_convert(struct store_v2_flow_complete *fv2, + struct store_flow_complete *f); + +#endif /* _STORE_V2_H */ Index: store.c =================================================================== RCS file: /var/cvs/flowd/store.c,v retrieving revision 1.30 diff -u -p -r1.30 store.c --- store.c 4 Feb 2005 06:26:10 -0000 1.30 +++ store.c 21 Aug 2005 09:49:17 -0000 @@ -55,30 +55,6 @@ RCSID("$Id: store.c,v 1.30 2005/02/04 06 } while (0) int -store_validate_header(struct store_header *hdr, char *ebuf, int elen) -{ - if (ntohl(hdr->magic) != STORE_MAGIC) - SFAILX(STORE_ERR_BAD_MAGIC, "Bad magic", 0); - if (ntohl(hdr->version) != STORE_VERSION) - SFAILX(STORE_ERR_UNSUP_VERSION, "Unsupported version", 0); - - return (STORE_ERR_OK); -} - -int -store_get_header(int fd, struct store_header *hdr, char *ebuf, int elen) -{ - ssize_t r; - - if ((r = atomicio(read, fd, hdr, sizeof(*hdr))) == -1) - SFAIL(STORE_ERR_IO, "read error", 0); - if (r < (ssize_t)sizeof(*hdr)) - SFAILX(STORE_ERR_EOF, "premature EOF", 0); - - return (store_validate_header(hdr, ebuf, elen)); -} - -int store_calc_flow_len(struct store_flow *hdr) { int ret = 0; @@ -123,7 +99,7 @@ int store_flow_deserialise(u_int8_t *buf, int len, struct store_flow_complete *f, char *ebuf, int elen) { - int offset, r; + int offset, allow_extra; struct store_flow_AGENT_ADDR4 aa4; struct store_flow_AGENT_ADDR6 aa6; struct store_flow_SRC_ADDR4 sa4; @@ -132,29 +108,28 @@ store_flow_deserialise(u_int8_t *buf, in struct store_flow_DST_ADDR6 da6; struct store_flow_GATEWAY_ADDR4 ga4; struct store_flow_GATEWAY_ADDR6 ga6; - u_int32_t fields, crc; + u_int32_t donefields, fields, crc; bzero(f, sizeof(*f)); crc32_start(&crc); - memcpy(&f->hdr.fields, buf, sizeof(f->hdr.fields)); - if (len < sizeof(f->hdr)) SFAILX(STORE_ERR_BUFFER_SIZE, "supplied length is too small", 1); - if ((r = store_calc_flow_len((struct store_flow *)buf)) == -1) - SFAILX(STORE_ERR_FLOW_INVALID, - "unsupported flow fields specified", 0); + memcpy(&f->hdr, buf, sizeof(f->hdr)); - if (len - sizeof(f->hdr) < r) + if (STORE_VER_GET_MAJ(f->hdr.version) != STORE_VER_MAJOR) + SFAILX(STORE_ERR_UNSUP_VERSION, "Unsupported version", 0); + allow_extra = (STORE_VER_GET_MIN(f->hdr.version) > STORE_VER_MINOR); + + if (len - sizeof(f->hdr) < (f->hdr.len_words * 4)) SFAILX(STORE_ERR_BUFFER_SIZE, - "calulated flow length is less than supplied len", 1); + "incomplete flow record supplied", 1); crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc); - fields = ntohl(f->hdr.fields); - + donefields = fields = ntohl(f->hdr.fields); offset = sizeof(f->hdr); #define RFIELD(flag, dest) do { \ @@ -166,6 +141,7 @@ store_flow_deserialise(u_int8_t *buf, in crc32_update((u_char *)&dest, sizeof(dest), \ &crc); \ } \ + donefields &= ~STORE_FIELD_##flag; \ } } while (0) RFIELD(TAG, f->tag); @@ -187,6 +163,19 @@ store_flow_deserialise(u_int8_t *buf, in RFIELD(FLOW_TIMES, f->ftimes); RFIELD(AS_INFO, f->asinf); RFIELD(FLOW_ENGINE_INFO, f->finf); + + /* Other fields might live here if minor version > ours */ + if ((donefields & ~STORE_FIELD_CRC32) != 0) { + if (allow_extra) { + /* Skip fields we don't understand */ + offset = (f->hdr.len_words * 4) + sizeof(f->hdr) - + sizeof(f->crc32); + fields = ntohl(f->hdr.fields) & STORE_FIELD_ALL; + } else { + /* There shouldn't be any extra if minor_ver <= ours */ + SFAILX(-1, "Flow has unknown fields", 0); + } + } RFIELD(CRC32, f->crc32); /* Sanity check and convert addresses */ @@ -242,9 +231,7 @@ store_get_flow(int fd, struct store_flow if (r < sizeof(struct store_flow)) SFAILX(STORE_ERR_EOF, "EOF reading flow header", 0); - if ((len = store_calc_flow_len((struct store_flow *)buf)) == -1) - SFAILX(STORE_ERR_FLOW_INVALID, - "unsupported flow fields specified", 0); + len = ((struct store_flow *)buf)->len_words * 4; if (len > sizeof(buf) - sizeof(struct store_flow)) SFAILX(STORE_ERR_INTERNAL, "Internal error: flow buffer too small", 1); @@ -259,38 +246,31 @@ store_get_flow(int fd, struct store_flow } int -store_check_header(int fd, char *ebuf, int elen) +store_read_flow(FILE *f, struct store_flow_complete *flow, char *ebuf, int elen) { - struct store_header hdr; - int r; - - if ((r = store_get_header(fd, &hdr, ebuf, elen)) != STORE_ERR_OK) - return (r); - - /* store_get_header does all the magic & version checks for us */ - - return (STORE_ERR_OK); -} + int r, len; + u_int8_t buf[512]; -int -store_put_header(int fd, char *ebuf, int elen) -{ - struct store_header hdr; - int r; + /* Read header */ + r = fread(buf, sizeof(struct store_flow), 1, f); + if (r == 0) + SFAILX(STORE_ERR_EOF, "EOF reading flow header", 0); + if (r != 1) + SFAIL(STORE_ERR_IO, "read flow header", 0); - bzero(&hdr, sizeof(hdr)); - hdr.magic = htonl(STORE_MAGIC); - hdr.version = htonl(STORE_VERSION); - hdr.start_time = htonl(time(NULL)); - hdr.flags = htonl(0); + len = ((struct store_flow *)buf)->len_words * 4; + if (len > sizeof(buf) - sizeof(struct store_flow)) + SFAILX(STORE_ERR_INTERNAL, + "Internal error: flow buffer too small", 1); - r = atomicio(vwrite, fd, &hdr, sizeof(hdr)); - if (r == -1) - SFAIL(STORE_ERR_IO, "write error on header", 0); - if (r < (ssize_t)sizeof(hdr)) - SFAILX(STORE_ERR_EOF, "EOF while writing header", 0); + r = fread(buf + sizeof(struct store_flow), len, 1, f); + if (r == 0) + SFAILX(STORE_ERR_EOF, "EOF reading flow data", 0); + if (r != 1) + SFAIL(STORE_ERR_IO, "read flow data", 0); - return (STORE_ERR_OK); + return (store_flow_deserialise(buf, len + sizeof(struct store_flow), + flow, ebuf, elen)); } int @@ -306,8 +286,9 @@ store_flow_serialise(struct store_flow_c struct store_flow_GATEWAY_ADDR4 gwa4; struct store_flow_GATEWAY_ADDR6 gwa6; u_int32_t fields, crc; - int offset; + int len, offset; + f->hdr.version = STORE_VERSION; fields = ntohl(f->hdr.fields); /* Convert addresses and set AF fields correctly */ @@ -404,16 +385,24 @@ store_flow_serialise(struct store_flow_c SFAILX(STORE_ERR_FLOW_INVALID, "silly gateway addr af", 1); } - crc32_start(&crc); - offset = 0; - /* Fields have probably changes as a result of address conversion */ f->hdr.fields = htonl(fields); - if (store_calc_flow_len(&f->hdr) > buflen) + + len = store_calc_flow_len(&f->hdr); + if ((len & 3) != 0) + SFAILX(STORE_ERR_INTERNAL, "len & 3 != 0", 1); + if (len > buflen) SFAILX(STORE_ERR_BUFFER_SIZE, "flow buffer too small", 1); + if (len == -1) + SFAILX(STORE_ERR_FLOW_INVALID, + "unsupported flow fields specified", 0); + f->hdr.len_words = len / 4; + f->hdr.reserved = 0; + + memcpy(buf, &f->hdr, sizeof(f->hdr)); + offset = sizeof(f->hdr); - memcpy(buf + offset, &f->hdr, sizeof(f->hdr)); - offset += sizeof(f->hdr); + crc32_start(&crc); crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc); #define WFIELD(spec, what) do { \ @@ -451,6 +440,9 @@ store_flow_serialise(struct store_flow_c WFIELD(CRC32, f->crc32); #undef WFIELD + if (len + sizeof(f->hdr) != offset) + SFAILX(STORE_ERR_INTERNAL, "len != offset", 1); + *flowlen = offset; return (STORE_ERR_OK); } @@ -461,7 +453,7 @@ store_put_flow(int fd, struct store_flow { u_int32_t fields, origfields; off_t startpos; - u_int8_t buf[512]; + u_int8_t buf[1024]; int len, r, saved_errno, ispipe = 0; /* Remember where we started, so we can back errors out */ @@ -506,6 +498,32 @@ store_put_flow(int fd, struct store_flow SFAILX(STORE_ERR_EOF, "EOF on write flow", 0); } +int +store_write_flow(FILE *f, struct store_flow_complete *flow, u_int32_t fieldmask, + char *ebuf, int elen) +{ + u_int32_t fields, origfields; + u_int8_t buf[1024]; + int len, r; + + origfields = ntohl(flow->hdr.fields); + fields = origfields & fieldmask; + flow->hdr.fields = htonl(fields); + + r = store_flow_serialise(flow, buf, sizeof(buf), &len, ebuf, elen); + flow->hdr.fields = htonl(origfields); + + if (r != STORE_ERR_OK) + return (r); + r = fwrite(buf, len, 1, f); + if (r == 0) + SFAILX(STORE_ERR_EOF, "EOF on write flow", 0); + if (r != 1) + SFAIL(STORE_ERR_IO, "fwrite flow", 0); + + return (STORE_ERR_OK); +} + const char * iso_time(time_t t, int utc_flag) { @@ -549,26 +567,94 @@ interval_time(time_t t) return (buf); } +/* + * Some helper functions for store_format_flow() and store_swab_flow(), + * so we can switch between host and network byte order easily. + */ +static u_int64_t +store_swp_ntoh64(u_int64_t v) +{ + return store_ntohll(v); +} + +static u_int32_t +store_swp_ntoh32(u_int32_t v) +{ + return ntohl(v); +} + +static u_int16_t +store_swp_ntoh16(u_int16_t v) +{ + return ntohs(v); +} + +static u_int64_t +store_swp_hton64(u_int64_t v) +{ + return store_htonll(v); +} + +static u_int32_t +store_swp_hton32(u_int32_t v) +{ + return htonl(v); +} + +static u_int16_t +store_swp_hton16(u_int16_t v) +{ + return htons(v); +} + +static u_int64_t +store_swp_fake64(u_int64_t v) +{ + return v; +} + +static u_int32_t +store_swp_fake32(u_int32_t v) +{ + return v; +} + +static u_int16_t +store_swp_fake16(u_int16_t v) +{ + return v; +} + void store_format_flow(struct store_flow_complete *flow, char *buf, size_t len, - int utc_flag, u_int32_t display_mask) + int utc_flag, u_int32_t display_mask, int hostorder) { char tmp[256]; u_int32_t fields; + u_int64_t (*fmt_ntoh64)(u_int64_t) = store_swp_ntoh64; + u_int32_t (*fmt_ntoh32)(u_int32_t) = store_swp_ntoh32; + u_int16_t (*fmt_ntoh16)(u_int16_t) = store_swp_ntoh16; + + if (hostorder) { + fmt_ntoh64 = store_swp_fake64; + fmt_ntoh32 = store_swp_fake32; + fmt_ntoh16 = store_swp_fake16; + } *buf = '\0'; - fields = ntohl(flow->hdr.fields) & display_mask; + fields = fmt_ntoh32(flow->hdr.fields) & display_mask; strlcat(buf, "FLOW ", len); if (SHASFIELD(TAG)) { - snprintf(tmp, sizeof(tmp), "tag %u ", ntohl(flow->tag.tag)); + snprintf(tmp, sizeof(tmp), "tag %u ", fmt_ntoh32(flow->tag.tag)); strlcat(buf, tmp, len); } if (SHASFIELD(RECV_TIME)) { - snprintf(tmp, sizeof(tmp), "recv_time %s ", - iso_time(ntohl(flow->recv_time.recv_secs), utc_flag)); + snprintf(tmp, sizeof(tmp), "recv_time %s.%05d ", + iso_time(fmt_ntoh32(flow->recv_time.recv_sec), utc_flag), + fmt_ntoh32(flow->recv_time.recv_usec)); strlcat(buf, tmp, len); } if (SHASFIELD(PROTO_FLAGS_TOS)) { @@ -591,7 +677,7 @@ store_format_flow(struct store_flow_comp strlcat(buf, tmp, len); if (SHASFIELD(SRCDST_PORT)) { snprintf(tmp, sizeof(tmp), ":%d", - ntohs(flow->ports.src_port)); + fmt_ntoh16(flow->ports.src_port)); strlcat(buf, tmp, len); } strlcat(buf, " ", len); @@ -602,7 +688,7 @@ store_format_flow(struct store_flow_comp strlcat(buf, tmp, len); if (SHASFIELD(SRCDST_PORT)) { snprintf(tmp, sizeof(tmp), ":%d", - ntohs(flow->ports.dst_port)); + fmt_ntoh16(flow->ports.dst_port)); strlcat(buf, tmp, len); } strlcat(buf, " ", len); @@ -614,63 +700,106 @@ store_format_flow(struct store_flow_comp } if (SHASFIELD(PACKETS)) { snprintf(tmp, sizeof(tmp), "packets %llu ", - store_ntohll(flow->packets.flow_packets)); + fmt_ntoh64(flow->packets.flow_packets)); strlcat(buf, tmp, len); } if (SHASFIELD(OCTETS)) { snprintf(tmp, sizeof(tmp), "octets %llu ", - store_ntohll(flow->octets.flow_octets)); + fmt_ntoh64(flow->octets.flow_octets)); strlcat(buf, tmp, len); } if (SHASFIELD(IF_INDICES)) { snprintf(tmp, sizeof(tmp), "in_if %d out_if %d ", - ntohs(flow->ifndx.if_index_in), - ntohs(flow->ifndx.if_index_out)); + fmt_ntoh16(flow->ifndx.if_index_in), + fmt_ntoh16(flow->ifndx.if_index_out)); strlcat(buf, tmp, len); } if (SHASFIELD(AGENT_INFO)) { snprintf(tmp, sizeof(tmp), "sys_uptime_ms %s.%03u ", - interval_time(ntohl(flow->ainfo.sys_uptime_ms) / 1000), - ntohl(flow->ainfo.sys_uptime_ms) % 1000); + interval_time(fmt_ntoh32(flow->ainfo.sys_uptime_ms) / 1000), + fmt_ntoh32(flow->ainfo.sys_uptime_ms) % 1000); strlcat(buf, tmp, len); snprintf(tmp, sizeof(tmp), "time_sec %s ", - iso_time(ntohl(flow->ainfo.time_sec), utc_flag)); + iso_time(fmt_ntoh32(flow->ainfo.time_sec), utc_flag)); strlcat(buf, tmp, len); snprintf(tmp, sizeof(tmp), "time_nanosec %lu netflow ver %u ", - (u_long)ntohl(flow->ainfo.time_nanosec), - ntohs(flow->ainfo.netflow_version)); + (u_long)fmt_ntoh32(flow->ainfo.time_nanosec), + fmt_ntoh16(flow->ainfo.netflow_version)); strlcat(buf, tmp, len); } if (SHASFIELD(FLOW_TIMES)) { snprintf(tmp, sizeof(tmp), "flow_start %s.%03u ", - interval_time(ntohl(flow->ftimes.flow_start) / 1000), - ntohl(flow->ftimes.flow_start) % 1000); + interval_time(fmt_ntoh32(flow->ftimes.flow_start) / 1000), + fmt_ntoh32(flow->ftimes.flow_start) % 1000); strlcat(buf, tmp, len); snprintf(tmp, sizeof(tmp), "flow_finish %s.%03u ", - interval_time(ntohl(flow->ftimes.flow_finish) / 1000), - ntohl(flow->ftimes.flow_finish) % 1000); + interval_time(fmt_ntoh32(flow->ftimes.flow_finish) / 1000), + fmt_ntoh32(flow->ftimes.flow_finish) % 1000); strlcat(buf, tmp, len); } if (SHASFIELD(AS_INFO)) { snprintf(tmp, sizeof(tmp), "src_AS %u src_masklen %u ", - ntohs(flow->asinf.src_as), flow->asinf.src_mask); + fmt_ntoh16(flow->asinf.src_as), flow->asinf.src_mask); strlcat(buf, tmp, len); snprintf(tmp, sizeof(tmp), "dst_AS %u dst_masklen %u ", - ntohs(flow->asinf.dst_as), flow->asinf.dst_mask); + fmt_ntoh16(flow->asinf.dst_as), flow->asinf.dst_mask); strlcat(buf, tmp, len); } if (SHASFIELD(FLOW_ENGINE_INFO)) { snprintf(tmp, sizeof(tmp), - "engine_type %u engine_id %u seq %lu ", - flow->finf.engine_type, flow->finf.engine_id, - (u_long)ntohl(flow->finf.flow_sequence)); + "engine_type %u engine_id %u seq %lu source %lu ", + fmt_ntoh16(flow->finf.engine_type), + fmt_ntoh16(flow->finf.engine_id), + (u_long)fmt_ntoh32(flow->finf.flow_sequence), + (u_long)fmt_ntoh32(flow->finf.source_id)); strlcat(buf, tmp, len); } if (SHASFIELD(CRC32)) { snprintf(tmp, sizeof(tmp), "crc32 %08x ", - ntohl(flow->crc32.crc32)); + fmt_ntoh32(flow->crc32.crc32)); strlcat(buf, tmp, len); } +} + + +void +store_swab_flow(struct store_flow_complete *flow, int to_net) +{ + u_int64_t (*sw64)(u_int64_t) = store_swp_ntoh64; + u_int32_t (*sw32)(u_int32_t) = store_swp_ntoh32; + u_int16_t (*sw16)(u_int16_t) = store_swp_ntoh16; + + if (to_net) { + sw64 = store_swp_hton64; + sw32 = store_swp_hton32; + sw16 = store_swp_hton16; + } + +#define FLSWAB(n,w) flow->w = sw##n(flow->w) + FLSWAB(32, hdr.fields); + FLSWAB(32, tag.tag); + FLSWAB(32, recv_time.recv_sec); + FLSWAB(32, recv_time.recv_usec); + FLSWAB(16, ports.src_port); + FLSWAB(16, ports.dst_port); + FLSWAB(64, packets.flow_packets); + FLSWAB(64, octets.flow_octets); + FLSWAB(32, ifndx.if_index_in); + FLSWAB(32, ifndx.if_index_out); + FLSWAB(32, ainfo.sys_uptime_ms); + FLSWAB(32, ainfo.time_sec); + FLSWAB(32, ainfo.time_nanosec); + FLSWAB(16, ainfo.netflow_version); + FLSWAB(32, ftimes.flow_start); + FLSWAB(32, ftimes.flow_finish); + FLSWAB(32, asinf.src_as); + FLSWAB(32, asinf.dst_as); + FLSWAB(16, finf.engine_type); + FLSWAB(16, finf.engine_id); + FLSWAB(32, finf.flow_sequence); + FLSWAB(16, finf.source_id); + FLSWAB(32, crc32.crc32); +#undef FLSWAB } u_int64_t Index: store.h =================================================================== RCS file: /var/cvs/flowd/store.h,v retrieving revision 1.26 diff -u -p -r1.26 store.h --- store.h 3 Nov 2004 06:34:02 -0000 1.26 +++ store.h 21 Aug 2005 09:49:17 -0000 @@ -38,14 +38,23 @@ struct store_addr4 { u_int8_t d[4]; } __packed; -#define STORE_MAGIC 0x012cf047 -#define STORE_VERSION 0x00000002 -/* Start of flow log file */ -struct store_header { - u_int32_t magic; - u_int32_t version; - u_int32_t start_time; - u_int32_t flags; /* Currently 0 */ +#define STORE_VER_MIN_MASK ((1 << 5) - 1) +#define STORE_VER_MAJ_MASK ((1 << 3) - 1) +#define STORE_MKVER(maj,min) (((maj & STORE_VER_MAJ_MASK) << 5) | \ + (min & STORE_VER_MIN_MASK)) +#define STORE_VER_GET_MAJ(ver) ((ver >> 5) & STORE_VER_MAJ_MASK) +#define STORE_VER_GET_MIN(ver) (ver & STORE_VER_MIN_MASK) + +#define STORE_VER_MAJOR 3 +#define STORE_VER_MINOR 0 +#define STORE_VERSION STORE_MKVER(STORE_VER_MAJOR, STORE_VER_MINOR) + +/* Start of flow record - present for every flow */ +struct store_flow { + u_int8_t version; + u_int8_t len_words; /* len in 4 byte words not inc hdr */ + u_int16_t reserved; + u_int32_t fields; } __packed; /* @@ -100,11 +109,6 @@ struct store_header { STORE_FIELD_AGENT_ADDR4|\ STORE_FIELD_AGENT_ADDR6) -/* Start of flow record - present for every flow */ -struct store_flow { - u_int32_t fields; -} __packed; - /* * Optional flow records * NB. suffixes must match the corresponding STORE_FIELD_ define (see store.c) @@ -117,7 +121,8 @@ struct store_flow_TAG { /* Optional flow field - present if STORE_FIELD_RECV_TIME */ struct store_flow_RECV_TIME { - u_int32_t recv_secs; + u_int32_t recv_sec; + u_int32_t recv_usec; } __packed; /* Optional flow field - present if STORE_FIELD_PROTO_FLAGS_TOS */ @@ -182,8 +187,8 @@ struct store_flow_OCTETS { /* Optional flow field - present if STORE_FIELD_IF_INDICES */ struct store_flow_IF_INDICES { - u_int16_t if_index_in; - u_int16_t if_index_out; + u_int32_t if_index_in; + u_int32_t if_index_out; } __packed; /* Optional flow field - present if STORE_FIELD_AGENT_INFO */ @@ -203,8 +208,8 @@ struct store_flow_FLOW_TIMES { /* Optional flow field - present if STORE_FIELD_AS_INFO */ struct store_flow_AS_INFO { - u_int16_t src_as; - u_int16_t dst_as; + u_int32_t src_as; + u_int32_t dst_as; u_int8_t src_mask; u_int8_t dst_mask; u_int16_t pad; @@ -212,10 +217,10 @@ struct store_flow_AS_INFO { /* Optional flow field - present if STORE_FIELD_FLOW_ENGINE_INFO */ struct store_flow_FLOW_ENGINE_INFO { - u_int8_t engine_type; - u_int8_t engine_id; - u_int16_t pad; + u_int16_t engine_type; + u_int16_t engine_id; u_int32_t flow_sequence; + u_int32_t source_id; } __packed; /* Optional flow field - present if STORE_FIELD_CRC32 */ @@ -257,23 +262,32 @@ struct store_flow_complete { #define STORE_ERR_IO_SEEK 0x09 #define STORE_ERR_CORRUPT 0x10 -int store_get_header(int fd, struct store_header *hdr, char *ebuf, int elen); +/* file descriptor oriented interface (tries to back out on failure */ int store_get_flow(int fd, struct store_flow_complete *f, char *ebuf, int elen); -int store_check_header(int fd, char *ebuf, int elen); -int store_put_header(int fd, char *ebuf, int elen); int store_put_flow(int fd, struct store_flow_complete *flow, u_int32_t fieldmask, char *ebuf, int elen); -int store_validate_header(struct store_header *hdr, char *ebuf, int elen); -int store_calc_flow_len(struct store_flow *hdr); + +/* Simple FILE* oriented interface, doesn't backout on failure */ +int store_read_flow(FILE *f, struct store_flow_complete *flow, char *ebuf, + int elen); +int store_write_flow(FILE *f, struct store_flow_complete *flow, + u_int32_t fieldmask, char *ebuf, int elen); + +/* Serialisation and deserialisation */ int store_flow_deserialise(u_int8_t *buf, int len, struct store_flow_complete *f, char *ebuf, int elen); int store_flow_serialise(struct store_flow_complete *f, u_int8_t *buf, int buflen, int *flowlen, char *ebuf, int elen); +int store_calc_flow_len(struct store_flow *hdr); + +/* Formatting and conversion */ +void store_format_flow(struct store_flow_complete *flow, char *buf, + size_t len, int utc_flag, u_int32_t display_mask, int hostorder); +void store_swab_flow(struct store_flow_complete *flow, int to_net); +/* Utility functions */ const char *iso_time(time_t t, int utc_flag); const char *interval_time(time_t t); -void store_format_flow(struct store_flow_complete *flow, char *buf, - size_t len, int utc_flag, u_int32_t display_mask); u_int64_t store_ntohll(u_int64_t v); u_int64_t store_htonll(u_int64_t v); Index: Flowd-perl/Flowd.xs =================================================================== RCS file: /var/cvs/flowd/Flowd-perl/Flowd.xs,v retrieving revision 1.2 diff -u -p -r1.2 Flowd.xs --- Flowd-perl/Flowd.xs 11 Mar 2005 19:07:48 -0000 1.2 +++ Flowd-perl/Flowd.xs 21 Aug 2005 09:49:17 -0000 @@ -46,8 +46,7 @@ int flow_length(...) buf = (char *)SvPV(ST(0), len); if (len < sizeof(struct store_flow)) croak("Supplied header is too short"); - if ((r = store_calc_flow_len((struct store_flow *)buf)) == -1) - croak("Unsupported fields in flow header"); + r = ((struct store_flow *)buf)->len_words * 4; RETVAL = r; OUTPUT: RETVAL @@ -80,14 +79,18 @@ void deserialise(...) field = newSVuv(fields); F_STORE("fields"); + field = newSVuv(flow.hdr.version); + F_STORE("flow_ver"); if (fields & STORE_FIELD_TAG) { field = newSVuv(ntohl(flow.tag.tag)); F_STORE("tag"); } if (fields & STORE_FIELD_RECV_TIME) { - field = newSVuv(ntohl(flow.recv_time.recv_secs)); - F_STORE("recv_secs"); + field = newSVuv(ntohl(flow.recv_time.recv_sec)); + F_STORE("recv_sec"); + field = newSVuv(ntohl(flow.recv_time.recv_usec)); + F_STORE("recv_usec"); } if (fields & STORE_FIELD_PROTO_FLAGS_TOS) { field = newSViv(flow.pft.tcp_flags); @@ -149,9 +152,9 @@ void deserialise(...) F_STORE("flow_octets"); } if (fields & STORE_FIELD_IF_INDICES) { - field = newSViv(ntohs(flow.ifndx.if_index_in)); + field = newSVuv(ntohl(flow.ifndx.if_index_in)); F_STORE("if_index_in"); - field = newSViv(ntohs(flow.ifndx.if_index_out)); + field = newSVuv(ntohl(flow.ifndx.if_index_out)); F_STORE("if_index_out"); } if (fields & STORE_FIELD_AGENT_INFO) { @@ -172,9 +175,9 @@ void deserialise(...) F_STORE("flow_finish"); } if (fields & STORE_FIELD_AS_INFO) { - field = newSViv(ntohs(flow.asinf.src_as)); + field = newSVuv(ntohl(flow.asinf.src_as)); F_STORE("src_as"); - field = newSViv(ntohs(flow.asinf.dst_as)); + field = newSVuv(ntohl(flow.asinf.dst_as)); F_STORE("dst_as"); field = newSViv(flow.asinf.src_mask); F_STORE("src_mask"); @@ -182,12 +185,14 @@ void deserialise(...) F_STORE("dst_mask"); } if (fields & STORE_FIELD_FLOW_ENGINE_INFO) { - field = newSViv(flow.finf.engine_type); + field = newSViv(ntohs(flow.finf.engine_type)); F_STORE("engine_type"); - field = newSViv(flow.finf.engine_id); + field = newSViv(ntohs(flow.finf.engine_id)); F_STORE("engine_id"); field = newSVuv(htonl(flow.finf.flow_sequence)); - F_STORE("src_mask"); + F_STORE("flow_sequence"); + field = newSVuv(htonl(flow.finf.source_id)); + F_STORE("source_id"); } if (fields & STORE_FIELD_CRC32) { field = newSVuv(ntohl(flow.crc32.crc32)); Index: Flowd-perl/lib/Flowd.pm =================================================================== RCS file: /var/cvs/flowd/Flowd-perl/lib/Flowd.pm,v retrieving revision 1.2 diff -u -p -r1.2 Flowd.pm --- Flowd-perl/lib/Flowd.pm 14 May 2005 07:22:08 -0000 1.2 +++ Flowd-perl/lib/Flowd.pm 21 Aug 2005 09:49:17 -0000 @@ -142,18 +142,6 @@ sub init { $self->{filename} = $filename; open($fhandle, "<$filename") or die "open($filename): $!"; $self->{handle} = $fhandle; - - # Read initial header - $r = read($self->{handle}, $hdr, 16); - - die "read($filename): $!" if not defined $r; - die "early EOF on $filename" if $r < 16; - - ($self->{magic}, $self->{version}, - $self->{start_time}, $self->{flags}) = unpack("NNNN", $hdr); - - die "bad magic" unless $self->{magic} == 0x012cf047; - die "unsupported version" unless $self->{version} == 0x00000002; } sub finish { @@ -202,8 +190,9 @@ sub format $ret .= sprintf "tag %u ", $flowfields->{tag}; } if ($fields & RECV_TIME) { - $ret .= sprintf "recv_time %s ", - Flowd::iso_time($flowfields->{recv_secs}, $utc_flag); + $ret .= sprintf "recv_time %s.%05d ", + Flowd::iso_time($flowfields->{recv_sec}, $utc_flag), + $flowfields->{recv_usec}; } if ($fields & PROTO_FLAGS_TOS) { $ret .= sprintf "proto %u ", $flowfields->{protocol};