This is part 1 of the Kafka series. The Kafka series is a series of blog posts that will introduce the design and implementation details of Kafka.

The source code referenced in this article uses Kafka’s trunk branch. I’ve pushed it to my personal repo for version alignment with this article:

https://github.com/cyrilwongmy/kafka-src-reading

TCP and Packet Fragmentation

Why Does TCP Require Packet Fragmentation?

Kafka uses TCP as its underlying transport layer protocol. TCP operates as a continuous byte stream without inherent message boundaries. Therefore, any application using TCP must implement its own mechanism to delineate message boundaries.

How to Implement Packet Fragmentation?

When writing data to TCP, the sender must explicitly mark message boundaries. Typically, we first write a fixed 4-byte header indicating the size of the subsequent message (4 bytes can represent messages up to 4GB). The receiver first reads these 4 bytes to determine the message_size, then reads exactly message_size bytes from the network to reconstruct the complete message.

| message size (4 bytes) | message | message size (4 bytes) | message | ...

Kafka’s Packet Fragmentation Implementation

The class handling TCP packet fragmentation in Kafka is NetworkReceive, which implements the approach described above.

Note its definition and constructor. The size and buffer fields correspond directly to the message size header and message content in the above section:

/**
 * A size-delimited Receive consisting of a 4-byte network-ordered size N 
 * followed by N bytes of content
 */
public class NetworkReceive implements Receive {

    public static final String UNKNOWN_SOURCE = "";
    public static final int UNLIMITED = -1;
    private static final Logger log = LoggerFactory.getLogger(NetworkReceive.class);
    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);

    private final String source;
    private final ByteBuffer size;
    private final int maxSize;
    private final MemoryPool memoryPool;
    private int requestedBufferSize = -1;
    private ByteBuffer buffer;

    // ...
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
    this.source = source;
    this.size = ByteBuffer.allocate(4); // 4-byte size header
    this.buffer = null; // Will hold message content
    this.maxSize = maxSize;
    this.memoryPool = memoryPool;
}

Implementing Fragmentation Logic: NetworkReceive.readFrom

This method reads data from a ScatteringByteChannel (representing network data) into the NetworkReceive object. The process has two phases:

  1. Read size header: If the 4-byte size header isn’t fully read, continue reading into the size buffer.
  2. Read message content: After obtaining the full size header, convert it to an integer (receiveSize), allocate a buffer of that size, and read receiveSize bytes from the channel. In this way, we don’t need a dynamically expandable byte array, which reduces the copy of underlying data. Data from network are only written into memory once.
public long readFrom(ScatteringByteChannel channel) throws IOException {
    int read = 0;
    // Phase 1: Read 4-byte size header
    if (size.hasRemaining()) {
        int bytesRead = channel.read(size);
        if (bytesRead < 0) throw new EOFException();
        read += bytesRead;
        if (!size.hasRemaining()) { // Header complete
            size.rewind();
            int receiveSize = size.getInt(); // Extract message size
            if (receiveSize < 0) throw new InvalidReceiveException("Invalid size: " + receiveSize);
            if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Size " + receiveSize + " exceeds limit " + maxSize);
            requestedBufferSize = receiveSize;
            if (receiveSize == 0) buffer = EMPTY_BUFFER;
        }
    }
    // Phase 2: Read message content
    if (buffer == null && requestedBufferSize != -1) {
        buffer = memoryPool.tryAllocate(requestedBufferSize); // Allocate buffer
        if (buffer == null) log.trace("Memory low - failed allocation size {} for {}", requestedBufferSize, source);
    }
    if (buffer != null) {
        int bytesRead = channel.read(buffer); // Read content
        if (bytesRead < 0) throw new EOFException();
        read += bytesRead;
    }
    return read;
}

Other helper methods are omitted for brevity. Explore the full source in the repo.

Sender Implementation: NetworkSend

On the sender side, we must format messages as message size (4 bytes) + message.

Kafka’s NetworkSend differs slightly from NetworkReceive due to zero-copy optimizations. We’ll focus on the basic in-memory approach here (zero-copy will be covered later).

Send

NetworkSend uses composition with the Send interface instead of direct size/buffer fields. Crucially, NetworkSend.writeTo() delegates to its contained Send object:

public class NetworkSend implements Send {
    private final String destinationId;
    private final Send send; // Delegates core functionality

    // Constructor and methods omitted...

    @Override
    public long writeTo(TransferableChannel channel) throws IOException {
        return send.writeTo(channel); // Delegation
    }
}

The Send interface defines the critical packet writing behavior:

public interface Send {
    boolean completed();
    long writeTo(TransferableChannel channel) throws IOException; // Core method
    long size();
}

The simplest implementation is ByteBufferSend. Its static factory sizePrefixed() constructs the required 4-byte header + message format:

public static ByteBufferSend sizePrefixed(ByteBuffer buffer) {
    ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
    sizeBuffer.putInt(0, buffer.remaining()); // Write size header
    return new ByteBufferSend(sizeBuffer, buffer); // Header + Content
}

Kafka

Its writeTo() method writes all buffers in one operation:

@Override
public long writeTo(TransferableChannel channel) throws IOException {
    long written = channel.write(buffers); // Write all buffers
    if (written < 0) throw new EOFException();
    remaining -= written;
    pending = channel.hasPendingWrites();
    return written;
}

Kafka’s Network Channel Abstraction

If you’re not familiar with Java NIO API, there are a few resources online.

Kafka wraps Java NIO APIs. The ScatteringByteChannel used by NetworkReceive is implemented by PlaintextTransportLayer:

Channel

PlaintextTransportLayer  has a field SocketChannel, which is used for interacting with internet over socket. PlaintextTransportLayer delegates handling of read functionality directly to a JDK SocketChannel. SocketChannel will perform the actual read from network socket.

@Override
public int read(ByteBuffer dst) throws IOException {
    return socketChannel.read(dst); // Direct delegation
}

@Override
public long read(ByteBuffer[] dsts) throws IOException {
    return socketChannel.read(dsts);
}

@Override
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
    return socketChannel.read(dsts, offset, length);
}

Conclusion

This blog demonstrates how Kafka implements TCP packet fragmentation. You can adapt this pattern to define custom message formats by encoding/decoding your content after handling fragmentation. If you have any question, free feel to leave issues at this repo.