This is part 1 of the Kafka series. The Kafka series is a series of blog posts that will introduce the design and implementation details of Kafka.
The source code referenced in this article uses Kafka’s trunk branch. I’ve pushed it to my personal repo for version alignment with this article:
https://github.com/cyrilwongmy/kafka-src-reading
TCP and Packet Fragmentation
Why Does TCP Require Packet Fragmentation?
Kafka uses TCP as its underlying transport layer protocol. TCP operates as a continuous byte stream without inherent message boundaries. Therefore, any application using TCP must implement its own mechanism to delineate message boundaries.
How to Implement Packet Fragmentation?
When writing data to TCP, the sender must explicitly mark message boundaries. Typically, we first write a fixed 4-byte header indicating the size of the subsequent message (4 bytes can represent messages up to 4GB). The receiver first reads these 4 bytes to determine the message_size
, then reads exactly message_size
bytes from the network to reconstruct the complete message.
| message size (4 bytes) | message | message size (4 bytes) | message | ...
Kafka’s Packet Fragmentation Implementation
The class handling TCP packet fragmentation in Kafka is NetworkReceive
, which implements the approach described above.
Note its definition and constructor. The size
and buffer
fields correspond directly to the message size header and message content in the above section:
/**
* A size-delimited Receive consisting of a 4-byte network-ordered size N
* followed by N bytes of content
*/
public class NetworkReceive implements Receive {
public static final String UNKNOWN_SOURCE = "";
public static final int UNLIMITED = -1;
private static final Logger log = LoggerFactory.getLogger(NetworkReceive.class);
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
private final String source;
private final ByteBuffer size;
private final int maxSize;
private final MemoryPool memoryPool;
private int requestedBufferSize = -1;
private ByteBuffer buffer;
// ...
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
this.source = source;
this.size = ByteBuffer.allocate(4); // 4-byte size header
this.buffer = null; // Will hold message content
this.maxSize = maxSize;
this.memoryPool = memoryPool;
}
Implementing Fragmentation Logic: NetworkReceive.readFrom
This method reads data from a ScatteringByteChannel
(representing network data) into the NetworkReceive
object. The process has two phases:
- Read size header: If the 4-byte size header isn’t fully read, continue reading into the
size
buffer. - Read message content: After obtaining the full size header, convert it to an integer (
receiveSize
), allocate a buffer of that size, and readreceiveSize
bytes from the channel. In this way, we don’t need a dynamically expandable byte array, which reduces the copy of underlying data. Data from network are only written into memory once.
public long readFrom(ScatteringByteChannel channel) throws IOException {
int read = 0;
// Phase 1: Read 4-byte size header
if (size.hasRemaining()) {
int bytesRead = channel.read(size);
if (bytesRead < 0) throw new EOFException();
read += bytesRead;
if (!size.hasRemaining()) { // Header complete
size.rewind();
int receiveSize = size.getInt(); // Extract message size
if (receiveSize < 0) throw new InvalidReceiveException("Invalid size: " + receiveSize);
if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Size " + receiveSize + " exceeds limit " + maxSize);
requestedBufferSize = receiveSize;
if (receiveSize == 0) buffer = EMPTY_BUFFER;
}
}
// Phase 2: Read message content
if (buffer == null && requestedBufferSize != -1) {
buffer = memoryPool.tryAllocate(requestedBufferSize); // Allocate buffer
if (buffer == null) log.trace("Memory low - failed allocation size {} for {}", requestedBufferSize, source);
}
if (buffer != null) {
int bytesRead = channel.read(buffer); // Read content
if (bytesRead < 0) throw new EOFException();
read += bytesRead;
}
return read;
}
Other helper methods are omitted for brevity. Explore the full source in the repo.
Sender Implementation: NetworkSend
On the sender side, we must format messages as message size (4 bytes) + message
.
Kafka’s NetworkSend
differs slightly from NetworkReceive
due to zero-copy optimizations. We’ll focus on the basic in-memory approach here (zero-copy will be covered later).
NetworkSend
uses composition with the Send
interface instead of direct size
/buffer
fields. Crucially, NetworkSend.writeTo()
delegates to its contained Send
object:
public class NetworkSend implements Send {
private final String destinationId;
private final Send send; // Delegates core functionality
// Constructor and methods omitted...
@Override
public long writeTo(TransferableChannel channel) throws IOException {
return send.writeTo(channel); // Delegation
}
}
The Send
interface defines the critical packet writing behavior:
public interface Send {
boolean completed();
long writeTo(TransferableChannel channel) throws IOException; // Core method
long size();
}
The simplest implementation is ByteBufferSend
. Its static factory sizePrefixed()
constructs the required 4-byte header + message format:
public static ByteBufferSend sizePrefixed(ByteBuffer buffer) {
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
sizeBuffer.putInt(0, buffer.remaining()); // Write size header
return new ByteBufferSend(sizeBuffer, buffer); // Header + Content
}
Its writeTo()
method writes all buffers in one operation:
@Override
public long writeTo(TransferableChannel channel) throws IOException {
long written = channel.write(buffers); // Write all buffers
if (written < 0) throw new EOFException();
remaining -= written;
pending = channel.hasPendingWrites();
return written;
}
Kafka’s Network Channel Abstraction
If you’re not familiar with Java NIO API, there are a few resources online.
Kafka wraps Java NIO APIs. The ScatteringByteChannel
used by NetworkReceive
is implemented by PlaintextTransportLayer
:
PlaintextTransportLayer
has a field SocketChannel
, which is used for interacting with internet over socket. PlaintextTransportLayer
delegates handling of read functionality directly to a JDK SocketChannel
. SocketChannel
will perform the actual read from network socket.
@Override
public int read(ByteBuffer dst) throws IOException {
return socketChannel.read(dst); // Direct delegation
}
@Override
public long read(ByteBuffer[] dsts) throws IOException {
return socketChannel.read(dsts);
}
@Override
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
return socketChannel.read(dsts, offset, length);
}
Conclusion
This blog demonstrates how Kafka implements TCP packet fragmentation. You can adapt this pattern to define custom message formats by encoding/decoding your content after handling fragmentation. If you have any question, free feel to leave issues at this repo.