https://questdb.io/blog/debugging-distributed-database-mysteries-with-rust-pcap-and-polars/ QuestDB Compare EnterpriseDocsBlogCommunity Search 8.1.0 * Roadmap LoginDownload QuestDB Debugging distributed database mysteries with Rust, packet capture and Polars Adam Cimarosti Adam Cimarosti Core Database Engineer July 29, 2024 * 12 min read Tags: * engineering * questdb * rust * performance * python * The performance problem * Capturing the network traffic * Analysing the network traffic * Summary QuestDB is a high performance time-series database with SQL analytics that can power through data ingestion and analysis. It's open source and integrates with many tools and languages. Give us a try! --------------------------------------------------------------------- A few months back I was working on the primary-replica replication feature in QuestDB. The feature was nearing completion of development but we had a report that it was using a significant amounts of network bandwidth. During the process of debugging the issue I ended up implementing my own quick'n'dirty network profiling tooling. This is a blog post on how I wrote the tool and how I analysed the data. The performance problem A bit of context first: Our database instances do not replicate directly point-to-point. Instead, they compress and upload the table's WAL (write-ahead log) files to a central object store such as AWS S3 or Azure Blob store. In other words, we store each table's full edit history in a series of files in the object store that we can replay back at any time. This architecture: * decouples deployment * allows rebuilding a database instance up to a specific transaction point * is generally more scalable By delegating the work to an external component, it relieves the primary database instance from having to deal with serving the replica instances. Each uploaded file (segment) contains a block of transactions. We overwrite the last file over and over with more transactions until it's large enough to roll over to the next file. We need to do this because the compatible object stores we use, for the most part, do not support appending. Being a time series database, users would typically stream (ingest) records into the database at a relatively constant rate (consuming network bandwidth inbound). The primary database instance would then write artifacts to the object store (consuming network bandwidth outbound). On an early deployment on DigitalOcean, we noticed that our outbound object store bandwidth usage was significantly higher than the inbound ingestion bandwith usage. Not only that, but the outbound object store bandwidth usage kept growing hour by hour. And it did so despite a constant ingestion rate. I've lost the original metrics and numbers, but it would have looked something like the following diagram (use your imagination to imagine the usual network bandwidth usage noise): [replicatio] Needless to say that the replication bandwidth should be roughly proportional to the ingestion bandwidth and not grow over time! Capturing the network traffic I needed to understand what was going on accurately, so at first I turned to Wireshark. I hoped I could perform a simulated test run and capture a time series about packet sizes for both the inbound and outbound connections, but struggled to do this. It's likely possible to accomplish this in Wireshark, but I find its UI pretty daunting, and I'm more comfortable with writing code. Wireshark is built on packet capture, so instead I grabbed my favourite tool (Rust) and used the pcap crate (wrapping the libpcap C library) to capture the two time series. For each connection, whenever I'd observe a packet I'd capture two fields: * The packet's timestamp (i64 epoch nanos timestamp). * The packet's size (u64). After writing a script to generate some load, I set up my test with s3s-fs - a binary that emulates an AWS S3 endpoint, then ran a primary QuestDB instance on the same machine. I could now monitor the database's inbound traffic on port 9000 and the replication traffic on port 10101 using a net-traffic-capture tool I I wrote for this purpose in Rust. Here are the first few lines of the main function of the packet capture tool. They run a loop over the intercepted traffic for the specific ports over the loopback device. For the full code see github.com/questdb/replication-stats -> / net-traffic-capture/. fn main() -> anyhow::Result<()> { // ... let ports: HashSet<_> = ports.into_iter().collect(); let writer_queue = writer::Writer::run(dir); let device = get_loopback_device()?; let mut cap = Capture::from_device(device)? .promisc(false) .snaplen(128) .timeout(1) .buffer_size(4 * 1024 * 1024) .open()?; cap.filter("tcp", true)?; let link_type = cap.get_datalink(); loop { let packet = ignore_timeouts(cap.next_packet())?; let Some(packet) = packet else { continue; }; let tcp_data = parse_tcp(&packet, link_type)?; let Some(tcp_data) = tcp_data else { continue; }; // ... } } The byte parsing logic here is implemented using the excellent etherparse crate. fn parse_tcp( packet: &Packet, link_type: Linktype ) -> anyhow::Result> { if packet.header.caplen < 32 { return Ok(None); } let ipv4data = match link_type { Linktype::NULL => &packet.data[4..], Linktype::ETHERNET => skip_ethernet_header(packet.data)?, _ => return Err(anyhow::anyhow!( "Unsupported link type: {:?}", link_type)), }; let sliced = SlicedPacket::from_ip(ipv4data)?; let Some(InternetSlice::Ipv4(ipv4slice, _)) = sliced.ip else { return Ok(None); }; let src_addr = ipv4slice.source_addr(); let dest_addr = ipv4slice.destination_addr(); let TransportSlice::Tcp(tcp) = sliced.transport.unwrap() else { return Ok(None); }; let src_port = tcp.source_port(); let dest_port = tcp.destination_port(); let link_bytes_len = 4; let data_offset = link_bytes_len + ((ipv4slice.ihl() * 4) + (tcp.data_offset() * 4)) as usize; let tcp_data = TcpData { ts: to_system_time(packet.header.ts), src: Addr { ip: src_addr, port: src_port, }, dest: Addr { ip: dest_addr, port: dest_port, }, data_offset, flags: TcpMeta::from_tcp_header(&tcp), }; Ok(Some(tcp_data)) } Once parsed, I had to write these time series metrics to disk in a format I could analyse easily later. Being a database engineer, I've got this: I re-implemented a tiny subset of QuestDB's ingestion logic in Rust. One thread sits on the pcap loop listening and parsing network packets, passing messages over a message queue to another thread responsible to append the time series to disk. I could have captured the data into another QuestDB instance, but I was concerned that this might skew the results. The thread that's responsible for disk writing uses growable memory mapped files to write the data into a columnar format. In other words, for each of the two time series, there's a timestamp column and a packet size column. // writer.rs struct U64ColWriter { file: std::fs::File, mmap: MmapMut, len: u64, cap: u64, } impl U64ColWriter { fn new(path: &Path) -> io::Result { let cap = increment(); let file = std::fs::OpenOptions::new() .read(true) .write(true) .create(true) .truncate(true) .open(path) .unwrap(); file.set_len(cap)?; let mmap = unsafe { MmapMut::map_mut(&file)? }; Ok(Self { file, mmap, len: 0, cap, }) } fn may_resize(&mut self) -> io::Result<()> { let required = self.len + size_of::() as u64; if required < self.cap { return Ok(()); } self.cap += increment(); self.file.set_len(self.cap)?; self.mmap = unsafe { MmapMut::map_mut(&self.file)? }; Ok(()) } fn append(&mut self, val: u64) -> io::Result<()> { self.may_resize()?; let offset = self.len as usize; self.mmap[offset..offset + size_of::()] .copy_from_slice(&val.to_le_bytes()); self.len += size_of::() as u64; Ok(()) } } All we need now is to add an extra .count file to track the number of written rows. The code updates it last after writing the two column files. impl DatapointWriter { fn new(path: &Path) -> io::Result { let ts_writer = U64ColWriter::new( &path.with_extension("ts"))?; let val_writer = U64ColWriter::new( &path.with_extension("val"))?; let count_writer = CountWriter::new( &path.with_extension("count"))?; Ok(Self { ts_writer, val_writer, count_writer, }) } fn append(&mut self, ts: u64, val: u64) -> io::Result<()> { self.ts_writer.append(ts)?; self.val_writer.append(val)?; self.count_writer.increment() } } Memory mapped files reduce the number of system calls and also allow the kernel to decide when to flush the data to disk in a very efficient manner. What's more, once written to memory, the data is safe within the kernel and can survive a process crash (but not a kernel panic or a sudden power cut). This makes it "transactional enough" for most use cases, and certainly for this one. The reason for using two threads is that writing to memory mapped files is still a blocking IO operation and I did not want to slow down the pcap loop. Analysing the network traffic Now over to my second favourite tool: Python. The column files written are effectively just packed 64-bit integers. A few lines of pyarrow later and the collected samples are loaded into a pyarrow.Table and from that, loaded zero-copy into a polars.DataFrame. For the full code see github.com/questdb/replication-stats -> / analisys/. # analisys/reader.py def read_col(count, dtype, path): with open(path, 'rb') as col_file: mem = mmap.mmap( col_file.fileno(), length=count * 8, access=mmap.ACCESS_READ) buf = pa.py_buffer(mem) return pa.Array.from_buffers(dtype, count, [None, buf]) def read_port_table(port, name, data_dir): data_dir = Path(data_dir) with open(data_dir / f'{port}.count', 'rb') as f: count = struct.unpack('