[netflow-tools] flowd + avici v9 flows problem
Damien Miller
djm at mindrot.org
Sun Aug 14 11:18:47 EST 2005
On Sun, 14 Aug 2005, Damien Miller wrote:
> Here is a better diff. It also includes a "flowd-reader -L" mode that can
> be used to convert old flow logs to the new format. E.g.
This diff is really attached this time :)
-d
-------------- next part --------------
Index: Makefile.in
===================================================================
RCS file: /var/cvs/flowd/Makefile.in,v
retrieving revision 1.25
diff -u -p -u -r1.25 Makefile.in
--- Makefile.in 3 Mar 2005 22:43:09 -0000 1.25
+++ Makefile.in 14 Aug 2005 01:09:04 -0000
@@ -40,7 +40,8 @@ TARGETS=flowd flowd-reader
all: $(TARGETS)
-LIBFLOWD_OBJS= atomicio.o addr.o store.o crc32.o strlcpy.o strlcat.o
+LIBFLOWD_OBJS= atomicio.o addr.o store.o store-v2.o crc32.o \
+ strlcpy.o strlcat.o
LIBFLOWD_HEADERS= addr.h crc32.h store.h
FLOWD_OBJS= flowd.o privsep_fdpass.o privsep.o filter.o \
parse.o log.o daemon.o peer.o \
Index: TODO
===================================================================
RCS file: /var/cvs/flowd/TODO,v
retrieving revision 1.51
diff -u -p -u -r1.51 TODO
--- TODO 14 May 2005 06:08:30 -0000 1.51
+++ TODO 14 Aug 2005 01:09:04 -0000
@@ -53,9 +53,6 @@
- Define net store.h types for:
- min/max_pkt_lngth
- - source_id (important!)
-
-- src/dst AS should be increased in size to 4 bytes
- Discard protocol-specific state when we receive a packet in a different
protocol
Index: configure.ac
===================================================================
RCS file: /var/cvs/flowd/configure.ac,v
retrieving revision 1.21
diff -u -p -u -r1.21 configure.ac
--- configure.ac 14 May 2005 07:22:08 -0000 1.21
+++ configure.ac 14 Aug 2005 01:09:04 -0000
@@ -17,7 +17,7 @@
AC_INIT
AC_CONFIG_SRCDIR([flowd.c])
-PROGVER=0.8.5
+PROGVER=0.9
AC_SUBST(PROGVER)
AC_CONFIG_HEADER(config.h)
Index: flowd-reader.8.in
===================================================================
RCS file: /var/cvs/flowd/flowd-reader.8.in,v
retrieving revision 1.4
diff -u -p -u -r1.4 flowd-reader.8.in
--- flowd-reader.8.in 19 Apr 2005 11:57:41 -0000 1.4
+++ flowd-reader.8.in 14 Aug 2005 01:09:04 -0000
@@ -22,7 +22,7 @@
.Nd Read, filter and concatenate binary flowd logfiles
.Sh SYNOPSIS
.Nm flowd-reader
-.Op Fl Uvqd
+.Op Fl LUvqd
.Op Fl f Ar filter_file
.Op Fl o Ar output_file
.Ar flow_log
@@ -59,6 +59,13 @@ to which all the flows that have been re
.Pp
The command-line options are as follows:
.Bl -tag -width Ds
+.It Fl L
+Allows
+.Nm
+to read legacy version 2 flow logs (generated by
+.Xr flowd 8
+versions prior to v9.0).
+This may be used to convert old flow logs to the newer form.
.It Fl U
Causes
.Nm
Index: flowd-reader.c
===================================================================
RCS file: /var/cvs/flowd/flowd-reader.c,v
retrieving revision 1.15
diff -u -p -u -r1.15 flowd-reader.c
--- flowd-reader.c 4 Feb 2005 06:26:10 -0000 1.15
+++ flowd-reader.c 14 Aug 2005 01:09:04 -0000
@@ -29,6 +29,7 @@
#include "common.h"
#include "flowd.h"
#include "store.h"
+#include "store-v2.h"
#include "atomicio.h"
RCSID("$Id: flowd-reader.c,v 1.15 2005/02/04 06:26:10 djm Exp $");
@@ -40,6 +41,7 @@ usage(void)
PROGNAME);
fprintf(stderr, "This is %s version %s. Valid commandline options:\n",
PROGNAME, PROGVER);
+ fprintf(stderr, " -L Read/convert legacy flow logs\n");
fprintf(stderr, " -q Don't print flows to stdout (use with -o)\n");
fprintf(stderr, " -d Print debugging information\n");
fprintf(stderr, " -f path Filter flows using rule file\n");
@@ -53,48 +55,16 @@ static int
open_start_log(const char *path, int debug)
{
int fd;
- off_t pos;
- char ebuf[512];
if (path == NULL) {
/* Logfile on stdout */
fd = STDOUT_FILENO;
- } else {
- if ((fd = open(path, O_RDWR|O_APPEND|O_CREAT, 0600)) == -1)
- logerr("open(%s)", path);
-
- /* Only write out the header if we are at the start of the file */
- switch ((pos = lseek(fd, 0, SEEK_END))) {
- case 0:
- /* New file, continue below */
- break;
- case -1:
- logerr("lseek");
- default:
- /* Logfile exists, don't write new header */
- if (lseek(fd, 0, SEEK_SET) != 0)
- logerr("lseek");
- if (store_check_header(fd, ebuf, sizeof(ebuf)) !=
- STORE_ERR_OK)
- logerrx("Store error: %s", ebuf);
- if (lseek(fd, 0, SEEK_END) <= 0) {
- fprintf(stderr, "lseek: %s\n", strerror(errno));
- exit(1);
- }
- if (debug) {
- fprintf(stderr, "Continuing with existing "
- "logfile len %lld\n", (long long)pos);
- }
- return (fd);
- }
- }
+ } else if ((fd = open(path, O_RDWR|O_APPEND|O_CREAT, 0600)) == -1)
+ logerr("open(%s)", path);
if (debug)
fprintf(stderr, "Writing new logfile header\n");
- if (store_put_header(fd, ebuf, sizeof(ebuf)) != STORE_ERR_OK)
- logerrx("Store error: %s", ebuf);
-
return (fd);
}
@@ -106,26 +76,30 @@ main(int argc, char **argv)
extern char *optarg;
extern int optind;
struct store_flow_complete flow;
- struct store_header hdr;
+ struct store_v2_flow_complete flow_v2;
char buf[2048], ebuf[512];
const char *ffile, *ofile;
FILE *ffilef;
- int ofd;
+ int ofd, read_legacy;
u_int32_t disp_mask;
struct flowd_config filter_config;
+ struct store_v2_header hdr_v2;
- utc = verbose = debug = 0;
+ utc = verbose = debug = read_legacy = 0;
ofile = ffile = NULL;
ofd = -1;
ffilef = NULL;
bzero(&filter_config, sizeof(filter_config));
- while ((ch = getopt(argc, argv, "Udf:ho:qv")) != -1) {
+ while ((ch = getopt(argc, argv, "LUdf:ho:qv")) != -1) {
switch (ch) {
case 'h':
usage();
return (0);
+ case 'L':
+ read_legacy = 1;
+ break;
case 'U':
utc = 1;
break;
@@ -190,24 +164,37 @@ main(int argc, char **argv)
else if ((fd = open(argv[i], O_RDONLY)) == -1)
logerr("open(%s)", argv[i]);
- if (store_get_header(fd, &hdr, ebuf,
+ if (read_legacy && store_v2_get_header(fd, &hdr_v2, ebuf,
sizeof(ebuf)) != STORE_ERR_OK)
- logerrx("%s", ebuf);
+ logerrx("%s", ebuf);
if (verbose >= 0) {
- printf("LOGFILE %s started at %s\n", argv[i],
- iso_time(ntohl(hdr.start_time), utc));
+ printf("LOGFILE %s", argv[i]);
+ if (read_legacy)
+ printf(" started at %s",
+ iso_time(ntohl(hdr_v2.start_time), utc));
+ printf("\n");
fflush(stdout);
}
for (;;) {
bzero(&flow, sizeof(flow));
- if ((r = store_get_flow(fd, &flow, ebuf,
- sizeof(ebuf))) == STORE_ERR_EOF)
+ if (read_legacy)
+ r = store_v2_get_flow(fd, &flow_v2, ebuf,
+ sizeof(ebuf));
+ else
+ r = store_get_flow(fd, &flow, ebuf,
+ sizeof(ebuf));
+
+ if (r == STORE_ERR_EOF)
break;
else if (r != STORE_ERR_OK)
logerrx("%s", ebuf);
+
+ if (read_legacy &&
+ store_v2_flow_convert(&flow_v2, &flow) == -1)
+ logerrx("legacy flow conversion failed");
if (ffile != NULL && filter_flow(&flow,
&filter_config.filter_list) == FF_ACTION_DISCARD)
Index: flowd.c
===================================================================
RCS file: /var/cvs/flowd/flowd.c,v
retrieving revision 1.56
diff -u -p -u -r1.56 flowd.c
--- flowd.c 28 Apr 2005 09:02:58 -0000 1.56
+++ flowd.c 14 Aug 2005 01:09:04 -0000
@@ -119,37 +119,10 @@ static int
start_log(int monitor_fd)
{
int fd;
- off_t pos;
- char ebuf[512];
if ((fd = client_open_log(monitor_fd)) == -1)
logerrx("Logfile open failed, exiting");
- /* Only write out the header if we are at the start of the file */
- switch ((pos = lseek(fd, 0, SEEK_END))) {
- case 0:
- /* New file, continue below */
- break;
- case -1:
- logerr("%s: lseek error, exiting", __func__);
- default:
- /* Logfile exists, don't write new header */
- if (lseek(fd, 0, SEEK_SET) != 0)
- logerr("%s: lseek error, exiting", __func__);
- if (store_check_header(fd, ebuf, sizeof(ebuf)) != STORE_ERR_OK)
- logerrx("%s: Exiting on %s", __func__, ebuf);
- if (lseek(fd, 0, SEEK_END) <= 0)
- logerr("%s: lseek error, exiting", __func__);
- logit(LOG_DEBUG, "Continuing with existing logfile len %lld",
- (long long)pos);
- return (fd);
- }
-
- logit(LOG_DEBUG, "Writing new logfile header");
-
- if (store_put_header(fd, ebuf, sizeof(ebuf)) != STORE_ERR_OK)
- logerrx("%s: Exiting on %s", __func__, ebuf);
-
return (fd);
}
@@ -168,8 +141,8 @@ process_flow(struct store_flow_complete
/* Prepare for writing */
flow->hdr.fields = htonl(flow->hdr.fields);
-
flow->recv_time.recv_secs = htonl(flow->recv_time.recv_secs);
+ flow->recv_time.recv_usecs = htonl(flow->recv_time.recv_usecs);
if (conf->opts & FLOWD_OPT_VERBOSE) {
char fbuf[1024];
@@ -199,6 +172,7 @@ process_netflow_v1(u_int8_t *pkt, size_t
struct store_flow_complete flow;
size_t offset;
u_int i, nflows;
+ struct timeval tv;
if (len < sizeof(*nf1_hdr)) {
peer->ninvalid++;
@@ -240,7 +214,9 @@ process_netflow_v1(u_int8_t *pkt, size_t
flow.hdr.fields &= ~STORE_FIELD_AS_INFO;
flow.hdr.fields &= ~STORE_FIELD_FLOW_ENGINE_INFO;
- flow.recv_time.recv_secs = time(NULL);
+ gettimeofday(&tv, NULL);
+ flow.recv_time.recv_secs = tv.tv_sec;
+ flow.recv_time.recv_usecs = tv.tv_usec;
flow.pft.tcp_flags = nf1_flow->tcp_flags;
flow.pft.protocol = nf1_flow->protocol;
@@ -288,6 +264,7 @@ process_netflow_v5(u_int8_t *pkt, size_t
struct store_flow_complete flow;
size_t offset;
u_int i, nflows;
+ struct timeval tv;
if (len < sizeof(*nf5_hdr)) {
peer->ninvalid++;
@@ -327,7 +304,9 @@ process_netflow_v5(u_int8_t *pkt, size_t
flow.hdr.fields &= ~STORE_FIELD_DST_ADDR6;
flow.hdr.fields &= ~STORE_FIELD_GATEWAY_ADDR6;
- flow.recv_time.recv_secs = time(NULL);
+ gettimeofday(&tv, NULL);
+ flow.recv_time.recv_secs = tv.tv_sec;
+ flow.recv_time.recv_usecs = tv.tv_usec;
flow.pft.tcp_flags = nf5_flow->tcp_flags;
flow.pft.protocol = nf5_flow->protocol;
@@ -384,6 +363,7 @@ process_netflow_v7(u_int8_t *pkt, size_t
struct store_flow_complete flow;
size_t offset;
u_int i, nflows;
+ struct timeval tv;
if (len < sizeof(*nf7_hdr)) {
peer->ninvalid++;
@@ -429,7 +409,9 @@ process_netflow_v7(u_int8_t *pkt, size_t
* the Cat5k (e.g. destination-only mls nde mode)
*/
- flow.recv_time.recv_secs = time(NULL);
+ gettimeofday(&tv, NULL);
+ flow.recv_time.recv_secs = tv.tv_sec;
+ flow.recv_time.recv_usecs = tv.tv_usec;
flow.pft.tcp_flags = nf7_flow->tcp_flags;
flow.pft.protocol = nf7_flow->protocol;
@@ -660,9 +642,10 @@ nf9_check_rec_len(u_int type, u_int len)
static int
nf9_flowset_to_store(u_int8_t *pkt, size_t len, struct xaddr *flow_source,
struct NF9_HEADER *nf9_hdr, struct peer_nf9_template *template,
- struct store_flow_complete *flow)
+ u_int32_t source_id, struct store_flow_complete *flow)
{
u_int offset, i;
+ struct timeval tv;
if (template->total_len > len)
return (-1);
@@ -675,7 +658,10 @@ nf9_flowset_to_store(u_int8_t *pkt, size
flow->ainfo.time_sec = nf9_hdr->time_sec;
flow->ainfo.netflow_version = nf9_hdr->c.version;
flow->finf.flow_sequence = nf9_hdr->package_sequence;
- flow->recv_time.recv_secs = time(NULL);
+ flow->finf.source_id = htonl(source_id);
+ gettimeofday(&tv, NULL);
+ flow->recv_time.recv_secs = tv.tv_sec;
+ flow->recv_time.recv_usecs = tv.tv_usec;
memcpy(&flow->agent_addr, flow_source, sizeof(flow->agent_addr));
offset = 0;
@@ -693,7 +679,7 @@ nf9_flowset_to_store(u_int8_t *pkt, size
static int
process_netflow_v9_template(u_int8_t *pkt, size_t len, struct peer_state *peer,
- struct peers *peers, u_int source_id)
+ struct peers *peers, u_int32_t source_id)
{
struct NF9_TEMPLATE_FLOWSET_HEADER *tmplh;
struct NF9_TEMPLATE_FLOWSET_RECORD *tmplr;
@@ -785,7 +771,7 @@ process_netflow_v9_template(u_int8_t *pk
static int
process_netflow_v9_data(u_int8_t *pkt, size_t len, struct peer_state *peer,
- u_int source_id, struct NF9_HEADER *nf9_hdr, struct flowd_config *conf,
+ u_int32_t source_id, struct NF9_HEADER *nf9_hdr, struct flowd_config *conf,
int log_fd, u_int *num_flows)
{
struct store_flow_complete *flows;
@@ -835,7 +821,8 @@ process_netflow_v9_data(u_int8_t *pkt, s
for (i = 0; i < num_flowsets; i++) {
if (nf9_flowset_to_store(pkt + offset, template->total_len,
- &peer->from, nf9_hdr, template, &flows[i]) == -1) {
+ &peer->from, nf9_hdr, template, source_id,
+ &flows[i]) == -1) {
peer->ninvalid++;
free(flows);
logit(LOG_WARNING, "invalid netflow v.9 data flowset "
@@ -863,8 +850,8 @@ process_netflow_v9(u_int8_t *pkt, size_t
{
struct NF9_HEADER *nf9_hdr = (struct NF9_HEADER *)pkt;
struct NF9_FLOWSET_HEADER_COMMON *flowset;
- u_int i, count, flowset_id, flowset_len, flowset_flows, total_flows;
- u_int offset, source_id;
+ u_int32_t i, count, flowset_id, flowset_len, flowset_flows;
+ u_int32_t offset, source_id, total_flows;
if (len < sizeof(*nf9_hdr)) {
peer->ninvalid++;
Index: flowd.py
===================================================================
RCS file: /var/cvs/flowd/flowd.py,v
retrieving revision 1.10
diff -u -p -u -r1.10 flowd.py
--- flowd.py 14 May 2005 07:22:08 -0000 1.10
+++ flowd.py 14 Aug 2005 01:09:04 -0000
@@ -14,7 +14,7 @@
# $Id: flowd.py,v 1.10 2005/05/14 07:22:08 djm Exp $
-VERSION = "0.8.5"
+VERSION = "0.9"
import struct
import time
@@ -54,36 +54,8 @@ class log:
self.mode = mode
if mode == "r":
self.flow_file = open(path, "rb")
- # Read header
- hdr = self.flow_file.read(16)
- if len(hdr) != 16:
- raise ValueError, "Short read on flow header"
- (self.magic, self.version, self.start_time, \
- self.flags) = struct.unpack(">IIII", hdr)
-
- if self.magic != 0x012cf047:
- raise ValueError, "Bad magic"
- if self.version != 0x00000002:
- raise ValueError, "Unsupported version"
elif mode == "w" or mode == "a":
self.flow_file = open(path, mode + "b")
- self.flow_file.seek(0, 2)
- if self.flow_file.tell() > 0:
- return
-
- # Write header
- self.magic = 0x012cf047
- self.version = 0x02
- self.flags = 0x00
- self.start_time = start_time
- if start_time is None:
- self.start_time = int(time.time())
- hdr = struct.pack(">IIII", self.magic, self.version, \
- self.start_time, self.flags)
- if len(hdr) != 16:
- raise ValueError, "Internal error: bad header"
- self.flow_file.write(hdr)
- self.flow_file.flush()
else:
raise ValueError, "Invalid mode value";
Index: setup.py
===================================================================
RCS file: /var/cvs/flowd/setup.py,v
retrieving revision 1.8
diff -u -p -u -r1.8 setup.py
--- setup.py 14 May 2005 07:22:08 -0000 1.8
+++ setup.py 14 Aug 2005 01:09:04 -0000
@@ -27,11 +27,11 @@ if __name__ == '__main__':
flowd_serialiser = Extension('flowd_serialiser',
sources = ['flowd_python.c'],
- define_macros = [('PROGVER', '"0.8.5"')],
+ define_macros = [('PROGVER', '"0.9"')],
libraries = ['flowd'],
library_dirs = ['.', '../..'])
setup( name = "flowd",
- version = "0.8.5",
+ version = "0.9",
author = "Damien Miller",
author_email = "djm at mindrot.org",
url = "http://www.mindrot.org/flowd.html",
Index: store-v2.c
===================================================================
RCS file: store-v2.c
diff -N store-v2.c
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ store-v2.c 14 Aug 2005 01:09:04 -0000
@@ -0,0 +1,557 @@
+/*
+ * Copyright (c) 2004 Damien Miller <djm at mindrot.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/types.h>
+
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <time.h>
+#include <poll.h>
+
+#include "common.h"
+#include "store-v2.h"
+#include "atomicio.h"
+#include "crc32.h"
+
+RCSID("$Id$");
+
+/* This is a useful abbreviation, used in several places below */
+#define SHASFIELD(flag) (fields & STORE_V2_FIELD_##flag)
+
+/* Stash error message and return */
+#define SFAILX(i, m, f) do { \
+ if (ebuf != NULL && elen > 0) { \
+ snprintf(ebuf, elen, "%s%s%s", \
+ (f) ? __func__ : "", (f) ? ": " : "", m); \
+ } \
+ return (i); \
+ } while (0)
+
+/* Stash error message, appending strerror into "ebuf" and return */
+#define SFAIL(i, m, f) do { \
+ if (ebuf != NULL && elen > 0) { \
+ snprintf(ebuf, elen, "%s%s%s: %s", \
+ (f) ? __func__ : "", (f) ? ": " : "", m, \
+ strerror(errno)); \
+ } \
+ return (i); \
+ } while (0)
+
+int
+store_v2_validate_header(struct store_v2_header *hdr, char *ebuf, int elen)
+{
+ if (ntohl(hdr->magic) != STORE_V2_MAGIC)
+ SFAILX(STORE_ERR_BAD_MAGIC, "Bad magic", 0);
+ if (ntohl(hdr->version) != STORE_V2_VERSION)
+ SFAILX(STORE_ERR_UNSUP_VERSION, "Unsupported version", 0);
+
+ return (STORE_ERR_OK);
+}
+
+int
+store_v2_get_header(int fd, struct store_v2_header *hdr, char *ebuf, int elen)
+{
+ ssize_t r;
+
+ if ((r = atomicio(read, fd, hdr, sizeof(*hdr))) == -1)
+ SFAIL(STORE_ERR_IO, "read error", 0);
+ if (r < (ssize_t)sizeof(*hdr))
+ SFAILX(STORE_ERR_EOF, "premature EOF", 0);
+
+ return (store_v2_validate_header(hdr, ebuf, elen));
+}
+
+int
+store_v2_calc_flow_len(struct store_v2_flow *hdr)
+{
+ int ret = 0;
+ u_int32_t fields;
+
+ fields = ntohl(hdr->fields);
+#define ADDFIELD(flag) do { \
+ if (SHASFIELD(flag)) { \
+ ret += sizeof(struct store_v2_flow_##flag); \
+ fields &= ~STORE_V2_FIELD_##flag; \
+ } } while (0)
+ ADDFIELD(TAG);
+ ADDFIELD(RECV_TIME);
+ ADDFIELD(PROTO_FLAGS_TOS);
+ ADDFIELD(AGENT_ADDR4);
+ ADDFIELD(AGENT_ADDR6);
+ ADDFIELD(SRC_ADDR4);
+ ADDFIELD(SRC_ADDR6);
+ ADDFIELD(DST_ADDR4);
+ ADDFIELD(DST_ADDR6);
+ ADDFIELD(GATEWAY_ADDR4);
+ ADDFIELD(GATEWAY_ADDR6);
+ ADDFIELD(SRCDST_PORT);
+ ADDFIELD(PACKETS);
+ ADDFIELD(OCTETS);
+ ADDFIELD(IF_INDICES);
+ ADDFIELD(AGENT_INFO);
+ ADDFIELD(FLOW_TIMES);
+ ADDFIELD(AS_INFO);
+ ADDFIELD(FLOW_ENGINE_INFO);
+ ADDFIELD(CRC32);
+#undef ADDFIELD
+
+ /* Make sure we have captured everything */
+ if (fields != 0)
+ return (-1);
+
+ return (ret);
+}
+
+int
+store_v2_flow_deserialise(u_int8_t *buf, int len, struct store_v2_flow_complete *f,
+ char *ebuf, int elen)
+{
+ int offset, r;
+ struct store_v2_flow_AGENT_ADDR4 aa4;
+ struct store_v2_flow_AGENT_ADDR6 aa6;
+ struct store_v2_flow_SRC_ADDR4 sa4;
+ struct store_v2_flow_SRC_ADDR6 sa6;
+ struct store_v2_flow_DST_ADDR4 da4;
+ struct store_v2_flow_DST_ADDR6 da6;
+ struct store_v2_flow_GATEWAY_ADDR4 ga4;
+ struct store_v2_flow_GATEWAY_ADDR6 ga6;
+ u_int32_t fields, crc;
+
+ bzero(f, sizeof(*f));
+ crc32_start(&crc);
+
+ memcpy(&f->hdr.fields, buf, sizeof(f->hdr.fields));
+
+ if (len < sizeof(f->hdr))
+ SFAILX(STORE_ERR_BUFFER_SIZE,
+ "supplied length is too small", 1);
+
+ if ((r = store_v2_calc_flow_len((struct store_v2_flow *)buf)) == -1)
+ SFAILX(STORE_ERR_FLOW_INVALID,
+ "unsupported flow fields specified", 0);
+
+ if (len - sizeof(f->hdr) < r)
+ SFAILX(STORE_ERR_BUFFER_SIZE,
+ "calulated flow length is less than supplied len", 1);
+
+ crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc);
+
+ fields = ntohl(f->hdr.fields);
+
+ offset = sizeof(f->hdr);
+
+#define RFIELD(flag, dest) do { \
+ if (SHASFIELD(flag)) { \
+ memcpy(&dest, buf + offset, sizeof(dest)); \
+ offset += sizeof(dest); \
+ if (SHASFIELD(CRC32) && \
+ STORE_V2_FIELD_##flag != STORE_V2_FIELD_CRC32) { \
+ crc32_update((u_char *)&dest, sizeof(dest), \
+ &crc); \
+ } \
+ } } while (0)
+
+ RFIELD(TAG, f->tag);
+ RFIELD(RECV_TIME, f->recv_time);
+ RFIELD(PROTO_FLAGS_TOS, f->pft);
+ RFIELD(AGENT_ADDR4, aa4);
+ RFIELD(AGENT_ADDR6, aa6);
+ RFIELD(SRC_ADDR4, sa4);
+ RFIELD(SRC_ADDR6, sa6);
+ RFIELD(DST_ADDR4, da4);
+ RFIELD(DST_ADDR6, da6);
+ RFIELD(GATEWAY_ADDR4, ga4);
+ RFIELD(GATEWAY_ADDR6, ga6);
+ RFIELD(SRCDST_PORT, f->ports);
+ RFIELD(PACKETS, f->packets);
+ RFIELD(OCTETS, f->octets);
+ RFIELD(IF_INDICES, f->ifndx);
+ RFIELD(AGENT_INFO, f->ainfo);
+ RFIELD(FLOW_TIMES, f->ftimes);
+ RFIELD(AS_INFO, f->asinf);
+ RFIELD(FLOW_ENGINE_INFO, f->finf);
+ RFIELD(CRC32, f->crc32);
+
+ /* Sanity check and convert addresses */
+ if (SHASFIELD(AGENT_ADDR4) && SHASFIELD(AGENT_ADDR6))
+ SFAILX(-1, "Flow has both v4/v6 agent addrs", 0);
+ if (SHASFIELD(SRC_ADDR4) && SHASFIELD(SRC_ADDR6))
+ SFAILX(-1, "Flow has both v4/v6 src addrs", 0);
+ if (SHASFIELD(DST_ADDR4) && SHASFIELD(DST_ADDR6))
+ SFAILX(-1, "Flow has both v4/v6 dst addrs", 0);
+ if (SHASFIELD(GATEWAY_ADDR4) && SHASFIELD(GATEWAY_ADDR6))
+ SFAILX(-1, "Flow has both v4/v6 gateway addrs", 0);
+
+#define S_CPYADDR(d, s, fam) do { \
+ (d).af = (fam == 4) ? AF_INET : AF_INET6; \
+ memcpy(&d.v##fam, &s, sizeof(d.v##fam)); \
+ } while (0)
+
+ if (SHASFIELD(AGENT_ADDR4))
+ S_CPYADDR(f->agent_addr, aa4.flow_agent_addr, 4);
+ if (SHASFIELD(AGENT_ADDR6))
+ S_CPYADDR(f->agent_addr, aa6.flow_agent_addr, 6);
+ if (SHASFIELD(SRC_ADDR4))
+ S_CPYADDR(f->src_addr, sa4.src_addr, 4);
+ if (SHASFIELD(SRC_ADDR6))
+ S_CPYADDR(f->src_addr, sa6.src_addr, 6);
+ if (SHASFIELD(DST_ADDR4))
+ S_CPYADDR(f->dst_addr, da4.dst_addr, 4);
+ if (SHASFIELD(DST_ADDR6))
+ S_CPYADDR(f->dst_addr, da6.dst_addr, 6);
+ if (SHASFIELD(GATEWAY_ADDR4))
+ S_CPYADDR(f->gateway_addr, ga4.gateway_addr, 4);
+ if (SHASFIELD(GATEWAY_ADDR6))
+ S_CPYADDR(f->gateway_addr, ga6.gateway_addr, 6);
+
+ if (SHASFIELD(CRC32) && crc != ntohl(f->crc32.crc32))
+ SFAILX(STORE_ERR_CRC_MISMATCH, "Flow checksum mismatch", 0);
+
+#undef S_CPYADDR
+#undef RFIELD
+
+ return (STORE_ERR_OK);
+}
+
+int
+store_v2_get_flow(int fd, struct store_v2_flow_complete *f, char *ebuf, int elen)
+{
+ int r, len;
+ u_int8_t buf[512];
+
+ /* Read header */
+ if ((r = atomicio(read, fd, buf, sizeof(struct store_v2_flow))) == -1)
+ SFAIL(STORE_ERR_IO, "read flow header", 0);
+ if (r < sizeof(struct store_v2_flow))
+ SFAILX(STORE_ERR_EOF, "EOF reading flow header", 0);
+
+ if ((len = store_v2_calc_flow_len((struct store_v2_flow *)buf)) == -1)
+ SFAILX(STORE_ERR_FLOW_INVALID,
+ "unsupported flow fields specified", 0);
+ if (len > sizeof(buf) - sizeof(struct store_v2_flow))
+ SFAILX(STORE_ERR_INTERNAL,
+ "Internal error: flow buffer too small", 1);
+
+ if ((r = atomicio(read, fd, buf + sizeof(struct store_v2_flow), len)) == -1)
+ SFAIL(STORE_ERR_IO, "read flow data", 0);
+ if (r < len)
+ SFAILX(STORE_ERR_EOF, "EOF reading flow data", 0);
+
+ return (store_v2_flow_deserialise(buf, len + sizeof(struct store_v2_flow),
+ f, ebuf, elen));
+}
+
+int
+store_v2_check_header(int fd, char *ebuf, int elen)
+{
+ struct store_v2_header hdr;
+ int r;
+
+ if ((r = store_v2_get_header(fd, &hdr, ebuf, elen)) != STORE_ERR_OK)
+ return (r);
+
+ /* store_get_header does all the magic & version checks for us */
+
+ return (STORE_ERR_OK);
+}
+
+int
+store_v2_put_header(int fd, char *ebuf, int elen)
+{
+ struct store_v2_header hdr;
+ int r;
+
+ bzero(&hdr, sizeof(hdr));
+ hdr.magic = htonl(STORE_V2_MAGIC);
+ hdr.version = htonl(STORE_V2_VERSION);
+ hdr.start_time = htonl(time(NULL));
+ hdr.flags = htonl(0);
+
+ r = atomicio(vwrite, fd, &hdr, sizeof(hdr));
+ if (r == -1)
+ SFAIL(STORE_ERR_IO, "write error on header", 0);
+ if (r < (ssize_t)sizeof(hdr))
+ SFAILX(STORE_ERR_EOF, "EOF while writing header", 0);
+
+ return (STORE_ERR_OK);
+}
+
+int
+store_v2_flow_serialise(struct store_v2_flow_complete *f, u_int8_t *buf, int buflen,
+ int *flowlen, char *ebuf, int elen)
+{
+ struct store_v2_flow_AGENT_ADDR4 aa4;
+ struct store_v2_flow_AGENT_ADDR6 aa6;
+ struct store_v2_flow_SRC_ADDR4 sa4;
+ struct store_v2_flow_SRC_ADDR6 sa6;
+ struct store_v2_flow_DST_ADDR4 da4;
+ struct store_v2_flow_DST_ADDR6 da6;
+ struct store_v2_flow_GATEWAY_ADDR4 gwa4;
+ struct store_v2_flow_GATEWAY_ADDR6 gwa6;
+ u_int32_t fields, crc;
+ int offset;
+
+ fields = ntohl(f->hdr.fields);
+
+ /* Convert addresses and set AF fields correctly */
+ /* XXX this is too repetitive */
+ switch(f->agent_addr.af) {
+ case AF_INET:
+ if ((fields & STORE_V2_FIELD_AGENT_ADDR4) == 0)
+ break;
+ memcpy(&aa4.flow_agent_addr, &f->agent_addr.v4,
+ sizeof(aa4.flow_agent_addr));
+ fields |= STORE_V2_FIELD_AGENT_ADDR4;
+ fields &= ~STORE_V2_FIELD_AGENT_ADDR6;
+ break;
+ case AF_INET6:
+ if ((fields & STORE_V2_FIELD_AGENT_ADDR6) == 0)
+ break;
+ memcpy(&aa6.flow_agent_addr, &f->agent_addr.v6,
+ sizeof(aa6.flow_agent_addr));
+ fields |= STORE_V2_FIELD_AGENT_ADDR6;
+ fields &= ~STORE_V2_FIELD_AGENT_ADDR4;
+ break;
+ default:
+ if ((fields & STORE_V2_FIELD_AGENT_ADDR) == 0)
+ break;
+ SFAILX(STORE_ERR_FLOW_INVALID, "silly agent addr af", 1);
+ }
+
+ switch(f->src_addr.af) {
+ case AF_INET:
+ if ((fields & STORE_V2_FIELD_SRC_ADDR4) == 0)
+ break;
+ memcpy(&sa4.src_addr, &f->src_addr.v4,
+ sizeof(sa4.src_addr));
+ fields |= STORE_V2_FIELD_SRC_ADDR4;
+ fields &= ~STORE_V2_FIELD_SRC_ADDR6;
+ break;
+ case AF_INET6:
+ if ((fields & STORE_V2_FIELD_SRC_ADDR6) == 0)
+ break;
+ memcpy(&sa6.src_addr, &f->src_addr.v6,
+ sizeof(sa6.src_addr));
+ fields |= STORE_V2_FIELD_SRC_ADDR6;
+ fields &= ~STORE_V2_FIELD_SRC_ADDR4;
+ break;
+ default:
+ if ((fields & STORE_V2_FIELD_SRC_ADDR) == 0)
+ break;
+ SFAILX(STORE_ERR_FLOW_INVALID, "silly src addrs af", 1);
+ }
+
+ switch(f->dst_addr.af) {
+ case AF_INET:
+ if ((fields & STORE_V2_FIELD_DST_ADDR4) == 0)
+ break;
+ memcpy(&da4.dst_addr, &f->dst_addr.v4,
+ sizeof(da4.dst_addr));
+ fields |= STORE_V2_FIELD_DST_ADDR4;
+ fields &= ~STORE_V2_FIELD_DST_ADDR6;
+ break;
+ case AF_INET6:
+ if ((fields & STORE_V2_FIELD_DST_ADDR6) == 0)
+ break;
+ memcpy(&da6.dst_addr, &f->dst_addr.v6,
+ sizeof(da6.dst_addr));
+ fields |= STORE_V2_FIELD_DST_ADDR6;
+ fields &= ~STORE_V2_FIELD_DST_ADDR4;
+ break;
+ default:
+ if ((fields & STORE_V2_FIELD_DST_ADDR) == 0)
+ break;
+ SFAILX(STORE_ERR_FLOW_INVALID, "silly dst addrs af", 1);
+ }
+
+ switch(f->gateway_addr.af) {
+ case AF_INET:
+ if ((fields & STORE_V2_FIELD_GATEWAY_ADDR4) == 0)
+ break;
+ memcpy(&gwa4.gateway_addr, &f->gateway_addr.v4,
+ sizeof(gwa4.gateway_addr));
+ fields |= STORE_V2_FIELD_GATEWAY_ADDR4;
+ fields &= ~STORE_V2_FIELD_GATEWAY_ADDR6;
+ break;
+ case AF_INET6:
+ if ((fields & STORE_V2_FIELD_GATEWAY_ADDR6) == 0)
+ break;
+ memcpy(&gwa6.gateway_addr, &f->gateway_addr.v6,
+ sizeof(gwa6.gateway_addr));
+ fields |= STORE_V2_FIELD_GATEWAY_ADDR6;
+ fields &= ~STORE_V2_FIELD_GATEWAY_ADDR4;
+ break;
+ default:
+ if ((fields & STORE_V2_FIELD_GATEWAY_ADDR) == 0)
+ break;
+ SFAILX(STORE_ERR_FLOW_INVALID, "silly gateway addr af", 1);
+ }
+
+ crc32_start(&crc);
+ offset = 0;
+
+ /* Fields have probably changes as a result of address conversion */
+ f->hdr.fields = htonl(fields);
+ if (store_v2_calc_flow_len(&f->hdr) > buflen)
+ SFAILX(STORE_ERR_BUFFER_SIZE, "flow buffer too small", 1);
+
+ memcpy(buf + offset, &f->hdr, sizeof(f->hdr));
+ offset += sizeof(f->hdr);
+ crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc);
+
+#define WFIELD(spec, what) do { \
+ if (SHASFIELD(spec)) { \
+ memcpy(buf + offset, &(what), sizeof(what)); \
+ offset += sizeof(what); \
+ if (SHASFIELD(spec) && \
+ (STORE_V2_FIELD_##spec != STORE_V2_FIELD_CRC32)) { \
+ crc32_update((u_char *)&(what), sizeof(what), \
+ &crc); \
+ } \
+ } } while (0)
+
+ WFIELD(TAG, f->tag);
+ WFIELD(RECV_TIME, f->recv_time);
+ WFIELD(PROTO_FLAGS_TOS, f->pft);
+ WFIELD(AGENT_ADDR4, aa4);
+ WFIELD(AGENT_ADDR6, aa6);
+ WFIELD(SRC_ADDR4, sa4);
+ WFIELD(SRC_ADDR6, sa6);
+ WFIELD(DST_ADDR4, da4);
+ WFIELD(DST_ADDR6, da6);
+ WFIELD(GATEWAY_ADDR4, gwa4);
+ WFIELD(GATEWAY_ADDR6, gwa6);
+ WFIELD(SRCDST_PORT, f->ports);
+ WFIELD(PACKETS, f->packets);
+ WFIELD(OCTETS, f->octets);
+ WFIELD(IF_INDICES, f->ifndx);
+ WFIELD(AGENT_INFO, f->ainfo);
+ WFIELD(FLOW_TIMES, f->ftimes);
+ WFIELD(AS_INFO, f->asinf);
+ WFIELD(FLOW_ENGINE_INFO, f->finf);
+ if (fields & (STORE_V2_FIELD_CRC32))
+ f->crc32.crc32 = htonl(crc);
+ WFIELD(CRC32, f->crc32);
+#undef WFIELD
+
+ *flowlen = offset;
+ return (STORE_ERR_OK);
+}
+
+int
+store_v2_put_flow(int fd, struct store_v2_flow_complete *flow, u_int32_t fieldmask,
+ char *ebuf, int elen)
+{
+ u_int32_t fields, origfields;
+ off_t startpos;
+ u_int8_t buf[512];
+ int len, r, saved_errno, ispipe = 0;
+
+ /* Remember where we started, so we can back errors out */
+ if ((startpos = lseek(fd, 0, SEEK_CUR)) == -1) {
+ if (errno == ESPIPE)
+ ispipe = 1;
+ else
+ SFAIL(STORE_ERR_IO_SEEK, "lseek", 1);
+ }
+
+ origfields = ntohl(flow->hdr.fields);
+ fields = origfields & fieldmask;
+ flow->hdr.fields = htonl(fields);
+
+ r = store_v2_flow_serialise(flow, buf, sizeof(buf), &len, ebuf, elen);
+ if (r != STORE_ERR_OK) {
+ flow->hdr.fields = htonl(origfields);
+ return (r);
+ }
+
+ r = atomicio(vwrite, fd, buf, len);
+ saved_errno = errno;
+ flow->hdr.fields = htonl(origfields);
+
+ if (r == len)
+ return (STORE_ERR_OK);
+
+ if (ispipe)
+ SFAIL(STORE_ERR_CORRUPT, "corrupting failure on pipe", 1);
+
+ /* Try to rewind to starting position, so we don't corrupt flow store */
+ if (lseek(fd, startpos, SEEK_SET) == -1)
+ SFAIL(STORE_ERR_CORRUPT, "corrupting failure on lseek", 1);
+ if (ftruncate(fd, startpos) == -1)
+ SFAIL(STORE_ERR_CORRUPT, "corrupting failure on ftruncate", 1);
+
+ /* Partial flow record has been removed, return with orig. error */
+ errno = saved_errno;
+ if (r == -1)
+ SFAIL(STORE_ERR_IO, "write flow", 0);
+ else
+ SFAILX(STORE_ERR_EOF, "EOF on write flow", 0);
+}
+
+int
+store_v2_flow_convert(struct store_v2_flow_complete *fv2,
+ struct store_flow_complete *f)
+{
+ int len;
+
+ bzero(f, sizeof(*f));
+ f->hdr.version = STORE_VERSION;
+ f->hdr.fields = fv2->hdr.fields;
+ if ((len = store_calc_flow_len(&f->hdr)) == -1)
+ return (-1);
+ f->hdr.len_words = len / 4;
+
+ f->tag.tag = fv2->tag.tag;
+ f->recv_time.recv_secs = fv2->recv_time.recv_secs;
+ f->pft.tcp_flags = fv2->pft.tcp_flags;
+ f->pft.protocol = fv2->pft.protocol;
+ f->pft.tos = fv2->pft.tos;
+ f->pft.pad = fv2->pft.pad;
+ f->agent_addr = fv2->agent_addr;
+ f->src_addr = fv2->src_addr;
+ f->dst_addr = fv2->dst_addr;
+ f->gateway_addr = fv2->gateway_addr;
+ f->ports.src_port = fv2->ports.src_port;
+ f->ports.dst_port = fv2->ports.dst_port;
+ f->packets.flow_packets = fv2->packets.flow_packets;
+ f->octets.flow_octets = fv2->octets.flow_octets;
+ f->ifndx.if_index_in = htonl(ntohs(fv2->ifndx.if_index_in));
+ f->ifndx.if_index_out = htonl(ntohs(fv2->ifndx.if_index_out));
+ f->ainfo.sys_uptime_ms = fv2->ainfo.sys_uptime_ms;
+ f->ainfo.time_sec = fv2->ainfo.time_sec;
+ f->ainfo.time_nanosec = fv2->ainfo.time_nanosec;
+ f->ainfo.netflow_version = fv2->ainfo.netflow_version;
+ f->ainfo.pad = fv2->ainfo.pad;
+ f->ftimes.flow_start = fv2->ftimes.flow_start;
+ f->ftimes.flow_finish = fv2->ftimes.flow_finish;
+ f->asinf.src_as = htonl(ntohs(fv2->asinf.src_as));
+ f->asinf.dst_as = htonl(ntohs(fv2->asinf.dst_as));
+ f->asinf.src_mask = fv2->asinf.src_mask;
+ f->asinf.dst_mask = fv2->asinf.dst_mask;
+ f->asinf.pad = fv2->asinf.pad;
+ f->finf.engine_type = fv2->finf.engine_type;
+ f->finf.engine_id = fv2->finf.engine_id;
+ f->finf.pad = fv2->finf.pad;
+ f->finf.flow_sequence = fv2->finf.flow_sequence;
+ f->crc32.crc32 = fv2->crc32.crc32;
+
+ return (0);
+}
Index: store-v2.h
===================================================================
RCS file: store-v2.h
diff -N store-v2.h
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ store-v2.h 14 Aug 2005 01:09:04 -0000
@@ -0,0 +1,255 @@
+/* $Id$ */
+
+/*
+ * Copyright (c) 2004 Damien Miller <djm at mindrot.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+/* On-disk storage format */
+
+#ifndef _STORE_V2_H
+#define _STORE_v2_H
+
+#if defined(HAVE_SYS_CDEFS_H)
+# include <sys/cdefs.h> /* For __packed, etc on platforms that have it */
+#endif
+#if defined(__GNUC__) && !defined(__packed)
+# define __packed __attribute__((__packed__))
+#endif
+
+#include "addr.h"
+#include "store.h"
+
+#define STORE_V2_MAGIC 0x012cf047
+#define STORE_V2_VERSION 0x00000002
+/* Start of flow log file */
+struct store_v2_header {
+ u_int32_t magic;
+ u_int32_t version;
+ u_int32_t start_time;
+ u_int32_t flags; /* Currently 0 */
+} __packed;
+
+/*
+ * Optional flow fields, specify what is stored for the flow
+ * NB - the flow records appear in this order on disk
+ */
+#define STORE_V2_FIELD_TAG (1U)
+#define STORE_V2_FIELD_RECV_TIME (1U<<1)
+#define STORE_V2_FIELD_PROTO_FLAGS_TOS (1U<<2)
+#define STORE_V2_FIELD_AGENT_ADDR4 (1U<<3)
+#define STORE_V2_FIELD_AGENT_ADDR6 (1U<<4)
+#define STORE_V2_FIELD_SRC_ADDR4 (1U<<5)
+#define STORE_V2_FIELD_SRC_ADDR6 (1U<<6)
+#define STORE_V2_FIELD_DST_ADDR4 (1U<<7)
+#define STORE_V2_FIELD_DST_ADDR6 (1U<<8)
+#define STORE_V2_FIELD_GATEWAY_ADDR4 (1U<<9)
+#define STORE_V2_FIELD_GATEWAY_ADDR6 (1U<<10)
+#define STORE_V2_FIELD_SRCDST_PORT (1U<<11)
+#define STORE_V2_FIELD_PACKETS (1U<<12)
+#define STORE_V2_FIELD_OCTETS (1U<<13)
+#define STORE_V2_FIELD_IF_INDICES (1U<<14)
+#define STORE_V2_FIELD_AGENT_INFO (1U<<15)
+#define STORE_V2_FIELD_FLOW_TIMES (1U<<16)
+#define STORE_V2_FIELD_AS_INFO (1U<<17)
+#define STORE_V2_FIELD_FLOW_ENGINE_INFO (1U<<18)
+/* ... more one day */
+
+#define STORE_V2_FIELD_CRC32 (1U<<30)
+#define STORE_V2_FIELD_RESERVED (1U<<31) /* For extension header */
+#define STORE_V2_FIELD_ALL (((1U<<19)-1)|STORE_V2_FIELD_CRC32)
+
+/* Useful combinations */
+#define STORE_V2_FIELD_AGENT_ADDR (STORE_V2_FIELD_AGENT_ADDR4|\
+ STORE_V2_FIELD_AGENT_ADDR6)
+#define STORE_V2_FIELD_SRC_ADDR (STORE_V2_FIELD_SRC_ADDR4|\
+ STORE_V2_FIELD_SRC_ADDR6)
+#define STORE_V2_FIELD_DST_ADDR (STORE_V2_FIELD_DST_ADDR4|\
+ STORE_V2_FIELD_DST_ADDR6)
+#define STORE_V2_FIELD_SRCDST_ADDR (STORE_V2_FIELD_SRC_ADDR|\
+ STORE_V2_FIELD_DST_ADDR)
+#define STORE_V2_FIELD_GATEWAY_ADDR (STORE_V2_FIELD_GATEWAY_ADDR4|\
+ STORE_V2_FIELD_GATEWAY_ADDR6)
+
+#define STORE_V2_DISPLAY_ALL STORE_V2_FIELD_ALL
+#define STORE_V2_DISPLAY_BRIEF (STORE_V2_FIELD_TAG|\
+ STORE_V2_FIELD_RECV_TIME|\
+ STORE_V2_FIELD_PROTO_FLAGS_TOS|\
+ STORE_V2_FIELD_SRCDST_PORT|\
+ STORE_V2_FIELD_PACKETS|\
+ STORE_V2_FIELD_OCTETS|\
+ STORE_V2_FIELD_SRCDST_ADDR|\
+ STORE_V2_FIELD_AGENT_ADDR4|\
+ STORE_V2_FIELD_AGENT_ADDR6)
+
+/* Start of flow record - present for every flow */
+struct store_v2_flow {
+ u_int32_t fields;
+} __packed;
+
+/*
+ * Optional flow records
+ * NB. suffixes must match the corresponding STORE_FIELD_ define (see store.c)
+ */
+
+/* Optional flow field - present if STORE_FIELD_TAG */
+struct store_v2_flow_TAG {
+ u_int32_t tag; /* set by filter */
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_RECV_TIME */
+struct store_v2_flow_RECV_TIME {
+ u_int32_t recv_secs;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_PROTO_FLAGS_TOS */
+struct store_v2_flow_PROTO_FLAGS_TOS {
+ u_int8_t tcp_flags;
+ u_int8_t protocol;
+ u_int8_t tos;
+ u_int8_t pad;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_AGENT_ADDR */
+struct store_v2_flow_AGENT_ADDR4 {
+ struct store_addr4 flow_agent_addr;
+} __packed;
+struct store_v2_flow_AGENT_ADDR6 {
+ struct store_addr6 flow_agent_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_SRC_ADDR4 */
+struct store_v2_flow_SRC_ADDR4 {
+ struct store_addr4 src_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_DST_ADDR4 */
+struct store_v2_flow_DST_ADDR4 {
+ struct store_addr4 dst_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_SRC_ADDR6 */
+struct store_v2_flow_SRC_ADDR6 {
+ struct store_addr6 src_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_DST_ADDR6 */
+struct store_v2_flow_DST_ADDR6 {
+ struct store_addr6 dst_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_GATEWAY_ADDR */
+struct store_v2_flow_GATEWAY_ADDR4 {
+ struct store_addr4 gateway_addr;
+} __packed;
+struct store_v2_flow_GATEWAY_ADDR6 {
+ struct store_addr6 gateway_addr;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_SRCDST_PORT */
+struct store_v2_flow_SRCDST_PORT {
+ u_int16_t src_port;
+ u_int16_t dst_port;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_PACKETS */
+struct store_v2_flow_PACKETS {
+ u_int64_t flow_packets;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_OCTETS */
+struct store_v2_flow_OCTETS {
+ u_int64_t flow_octets;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_IF_INDICES */
+struct store_v2_flow_IF_INDICES {
+ u_int16_t if_index_in;
+ u_int16_t if_index_out;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_AGENT_INFO */
+struct store_v2_flow_AGENT_INFO {
+ u_int32_t sys_uptime_ms;
+ u_int32_t time_sec;
+ u_int32_t time_nanosec;
+ u_int16_t netflow_version;
+ u_int16_t pad;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_FLOW_TIMES */
+struct store_v2_flow_FLOW_TIMES {
+ u_int32_t flow_start;
+ u_int32_t flow_finish;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_AS_INFO */
+struct store_v2_flow_AS_INFO {
+ u_int16_t src_as;
+ u_int16_t dst_as;
+ u_int8_t src_mask;
+ u_int8_t dst_mask;
+ u_int16_t pad;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_FLOW_ENGINE_INFO */
+struct store_v2_flow_FLOW_ENGINE_INFO {
+ u_int8_t engine_type;
+ u_int8_t engine_id;
+ u_int16_t pad;
+ u_int32_t flow_sequence;
+} __packed;
+
+/* Optional flow field - present if STORE_FIELD_CRC32 */
+struct store_v2_flow_CRC32 {
+ u_int32_t crc32;
+} __packed;
+
+/* A abstract flow record (all fields included) */
+struct store_v2_flow_complete {
+ struct store_v2_flow hdr;
+ struct store_v2_flow_TAG tag;
+ struct store_v2_flow_RECV_TIME recv_time;
+ struct store_v2_flow_PROTO_FLAGS_TOS pft;
+ struct xaddr agent_addr;
+ struct xaddr src_addr;
+ struct xaddr dst_addr;
+ struct xaddr gateway_addr;
+ struct store_v2_flow_SRCDST_PORT ports;
+ struct store_v2_flow_PACKETS packets;
+ struct store_v2_flow_OCTETS octets;
+ struct store_v2_flow_IF_INDICES ifndx;
+ struct store_v2_flow_AGENT_INFO ainfo;
+ struct store_v2_flow_FLOW_TIMES ftimes;
+ struct store_v2_flow_AS_INFO asinf;
+ struct store_v2_flow_FLOW_ENGINE_INFO finf;
+ struct store_v2_flow_CRC32 crc32;
+} __packed;
+
+int store_v2_get_header(int fd, struct store_v2_header *hdr, char *ebuf, int elen);
+int store_v2_get_flow(int fd, struct store_v2_flow_complete *f, char *ebuf, int elen);
+int store_v2_check_header(int fd, char *ebuf, int elen);
+int store_v2_put_header(int fd, char *ebuf, int elen);
+int store_v2_put_flow(int fd, struct store_v2_flow_complete *flow,
+ u_int32_t fieldmask, char *ebuf, int elen);
+int store_v2_validate_header(struct store_v2_header *hdr, char *ebuf, int elen);
+int store_v2_calc_flow_len(struct store_v2_flow *hdr);
+int store_v2_flow_deserialise(u_int8_t *buf, int len,
+ struct store_v2_flow_complete *f, char *ebuf, int elen);
+int store_v2_flow_serialise(struct store_v2_flow_complete *f, u_int8_t *buf, int buflen,
+ int *flowlen, char *ebuf, int elen);
+int store_v2_flow_convert(struct store_v2_flow_complete *fv2,
+ struct store_flow_complete *f);
+
+#endif /* _STORE_V2_H */
Index: store.c
===================================================================
RCS file: /var/cvs/flowd/store.c,v
retrieving revision 1.30
diff -u -p -u -r1.30 store.c
--- store.c 4 Feb 2005 06:26:10 -0000 1.30
+++ store.c 14 Aug 2005 01:09:04 -0000
@@ -55,30 +55,6 @@ RCSID("$Id: store.c,v 1.30 2005/02/04 06
} while (0)
int
-store_validate_header(struct store_header *hdr, char *ebuf, int elen)
-{
- if (ntohl(hdr->magic) != STORE_MAGIC)
- SFAILX(STORE_ERR_BAD_MAGIC, "Bad magic", 0);
- if (ntohl(hdr->version) != STORE_VERSION)
- SFAILX(STORE_ERR_UNSUP_VERSION, "Unsupported version", 0);
-
- return (STORE_ERR_OK);
-}
-
-int
-store_get_header(int fd, struct store_header *hdr, char *ebuf, int elen)
-{
- ssize_t r;
-
- if ((r = atomicio(read, fd, hdr, sizeof(*hdr))) == -1)
- SFAIL(STORE_ERR_IO, "read error", 0);
- if (r < (ssize_t)sizeof(*hdr))
- SFAILX(STORE_ERR_EOF, "premature EOF", 0);
-
- return (store_validate_header(hdr, ebuf, elen));
-}
-
-int
store_calc_flow_len(struct store_flow *hdr)
{
int ret = 0;
@@ -123,7 +99,7 @@ int
store_flow_deserialise(u_int8_t *buf, int len, struct store_flow_complete *f,
char *ebuf, int elen)
{
- int offset, r;
+ int offset, allow_extra;
struct store_flow_AGENT_ADDR4 aa4;
struct store_flow_AGENT_ADDR6 aa6;
struct store_flow_SRC_ADDR4 sa4;
@@ -137,24 +113,23 @@ store_flow_deserialise(u_int8_t *buf, in
bzero(f, sizeof(*f));
crc32_start(&crc);
- memcpy(&f->hdr.fields, buf, sizeof(f->hdr.fields));
-
if (len < sizeof(f->hdr))
SFAILX(STORE_ERR_BUFFER_SIZE,
"supplied length is too small", 1);
- if ((r = store_calc_flow_len((struct store_flow *)buf)) == -1)
- SFAILX(STORE_ERR_FLOW_INVALID,
- "unsupported flow fields specified", 0);
+ memcpy(&f->hdr, buf, sizeof(f->hdr));
- if (len - sizeof(f->hdr) < r)
+ if (STORE_VER_GET_MAJ(f->hdr.version) != STORE_VER_MAJOR)
+ SFAILX(STORE_ERR_UNSUP_VERSION, "Unsupported version", 0);
+ allow_extra = (STORE_VER_GET_MIN(f->hdr.version) > STORE_VER_MINOR);
+
+ if (len - sizeof(f->hdr) < (f->hdr.len_words * 4))
SFAILX(STORE_ERR_BUFFER_SIZE,
- "calulated flow length is less than supplied len", 1);
+ "incomplete flow record supplied", 1);
crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc);
fields = ntohl(f->hdr.fields);
-
offset = sizeof(f->hdr);
#define RFIELD(flag, dest) do { \
@@ -166,6 +141,7 @@ store_flow_deserialise(u_int8_t *buf, in
crc32_update((u_char *)&dest, sizeof(dest), \
&crc); \
} \
+ fields &= ~STORE_FIELD_##flag; \
} } while (0)
RFIELD(TAG, f->tag);
@@ -187,6 +163,19 @@ store_flow_deserialise(u_int8_t *buf, in
RFIELD(FLOW_TIMES, f->ftimes);
RFIELD(AS_INFO, f->asinf);
RFIELD(FLOW_ENGINE_INFO, f->finf);
+
+ /* Other fields might live here if minor version > ours */
+ if ((fields & ~STORE_FIELD_CRC32) != 0) {
+ if (allow_extra) {
+ /* Skip fields we don't understand */
+ offset = (f->hdr.len_words * 4) + sizeof(f->hdr) -
+ sizeof(f->crc32);
+ fields = ntohl(f->hdr.fields) & STORE_FIELD_ALL;
+ } else {
+ /* There shouldn't be any extra if minor_ver <= ours */
+ SFAILX(-1, "Flow has unknown fields", 0);
+ }
+ }
RFIELD(CRC32, f->crc32);
/* Sanity check and convert addresses */
@@ -242,9 +231,7 @@ store_get_flow(int fd, struct store_flow
if (r < sizeof(struct store_flow))
SFAILX(STORE_ERR_EOF, "EOF reading flow header", 0);
- if ((len = store_calc_flow_len((struct store_flow *)buf)) == -1)
- SFAILX(STORE_ERR_FLOW_INVALID,
- "unsupported flow fields specified", 0);
+ len = ((struct store_flow *)buf)->len_words * 4;
if (len > sizeof(buf) - sizeof(struct store_flow))
SFAILX(STORE_ERR_INTERNAL,
"Internal error: flow buffer too small", 1);
@@ -259,41 +246,6 @@ store_get_flow(int fd, struct store_flow
}
int
-store_check_header(int fd, char *ebuf, int elen)
-{
- struct store_header hdr;
- int r;
-
- if ((r = store_get_header(fd, &hdr, ebuf, elen)) != STORE_ERR_OK)
- return (r);
-
- /* store_get_header does all the magic & version checks for us */
-
- return (STORE_ERR_OK);
-}
-
-int
-store_put_header(int fd, char *ebuf, int elen)
-{
- struct store_header hdr;
- int r;
-
- bzero(&hdr, sizeof(hdr));
- hdr.magic = htonl(STORE_MAGIC);
- hdr.version = htonl(STORE_VERSION);
- hdr.start_time = htonl(time(NULL));
- hdr.flags = htonl(0);
-
- r = atomicio(vwrite, fd, &hdr, sizeof(hdr));
- if (r == -1)
- SFAIL(STORE_ERR_IO, "write error on header", 0);
- if (r < (ssize_t)sizeof(hdr))
- SFAILX(STORE_ERR_EOF, "EOF while writing header", 0);
-
- return (STORE_ERR_OK);
-}
-
-int
store_flow_serialise(struct store_flow_complete *f, u_int8_t *buf, int buflen,
int *flowlen, char *ebuf, int elen)
{
@@ -306,8 +258,9 @@ store_flow_serialise(struct store_flow_c
struct store_flow_GATEWAY_ADDR4 gwa4;
struct store_flow_GATEWAY_ADDR6 gwa6;
u_int32_t fields, crc;
- int offset;
+ int len, offset;
+ f->hdr.version = STORE_VERSION;
fields = ntohl(f->hdr.fields);
/* Convert addresses and set AF fields correctly */
@@ -404,16 +357,24 @@ store_flow_serialise(struct store_flow_c
SFAILX(STORE_ERR_FLOW_INVALID, "silly gateway addr af", 1);
}
- crc32_start(&crc);
- offset = 0;
-
/* Fields have probably changes as a result of address conversion */
f->hdr.fields = htonl(fields);
- if (store_calc_flow_len(&f->hdr) > buflen)
+
+ len = store_calc_flow_len(&f->hdr);
+ if ((len & 3) != 0)
+ SFAILX(STORE_ERR_INTERNAL, "len & 3 != 0", 1);
+ if (len > buflen)
SFAILX(STORE_ERR_BUFFER_SIZE, "flow buffer too small", 1);
+ if (len == -1)
+ SFAILX(STORE_ERR_FLOW_INVALID,
+ "unsupported flow fields specified", 0);
+ f->hdr.len_words = len / 4;
+ f->hdr.reserved = 0;
+
+ memcpy(buf, &f->hdr, sizeof(f->hdr));
+ offset = sizeof(f->hdr);
- memcpy(buf + offset, &f->hdr, sizeof(f->hdr));
- offset += sizeof(f->hdr);
+ crc32_start(&crc);
crc32_update((u_char *)&f->hdr, sizeof(f->hdr), &crc);
#define WFIELD(spec, what) do { \
@@ -451,6 +412,9 @@ store_flow_serialise(struct store_flow_c
WFIELD(CRC32, f->crc32);
#undef WFIELD
+ if (len + sizeof(f->hdr) != offset)
+ SFAILX(STORE_ERR_INTERNAL, "len != offset", 1);
+
*flowlen = offset;
return (STORE_ERR_OK);
}
@@ -567,8 +531,9 @@ store_format_flow(struct store_flow_comp
strlcat(buf, tmp, len);
}
if (SHASFIELD(RECV_TIME)) {
- snprintf(tmp, sizeof(tmp), "recv_time %s ",
- iso_time(ntohl(flow->recv_time.recv_secs), utc_flag));
+ snprintf(tmp, sizeof(tmp), "recv_time %s.%05d ",
+ iso_time(ntohl(flow->recv_time.recv_secs), utc_flag),
+ ntohl(flow->recv_time.recv_usecs));
strlcat(buf, tmp, len);
}
if (SHASFIELD(PROTO_FLAGS_TOS)) {
@@ -661,9 +626,10 @@ store_format_flow(struct store_flow_comp
}
if (SHASFIELD(FLOW_ENGINE_INFO)) {
snprintf(tmp, sizeof(tmp),
- "engine_type %u engine_id %u seq %lu ",
+ "engine_type %u engine_id %u seq %lu source %lu ",
flow->finf.engine_type, flow->finf.engine_id,
- (u_long)ntohl(flow->finf.flow_sequence));
+ (u_long)ntohl(flow->finf.flow_sequence),
+ (u_long)ntohl(flow->finf.source_id));
strlcat(buf, tmp, len);
}
if (SHASFIELD(CRC32)) {
Index: store.h
===================================================================
RCS file: /var/cvs/flowd/store.h,v
retrieving revision 1.26
diff -u -p -u -r1.26 store.h
--- store.h 3 Nov 2004 06:34:02 -0000 1.26
+++ store.h 14 Aug 2005 01:09:04 -0000
@@ -38,14 +38,23 @@ struct store_addr4 {
u_int8_t d[4];
} __packed;
-#define STORE_MAGIC 0x012cf047
-#define STORE_VERSION 0x00000002
-/* Start of flow log file */
-struct store_header {
- u_int32_t magic;
- u_int32_t version;
- u_int32_t start_time;
- u_int32_t flags; /* Currently 0 */
+#define STORE_VER_MIN_MASK ((1 << 5) - 1)
+#define STORE_VER_MAJ_MASK ((1 << 3) - 1)
+#define STORE_MKVER(maj,min) (((maj & STORE_VER_MAJ_MASK) << 5) | \
+ (min & STORE_VER_MIN_MASK))
+#define STORE_VER_GET_MAJ(ver) ((ver >> 5) & STORE_VER_MAJ_MASK)
+#define STORE_VER_GET_MIN(ver) (ver & STORE_VER_MIN_MASK)
+
+#define STORE_VER_MAJOR 3
+#define STORE_VER_MINOR 0
+#define STORE_VERSION STORE_MKVER(STORE_VER_MAJOR, STORE_VER_MINOR)
+
+/* Start of flow record - present for every flow */
+struct store_flow {
+ u_int8_t version;
+ u_int8_t len_words; /* len in 4 byte words not inc hdr */
+ u_int16_t reserved;
+ u_int32_t fields;
} __packed;
/*
@@ -100,11 +109,6 @@ struct store_header {
STORE_FIELD_AGENT_ADDR4|\
STORE_FIELD_AGENT_ADDR6)
-/* Start of flow record - present for every flow */
-struct store_flow {
- u_int32_t fields;
-} __packed;
-
/*
* Optional flow records
* NB. suffixes must match the corresponding STORE_FIELD_ define (see store.c)
@@ -118,6 +122,7 @@ struct store_flow_TAG {
/* Optional flow field - present if STORE_FIELD_RECV_TIME */
struct store_flow_RECV_TIME {
u_int32_t recv_secs;
+ u_int32_t recv_usecs;
} __packed;
/* Optional flow field - present if STORE_FIELD_PROTO_FLAGS_TOS */
@@ -182,8 +187,8 @@ struct store_flow_OCTETS {
/* Optional flow field - present if STORE_FIELD_IF_INDICES */
struct store_flow_IF_INDICES {
- u_int16_t if_index_in;
- u_int16_t if_index_out;
+ u_int32_t if_index_in;
+ u_int32_t if_index_out;
} __packed;
/* Optional flow field - present if STORE_FIELD_AGENT_INFO */
@@ -203,8 +208,8 @@ struct store_flow_FLOW_TIMES {
/* Optional flow field - present if STORE_FIELD_AS_INFO */
struct store_flow_AS_INFO {
- u_int16_t src_as;
- u_int16_t dst_as;
+ u_int32_t src_as;
+ u_int32_t dst_as;
u_int8_t src_mask;
u_int8_t dst_mask;
u_int16_t pad;
@@ -216,6 +221,7 @@ struct store_flow_FLOW_ENGINE_INFO {
u_int8_t engine_id;
u_int16_t pad;
u_int32_t flow_sequence;
+ u_int32_t source_id;
} __packed;
/* Optional flow field - present if STORE_FIELD_CRC32 */
@@ -257,18 +263,14 @@ struct store_flow_complete {
#define STORE_ERR_IO_SEEK 0x09
#define STORE_ERR_CORRUPT 0x10
-int store_get_header(int fd, struct store_header *hdr, char *ebuf, int elen);
int store_get_flow(int fd, struct store_flow_complete *f, char *ebuf, int elen);
-int store_check_header(int fd, char *ebuf, int elen);
-int store_put_header(int fd, char *ebuf, int elen);
int store_put_flow(int fd, struct store_flow_complete *flow,
u_int32_t fieldmask, char *ebuf, int elen);
-int store_validate_header(struct store_header *hdr, char *ebuf, int elen);
-int store_calc_flow_len(struct store_flow *hdr);
int store_flow_deserialise(u_int8_t *buf, int len,
struct store_flow_complete *f, char *ebuf, int elen);
int store_flow_serialise(struct store_flow_complete *f, u_int8_t *buf, int buflen,
int *flowlen, char *ebuf, int elen);
+int store_calc_flow_len(struct store_flow *hdr);
const char *iso_time(time_t t, int utc_flag);
const char *interval_time(time_t t);
More information about the netflow-tools
mailing list