Index: flowd.py =================================================================== RCS file: /var/cvs/flowd/flowd.py,v retrieving revision 1.10 diff -u -p -r1.10 flowd.py --- flowd.py 14 May 2005 07:22:08 -0000 1.10 +++ flowd.py 13 Jul 2005 12:43:17 -0000 @@ -137,22 +141,7 @@ class flow: self.fields = { "fields" : 0 } def from_file(self, flow_file): - # Read flow header - needlen = flowd_serialiser.header_len() - hdr = flow_file.read(needlen) - if len(hdr) == 0: - raise EOFError - if len(hdr) != needlen: - raise ValueError, "Short read on flow header" - - needlen = flowd_serialiser.flow_len(hdr) - flow = flow_file.read(needlen) - if len(flow) == 0: - raise EOFError - if len(flow) != needlen: - raise ValueError, "Short read on flow data" - - self.fields = flowd_serialiser.deserialise(hdr + flow) + flowd_serialiser.read_flow(self.fields, flow_file) def to_file(self, flow_file, field_mask = 0xffffffffL): flow = flowd_serialiser.serialise(self.fields, field_mask) Index: flowd_python.c =================================================================== RCS file: /var/cvs/flowd/flowd_python.c,v retrieving revision 1.4 diff -u -p -r1.4 flowd_python.c --- flowd_python.c 11 Mar 2005 19:07:11 -0000 1.4 +++ flowd_python.c 13 Jul 2005 12:43:17 -0000 @@ -23,6 +23,328 @@ RCSID("$Id: flowd_python.c,v 1.4 2005/03/11 19:07:11 djm Exp $"); static PyObject * +flow_read_flow(PyObject *self, PyObject *args) +{ + PyObject *fields_dict, *infile, *field; + FILE *inf; + u_int8_t buf[sizeof(struct store_flow_complete)]; + size_t o, len; + char ebuf[512], addr_buf[128]; + int version, need, r; + u_int32_t fields; + struct store_flow_complete flow; + + version = STORE_VERSION; + if (!PyArg_ParseTuple(args, "O!O!|k", &PyDict_Type, &fields_dict, + &PyFile_Type, &infile, &version) || fields == NULL || + infile == NULL) + return (NULL); + + if ((inf = PyFile_AsFile(infile)) == NULL) { + PyErr_SetString(PyExc_RuntimeError, "File is invalid"); + return (NULL); + } + + if (version != STORE_VERSION) { + PyErr_SetString(PyExc_NotImplementedError, + "Unsupported store version"); + return (NULL); + } + + if (fread(buf, sizeof(struct store_flow), 1, inf) != 1) { + if (feof(inf)) + PyErr_SetString(PyExc_EOFError, "End of file"); + else + PyErr_SetString(PyExc_IOError, strerror(errno)); + return (NULL); + } + + if ((need = store_calc_flow_len((struct store_flow *)buf)) == -1) { + PyErr_SetString(PyExc_ValueError, "Flow header invalid"); + return (NULL); + } + + if (need + sizeof(struct store_flow) > sizeof(buf)) { + PyErr_SetString(PyExc_ValueError, + "Internal error: buf too small"); + return (NULL); + } + + len = sizeof(struct store_flow); + if (fread(buf + len, need, 1, inf) != 1) { + if (feof(inf)) + PyErr_SetString(PyExc_EOFError, "End of file"); + else + PyErr_SetString(PyExc_IOError, strerror(errno)); + return (NULL); + } + len += o; + + r = store_flow_deserialise(buf, len, &flow, ebuf, sizeof(ebuf)); + if (r != STORE_ERR_OK) { + PyErr_SetString(PyExc_ValueError, ebuf); + return (NULL); + } + + fields = ntohl(flow.hdr.fields); + + field = PyLong_FromUnsignedLong(fields); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "fields", field)) + return (NULL); + Py_DECREF(field); + + if (fields & STORE_FIELD_TAG) { + field = PyLong_FromUnsignedLong(ntohl(flow.tag.tag)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "tag", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_RECV_TIME) { + field = PyLong_FromUnsignedLong(ntohl( + flow.recv_time.recv_secs)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "recv_secs", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_PROTO_FLAGS_TOS) { + field = PyInt_FromLong(flow.pft.tcp_flags); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "tcp_flags", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(flow.pft.protocol); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "protocol", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(flow.pft.tos); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "tos", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & (STORE_FIELD_AGENT_ADDR4|STORE_FIELD_AGENT_ADDR6)) { + addr_ntop(&flow.agent_addr, addr_buf, sizeof(addr_buf)); + field = PyString_FromString(addr_buf); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "agent_addr", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(flow.agent_addr.af); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "agent_addr_af", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & (STORE_FIELD_SRC_ADDR4|STORE_FIELD_SRC_ADDR6)) { + addr_ntop(&flow.src_addr, addr_buf, sizeof(addr_buf)); + field = PyString_FromString(addr_buf); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "src_addr", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(flow.src_addr.af); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "src_addr_af", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & (STORE_FIELD_DST_ADDR4|STORE_FIELD_DST_ADDR6)) { + addr_ntop(&flow.dst_addr, addr_buf, sizeof(addr_buf)); + field = PyString_FromString(addr_buf); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "dst_addr", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(flow.dst_addr.af); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "dst_addr_af", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & (STORE_FIELD_GATEWAY_ADDR4|STORE_FIELD_GATEWAY_ADDR6)) { + addr_ntop(&flow.gateway_addr, addr_buf, sizeof(addr_buf)); + field = PyString_FromString(addr_buf); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "gateway_addr", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(flow.gateway_addr.af); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "gateway_addr_af", + field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_SRCDST_PORT) { + field = PyInt_FromLong(ntohs(flow.ports.src_port)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "src_port", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(ntohs(flow.ports.dst_port)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "dst_port", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_PACKETS) { + field = PyLong_FromUnsignedLongLong( + store_ntohll(flow.packets.flow_packets)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "flow_packets", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_OCTETS) { + field = PyLong_FromUnsignedLongLong( + store_ntohll(flow.octets.flow_octets)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "flow_octets", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_IF_INDICES) { + field = PyInt_FromLong(ntohs(flow.ifndx.if_index_in)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "if_index_in", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(ntohs(flow.ifndx.if_index_out)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "if_index_out", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_AGENT_INFO) { + field = PyLong_FromUnsignedLong( + ntohl(flow.ainfo.sys_uptime_ms)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "sys_uptime_ms", field)) + goto setitem_err; + Py_DECREF(field); + field = PyLong_FromUnsignedLong(ntohl(flow.ainfo.time_sec)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "time_sec", field)) + goto setitem_err; + Py_DECREF(field); + field = PyLong_FromUnsignedLong(ntohl(flow.ainfo.time_nanosec)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "time_nanosec", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(ntohs(flow.ainfo.netflow_version)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "netflow_version", + field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_FLOW_TIMES) { + field = PyLong_FromUnsignedLong(ntohl(flow.ftimes.flow_start)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "flow_start", field)) + goto setitem_err; + Py_DECREF(field); + field = PyLong_FromUnsignedLong(ntohl(flow.ftimes.flow_finish)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "flow_finish", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_AS_INFO) { + field = PyInt_FromLong(ntohs(flow.asinf.src_as)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "src_as", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(ntohs(flow.asinf.dst_as)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "dst_as", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(flow.asinf.src_mask); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "src_mask", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(flow.asinf.dst_mask); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "dst_mask", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_FLOW_ENGINE_INFO) { + field = PyInt_FromLong(flow.finf.engine_type); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "engine_type", field)) + goto setitem_err; + Py_DECREF(field); + field = PyInt_FromLong(flow.finf.engine_id); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "engine_id", field)) + goto setitem_err; + Py_DECREF(field); + field = PyLong_FromUnsignedLong(htonl(flow.finf.flow_sequence)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "flow_sequence", field)) + goto setitem_err; + Py_DECREF(field); + } + if (fields & STORE_FIELD_CRC32) { + field = PyLong_FromUnsignedLong(ntohl(flow.crc32.crc32)); + if (field == NULL) + return (NULL); + if (PyDict_SetItemString(fields_dict, "crc", field)) + goto setitem_err; + Py_DECREF(field); + } + + Py_INCREF(Py_None); + return Py_None; + + setitem_err: + Py_DECREF(field); + return (NULL); +} + +static PyObject * flow_header_length(PyObject *self, PyObject *args) { int version; @@ -542,6 +864,8 @@ static PyMethodDef flowd_methods[] = { PyDoc_STR("Convert a binary flow log record into a dict of fields") }, { "serialise", flow_serialise, METH_VARARGS, PyDoc_STR("Convert a dict flow record into a binary log record") }, + { "read_flow", flow_read_flow, METH_VARARGS, + PyDoc_STR("Read and convert a binary flow log record") }, { NULL, NULL, 0, NULL} /* sentinel */ };