[netflow-tools] flowd + avici v9 flows problem

Damien Miller djm at mindrot.org
Sun Aug 14 11:18:47 EST 2005


On Sun, 14 Aug 2005, Damien Miller wrote:

> Here is a better diff. It also includes a "flowd-reader -L" mode that can
> be used to convert old flow logs to the new format. E.g.

This diff is really attached this time :)

-d
-------------- next part --------------
Index: Makefile.in
===================================================================
RCS file: /var/cvs/flowd/Makefile.in,v
retrieving revision 1.25
diff -u -p -u -r1.25 Makefile.in
--- Makefile.in	3 Mar 2005 22:43:09 -0000	1.25
+++ Makefile.in	14 Aug 2005 01:09:04 -0000
@@ -40,7 +40,8 @@ TARGETS=flowd flowd-reader
 
 all: $(TARGETS)
 
-LIBFLOWD_OBJS=		atomicio.o addr.o store.o crc32.o strlcpy.o strlcat.o
+LIBFLOWD_OBJS=		atomicio.o addr.o store.o store-v2.o crc32.o \
+			strlcpy.o strlcat.o
 LIBFLOWD_HEADERS=	addr.h crc32.h store.h
 FLOWD_OBJS=		flowd.o privsep_fdpass.o privsep.o filter.o \
 			parse.o log.o daemon.o peer.o \
Index: TODO
===================================================================
RCS file: /var/cvs/flowd/TODO,v
retrieving revision 1.51
diff -u -p -u -r1.51 TODO
--- TODO	14 May 2005 06:08:30 -0000	1.51
+++ TODO	14 Aug 2005 01:09:04 -0000
@@ -53,9 +53,6 @@
 
 - Define net store.h types for:
   - min/max_pkt_lngth
-  - source_id (important!)
-
-- src/dst AS should be increased in size to 4 bytes
 
 - Discard protocol-specific state when we receive a packet in a different
   protocol
Index: configure.ac
===================================================================
RCS file: /var/cvs/flowd/configure.ac,v
retrieving revision 1.21
diff -u -p -u -r1.21 configure.ac
--- configure.ac	14 May 2005 07:22:08 -0000	1.21
+++ configure.ac	14 Aug 2005 01:09:04 -0000
@@ -17,7 +17,7 @@
 AC_INIT
 AC_CONFIG_SRCDIR([flowd.c])
 
-PROGVER=0.8.5
+PROGVER=0.9
 AC_SUBST(PROGVER)
 
 AC_CONFIG_HEADER(config.h)
Index: flowd-reader.8.in
===================================================================
RCS file: /var/cvs/flowd/flowd-reader.8.in,v
retrieving revision 1.4
diff -u -p -u -r1.4 flowd-reader.8.in
--- flowd-reader.8.in	19 Apr 2005 11:57:41 -0000	1.4
+++ flowd-reader.8.in	14 Aug 2005 01:09:04 -0000
@@ -22,7 +22,7 @@
 .Nd Read, filter and concatenate binary flowd logfiles
 .Sh SYNOPSIS
 .Nm flowd-reader
-.Op Fl Uvqd
+.Op Fl LUvqd
 .Op Fl f Ar filter_file
 .Op Fl o Ar output_file
 .Ar flow_log
@@ -59,6 +59,13 @@ to which all the flows that have been re
 .Pp
 The command-line options are as follows:
 .Bl -tag -width Ds
+.It Fl L
+Allows
+.Nm
+to read legacy version 2 flow logs (generated by
+.Xr flowd 8
+versions prior to v9.0).
+This may be used to convert old flow logs to the newer form.
 .It Fl U
 Causes
 .Nm
Index: flowd-reader.c
===================================================================
RCS file: /var/cvs/flowd/flowd-reader.c,v
retrieving revision 1.15
diff -u -p -u -r1.15 flowd-reader.c
--- flowd-reader.c	4 Feb 2005 06:26:10 -0000	1.15
+++ flowd-reader.c	14 Aug 2005 01:09:04 -0000
@@ -29,6 +29,7 @@
 #include "common.h"
 #include "flowd.h"
 #include "store.h"
+#include "store-v2.h"
 #include "atomicio.h"
 
 RCSID("$Id: flowd-reader.c,v 1.15 2005/02/04 06:26:10 djm Exp $");
@@ -40,6 +41,7 @@ usage(void)
 	    PROGNAME);
 	fprintf(stderr, "This is %s version %s. Valid commandline options:\n",
 	    PROGNAME, PROGVER);
+	fprintf(stderr, "  -L       Read/convert legacy flow logs\n");
 	fprintf(stderr, "  -q       Don't print flows to stdout (use with -o)\n");
 	fprintf(stderr, "  -d       Print debugging information\n");
 	fprintf(stderr, "  -f path  Filter flows using rule file\n");
@@ -53,48 +55,16 @@ static int
 open_start_log(const char *path, int debug)
 {
 	int fd;
-	off_t pos;
-	char ebuf[512];
 
 	if (path == NULL) {
 		/* Logfile on stdout */
 		fd = STDOUT_FILENO;
-	} else {
-		if ((fd = open(path, O_RDWR|O_APPEND|O_CREAT, 0600)) == -1)
-			logerr("open(%s)", path);
-
-		/* Only write out the header if we are at the start of the file */
-		switch ((pos = lseek(fd, 0, SEEK_END))) {
-		case 0:
-			/* New file, continue below */
-			break;
-		case -1:
-			logerr("lseek");
-		default:
-			/* Logfile exists, don't write new header */
-			if (lseek(fd, 0, SEEK_SET) != 0)
-				logerr("lseek");
-			if (store_check_header(fd, ebuf, sizeof(ebuf)) != 
-			    STORE_ERR_OK)
-				logerrx("Store error: %s", ebuf);
-			if (lseek(fd, 0, SEEK_END) <= 0) {
-				fprintf(stderr, "lseek: %s\n", strerror(errno));
-				exit(1);
-			}
-			if (debug) {
-				fprintf(stderr, "Continuing with existing "
-				    "logfile len %lld\n", (long long)pos);
-			}
-			return (fd);
-		}
-	}
+	} else if ((fd = open(path, O_RDWR|O_APPEND|O_CREAT, 0600)) == -1)
+		logerr("open(%s)", path);
 
 	if (debug)
 		fprintf(stderr, "Writing new logfile header\n");
 
-	if (store_put_header(fd, ebuf, sizeof(ebuf)) != STORE_ERR_OK)
-		logerrx("Store error: %s", ebuf);
-
 	return (fd);
 }
 
@@ -106,26 +76,30 @@ main(int argc, char **argv)
 	extern char *optarg;
 	extern int optind;
 	struct store_flow_complete flow;
-	struct store_header hdr;
+	struct store_v2_flow_complete flow_v2;
 	char buf[2048], ebuf[512];
 	const char *ffile, *ofile;
 	FILE *ffilef;
-	int ofd;
+	int ofd, read_legacy;
 	u_int32_t disp_mask;
 	struct flowd_config filter_config;
+	struct store_v2_header hdr_v2;
 
-	utc = verbose = debug = 0;
+	utc = verbose = debug = read_legacy = 0;
 	ofile = ffile = NULL;
 	ofd = -1;
 	ffilef = NULL;
 
 	bzero(&filter_config, sizeof(filter_config));
 
