Jetty I/O Architecture
Jetty libraries (both client and server) use Java NIO to handle I/O, so that at its core Jetty I/O is completely non-blocking.
Jetty I/O: SelectorManager
The core class of Jetty I/O is SelectorManager
.
SelectorManager
manages internally a configurable number of ManagedSelector
s.
Each ManagedSelector
wraps an instance of java.nio.channels.Selector
that in turn manages a number of java.nio.channels.SocketChannel
instances.
TODO: add image |
SocketChannel
instances can be created by clients when connecting to a server and by a server when accepting connections from clients.
In both cases the SocketChannel
instance is passed to SelectorManager
(which passes it to ManagedSelector
and eventually to java.nio.channels.Selector
) to be registered for use within Jetty.
It is possible for an application to create the SocketChannel
instances outside Jetty, even perform some initial network traffic also outside Jetty (for example for authentication purposes), and then pass the SocketChannel
instance to SelectorManager
for use within Jetty.
This example shows how a client can connect to a server:
public void connect(SelectorManager selectorManager, Map<String, Object> context) throws IOException
{
String host = "host";
int port = 8080;
// Create an unconnected SocketChannel.
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// Connect and register to Jetty.
if (socketChannel.connect(new InetSocketAddress(host, port)))
selectorManager.accept(socketChannel, context);
else
selectorManager.connect(socketChannel, context);
}
This example shows how a server accepts a client connection:
public void accept(ServerSocketChannel acceptor, SelectorManager selectorManager) throws IOException
{
// Wait until a client connects.
SocketChannel socketChannel = acceptor.accept();
socketChannel.configureBlocking(false);
// Accept and register to Jetty.
Object attachment = null;
selectorManager.accept(socketChannel, attachment);
}
Jetty I/O: EndPoint
and Connection
SocketChannel
s that are passed to SelectorManager
are wrapped into two related components: an EndPoint
and a Connection
.
EndPoint
is the Jetty abstraction for a SocketChannel
or a DatagramChannel
: you can read bytes from an EndPoint
, you can write bytes to an EndPoint
, you can close an EndPoint
, etc.
Connection
is the Jetty abstraction that is responsible to read bytes from the EndPoint
and to deserialize the read bytes into objects.
For example, an HTTP/1.1 server-side Connection
implementation is responsible to deserialize HTTP/1.1 request bytes into an HTTP request object.
Conversely, an HTTP/1.1 client-side Connection
implementation is responsible to deserialize HTTP/1.1 response bytes into an HTTP response object.
Connection
is the abstraction that implements the reading side of a specific protocol such as HTTP/1.1, or HTTP/2, or HTTP/3, or WebSocket: it is able to read incoming communication in that protocol.
The writing side for a specific protocol may be implemented in the Connection
but may also be implemented in other components, although eventually the bytes to be written will be written through the EndPoint
.
While there are primarily only two implementations of EndPoint
,SocketChannelEndPoint
for TCP and DatagramChannelEndPoint
for UDP (used both on the client-side and on the server-side), there are many implementations of Connection
, typically two for each protocol (one for the client-side and one for the server-side).
The EndPoint
and Connection
pairs can be chained, for example in case of encrypted communication using the TLS protocol.
There is an EndPoint
and Connection
TLS pair where the EndPoint
reads the encrypted bytes from the socket and the Connection
decrypts them; next in the chain there is an EndPoint
and Connection
pair where the EndPoint
"reads" decrypted bytes (provided by the previous Connection
) and the Connection
deserializes them into specific protocol objects (for example HTTP/2 frame objects).
Certain protocols, such as WebSocket, start the communication with the server using one protocol (for example, HTTP/1.1), but then change the communication to use another protocol (for example, WebSocket).
EndPoint
supports changing the Connection
object on-the-fly via EndPoint.upgrade(Connection)
.
This allows to use the HTTP/1.1 Connection
during the initial communication and later to replace it with a WebSocket Connection
.
SelectorManager
is an abstract class because while it knows how to create concrete EndPoint
instances, it does not know how to create protocol specific Connection
instances.
Creating Connection
instances is performed on the server-side by ConnectionFactory
s and on the client-side by ClientConnectionFactory
s.
On the server-side, the component that aggregates a SelectorManager
with a set of ConnectionFactory
s is ServerConnector
for TCP sockets, QuicServerConnector
for QUIC sockets, and UnixDomainServerConnector
for Unix-Domain sockets (see the server-side architecture section for more information).
On the client-side, the components that aggregates a SelectorManager
with a set of ClientConnectionFactory
s are HttpClientTransport
subclasses (see the client-side architecture section for more information).
Jetty I/O: EndPoint
The Jetty I/O library use Java NIO to handle I/O, so that I/O is non-blocking.
At the Java NIO level, in order to be notified when a SocketChannel
or DatagramChannel
has data to be read, the SelectionKey.OP_READ
flag must be set.
In the Jetty I/O library, you can call EndPoint.fillInterested(Callback)
to declare interest in the "read" (also called "fill") event, and the Callback
parameter is the object that is notified when such an event occurs.
At the Java NIO level, a SocketChannel
or DatagramChannel
is always writable, unless it becomes congested.
In order to be notified when a channel uncongests and it is therefore writable again, the SelectionKey.OP_WRITE
flag must be set.
In the Jetty I/O library, you can call EndPoint.write(Callback, ByteBuffer…)
to write the ByteBuffer
s and the Callback
parameter is the object that is notified when the whole write is finished (i.e. all ByteBuffer
s have been fully written, even if they are delayed by congestion/uncongestion).
The EndPoint
APIs abstract out the Java NIO details by providing non-blocking APIs based on Callback
objects for I/O operations.
The EndPoint
APIs are typically called by Connection
implementations, see this section.
Jetty I/O: Connection
Connection
is the abstraction that deserializes incoming bytes into objects, for example an HTTP request object or a WebSocket frame object, that can be used by more abstract layers.
Connection
instances have two lifecycle methods:
-
Connection.onOpen()
, invoked when theConnection
is associated with theEndPoint
-
Connection.onClose(Throwable)
, invoked when theConnection
is disassociated from theEndPoint
, where theThrowable
parameter indicates whether the disassociation was normal (when the parameter isnull
) or was due to an error (when the parameter is notnull
)
When a Connection
is first created, it is not registered for any Java NIO event.
It is therefore typical to implement onOpen()
to call EndPoint.fillInterested(Callback)
so that the Connection
declares interest for read events and it is invoked (via the Callback
) when the read event happens.
Abstract class AbstractConnection
partially implements Connection
and provides simpler APIs.
The example below shows a typical implementation that extends AbstractConnection
:
// Extend AbstractConnection to inherit basic implementation.
class MyConnection extends AbstractConnection
{
public MyConnection(EndPoint endPoint, Executor executor)
{
super(endPoint, executor);
}
@Override
public void onOpen()
{
super.onOpen();
// Declare interest for fill events.
fillInterested();
}
@Override
public void onFillable()
{
// Called when a fill event happens.
}
}
Jetty I/O: TCP Network Echo
With the concepts above it is now possible to write a simple, fully non-blocking, Connection
implementation that simply echoes the bytes that it reads back to the other peer.
A naive, but wrong, implementation may be the following:
class WrongEchoConnection extends AbstractConnection implements Callback
{
public WrongEchoConnection(EndPoint endPoint, Executor executor)
{
super(endPoint, executor);
}
@Override
public void onOpen()
{
super.onOpen();
// Declare interest for fill events.
fillInterested();
}
@Override
public void onFillable()
{
try
{
ByteBuffer buffer = BufferUtil.allocate(1024);
int filled = getEndPoint().fill(buffer);
if (filled > 0)
{
// Filled some bytes, echo them back.
getEndPoint().write(this, buffer);
}
else if (filled == 0)
{
// No more bytes to fill, declare
// again interest for fill events.
fillInterested();
}
else
{
// The other peer closed the
// connection, close it back.
getEndPoint().close();
}
}
catch (Exception x)
{
getEndPoint().close(x);
}
}
@Override
public void succeeded()
{
// The write is complete, fill again.
onFillable();
}
@Override
public void failed(Throwable x)
{
getEndPoint().close(x);
}
}
The implementation above is wrong and leads to StackOverflowError .
|
The problem with this implementation is that if the writes always complete synchronously (i.e. without being delayed by TCP congestion), you end up with this sequence of calls:
Connection.onFillable() EndPoint.write() Connection.succeeded() Connection.onFillable() EndPoint.write() Connection.succeeded() ...
which leads to StackOverflowError
.
This is a typical side effect of asynchronous programming using non-blocking APIs, and happens in the Jetty I/O library as well.
The callback is invoked synchronously for efficiency reasons.
Submitting the invocation of the callback to an Executor to be invoked in a different thread would cause a context switch and make simple writes extremely inefficient.
|
This side effect of asynchronous programming leading to StackOverflowError
is so common that the Jetty libraries have a generic solution for it: a specialized Callback
implementation named org.eclipse.jetty.util.IteratingCallback
that turns recursion into iteration, therefore avoiding the StackOverflowError
.
IteratingCallback
is a Callback
implementation that should be passed to non-blocking APIs such as EndPoint.write(Callback, ByteBuffer…)
when they are performed in a loop.
IteratingCallback
works by starting the loop with IteratingCallback.iterate()
.
In turn, this calls IteratingCallback.process()
, an abstract method that must be implemented with the code that should be executed for each loop.
Method process()
must return:
-
Action.SCHEDULED
, to indicate whether the loop has performed a non-blocking, possibly asynchronous, operation -
Action.IDLE
, to indicate that the loop should temporarily be suspended to be resumed later -
Action.SUCCEEDED
to indicate that the loop exited successfully
Any exception thrown within process()
exits the loops with a failure.
Now that you know how IteratingCallback
works, a correct implementation for the echo Connection
is the following:
class EchoConnection extends AbstractConnection
{
private final IteratingCallback callback = new EchoIteratingCallback();
public EchoConnection(EndPoint endp, Executor executor)
{
super(endp, executor);
}
@Override
public void onOpen()
{
super.onOpen();
// Declare interest for fill events.
fillInterested();
}
@Override
public void onFillable()
{
// Start the iteration loop that reads and echoes back.
callback.iterate();
}
class EchoIteratingCallback extends IteratingCallback
{
private ByteBuffer buffer;
@Override
protected Action process() throws Throwable
{
// Obtain a buffer if we don't already have one.
if (buffer == null)
buffer = BufferUtil.allocate(1024);
int filled = getEndPoint().fill(buffer);
if (filled > 0)
{
// We have filled some bytes, echo them back.
getEndPoint().write(this, buffer);
// Signal that the iteration should resume
// when the write() operation is completed.
return Action.SCHEDULED;
}
else if (filled == 0)
{
// We don't need the buffer anymore, so
// don't keep it around while we are idle.
buffer = null;
// No more bytes to read, declare
// again interest for fill events.
fillInterested();
// Signal that the iteration is now IDLE.
return Action.IDLE;
}
else
{
// The other peer closed the connection,
// the iteration completed successfully.
return Action.SUCCEEDED;
}
}
@Override
protected void onCompleteSuccess()
{
// The iteration completed successfully.
getEndPoint().close();
}
@Override
protected void onCompleteFailure(Throwable cause)
{
// The iteration completed with a failure.
getEndPoint().close(cause);
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
}
When onFillable()
is called, for example the first time that bytes are available from the network, the iteration is started.
Starting the iteration calls process()
, where a buffer is allocated and filled with bytes read from the network via EndPoint.fill(ByteBuffer)
; the buffer is subsequently written back via EndPoint.write(Callback, ByteBuffer…)
— note that the callback passed to EndPoint.write()
is this
, i.e. the IteratingCallback
itself; finally Action.SCHEDULED
is returned, returning from the process()
method.
At this point, the call to EndPoint.write(Callback, ByteBuffer…)
may have completed synchronously; IteratingCallback
would know that and call process()
again; within process()
, the buffer has already been allocated so it will be reused, saving further allocations; the buffer will be filled and possibly written again; Action.SCHEDULED
is returned again, returning again from the process()
method.
At this point, the call to EndPoint.write(Callback, ByteBuffer…)
may have not completed synchronously, so IteratingCallback
will not call process()
again; the processing thread is free to return to the Jetty I/O system where it may be put back into the thread pool.
If this was the only active network connection, the system would now be idle, with no threads blocked, waiting that the write()
completes. This thread-less wait is one of the most important features that make non-blocking asynchronous servers more scalable: they use less resources.
Eventually, the Jetty I/O system will notify that the write()
completed; this notifies the IteratingCallback
that can now resume the loop and call process()
again.
When process()
is called, it is possible that zero bytes are read from the network; in this case, you want to deallocate the buffer since the other peer may never send more bytes for the Connection
to read, or it may send them after a long pause — in both cases we do not want to retain the memory allocated by the buffer; next, you want to call fillInterested()
to declare again interest for read events, and return Action.IDLE
since there is nothing to write back and therefore the loop may be suspended.
When more bytes are again available to be read from the network, onFillable()
will be called again and that will start the iteration again.
Another possibility is that during process()
the read returns -1
indicating that the other peer has closed the connection; this means that there will not be more bytes to read and the loop can be exited, so you return Action.SUCCEEDED
; IteratingCallback
will then call onCompleteSuccess()
where you can close the EndPoint
.
The last case is that during process()
an exception is thrown, for example by EndPoint.fill(ByteBuffer)
or, in more advanced implementations, by code that parses the bytes that have been read and finds them unacceptable; any exception thrown within process()
will be caught by IteratingCallback
that will exit the loop with a failure and call onCompleteFailure(Throwable)
with the exception that has been thrown, where you can close the EndPoint
, passing the exception that is the reason for closing prematurely the EndPoint
.
Asynchronous programming is hard. Rely on the Jetty classes to implement |