-	while ((ch = getopt(argc, argv, "Udf:ho:qv")) != -1) {
+	while ((ch = getopt(argc, argv, "LUdf:ho:qv")) != -1) {
 		switch (ch) {
 		case 'h':
 			usage();
 			return (0);
+		case 'L':
+			read_legacy = 1;
+			break;
 		case 'U':
 			utc = 1;
 			break;
@@ -190,24 +164,37 @@ main(int argc, char **argv)
 		else if ((fd = open(argv[i], O_RDONLY)) == -1)
 			logerr("open(%s)", argv[i]);
 
-		if (store_get_header(fd, &hdr, ebuf,
+		if (read_legacy && store_v2_get_header(fd, &hdr_v2, ebuf,
 		    sizeof(ebuf)) != STORE_ERR_OK)
-		    	logerrx("%s", ebuf);
+			logerrx("%s", ebuf);
 
 		if (verbose >= 0) {
-			printf("LOGFILE %s started at %s\n", argv[i],
-			    iso_time(ntohl(hdr.start_time), utc));
+			printf("LOGFILE %s", argv[i]);
+			if (read_legacy)
+				printf(" started at %s",
+				    iso_time(ntohl(hdr_v2.start_time), utc));
+			printf("\n");
 			fflush(stdout);
 		}
 
 		for (;;) {
 			bzero(&flow, sizeof(flow));
 
-			if ((r = store_get_flow(fd, &flow, ebuf,
-			    sizeof(ebuf))) == STORE_ERR_EOF)
+			if (read_legacy)
+				r = store_v2_get_flow(fd, &flow_v2, ebuf,
+				    sizeof(ebuf));
+			else
+				r = store_get_flow(fd, &flow, ebuf,
+				    sizeof(ebuf));
+				
+			if (r == STORE_ERR_EOF)
 				break;
 			else if (r != STORE_ERR_OK)
 			    	logerrx("%s", ebuf);
+
+			if (read_legacy &&
+			    store_v2_flow_convert(&flow_v2, &flow) == -1)
+			    	logerrx("legacy flow conversion failed");
 
 			if (ffile != NULL && filter_flow(&flow,
 			    &filter_config.filter_list) == FF_ACTION_DISCARD)
Index: flowd.c
===================================================================
RCS file: /var/cvs/flowd/flowd.c,v
retrieving revision 1.56
diff -u -p -u -r1.56 flowd.c
--- flowd.c	28 Apr 2005 09:02:58 -0000	1.56
+++ flowd.c	14 Aug 2005 01:09:04 -0000
@@ -119,37 +119,10 @@ static int
 start_log(int monitor_fd)
 {
 	int fd;
-	off_t pos;
-	char ebuf[512];
 
 	if ((fd = client_open_log(monitor_fd)) == -1)
 		logerrx("Logfile open failed, exiting");
 
-	/* Only write out the header if we are at the start of the file */
-	switch ((pos = lseek(fd, 0, SEEK_END))) {
-	case 0:
-		/* New file, continue below */
-		break;
-	case -1:
-		logerr("%s: lseek error, exiting", __func__);
-	default:
-		/* Logfile exists, don't write new header */
-		if (lseek(fd, 0, SEEK_SET) != 0)
-			logerr("%s: lseek error, exiting", __func__);
-		if (store_check_header(fd, ebuf, sizeof(ebuf)) != STORE_ERR_OK)
-			logerrx("%s: Exiting on %s", __func__, ebuf);
-		if (lseek(fd, 0, SEEK_END) <= 0)
-			logerr("%s: lseek error, exiting", __func__);
-		logit(LOG_DEBUG, "Continuing with existing logfile len %lld",
-		    (long long)pos);
-		return (fd);
-	}
-
-	logit(LOG_DEBUG, "Writing new logfile header");
-
-	if (store_put_header(fd, ebuf, sizeof(ebuf)) != STORE_ERR_OK)
-		logerrx("%s: Exiting on %s", __func__, ebuf);
-
 	return (fd);
 }
 
@@ -168,8 +141,8 @@ process_flow(struct store_flow_complete 
 
 	/* Prepare for writing */
 	flow->hdr.fields = htonl(flow->hdr.fields);
-
 	flow->recv_time.recv_secs = htonl(flow->recv_time.recv_secs);
+	flow->recv_time.recv_usecs = htonl(flow->recv_time.recv_usecs);
 
 	if (conf->opts & FLOWD_OPT_VERBOSE) {
 		char fbuf[1024];
@@ -199,6 +172,7 @@ process_netflow_v1(u_int8_t *pkt, size_t
 	struct store_flow_complete flow;
 	size_t offset;
 	u_int i, nflows;
+	struct timeval tv;
 
 	if (len < sizeof(*nf1_hdr)) {
 		peer->ninvalid++;
@@ -240,7 +214,9 @@ process_netflow_v1(u_int8_t *pkt, size_t
 		flow.hdr.fields &= ~STORE_FIELD_AS_INFO;
 		flow.hdr.fields &= ~STORE_FIELD_FLOW_ENGINE_INFO;
 
-		flow.recv_time.recv_secs = time(NULL);
+		gettimeofday(&tv, NULL);
+		flow.recv_time.recv_secs = tv.tv_sec;
+		flow.recv_time.recv_usecs = tv.tv_usec;
 
 		flow.pft.tcp_flags = nf1_flow->tcp_flags;
 		flow.pft.protocol = nf1_flow->protocol;
@@ -288,6 +264,7 @@ process_netflow_v5(u_int8_t *pkt, size_t
 	struct store_flow_complete flow;
 	size_t offset;
 	u_int i, nflows;
+	struct timeval tv;
 
 	if (len < sizeof(*nf5_hdr)) {
 		peer->ninvalid++;
@@ -327,7 +304,9 @@ process_netflow_v5(u_int8_t *pkt, size_t
 		flow.hdr.fields &= ~STORE_FIELD_DST_ADDR6;
 		flow.hdr.fields &= ~STORE_FIELD_GATEWAY_ADDR6;
 
-		flow.recv_time.recv_secs = time(NULL);
+		gettimeofday(&tv, NULL);
+		flow.recv_time.recv_secs = tv.tv_sec;
+		flow.recv_time.recv_usecs = tv.tv_usec;
 
 		flow.pft.tcp_flags = nf5_flow->tcp_flags;
 		flow.pft.protocol = nf5_flow->protocol;
@@ -384,6 +363,7 @@ process_netflow_v7(u_int8_t *pkt, size_t
 	struct store_flow_complete flow;
 	size_t offset;
 	u_int i, nflows;
+	struct timeval tv;
 
 	if (len < sizeof(*nf7_hdr)) {
 		peer->ninvalid++;
@@ -429,7 +409,9 @@ process_netflow_v7(u_int8_t *pkt, size_t
 		 * the Cat5k (e.g. destination-only mls nde mode)
 		 */
 
-		flow.recv_time.recv_secs = time(NULL);
+		gettimeofday(&tv, NULL);
+		flow.recv_time.recv_secs = tv.tv_sec;
+		flow.recv_time.recv_usecs = tv.tv_usec;
 
 		flow.pft.tcp_flags = nf7_flow->tcp_flags;
 		flow.pft.protocol = nf7_flow->protocol;
@@ -660,9 +642,10 @@ nf9_check_rec_len(u_int type, u_int len)
 static int
 nf9_flowset_to_store(u_int8_t *pkt, size_t len, struct xaddr *flow_source,
     struct NF9_HEADER *nf9_hdr, struct peer_nf9_template *template,
-    struct store_flow_complete *flow)
+    u_int32_t source_id, struct store_flow_complete *flow)
 {
 	u_int offset, i;
+	struct timeval tv;
 
 	if (template->total_len > len)
 		return (-1);
@@ -675,7 +658,10 @@ nf9_flowset_to_store(u_int8_t *pkt, size
 	flow->ainfo.time_sec = nf9_hdr->time_sec;
 	flow->ainfo.netflow_version = nf9_hdr->c.version;
 	flow->finf.flow_sequence = nf9_hdr->package_sequence;
-	flow->recv_time.recv_secs = time(NULL);
+	flow->finf.source_id = htonl(source_id);
+	gettimeofday(&tv, NULL);
+	flow->recv_time.recv_secs = tv.tv_sec;
+	flow->recv_time.recv_usecs = tv.tv_usec;
 	memcpy(&flow->agent_addr, flow_source, sizeof(flow->agent_addr));
 
 	offset = 0;
@@ -693,7 +679,7 @@ nf9_flowset_to_store(u_int8_t *pkt, size
 
 static int
 process_netflow_v9_template(u_int8_t *pkt, size_t len, struct peer_state *peer,
-    struct peers *peers, u_int source_id)
+    struct peers *peers, u_int32_t source_id)
 {
 	struct NF9_TEMPLATE_FLOWSET_HEADER *tmplh;
 	struct NF9_TEMPLATE_FLOWSET_RECORD *tmplr;
@@ -785,7 +771,7 @@ process_netflow_v9_template(u_int8_t *pk
 
 static int
 process_netflow_v9_data(u_int8_t *pkt, size_t len, struct peer_state *peer,
-    u_int source_id, struct NF9_HEADER *nf9_hdr, struct flowd_config *conf,
+    u_int32_t source_id, struct NF9_HEADER *nf9_hdr, struct flowd_config *conf,
     int log_fd, u_int *num_flows)
 {
 	struct store_flow_complete *flows;
@@ -835,7 +821,8 @@ process_netflow_v9_data(u_int8_t *pkt, s
 
 	for (i = 0; i < num_flowsets; i++) {
 		if (nf9_flowset_to_store(pkt + offset, template->total_len,
-		    &peer->from, nf9_hdr, template, &flows[i]) == -1) {
+		    &peer->from, nf9_hdr, template, source_id, 
+		    &flows[i]) == -1) {
 			peer->ninvalid++;
 			free(flows);
 			logit(LOG_WARNING, "invalid netflow v.9 data flowset "
@@ -863,8 +850,8 @@ process_netflow_v9(u_int8_t *pkt, size_t
 {
 	struct NF9_HEADER *nf9_hdr = (struct NF9_HEADER *)pkt;
 	struct NF9_FLOWSET_HEADER_COMMON *flowset;
-	u_int i, count, flowset_id, flowset_len, flowset_flows, total_flows;
-	u_int offset, source_id;
+	u_int32_t i, count, flowset_id, flowset_len, flowset_flows;
+	u_int32_t offset, source_id, total_flows;
 
 	if (len < sizeof(*nf9_hdr)) {
 		peer->ninvalid++;
Index: flowd.py
===================================================================
RCS file: /var/cvs/flowd/flowd.py,v
retrieving revision 1.10
diff -u -p -u -r1.10 flowd.py
--- flowd.py	14 May 2005 07:22:08 -0000	1.10
+++ flowd.py	14 Aug 2005 01:09:04 -0000
@@ -14,7 +14,7 @@
 
 # $Id: flowd.py,v 1.10 2005/05/14 07:22:08 djm Exp $
 
-VERSION = "0.8.5"
+VERSION = "0.9"
 
 import struct
 import time
@@ -54,36 +54,8 @@ class log:
 		self.mode = mode
 		if mode == "r":
 			self.flow_file = open(path, "rb")
-			# Read header
-			hdr = self.flow_file.read(16)
-			if len(hdr) != 16:
-				raise ValueError, "Short read on flow header"
-			(self.magic, self.version, self.start_time, \
-			 self.flags) = struct.unpack(">IIII", hdr)
-
-			if self.magic != 0x012cf047:
-				raise ValueError, "Bad magic"
-			if self.version != 0x00000002:
-				raise ValueError, "Unsupported version"
 		elif mode == "w" or mode == "a":
 			self.flow_file = open(path, mode + "b")
-			self.flow_file.seek(0, 2)
-			if self.flow_file.tell() > 0:
-				return
-
-			# Write header
-			self.magic = 0x012cf047
-			self.version = 0x02
-			self.flags = 0x00
-			self.start_time = start_time
-			if start_time is None:
-				self.start_time = int(time.time())
-			hdr = struct.pack(">IIII", self.magic, self.version, \
-			    self.start_time, self.flags)
-			if len(hdr) != 16:
-				raise ValueError, "Internal error: bad header"
-			self.flow_file.write(hdr)
-			self.flow_file.flush()
 		else:
 			raise ValueError, "Invalid mode value";
 
Index: setup.py
===================================================================
RCS file: /var/cvs/flowd/setup.py,v
retrieving revision 1.8
diff -u -p -u -r1.8 setup.py
--- setup.py	14 May 2005 07:22:08 -0000	1.8
+++ setup.py	14 Aug 2005 01:09:04 -0000
@@ -27,11 +27,11 @@ if __name__ == '__main__':
 
 	flowd_serialiser = Extension('flowd_serialiser',
 		sources = ['flowd_python.c'],
-		define_macros = [('PROGVER', '"0.8.5"')],
+		define_macros = [('PROGVER', '"0.9"')],
 		libraries = ['flowd'],
 		library_dirs = ['.', '../..'])
 	setup(	name = "flowd",
-		version = "0.8.5",
+		version = "0.9",
 		author = "Damien Miller",
 		author_email = "djm at mindrot.org",
 		url = "http://www.mindrot.org/flowd.html",
Index: store-v2.c
===================================================================
RCS file: store-v2.c
diff -N store-v2.c
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ store-v2.c	14 Aug 2005 01:09:04 -0000
@@ -0,0 +1,557 @@
+/*
+ * Copyright (c) 2004 Damien Miller <djm at mindrot.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/types.h>
+
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <time.h>
+#include <poll.h>
+
+#include "common.h"
+#include "store-v2.h"
+#include "atomicio.h"
+#include "crc32.h"
+
+RCSID("$Id$");
+
+/* This is a useful abbreviation, used in several places below */
+#define SHASFIELD(flag) (fields & STORE_V2_FIELD_##flag)
+
+/* Stash error message and return */
+#define SFAILX(i, m, f) do {						\
+		if (ebuf != NULL && elen > 0) {				\
+			snprintf(ebuf, elen, "%s%s%s",			\
+			    (f) ? __func__ : "", (f) ? ": " : "", m);	\
+		}							\
+		return (i);						\
+	} while (0)
+
+/* Stash error message, appending strerror into "ebuf" and return */
+#define SFAIL(i, m, f) do {						\
+		if (ebuf != NULL && elen > 0) {				\
+			snprintf(ebuf, elen, "%s%s%s: %s", 		\
+			    (f) ? __func__ : "", (f) ? ": " : "", m, 	\
+			    strerror(errno));				\
+		}							\
+		return (i);						\
+	} while (0)
+
+int
+store_v2_validate_header(struct store_v2_header *hdr, char *ebuf, int elen)
+{
+	if (ntohl(hdr->magic) != STORE_V2_MAGIC)
+		SFAILX(STORE_ERR_BAD_MAGIC, "Bad magic", 0);
+	if (ntohl(hdr->version) != STORE_V2_VERSION)
+		SFAILX(STORE_ERR_UNSUP_VERSION, "Unsupported version", 0);
+
+	return (STORE_ERR_OK);
+}
+
+int
+store_v2_get_header(int fd, struct store_v2_header *hdr, char *ebuf, int elen)
+{
+	ssize_t r;
+
+	if ((r = atomicio(read, fd, hdr, sizeof(*hdr))) == -1)
+		SFAIL(STORE_ERR_IO, "read error", 0);
+	if (r < (ssize_t)sizeof(*hdr))
+		SFAILX(STORE_ERR_EOF, "premature EOF", 0);
+
+	return (store_v2_validate_header(hdr, ebuf, elen));
+}
+
+int
+store_v2_calc_flow_len(struct store_v2_flow *hdr)
+{
+	int ret = 0;
+	u_int32_t fields;
+
+	fields = ntohl(hdr->fields);
+#define ADDFIELD(flag) do { \
+		if (SHASFIELD(flag)) { \
+			ret += sizeof(struct store_v2_flow_##flag); \
+			fields &= ~STORE_V2_FIELD_##flag; \
+		} } while (0)
+	ADDFIELD(TAG);
+	ADDFIELD(RECV_TIME);
+	ADDFIELD(PROTO_FLAGS_TOS);
+	ADDFIELD(AGENT_ADDR4);
+	ADDFIELD(AGENT_ADDR6);
+	ADDFIELD(SRC_ADDR4);
+	ADDFIELD(SRC_ADDR6);
+	ADDFIELD(DST_ADDR4);
+	ADDFIELD(DST_ADDR6);
+	ADDFIELD(GATEWAY_ADDR4);
+	ADDFIELD(GATEWAY_ADDR6);
+	ADDFIELD(SRCDST_PORT);
+	ADDFIELD(PACKETS);
+	ADDFIELD(OCTETS);
+	ADDFIELD(IF_INDICES);
+	ADDFIELD(AGENT_INFO);
+	ADDFIELD(FLOW_TIMES);
+	ADDFIELD(AS_INFO);
+	ADDFIELD(FLOW_ENGINE_INFO);
+	ADDFIELD(CRC32);
+#undef ADDFIELD
+
+	/* Make sure we have captured everything */
+	if (fields != 0)
+		return (-1);
+
+	return (ret);
+}
+
+int
+store_v2_flow_deserialise(u_int8_t *buf, int len, struct store_v2_flow_complete *f,
+    char *ebuf, int elen)
+{
+	int offset, r;
+	struct store_v2_flow_AGENT_ADDR4 aa4;
+	struct store_v2_flow_AGENT_ADDR6 aa6;
+	struct store_v2_flow_SRC_ADDR4 sa4;
+	struct store_v2_flow_SRC_ADDR6 sa6;
+	struct store_v2_flow_DST_ADDR4 da4;
+	struct store_v2_flow_DST_ADDR6 da6;
+	struct store_v2_flow_GATEWAY_ADDR4 ga4;
+	struct store_v2_flow_GATEWAY_ADDR6 ga6;
+	u_int32_t fields, crc;
+
+	bzero(f, sizeof(*f));
+	crc32_start(&crc);
+
+	memcpy(&f->hdr.fields, buf, sizeof(f->hdr.fields));
+
+	if (len < sizeof(f->hdr))
+		SFAILX(STORE_ERR_BUFFER_SIZE,
+		    "supplied length is too small", 1);
+
+	if ((r = store_v2_calc_flow_len((struct store_v2_flow *)buf)) == -1)
+		SFAILX(STORE_ERR_FLOW_INVALID,
+		    "unsupported flow fields specified", 0);
+
+	if (len - sizeof(f->hdr) < r)
+		SFAILX(STORE_ERR_BUFFER_SIZE,
+		    "calulated flow length is less than supplied len", 1);
+
+	crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc);
+
+	fields = ntohl(f->hdr.fields);
+
+	offset = sizeof(f->hdr);
+
+#define RFIELD(flag, dest) do { \
+		if (SHASFIELD(flag)) { \
+			memcpy(&dest, buf + offset, sizeof(dest)); \
+			offset += sizeof(dest); \
+			if (SHASFIELD(CRC32) && \
+			    STORE_V2_FIELD_##flag != STORE_V2_FIELD_CRC32) { \
+				crc32_update((u_char *)&dest, sizeof(dest), \
+				    &crc); \
+			} \
+		} } while (0)
+
+	RFIELD(TAG, f->tag);
+	RFIELD(RECV_TIME, f->recv_time);
+	RFIELD(PROTO_FLAGS_TOS, f->pft);
+	RFIELD(AGENT_ADDR4, aa4);
+	RFIELD(AGENT_ADDR6, aa6);
+	RFIELD(SRC_ADDR4, sa4);
+	RFIELD(SRC_ADDR6, sa6);
+	RFIELD(DST_ADDR4, da4);
+	RFIELD(DST_ADDR6, da6);
+	RFIELD(GATEWAY_ADDR4, ga4);
+	RFIELD(GATEWAY_ADDR6, ga6);
+	RFIELD(SRCDST_PORT, f->ports);
+	RFIELD(PACKETS, f->packets);
+	RFIELD(OCTETS, f->octets);
+	RFIELD(IF_INDICES, f->ifndx);
+	RFIELD(AGENT_INFO, f->ainfo);
+	RFIELD(FLOW_TIMES, f->ftimes);
+	RFIELD(AS_INFO, f->asinf);
+	RFIELD(FLOW_ENGINE_INFO, f->finf);
+	RFIELD(CRC32, f->crc32);
+
+	/* Sanity check and convert addresses */
+	if (SHASFIELD(AGENT_ADDR4) && SHASFIELD(AGENT_ADDR6))
+		SFAILX(-1, "Flow has both v4/v6 agent addrs", 0);
+	if (SHASFIELD(SRC_ADDR4) && SHASFIELD(SRC_ADDR6))
+		SFAILX(-1, "Flow has both v4/v6 src addrs", 0);
+	if (SHASFIELD(DST_ADDR4) && SHASFIELD(DST_ADDR6))
+		SFAILX(-1, "Flow has both v4/v6 dst addrs", 0);
+	if (SHASFIELD(GATEWAY_ADDR4) && SHASFIELD(GATEWAY_ADDR6))
+		SFAILX(-1, "Flow has both v4/v6 gateway addrs", 0);
+
+#define S_CPYADDR(d, s, fam) do {					\
+		(d).af = (fam == 4) ? AF_INET : AF_INET6;		\
+		memcpy(&d.v##fam, &s, sizeof(d.v##fam));		\
+	} while (0)
+
+	if (SHASFIELD(AGENT_ADDR4))
+		S_CPYADDR(f->agent_addr, aa4.flow_agent_addr, 4);
+	if (SHASFIELD(AGENT_ADDR6))
+		S_CPYADDR(f->agent_addr, aa6.flow_agent_addr, 6);
+	if (SHASFIELD(SRC_ADDR4))
+		S_CPYADDR(f->src_addr, sa4.src_addr, 4);
+	if (SHASFIELD(SRC_ADDR6))
+		S_CPYADDR(f->src_addr, sa6.src_addr, 6);
+	if (SHASFIELD(DST_ADDR4))
+		S_CPYADDR(f->dst_addr, da4.dst_addr, 4);
+	if (SHASFIELD(DST_ADDR6))
+		S_CPYADDR(f->dst_addr, da6.dst_addr, 6);
+	if (SHASFIELD(GATEWAY_ADDR4))
+		S_CPYADDR(f->gateway_addr, ga4.gateway_addr, 4);
+	if (SHASFIELD(GATEWAY_ADDR6))
+		S_CPYADDR(f->gateway_addr, ga6.gateway_addr, 6);
+
+	if (SHASFIELD(CRC32) && crc != ntohl(f->crc32.crc32))
+		SFAILX(STORE_ERR_CRC_MISMATCH, "Flow checksum mismatch", 0);
+
+#undef S_CPYADDR
+#undef RFIELD
+
+	return (STORE_ERR_OK);
+}
+
+int
+store_v2_get_flow(int fd, struct store_v2_flow_complete *f, char *ebuf, int elen)
+{
+	int r, len;
+	u_int8_t buf[512];
+
+	/* Read header */
+	if ((r = atomicio(read, fd, buf, sizeof(struct store_v2_flow))) == -1)
+		SFAIL(STORE_ERR_IO, "read flow header", 0);
+	if (r < sizeof(struct store_v2_flow))
+		SFAILX(STORE_ERR_EOF, "EOF reading flow header", 0);
+
+	if ((len = store_v2_calc_flow_len((struct store_v2_flow *)buf)) == -1)
+		SFAILX(STORE_ERR_FLOW_INVALID,
+		    "unsupported flow fields specified", 0);
+	if (len > sizeof(buf) - sizeof(struct store_v2_flow))
+		SFAILX(STORE_ERR_INTERNAL,
+		    "Internal error: flow buffer too small", 1);
+
+	if ((r = atomicio(read, fd, buf + sizeof(struct store_v2_flow), len)) == -1)
+		SFAIL(STORE_ERR_IO, "read flow data", 0);
+	if (r < len)
+		SFAILX(STORE_ERR_EOF, "EOF reading flow data", 0);
+
+	return (store_v2_flow_deserialise(buf, len + sizeof(struct store_v2_flow),
+	    f, ebuf, elen));
+}
+
+int
+store_v2_check_header(int fd, char *ebuf, int elen)
+{
+	struct store_v2_header hdr;
+	int r;
+
+	if ((r = store_v2_get_header(fd, &hdr, ebuf, elen)) != STORE_ERR_OK)
+		return (r);
+
+	/* store_get_header does all the magic & version checks for us */
+
+	return (STORE_ERR_OK);
+}
+
+int
+store_v2_put_header(int fd, char *ebuf, int elen)
+{
+	struct store_v2_header hdr;
+	int r;
+
+	bzero(&hdr, sizeof(hdr));
+	hdr.magic = htonl(STORE_V2_MAGIC);
+	hdr.version = htonl(STORE_V2_VERSION);
+	hdr.start_time = htonl(time(NULL));
+	hdr.flags = htonl(0);
+
+	r = atomicio(vwrite, fd, &hdr, sizeof(hdr));
+	if (r == -1)
+		SFAIL(STORE_ERR_IO, "write error on header", 0);
+	if (r < (ssize_t)sizeof(hdr))
+		SFAILX(STORE_ERR_EOF, "EOF while writing header", 0);
+
+	return (STORE_ERR_OK);
+}
+
+int
+store_v2_flow_serialise(struct store_v2_flow_complete *f, u_int8_t *buf, int buflen,
+    int *flowlen, char *ebuf, int elen)
+{
+	struct store_v2_flow_AGENT_ADDR4 aa4;
+	struct store_v2_flow_AGENT_ADDR6 aa6;
+	struct store_v2_flow_SRC_ADDR4 sa4;
+	struct store_v2_flow_SRC_ADDR6 sa6;
+	struct store_v2_flow_DST_ADDR4 da4;
+	struct store_v2_flow_DST_ADDR6 da6;
+	struct store_v2_flow_GATEWAY_ADDR4 gwa4;
+	struct store_v2_flow_GATEWAY_ADDR6 gwa6;
+	u_int32_t fields, crc;
+	int offset;
+
+	fields = ntohl(f->hdr.fields);
+
+	/* Convert addresses and set AF fields correctly */
+	/* XXX this is too repetitive */
+	switch(f->agent_addr.af) {
+	case AF_INET:
+		if ((fields & STORE_V2_FIELD_AGENT_ADDR4) == 0)
+			break;
+		memcpy(&aa4.flow_agent_addr, &f->agent_addr.v4,
+		    sizeof(aa4.flow_agent_addr));
+		fields |= STORE_V2_FIELD_AGENT_ADDR4;
+		fields &= ~STORE_V2_FIELD_AGENT_ADDR6;
+		break;
+	case AF_INET6:
+		if ((fields & STORE_V2_FIELD_AGENT_ADDR6) == 0)
+			break;
+		memcpy(&aa6.flow_agent_addr, &f->agent_addr.v6,
+		    sizeof(aa6.flow_agent_addr));
+		fields |= STORE_V2_FIELD_AGENT_ADDR6;
+		fields &= ~STORE_V2_FIELD_AGENT_ADDR4;
+		break;
+	default:
+		if ((fields & STORE_V2_FIELD_AGENT_ADDR) == 0)
+			break;
+		SFAILX(STORE_ERR_FLOW_INVALID, "silly agent addr af", 1);
+	}
+
+	switch(f->src_addr.af) {
+	case AF_INET:
+		if ((fields & STORE_V2_FIELD_SRC_ADDR4) == 0)
+			break;
+		memcpy(&sa4.src_addr, &f->src_addr.v4,
+		    sizeof(sa4.src_addr));
+		fields |= STORE_V2_FIELD_SRC_ADDR4;
+		fields &= ~STORE_V2_FIELD_SRC_ADDR6;
+		break;
+	case AF_INET6:
+		if ((fields & STORE_V2_FIELD_SRC_ADDR6) == 0)
+			break;
+		memcpy(&sa6.src_addr, &f->src_addr.v6,
+		    sizeof(sa6.src_addr));
+		fields |= STORE_V2_FIELD_SRC_ADDR6;
+		fields &= ~STORE_V2_FIELD_SRC_ADDR4;
+		break;
+	default:
+		if ((fields & STORE_V2_FIELD_SRC_ADDR) == 0)
+			break;
+		SFAILX(STORE_ERR_FLOW_INVALID, "silly src addrs af", 1);
+	}
+
+	switch(f->dst_addr.af) {
+	case AF_INET:
+		if ((fields & STORE_V2_FIELD_DST_ADDR4) == 0)
+			break;
+		memcpy(&da4.dst_addr, &f->dst_addr.v4,
+		    sizeof(da4.dst_addr));
+		fields |= STORE_V2_FIELD_DST_ADDR4;
+		fields &= ~STORE_V2_FIELD_DST_ADDR6;
+		break;
+	case AF_INET6:
+		if ((fields & STORE_V2_FIELD_DST_ADDR6) == 0)
+			break;
+		memcpy(&da6.dst_addr, &f->dst_addr.v6,
+		    sizeof(da6.dst_addr));
+		fields |= STORE_V2_FIELD_DST_ADDR6;
+		fields &= ~STORE_V2_FIELD_DST_ADDR4;
+		break;
+	default:
+		if ((fields & STORE_V2_FIELD_DST_ADDR) == 0)
+			break;
+		SFAILX(STORE_ERR_FLOW_INVALID, "silly dst addrs af", 1);
+	}
+
+	switch(f->gateway_addr.af) {
+	case AF_INET:
+		if ((fields & STORE_V2_FIELD_GATEWAY_ADDR4) == 0)
+			break;
+		memcpy(&gwa4.gateway_addr, &f->gateway_addr.v4,
+		    sizeof(gwa4.gateway_addr));
+		fields |= STORE_V2_FIELD_GATEWAY_ADDR4;
+		fields &= ~STORE_V2_FIELD_GATEWAY_ADDR6;
+		break;
+	case AF_INET6:
+		if ((fields & STORE_V2_FIELD_GATEWAY_ADDR6) == 0)
+			break;
+		memcpy(&gwa6.gateway_addr, &f->gateway_addr.v6,
+		    sizeof(gwa6.gateway_addr));
+		fields |= STORE_V2_FIELD_GATEWAY_ADDR6;
+		fields &= ~STORE_V2_FIELD_GATEWAY_ADDR4;
+		break;
+	default:
+		if ((fields & STORE_V2_FIELD_GATEWAY_ADDR) == 0)
+			break;
+		SFAILX(STORE_ERR_FLOW_INVALID, "silly gateway addr af", 1);
+	}
+
+	crc32_start(&crc);
+	offset = 0;
+
+	/* Fields have probably changes as a result of address conversion */
+	f->hdr.fields = htonl(fields);
+	if (store_v2_calc_flow_len(&f->hdr) > buflen)
+		SFAILX(STORE_ERR_BUFFER_SIZE, "flow buffer too small", 1);
+
+	memcpy(buf + offset, &f->hdr, sizeof(f->hdr));
+	offset += sizeof(f->hdr);
+	crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc);
+
+#define WFIELD(spec, what) do {						\
+	if (SHASFIELD(spec)) {						\
+		memcpy(buf + offset, &(what), sizeof(what));		\
+		offset += sizeof(what);					\
+		if (SHASFIELD(spec) && 					\
+		    (STORE_V2_FIELD_##spec != STORE_V2_FIELD_CRC32)) {	\
+			crc32_update((u_char *)&(what), sizeof(what),	\
+			    &crc);					\
+		}							\
+	}  } while (0)
+
+	WFIELD(TAG, f->tag);
+	WFIELD(RECV_TIME, f->recv_time);
+	WFIELD(PROTO_FLAGS_TOS, f->pft);
+	WFIELD(AGENT_ADDR4, aa4);
+	WFIELD(AGENT_ADDR6, aa6);
+	WFIELD(SRC_ADDR4, sa4);
+	WFIELD(SRC_ADDR6, sa6);
+	WFIELD(DST_ADDR4, da4);
+	WFIELD(DST_ADDR6, da6);
+	WFIELD(GATEWAY_ADDR4, gwa4);
+	WFIELD(GATEWAY_ADDR6, gwa6);
+	WFIELD(SRCDST_PORT, f->ports);
+	WFIELD(PACKETS, f->packets);
+	WFIELD(OCTETS, f->octets);
+	WFIELD(IF_INDICES, f->ifndx);
+	WFIELD(AGENT_INFO, f->ainfo);
+	WFIELD(FLOW_TIMES, f->ftimes);
+	WFIELD(AS_INFO, f->asinf);
+	WFIELD(FLOW_ENGINE_INFO, f->finf);
+	if (fields & (STORE_V2_FIELD_CRC32))
+		f->crc32.crc32 = htonl(crc);
+	WFIELD(CRC32, f->crc32);
+#undef WFIELD
+
+	*flowlen = offset;
+	return (STORE_ERR_OK);
+}
+
+int
+store_v2_put_flow(int fd, struct store_v2_flow_complete *flow, u_int32_t fieldmask,
+    char *ebuf, int elen)
+{
+	u_int32_t fields, origfields;
+	off_t startpos;
+	u_int8_t buf[512];
+	int len, r, saved_errno, ispipe = 0;
+
+	/* Remember where we started, so we can back errors out */
+	if ((startpos = lseek(fd, 0, SEEK_CUR)) == -1) {
+		if (errno == ESPIPE)
+			ispipe = 1;
+		else
+			SFAIL(STORE_ERR_IO_SEEK, "lseek", 1);
+	}
+
+	origfields = ntohl(flow->hdr.fields);
+	fields = origfields & fieldmask;
+	flow->hdr.fields = htonl(fields);
+
+	r = store_v2_flow_serialise(flow, buf, sizeof(buf), &len, ebuf, elen);
+	if (r != STORE_ERR_OK) {
+		flow->hdr.fields = htonl(origfields);
+		return (r);
+	}
+
+	r = atomicio(vwrite, fd, buf, len);
+	saved_errno = errno;
+	flow->hdr.fields = htonl(origfields);
+
+	if (r == len)
+		return (STORE_ERR_OK);
+
+	if (ispipe)
+		SFAIL(STORE_ERR_CORRUPT, "corrupting failure on pipe", 1);
+
+	/* Try to rewind to starting position, so we don't corrupt flow store */
+	if (lseek(fd, startpos, SEEK_SET) == -1)
+		SFAIL(STORE_ERR_CORRUPT, "corrupting failure on lseek", 1);
+	if (ftruncate(fd, startpos) == -1)
+		SFAIL(STORE_ERR_CORRUPT, "corrupting failure on ftruncate", 1);
+
+	/* Partial flow record has been removed, return with orig. error */
+	errno = saved_errno;
+	if (r == -1)
+		SFAIL(STORE_ERR_IO, "write flow", 0);
+	else
+		SFAILX(STORE_ERR_EOF, "EOF on write flow", 0);
+}
+
+int
+store_v2_flow_convert(struct store_v2_flow_complete *fv2,
+    struct store_flow_complete *f)
+{
+	int len;
+
+	bzero(f, sizeof(*f));
+	f->hdr.version = STORE_VERSION;
+	f->hdr.fields = fv2->hdr.fields;
+	if ((len = store_calc_flow_len(&f->hdr)) == -1)
+		return (-1);
+	f->hdr.len_words = len / 4;
+
+	f->tag.tag = fv2->tag.tag;
+	f->recv_time.recv_secs = fv2->recv_time.recv_secs;
+	f->pft.tcp_flags = fv2->pft.tcp_flags;
+	f->pft.protocol = fv2->pft.protocol;
+	f->pft.tos = fv2->pft.tos;
+	f->pft.pad = fv2->pft.pad;
+	f->agent_addr = fv2->agent_addr;
+	f->src_addr = fv2->src_addr;
+	f->dst_addr = fv2->dst_addr;
+	f->gateway_addr = fv2->gateway_addr;
+	f->ports.src_port = fv2->ports.src_port;
+	f->ports.dst_port = fv2->ports.dst_port;
+	f->packets.flow_packets = fv2->packets.flow_packets;
+	f->octets.flow_octets = fv2->octets.flow_octets;
+	f->ifndx.if_index_in = htonl(ntohs(fv2->ifndx.if_index_in));
+	f->ifndx.if_index_out = htonl(ntohs(fv2->ifndx.if_index_out));
+	f->ainfo.sys_uptime_ms = fv2->ainfo.sys_uptime_ms;
+	f->ainfo.time_sec = fv2->ainfo.time_sec;
+	f->ainfo.time_nanosec = fv2->ainfo.time_nanosec;
+	f->ainfo.netflow_version = fv2->ainfo.netflow_version;
+	f->ainfo.pad = fv2->ainfo.pad;
+	f->ftimes.flow_start = fv2->ftimes.flow_start;
+	f->ftimes.flow_finish = fv2->ftimes.flow_finish;
+	f->asinf.src_as = htonl(ntohs(fv2->asinf.src_as));
+	f->asinf.dst_as = htonl(ntohs(fv2->asinf.dst_as));
+	f->asinf.src_mask = fv2->asinf.src_mask;
+	f->asinf.dst_mask = fv2->asinf.dst_mask;
+	f->asinf.pad = fv2->asinf.pad;
+	f->finf.engine_type = fv2->finf.engine_type;
+	f->finf.engine_id = fv2->finf.engine_id;
+	f->finf.pad = fv2->finf.pad;
+	f->finf.flow_sequence = fv2->finf.flow_sequence;
+	f->crc32.crc32 = fv2->crc32.crc32;
+
+	return (0);
+}
Index: store-v2.h
===================================================================
RCS file: store-v2.h
diff -N store-v2.h
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ store-v2.h	14 Aug 2005 01:09:04 -0000
@@ -0,0 +1,255 @@
+/*	$Id$	*/
+
+/*
+ * Copyright (c) 2004 Damien Miller <djm at mindrot.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+/* On-disk storage format */
+
+#ifndef _STORE_V2_H
+#define _STORE_v2_H
+
+#if defined(HAVE_SYS_CDEFS_H)
+# include <sys/cdefs.h> /* For __packed, etc on platforms that have it */
+#endif
+#if defined(__GNUC__) && !defined(__packed)
+# define __packed __attribute__((__packed__))
+#endif
+
+#include "addr.h"
+#include "store.h"
+
+#define STORE_V2_MAGIC			0x012cf047
+#define STORE_V2_VERSION		0x00000002
+/* Start of flow log file */
+struct store_v2_header {
+	u_int32_t		magic;
+	u_int32_t		version;
+	u_int32_t		start_time;
+	u_int32_t		flags;	/* Currently 0 */
+} __packed;
+
+/*
+ * Optional flow fields, specify what is stored for the flow
+ * NB - the flow records appear in this order on disk
+ */
+#define STORE_V2_FIELD_TAG		(1U)
+#define STORE_V2_FIELD_RECV_TIME	(1U<<1)
+#define STORE_V2_FIELD_PROTO_FLAGS_TOS	(1U<<2)
+#define STORE_V2_FIELD_AGENT_ADDR4	(1U<<3)
+#define STORE_V2_FIELD_AGENT_ADDR6	(1U<<4)
+#define STORE_V2_FIELD_SRC_ADDR4	(1U<<5)
+#define STORE_V2_FIELD_SRC_ADDR6	(1U<<6)
+#define STORE_V2_FIELD_DST_ADDR4	(1U<<7)
+#define STORE_V2_FIELD_DST_ADDR6	(1U<<8)
+#define STORE_V2_FIELD_GATEWAY_ADDR4	(1U<<9)
+#define STORE_V2_FIELD_GATEWAY_ADDR6	(1U<<10)
+#define STORE_V2_FIELD_SRCDST_PORT	(1U<<11)
+#define STORE_V2_FIELD_PACKETS		(1U<<12)
+#define STORE_V2_FIELD_OCTETS		(1U<<13)
+#define STORE_V2_FIELD_IF_INDICES	(1U<<14)
+#define STORE_V2_FIELD_AGENT_INFO	(1U<<15)
+#define STORE_V2_FIELD_FLOW_TIMES	(1U<<16)
+#define STORE_V2_FIELD_AS_INFO		(1U<<17)
+#define STORE_V2_FIELD_FLOW_ENGINE_INFO	(1U<<18)
+/* ... more one day */
+
+#define STORE_V2_FIELD_CRC32		(1U<<30)
+#define STORE_V2_FIELD_RESERVED		(1U<<31) /* For extension header */
+#define STORE_V2_FIELD_ALL		(((1U<<19)-1)|STORE_V2_FIELD_CRC32)
+
+/* Useful combinations */
+#define STORE_V2_FIELD_AGENT_ADDR	(STORE_V2_FIELD_AGENT_ADDR4|\
+					 STORE_V2_FIELD_AGENT_ADDR6)
+#define STORE_V2_FIELD_SRC_ADDR		(STORE_V2_FIELD_SRC_ADDR4|\
+					 STORE_V2_FIELD_SRC_ADDR6)
+#define STORE_V2_FIELD_DST_ADDR		(STORE_V2_FIELD_DST_ADDR4|\
+					 STORE_V2_FIELD_DST_ADDR6)
+#define STORE_V2_FIELD_SRCDST_ADDR	(STORE_V2_FIELD_SRC_ADDR|\
+					 STORE_V2_FIELD_DST_ADDR)
+#define STORE_V2_FIELD_GATEWAY_ADDR	(STORE_V2_FIELD_GATEWAY_ADDR4|\
+					 STORE_V2_FIELD_GATEWAY_ADDR6)
+
+#define STORE_V2_DISPLAY_ALL		STORE_V2_FIELD_ALL
+#define STORE_V2_DISPLAY_BRIEF		(STORE_V2_FIELD_TAG|\
+					 STORE_V2_FIELD_RECV_TIME|\
+					 STORE_V2_FIELD_PROTO_FLAGS_TOS|\
+					 STORE_V2_FIELD_SRCDST_PORT|\
+					 STORE_V2_FIELD_PACKETS|\
+					 STORE_V2_FIELD_OCTETS|\
+					 STORE_V2_FIELD_SRCDST_ADDR|\
+					 STORE_V2_FIELD_AGENT_ADDR4|\
+					 STORE_V2_FIELD_AGENT_ADDR6)
+
+/* Start of flow record - present for every flow */
+struct store_v2_flow {
+	u_int32_t		fields;
+} __packed;
+
+/*
+ * Optional flow records
+ * NB. suffixes must match the corresponding STORE_FIELD_ define (see store.c)
+ */
+
+/* Optional flow field - present if STORE_FIELD_TAG */
+struct store_v2_flow_TAG {
+	u_int32_t		tag; /* set by filter */
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_RECV_TIME */
+struct store_v2_flow_RECV_TIME {
+	u_int32_t		recv_secs;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_PROTO_FLAGS_TOS */
+struct store_v2_flow_PROTO_FLAGS_TOS {
+	u_int8_t		tcp_flags;
+	u_int8_t		protocol;
+	u_int8_t		tos;
+	u_int8_t		pad;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_AGENT_ADDR */
+struct store_v2_flow_AGENT_ADDR4 {
+	struct store_addr4	flow_agent_addr;
+} __packed;
+struct store_v2_flow_AGENT_ADDR6 {
+	struct store_addr6	flow_agent_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_SRC_ADDR4 */
+struct store_v2_flow_SRC_ADDR4 {
+	struct store_addr4	src_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_DST_ADDR4 */
+struct store_v2_flow_DST_ADDR4 {
+	struct store_addr4	dst_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_SRC_ADDR6 */
+struct store_v2_flow_SRC_ADDR6 {
+	struct store_addr6	src_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_DST_ADDR6 */
+struct store_v2_flow_DST_ADDR6 {
+	struct store_addr6	dst_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_GATEWAY_ADDR */
+struct store_v2_flow_GATEWAY_ADDR4 {
+	struct store_addr4	gateway_addr;
+} __packed;
+struct store_v2_flow_GATEWAY_ADDR6 {
+	struct store_addr6	gateway_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_SRCDST_PORT */
+struct store_v2_flow_SRCDST_PORT {
+	u_int16_t		src_port;
+	u_int16_t		dst_port;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_PACKETS */
+struct store_v2_flow_PACKETS {
+	u_int64_t		flow_packets;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_OCTETS */
+struct store_v2_flow_OCTETS {
+	u_int64_t		flow_octets;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_IF_INDICES */
+struct store_v2_flow_IF_INDICES {
+	u_int16_t		if_index_in;
+	u_int16_t		if_index_out;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_AGENT_INFO */
+struct store_v2_flow_AGENT_INFO {
+	u_int32_t		sys_uptime_ms;
+	u_int32_t		time_sec;
+	u_int32_t		time_nanosec;
+	u_int16_t		netflow_version;
+	u_int16_t		pad;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_FLOW_TIMES */
+struct store_v2_flow_FLOW_TIMES {
+	u_int32_t		flow_start;
+	u_int32_t		flow_finish;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_AS_INFO */
+struct store_v2_flow_AS_INFO {
+	u_int16_t		src_as;
+	u_int16_t		dst_as;
+	u_int8_t		src_mask;
+	u_int8_t		dst_mask;
+	u_int16_t		pad;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_FLOW_ENGINE_INFO */
+struct store_v2_flow_FLOW_ENGINE_INFO {
+	u_int8_t		engine_type;
+	u_int8_t		engine_id;
+	u_int16_t		pad;
+	u_int32_t		flow_sequence;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_CRC32 */
+struct store_v2_flow_CRC32 {
+	u_int32_t		crc32;
+} __packed;
+
+/* A abstract flow record (all fields included) */
+struct store_v2_flow_complete {
+	struct store_v2_flow			hdr;
+	struct store_v2_flow_TAG			tag;
+	struct store_v2_flow_RECV_TIME		recv_time;
+	struct store_v2_flow_PROTO_FLAGS_TOS	pft;
+	struct xaddr				agent_addr;
+	struct xaddr				src_addr;
+	struct xaddr				dst_addr;
+	struct xaddr				gateway_addr;
+	struct store_v2_flow_SRCDST_PORT		ports;
+	struct store_v2_flow_PACKETS		packets;
+	struct store_v2_flow_OCTETS		octets;
+	struct store_v2_flow_IF_INDICES		ifndx;
+	struct store_v2_flow_AGENT_INFO		ainfo;
+	struct store_v2_flow_FLOW_TIMES		ftimes;
+	struct store_v2_flow_AS_INFO		asinf;
+	struct store_v2_flow_FLOW_ENGINE_INFO	finf;
+	struct store_v2_flow_CRC32			crc32;
+} __packed;
+
+int store_v2_get_header(int fd, struct store_v2_header *hdr, char *ebuf, int elen);
+int store_v2_get_flow(int fd, struct store_v2_flow_complete *f, char *ebuf, int elen);
+int store_v2_check_header(int fd, char *ebuf, int elen);
+int store_v2_put_header(int fd, char *ebuf, int elen);
+int store_v2_put_flow(int fd, struct store_v2_flow_complete *flow,
+    u_int32_t fieldmask, char *ebuf, int elen);
+int store_v2_validate_header(struct store_v2_header *hdr, char *ebuf, int elen);
+int store_v2_calc_flow_len(struct store_v2_flow *hdr);
+int store_v2_flow_deserialise(u_int8_t *buf, int len,
+    struct store_v2_flow_complete *f, char *ebuf, int elen);
+int store_v2_flow_serialise(struct store_v2_flow_complete *f, u_int8_t *buf, int buflen,
+    int *flowlen, char *ebuf, int elen);
+int store_v2_flow_convert(struct store_v2_flow_complete *fv2,
+    struct store_flow_complete *f);
+
+#endif /* _STORE_V2_H */
Index: store.c
===================================================================
RCS file: /var/cvs/flowd/store.c,v
retrieving revision 1.30
diff -u -p -u -r1.30 store.c
--- store.c	4 Feb 2005 06:26:10 -0000	1.30
+++ store.c	14 Aug 2005 01:09:04 -0000
@@ -55,30 +55,6 @@ RCSID("$Id: store.c,v 1.30 2005/02/04 06
 	} while (0)
 
 int
-store_validate_header(struct store_header *hdr, char *ebuf, int elen)
-{
-	if (ntohl(hdr->magic) != STORE_MAGIC)
-		SFAILX(STORE_ERR_BAD_MAGIC, "Bad magic", 0);
-	if (ntohl(hdr->version) != STORE_VERSION)
-		SFAILX(STORE_ERR_UNSUP_VERSION, "Unsupported version", 0);
-
-	return (STORE_ERR_OK);
-}
-
-int
-store_get_header(int fd, struct store_header *hdr, char *ebuf, int elen)
-{
-	ssize_t r;
-
-	if ((r = atomicio(read, fd, hdr, sizeof(*hdr))) == -1)
-		SFAIL(STORE_ERR_IO, "read error", 0);
-	if (r < (ssize_t)sizeof(*hdr))
-		SFAILX(STORE_ERR_EOF, "premature EOF", 0);
-
-	return (store_validate_header(hdr, ebuf, elen));
-}
-
-int
 store_calc_flow_len(struct store_flow *hdr)
 {
 	int ret = 0;
@@ -123,7 +99,7 @@ int
 store_flow_deserialise(u_int8_t *buf, int len, struct store_flow_complete *f,
     char *ebuf, int elen)
 {
-	int offset, r;
+	int offset, allow_extra;
 	struct store_flow_AGENT_ADDR4 aa4;
 	struct store_flow_AGENT_ADDR6 aa6;
 	struct store_flow_SRC_ADDR4 sa4;
@@ -137,24 +113,23 @@ store_flow_deserialise(u_int8_t *buf, in
 	bzero(f, sizeof(*f));
 	crc32_start(&crc);
 
-	memcpy(&f->hdr.fields, buf, sizeof(f->hdr.fields));
-
 	if (len < sizeof(f->hdr))
 		SFAILX(STORE_ERR_BUFFER_SIZE,
 		    "supplied length is too small", 1);
 
-	if ((r = store_calc_flow_len((struct store_flow *)buf)) == -1)
-		SFAILX(STORE_ERR_FLOW_INVALID,
-		    "unsupported flow fields specified", 0);
+	memcpy(&f->hdr, buf, sizeof(f->hdr));
 
-	if (len - sizeof(f->hdr) < r)
+	if (STORE_VER_GET_MAJ(f->hdr.version) != STORE_VER_MAJOR)
+		SFAILX(STORE_ERR_UNSUP_VERSION, "Unsupported version", 0);
+	allow_extra = (STORE_VER_GET_MIN(f->hdr.version) > STORE_VER_MINOR);
+
+	if (len - sizeof(f->hdr) < (f->hdr.len_words * 4))
 		SFAILX(STORE_ERR_BUFFER_SIZE,
-		    "calulated flow length is less than supplied len", 1);
+		    "incomplete flow record supplied", 1);
 
 	crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc);
 
 	fields = ntohl(f->hdr.fields);
-
 	offset = sizeof(f->hdr);
 
 #define RFIELD(flag, dest) do { \
@@ -166,6 +141,7 @@ store_flow_deserialise(u_int8_t *buf, in
 				crc32_update((u_char *)&dest, sizeof(dest), \
 				    &crc); \
 			} \
+			fields &= ~STORE_FIELD_##flag; \
 		} } while (0)
 
 	RFIELD(TAG, f->tag);
@@ -187,6 +163,19 @@ store_flow_deserialise(u_int8_t *buf, in
 	RFIELD(FLOW_TIMES, f->ftimes);
 	RFIELD(AS_INFO, f->asinf);
 	RFIELD(FLOW_ENGINE_INFO, f->finf);
+
+	/* Other fields might live here if minor version > ours */
+	if ((fields & ~STORE_FIELD_CRC32) != 0) {
+		if (allow_extra) {
+			/* Skip fields we don't understand */
+			offset = (f->hdr.len_words * 4) + sizeof(f->hdr) - 
+			    sizeof(f->crc32);
+			fields = ntohl(f->hdr.fields) & STORE_FIELD_ALL;
+		} else {
+			/* There shouldn't be any extra if minor_ver <= ours */
+			SFAILX(-1, "Flow has unknown fields", 0);
+		}
+	}
 	RFIELD(CRC32, f->crc32);
 
 	/* Sanity check and convert addresses */
@@ -242,9 +231,7 @@ store_get_flow(int fd, struct store_flow
 	if (r < sizeof(struct store_flow))
 		SFAILX(STORE_ERR_EOF, "EOF reading flow header", 0);
 
-	if ((len = store_calc_flow_len((struct store_flow *)buf)) == -1)
-		SFAILX(STORE_ERR_FLOW_INVALID,
-		    "unsupported flow fields specified", 0);
+	len = ((struct store_flow *)buf)->len_words * 4;
 	if (len > sizeof(buf) - sizeof(struct store_flow))
 		SFAILX(STORE_ERR_INTERNAL,
 		    "Internal error: flow buffer too small", 1);
@@ -259,41 +246,6 @@ store_get_flow(int fd, struct store_flow
 }
 
 int
-store_check_header(int fd, char *ebuf, int elen)
-{
-	struct store_header hdr;
-	int r;
-
-	if ((r = store_get_header(fd, &hdr, ebuf, elen)) != STORE_ERR_OK)
-		return (r);
-
-	/* store_get_header does all the magic & version checks for us */
-
-	return (STORE_ERR_OK);
-}
-
-int
-store_put_header(int fd, char *ebuf, int elen)
-{
-	struct store_header hdr;
-	int r;
-
-	bzero(&hdr, sizeof(hdr));
-	hdr.magic = htonl(STORE_MAGIC);
-	hdr.version = htonl(STORE_VERSION);
-	hdr.start_time = htonl(time(NULL));
-	hdr.flags = htonl(0);
-
-	r = atomicio(vwrite, fd, &hdr, sizeof(hdr));
-	if (r == -1)
-		SFAIL(STORE_ERR_IO, "write error on header", 0);
-	if (r < (ssize_t)sizeof(hdr))
-		SFAILX(STORE_ERR_EOF, "EOF while writing header", 0);
-
-	return (STORE_ERR_OK);
-}
-
-int
 store_flow_serialise(struct store_flow_complete *f, u_int8_t *buf, int buflen,
     int *flowlen, char *ebuf, int elen)
 {
@@ -306,8 +258,9 @@ store_flow_serialise(struct store_flow_c
 	struct store_flow_GATEWAY_ADDR4 gwa4;
 	struct store_flow_GATEWAY_ADDR6 gwa6;
 	u_int32_t fields, crc;
-	int offset;
+	int len, offset;
 
+	f->hdr.version = STORE_VERSION;
 	fields = ntohl(f->hdr.fields);
 
 	/* Convert addresses and set AF fields correctly */
@@ -404,16 +357,24 @@ store_flow_serialise(struct store_flow_c
 		SFAILX(STORE_ERR_FLOW_INVALID, "silly gateway addr af", 1);
 	}
 
-	crc32_start(&crc);
-	offset = 0;
-
 	/* Fields have probably changes as a result of address conversion */
 	f->hdr.fields = htonl(fields);
-	if (store_calc_flow_len(&f->hdr) > buflen)
+
+	len = store_calc_flow_len(&f->hdr);
+	if ((len & 3) != 0)
+		SFAILX(STORE_ERR_INTERNAL, "len & 3 != 0", 1);
+	if (len > buflen)
 		SFAILX(STORE_ERR_BUFFER_SIZE, "flow buffer too small", 1);
+	if (len == -1)
+		SFAILX(STORE_ERR_FLOW_INVALID,
+		    "unsupported flow fields specified", 0);
+	f->hdr.len_words = len / 4;
+	f->hdr.reserved = 0;
+
+	memcpy(buf, &f->hdr, sizeof(f->hdr));
+	offset = sizeof(f->hdr);
 
-	memcpy(buf + offset, &f->hdr, sizeof(f->hdr));
-	offset += sizeof(f->hdr);
+	crc32_start(&crc);
 	crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc);
 
 #define WFIELD(spec, what) do {						\
@@ -451,6 +412,9 @@ store_flow_serialise(struct store_flow_c
 	WFIELD(CRC32, f->crc32);
 #undef WFIELD
 
+	if (len + sizeof(f->hdr) != offset)
+		SFAILX(STORE_ERR_INTERNAL, "len != offset", 1);
+
 	*flowlen = offset;
 	return (STORE_ERR_OK);
 }
@@ -567,8 +531,9 @@ store_format_flow(struct store_flow_comp
 		strlcat(buf, tmp, len);
 	}
 	if (SHASFIELD(RECV_TIME)) {
-		snprintf(tmp, sizeof(tmp), "recv_time %s ",
-		    iso_time(ntohl(flow->recv_time.recv_secs), utc_flag));
+		snprintf(tmp, sizeof(tmp), "recv_time %s.%05d ",
+		    iso_time(ntohl(flow->recv_time.recv_secs), utc_flag),
+		    ntohl(flow->recv_time.recv_usecs));
 		strlcat(buf, tmp, len);
 	}
 	if (SHASFIELD(PROTO_FLAGS_TOS)) {
@@ -661,9 +626,10 @@ store_format_flow(struct store_flow_comp
 	}
 	if (SHASFIELD(FLOW_ENGINE_INFO)) {
 		snprintf(tmp, sizeof(tmp),
-		    "engine_type %u engine_id %u seq %lu ",
+		    "engine_type %u engine_id %u seq %lu source %lu ",
 		    flow->finf.engine_type,  flow->finf.engine_id,
-		    (u_long)ntohl(flow->finf.flow_sequence));
+		    (u_long)ntohl(flow->finf.flow_sequence), 
+		    (u_long)ntohl(flow->finf.source_id));
 		strlcat(buf, tmp, len);
 	}
 	if (SHASFIELD(CRC32)) {
Index: store.h
===================================================================
RCS file: /var/cvs/flowd/store.h,v
retrieving revision 1.26
diff -u -p -u -r1.26 store.h
--- store.h	3 Nov 2004 06:34:02 -0000	1.26
+++ store.h	14 Aug 2005 01:09:04 -0000
@@ -38,14 +38,23 @@ struct store_addr4 {
 	u_int8_t	d[4];
 } __packed;
 
-#define STORE_MAGIC			0x012cf047
-#define STORE_VERSION			0x00000002
-/* Start of flow log file */
-struct store_header {
-	u_int32_t		magic;
-	u_int32_t		version;
-	u_int32_t		start_time;
-	u_int32_t		flags;	/* Currently 0 */
+#define STORE_VER_MIN_MASK	((1 << 5) - 1)
+#define STORE_VER_MAJ_MASK	((1 << 3) - 1)
+#define STORE_MKVER(maj,min)	(((maj & STORE_VER_MAJ_MASK) << 5) | \
+				  (min & STORE_VER_MIN_MASK))
+#define STORE_VER_GET_MAJ(ver)	((ver >> 5) & STORE_VER_MAJ_MASK)
+#define STORE_VER_GET_MIN(ver)	(ver & STORE_VER_MIN_MASK)
+
+#define STORE_VER_MAJOR		3
+#define STORE_VER_MINOR		0
+#define STORE_VERSION		STORE_MKVER(STORE_VER_MAJOR, STORE_VER_MINOR)
+
+/* Start of flow record - present for every flow */
+struct store_flow {
+	u_int8_t		version;
+	u_int8_t		len_words; /* len in 4 byte words not inc hdr */
+	u_int16_t		reserved;
+	u_int32_t		fields;
 } __packed;
 
 /*
@@ -100,11 +109,6 @@ struct store_header {
 					 STORE_FIELD_AGENT_ADDR4|\
 					 STORE_FIELD_AGENT_ADDR6)
 
-/* Start of flow record - present for every flow */
-struct store_flow {
-	u_int32_t		fields;
-} __packed;
-
 /*
  * Optional flow records
  * NB. suffixes must match the corresponding STORE_FIELD_ define (see store.c)
@@ -118,6 +122,7 @@ struct store_flow_TAG {
 /* Optional flow field - present if STORE_FIELD_RECV_TIME */
 struct store_flow_RECV_TIME {
 	u_int32_t		recv_secs;
+	u_int32_t		recv_usecs;
 } __packed;
 
 /* Optional flow field - present if STORE_FIELD_PROTO_FLAGS_TOS */
@@ -182,8 +187,8 @@ struct store_flow_OCTETS {
 
 /* Optional flow field - present if STORE_FIELD_IF_INDICES */
 struct store_flow_IF_INDICES {
-	u_int16_t		if_index_in;
-	u_int16_t		if_index_out;
+	u_int32_t		if_index_in;
+	u_int32_t		if_index_out;
 } __packed;
 
 /* Optional flow field - present if STORE_FIELD_AGENT_INFO */
@@ -203,8 +208,8 @@ struct store_flow_FLOW_TIMES {
 
 /* Optional flow field - present if STORE_FIELD_AS_INFO */
 struct store_flow_AS_INFO {
-	u_int16_t		src_as;
-	u_int16_t		dst_as;
+	u_int32_t		src_as;
+	u_int32_t		dst_as;
 	u_int8_t		src_mask;
 	u_int8_t		dst_mask;
 	u_int16_t		pad;
@@ -216,6 +221,7 @@ struct store_flow_FLOW_ENGINE_INFO {
 	u_int8_t		engine_id;
 	u_int16_t		pad;
 	u_int32_t		flow_sequence;
+	u_int32_t		source_id;
 } __packed;
 
 /* Optional flow field - present if STORE_FIELD_CRC32 */
@@ -257,18 +263,14 @@ struct store_flow_complete {
 #define STORE_ERR_IO_SEEK			0x09
 #define STORE_ERR_CORRUPT			0x10
 
-int store_get_header(int fd, struct store_header *hdr, char *ebuf, int elen);
 int store_get_flow(int fd, struct store_flow_complete *f, char *ebuf, int elen);
-int store_check_header(int fd, char *ebuf, int elen);
-int store_put_header(int fd, char *ebuf, int elen);
 int store_put_flow(int fd, struct store_flow_complete *flow,
     u_int32_t fieldmask, char *ebuf, int elen);
-int store_validate_header(struct store_header *hdr, char *ebuf, int elen);
-int store_calc_flow_len(struct store_flow *hdr);
 int store_flow_deserialise(u_int8_t *buf, int len,
     struct store_flow_complete *f, char *ebuf, int elen);
 int store_flow_serialise(struct store_flow_complete *f, u_int8_t *buf, int buflen,
     int *flowlen, char *ebuf, int elen);
+int store_calc_flow_len(struct store_flow *hdr);
 
 const char *iso_time(time_t t, int utc_flag);
 const char *interval_time(time_t t);


More information about the netflow-tools mailing list