Jetty I/O Architecture
The Jetty libraries (both client and server) use Java NIO to handle I/O, so that at its core Jetty I/O is completely non-blocking.
Jetty I/O: SelectorManager
The main class of The Jetty I/O library is SelectorManager
.
SelectorManager
manages internally a configurable number of ManagedSelector
s.
Each ManagedSelector
wraps an instance of java.nio.channels.Selector
that in turn manages a number of java.nio.channels.SocketChannel
instances.
TODO: add image |
SocketChannel
instances are typically created by the Jetty implementation, on client-side when connecting to a server and on server-side when accepting connections from clients.
In both cases the SocketChannel
instance is passed to SelectorManager
(which passes it to ManagedSelector
and eventually to java.nio.channels.Selector
) to be registered for use within Jetty.
It is possible for an application to create the SocketChannel
instances outside Jetty, even perform some initial network traffic also outside Jetty (for example for authentication purposes), and then pass the SocketChannel
instance to SelectorManager
for use within Jetty.
This example shows how a client can connect to a server:
public void connect(SelectorManager selectorManager, Map<String, Object> context) throws IOException
{
String host = "host";
int port = 8080;
// Create an unconnected SocketChannel.
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// Connect and register to Jetty.
if (socketChannel.connect(new InetSocketAddress(host, port)))
selectorManager.accept(socketChannel, context);
else
selectorManager.connect(socketChannel, context);
}
This example shows how a server accepts a client connection:
public void accept(ServerSocketChannel acceptor, SelectorManager selectorManager) throws IOException
{
// Wait until a client connects.
SocketChannel socketChannel = acceptor.accept();
socketChannel.configureBlocking(false);
// Accept and register to Jetty.
Object attachment = null;
selectorManager.accept(socketChannel, attachment);
}
Jetty I/O: EndPoint
and Connection
SocketChannel
s that are passed to SelectorManager
are wrapped into two related components: an EndPoint
and a Connection
.
EndPoint
is the Jetty abstraction for a SocketChannel
or a DatagramChannel
: you can read bytes from an EndPoint
, you can write bytes to an EndPoint
, you can close an EndPoint
, etc.
Connection
is the Jetty abstraction that is responsible to read bytes from the EndPoint
and to deserialize the read bytes into objects.
For example, an HTTP/1.1 server-side Connection
implementation is responsible to deserialize HTTP/1.1 request bytes into an HTTP request object.
Conversely, an HTTP/1.1 client-side Connection
implementation is responsible to deserialize HTTP/1.1 response bytes into an HTTP response object.
Connection
is the abstraction that implements the reading side of a specific protocol such as HTTP/1.1, or HTTP/2, or HTTP/3, or WebSocket: it is able to read and parse incoming bytes in that protocol.
The writing side for a specific protocol may be implemented in the Connection
but may also be implemented in other components, although eventually the bytes to write will be written through the EndPoint
.
While there are primarily only two implementations of EndPoint
,SocketChannelEndPoint
for TCP and DatagramChannelEndPoint
for UDP (used both on the client-side and on the server-side), there are many implementations of Connection
, typically two for each protocol (one for the client-side and one for the server-side).
The EndPoint
and Connection
pairs can be chained, for example in case of encrypted communication using the TLS protocol.
There is an EndPoint
and Connection
TLS pair where the EndPoint
reads the encrypted bytes from the socket and the Connection
decrypts them; next in the chain there is an EndPoint
and Connection
pair where the EndPoint
"reads" decrypted bytes (provided by the previous Connection
) and the Connection
deserializes them into specific protocol objects (for example HTTP/2 frame objects).
Certain protocols, such as WebSocket, start the communication with the server using one protocol (for example, HTTP/1.1), but then change the communication to use another protocol (for example, WebSocket).
EndPoint
supports changing the Connection
object on-the-fly via EndPoint.upgrade(Connection)
.
This allows to use the HTTP/1.1 Connection
during the initial communication and later to replace it with a WebSocket Connection
.
SelectorManager
is an abstract class because while it knows how to create concrete EndPoint
instances, it does not know how to create protocol specific Connection
instances.
Creating Connection
instances is performed on the server-side by ConnectionFactory
s and on the client-side by ClientConnectionFactory
s.
On the server-side, the component that aggregates a SelectorManager
with a set of ConnectionFactory
s is ServerConnector
for TCP sockets, QuicServerConnector
for QUIC sockets, and UnixDomainServerConnector
for Unix-Domain sockets (see the server-side architecture section for more information).
On the client-side, the components that aggregates a SelectorManager
with a set of ClientConnectionFactory
s are HttpClientTransport
subclasses (see the client-side architecture section for more information).
Jetty I/O: EndPoint
The Jetty I/O library use Java NIO to handle I/O, so that I/O is non-blocking.
At the Java NIO level, in order to be notified when a SocketChannel
or DatagramChannel
has data to be read, the SelectionKey.OP_READ
flag must be set.
In the Jetty I/O library, you can call EndPoint.fillInterested(Callback)
to declare interest in the "read" (also called "fill") event, and the Callback
parameter is the object that is notified when such an event occurs.
At the Java NIO level, a SocketChannel
or DatagramChannel
is always writable, unless it becomes congested.
In order to be notified when a channel uncongests and it is therefore writable again, the SelectionKey.OP_WRITE
flag must be set.
In the Jetty I/O library, you can call EndPoint.write(Callback, ByteBuffer…)
to write the ByteBuffer
s and the Callback
parameter is the object that is notified when the whole write is finished (i.e. all ByteBuffer
s have been fully written, even if they are delayed by congestion/uncongestion).
The EndPoint
APIs abstract out the Java NIO details by providing non-blocking APIs based on Callback
objects for I/O operations.
The EndPoint
APIs are typically called by Connection
implementations, see this section.
Jetty I/O: Connection
Connection
is the abstraction that deserializes incoming bytes into objects, for example an HTTP request object or a WebSocket frame object, that can be used by more abstract layers.
Connection
instances have two lifecycle methods:
-
Connection.onOpen()
, invoked when theConnection
is associated with theEndPoint
. -
Connection.onClose(Throwable)
, invoked when theConnection
is disassociated from theEndPoint
, where theThrowable
parameter indicates whether the disassociation was normal (when the parameter isnull
) or was due to an error (when the parameter is notnull
).
When a Connection
is first created, it is not registered for any Java NIO event.
It is therefore typical to implement onOpen()
to call EndPoint.fillInterested(Callback)
so that the Connection
declares interest for read events, and it is invoked (via the Callback
) when the read event happens.
The abstract class AbstractConnection
partially implements Connection
and provides simpler APIs.
The example below shows a typical implementation that extends AbstractConnection
:
// Extend AbstractConnection to inherit basic implementation.
class MyConnection extends AbstractConnection
{
public MyConnection(EndPoint endPoint, Executor executor)
{
super(endPoint, executor);
}
@Override
public void onOpen()
{
super.onOpen();
// Declare interest for fill events.
// When the fill event happens, method onFillable() below is invoked.
fillInterested();
}
@Override
public void onFillable()
{
// Invoked when a fill event happens.
}
}
Jetty I/O: Connection.Listener
The Jetty I/O library allows applications to register event listeners for the Connection
events "opened" and "closed" via the interface Connection.Listener
.
This is useful in many cases, for example:
-
Gather statistics about connection lifecycle, such as time of creation and duration.
-
Gather statistics about the number of concurrent connections, and take action if a threshold is exceeded.
-
Gather statistics about the number of bytes read and written, and the number of "messages" read and written, where "messages" may mean HTTP/1.1 requests or responses, or WebSocket frames, or HTTP/2 frames, etc.
-
Gather statistics about the different types of connections being opened (TLS, HTTP/1.1, HTTP/2, WebSocket, etc.).
-
Etc.
Connection.Listener
implementations must be added as beans to a server-side Connector
, or to client-side HttpClient
, WebSocketClient
, HTTP2Client
or HTTP3Client
.
You can add as beans many Connection.Listener
objects, each with its own logic, so that you can separate different logics into different Connection.Listener
implementations.
The Jetty I/O library provides useful Connection.Listener
implementations that you should evaluate before writing your own:
Here is a simple example of a Connection.Listener
used both on the client and on the server:
class ThresholdConnectionListener implements Connection.Listener
{
private final AtomicInteger connections = new AtomicInteger();
private int threshold;
private boolean notified;
public ThresholdConnectionListener(int threshold)
{
this.threshold = threshold;
}
@Override
public void onOpened(Connection connection)
{
int count = connections.incrementAndGet();
if (count > threshold && !notified)
{
notified = true;
System.getLogger("connection.threshold").log(System.Logger.Level.WARNING, "Connection threshold exceeded");
}
}
@Override
public void onClosed(Connection connection)
{
int count = connections.decrementAndGet();
// Reset the alert when we are below 90% of the threshold.
if (count < threshold * 0.9F)
notified = false;
}
}
// Configure server-side connectors with Connection.Listeners.
Server server = new Server();
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);
// Add statistics.
connector.addBean(new ConnectionStatistics());
// Add your own Connection.Listener.
connector.addBean(new ThresholdConnectionListener(2048));
server.start();
// Configure client-side HttpClient with Connection.Listeners.
HttpClient httpClient = new HttpClient();
// Add statistics.
httpClient.addBean(new ConnectionStatistics());
// Add your own Connection.Listener.
httpClient.addBean(new ThresholdConnectionListener(512));
httpClient.start();
Jetty I/O: TCP Network Echo
With the concepts above it is now possible to write a simple, fully non-blocking, Connection
implementation that simply echoes the bytes that it reads back to the other peer.
A naive, but wrong, implementation may be the following:
class WrongEchoConnection extends AbstractConnection implements Callback
{
public WrongEchoConnection(EndPoint endPoint, Executor executor)
{
super(endPoint, executor);
}
@Override
public void onOpen()
{
super.onOpen();
// Declare interest for fill events.
fillInterested();
}
@Override
public void onFillable()
{
try
{
ByteBuffer buffer = BufferUtil.allocate(1024);
int filled = getEndPoint().fill(buffer);
if (filled > 0)
{
// Filled some bytes, echo them back.
getEndPoint().write(this, buffer);
}
else if (filled == 0)
{
// No more bytes to fill, declare
// again interest for fill events.
fillInterested();
}
else
{
// The other peer closed the
// connection, close it back.
getEndPoint().close();
}
}
catch (Exception x)
{
getEndPoint().close(x);
}
}
@Override
public void succeeded()
{
// The write is complete, fill again.
onFillable();
}
@Override
public void failed(Throwable x)
{
getEndPoint().close(x);
}
}
The implementation above is wrong and leads to StackOverflowError .
|
The problem with this implementation is that if the writes always complete synchronously (i.e. without being delayed by TCP congestion), you end up with this sequence of calls:
Connection.onFillable() EndPoint.write() Connection.succeeded() Connection.onFillable() EndPoint.write() Connection.succeeded() ...
which leads to StackOverflowError
.
This is a typical side effect of asynchronous programming using non-blocking APIs, and happens in the Jetty I/O library as well.
The callback is invoked synchronously for efficiency reasons.
Submitting the invocation of the callback to an Executor to be invoked in a different thread would cause a context switch and make simple writes extremely inefficient.
|
This side effect of asynchronous programming leading to StackOverflowError
is so common that the Jetty libraries have a generic solution for it: a specialized Callback
implementation named org.eclipse.jetty.util.IteratingCallback
that turns recursion into iteration, therefore avoiding the StackOverflowError
.
IteratingCallback
is a Callback
implementation that should be passed to non-blocking APIs such as EndPoint.write(Callback, ByteBuffer…)
when they are performed in a loop.
IteratingCallback
works by starting the loop with IteratingCallback.iterate()
.
In turn, this calls IteratingCallback.process()
, an abstract method that must be implemented with the code that should be executed for each loop.
Method process()
must return:
-
Action.SCHEDULED
, to indicate whether the loop has performed a non-blocking, possibly asynchronous, operation -
Action.IDLE
, to indicate that the loop should temporarily be suspended to be resumed later -
Action.SUCCEEDED
to indicate that the loop exited successfully
Any exception thrown within process()
exits the loops with a failure.
Now that you know how IteratingCallback
works, a correct implementation for the echo Connection
is the following:
class EchoConnection extends AbstractConnection
{
private final IteratingCallback callback = new EchoIteratingCallback();
public EchoConnection(EndPoint endp, Executor executor)
{
super(endp, executor);
}
@Override
public void onOpen()
{
super.onOpen();
// Declare interest for fill events.
fillInterested();
}
@Override
public void onFillable()
{
// Start the iteration loop that reads and echoes back.
callback.iterate();
}
class EchoIteratingCallback extends IteratingCallback
{
private ByteBuffer buffer;
@Override
protected Action process() throws Throwable
{
// Obtain a buffer if we don't already have one.
if (buffer == null)
buffer = BufferUtil.allocate(1024);
int filled = getEndPoint().fill(buffer);
if (filled > 0)
{
// We have filled some bytes, echo them back.
getEndPoint().write(this, buffer);
// Signal that the iteration should resume
// when the write() operation is completed.
return Action.SCHEDULED;
}
else if (filled == 0)
{
// We don't need the buffer anymore, so
// don't keep it around while we are idle.
buffer = null;
// No more bytes to read, declare
// again interest for fill events.
fillInterested();
// Signal that the iteration is now IDLE.
return Action.IDLE;
}
else
{
// The other peer closed the connection,
// the iteration completed successfully.
return Action.SUCCEEDED;
}
}
@Override
protected void onCompleteSuccess()
{
// The iteration completed successfully.
getEndPoint().close();
}
@Override
protected void onCompleteFailure(Throwable cause)
{
// The iteration completed with a failure.
getEndPoint().close(cause);
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
}
When onFillable()
is called, for example the first time that bytes are available from the network, the iteration is started.
Starting the iteration calls process()
, where a buffer is allocated and filled with bytes read from the network via EndPoint.fill(ByteBuffer)
; the buffer is subsequently written back via EndPoint.write(Callback, ByteBuffer…)
— note that the callback passed to EndPoint.write()
is this
, i.e. the IteratingCallback
itself; finally Action.SCHEDULED
is returned, returning from the process()
method.
At this point, the call to EndPoint.write(Callback, ByteBuffer…)
may have completed synchronously; IteratingCallback
would know that and call process()
again; within process()
, the buffer has already been allocated so it will be reused, saving further allocations; the buffer will be filled and possibly written again; Action.SCHEDULED
is returned again, returning again from the process()
method.
At this point, the call to EndPoint.write(Callback, ByteBuffer…)
may have not completed synchronously, so IteratingCallback
will not call process()
again; the processing thread is free to return to the Jetty I/O system where it may be put back into the thread pool.
If this was the only active network connection, the system would now be idle, with no threads blocked, waiting that the write()
completes. This thread-less wait is one of the most important features that make non-blocking asynchronous servers more scalable: they use less resources.
Eventually, the Jetty I/O system will notify that the write()
completed; this notifies the IteratingCallback
that can now resume the loop and call process()
again.
When process()
is called, it is possible that zero bytes are read from the network; in this case, you want to deallocate the buffer since the other peer may never send more bytes for the Connection
to read, or it may send them after a long pause — in both cases we do not want to retain the memory allocated by the buffer; next, you want to call fillInterested()
to declare again interest for read events, and return Action.IDLE
since there is nothing to write back and therefore the loop may be suspended.
When more bytes are again available to be read from the network, onFillable()
will be called again and that will start the iteration again.
Another possibility is that during process()
the read returns -1
indicating that the other peer has closed the connection; this means that there will not be more bytes to read and the loop can be exited, so you return Action.SUCCEEDED
; IteratingCallback
will then call onCompleteSuccess()
where you can close the EndPoint
.
The last case is that during process()
an exception is thrown, for example by EndPoint.fill(ByteBuffer)
or, in more advanced implementations, by code that parses the bytes that have been read and finds them unacceptable; any exception thrown within process()
will be caught by IteratingCallback
that will exit the loop with a failure and call onCompleteFailure(Throwable)
with the exception that has been thrown, where you can close the EndPoint
, passing the exception that is the reason for closing prematurely the EndPoint
.
Asynchronous programming is hard. Rely on the Jetty classes to implement |
Content.Source
The high-level abstraction that Jetty offers to read bytes is org.eclipse.jetty.io.Content.Source
.
Content.Source
offers a non-blocking demand/read model where a read returns a Content.Chunk
(see also this section).
A Content.Chunk
groups the following information:
-
A
ByteBuffer
with the bytes that have been read; it may be empty. -
Whether the read reached end-of-file, via its
last
flag. -
A failure that might have happened during the read, via its
getFailure()
method.
The Content.Chunk
returned from Content.Source.read()
can either be a normal chunk (a chunk containing a ByteBuffer
and a null
failure), or a failure chunk (a chunk containing an empty ByteBuffer
and a non-null
failure).
A failure chunk also indicates (via the last
flag) whether the failure is a fatal (when last=true
) or transient (when last=false
) failure.
A transient failure is a temporary failure that happened during the read, it may be ignored, and it is recoverable: it is possible to call read()
again and obtain a normal chunk (or a null
chunk).
Typical cases of transient failures are idle timeout failures, where the read timed out, but the application may decide to insist reading until some other event happens.
The application may convert a transient failure into a fatal failure by calling Content.Source.fail(Throwable)
.
A Content.Source
must be fully consumed by reading all its content, or failed by calling Content.Source.fail(Throwable)
to signal that the reader is not interested in reading anymore, otherwise it may leak underlying resources.
Fully consuming a Content.Source
means reading from it until it returns a Content.Chunk
whose last
flag is true
.
Reading or demanding from an already fully consumed Content.Source
is always immediately serviced with the last state of the Content.Source
: a Content.Chunk
with the last
flag set to true
, either an end-of-file chunk, or a failure chunk.
Once failed, a Content.Source
is considered fully consumed.
Further attempts to read from a failed Content.Source
return a failure chunk whose getFailure()
method returns the exception passed to Content.Source.fail(Throwable)
.
When reading a normal chunk, its ByteBuffer
is typically a slice of a different ByteBuffer
that has been read by a lower layer.
There may be multiple layers between the bottom layer (where the initial read typically happens) and the application layer that calls Content.Source.read()
.
By slicing the ByteBuffer
(rather than copying its bytes), there is no copy of the bytes between the layers, which yields greater performance.
However, this comes with the cost that the ByteBuffer
, and the associated Content.Chunk
, have an intrinsic lifecycle: the final consumer of a Content.Chunk
at the application layer must indicate when it has consumed the chunk, so that the bottom layer may reuse/recycle the ByteBuffer
.
Consuming the chunk means that the bytes in the ByteBuffer
are read (or ignored), and that the application will not look at or reference that ByteBuffer
ever again.
Content.Chunk
offers a retain/release model to deal with the ByteBuffer
lifecycle, with a simple rule:
A Content.Chunk returned by a call to Content.Source.read() must be released, except for Content.Chunk s that are failure chunks.
Failure chunks may be released, but they do not need to be.
|
The example below is the idiomatic way of reading from a Content.Source
:
public void read(Content.Source source)
{
// Read from the source in a loop.
while (true)
{
// Read a chunk, must be eventually released.
Content.Chunk chunk = source.read(); (1)
// If no chunk, demand to be called back when there are more chunks.
if (chunk == null)
{
source.demand(() -> read(source));
return;
}
// If there is a failure reading, handle it.
if (Content.Chunk.isFailure(chunk))
{
boolean fatal = chunk.isLast();
if (fatal)
{
// A fatal failure, such as a network failure.
handleFatalFailure(chunk.getFailure());
// No recovery is possible, stop reading
// by returning without demanding.
return;
}
else
{
// A transient failure such as a read timeout.
handleTransientFailure(chunk.getFailure());
// Recovery is possible, try to read again.
continue;
}
}
// A normal chunk of content, consume it.
consume(chunk);
// Release the chunk.
chunk.release(); (2)
// Stop reading if EOF was reached.
if (chunk.isLast())
return;
// Loop around to read another chunk.
}
}
1 | The read() that must be paired with a release() . |
2 | The release() that pairs the read() . |
Note how the reads happen in a loop, consuming the Content.Source
as soon as it has content available to be read, and therefore no backpressure is applied to the reads.
Calling Content.Chunk.release() must be done only after the bytes in the ByteBuffer returned by Content.Chunk.getByteBuffer() have been consumed.
When the Content.Chunk is released, the implementation may reuse the ByteBuffer and overwrite the bytes with different bytes; if the application looks at the ByteBuffer after having released the Content.Chunk is may see other, unrelated, bytes.
|
An alternative way to read from a Content.Source
, to use when the chunk is consumed asynchronously, and you don’t want to read again until the Content.Chunk
is consumed, is the following:
public void read(Content.Source source)
{
// Read a chunk, must be eventually released.
Content.Chunk chunk = source.read(); (1)
// If no chunk, demand to be called back when there are more chunks.
if (chunk == null)
{
source.demand(() -> read(source));
return;
}
// If there is a failure reading, always treat it as fatal.
if (Content.Chunk.isFailure(chunk))
{
// If the failure is transient, fail the source
// to indicate that there will be no more reads.
if (!chunk.isLast())
source.fail(chunk.getFailure());
// Handle the failure and stop reading by not demanding.
handleFatalFailure(chunk.getFailure());
return;
}
// Consume the chunk asynchronously, and do not
// read more chunks until this has been consumed.
CompletableFuture<Void> consumed = consumeAsync(chunk);
// Release the chunk.
chunk.release(); (2)
// Only when the chunk has been consumed try to read more.
consumed.whenComplete((result, failure) ->
{
if (failure == null)
{
// Continue reading if EOF was not reached.
if (!chunk.isLast())
source.demand(() -> read(source));
}
else
{
// If there is a failure reading, handle it,
// and stop reading by not demanding.
handleFatalFailure(failure);
}
});
}
1 | The read() that must be paired with a release() . |
2 | The release() that pairs the read() . |
Note how the reads do not happen in a loop, and therefore backpressure is applied to the reads, because there is not a next read until the chunk from the previous read has been consumed (and this may take time).
Since the Chunk
is consumed asynchronously, you may need to retain it to extend its lifecycle, as explained in this section.
You can use Content.Source
static methods to conveniently read (in a blocking way or non-blocking way), for example via static Content.Source.asStringAsync(Content.Source, Charset)
, or via an InputStream
using static Content.Source.asInputStream(Content.Source)
.
Refer to the Content.Source
javadocs
for further details.
Content.Chunk
Content.Chunk
offers a retain/release API to control the lifecycle of its ByteBuffer
.
When Content.Chunk
s are consumed synchronously, no additional retain/release API call is necessary, for example:
public void consume(Content.Chunk chunk) throws IOException
{
// Consume the chunk synchronously within this method.
// For example, parse the bytes into other objects,
// or copy the bytes elsewhere (e.g. the file system).
fileChannel.write(chunk.getByteBuffer());
if (chunk.isLast())
fileChannel.close();
}
On the other hand, if the Content.Chunk
is not consumed immediately, then it must be retained, and you must arrange for the Content.Chunk
to be released at a later time, thus pairing the retain.
For example, you may accumulate the Content.Chunk
s in a List
to convert them to a String
when all the Content.Chunk
s have been read.
Since reading from a Content.Source
is asynchronous, the String
result is produced via a CompletableFuture
:
// CompletableTask is-a CompletableFuture.
public class ChunksToString extends CompletableTask<String>
{
private final List<Content.Chunk> chunks = new ArrayList<>();
private final Content.Source source;
public ChunksToString(Content.Source source)
{
this.source = source;
}
@Override
public void run()
{
while (true)
{
// Read a chunk, must be eventually released.
Content.Chunk chunk = source.read(); (1)
if (chunk == null)
{
source.demand(this);
return;
}
if (Content.Chunk.isFailure(chunk))
{
handleFatalFailure(chunk.getFailure());
return;
}
// A normal chunk of content, consume it.
consume(chunk);
// Release the chunk.
// This pairs the call to read() above.
chunk.release(); (2)
if (chunk.isLast())
{
// Produce the result.
String result = getResult();
// Complete this CompletableFuture with the result.
complete(result);
// The reading is complete.
return;
}
}
}
public void consume(Content.Chunk chunk)
{
// The chunk is not consumed within this method, but
// stored away for later use, so it must be retained.
chunk.retain(); (3)
chunks.add(chunk);
}
public String getResult()
{
Utf8StringBuilder builder = new Utf8StringBuilder();
// Iterate over the chunks, copying and releasing.
for (Content.Chunk chunk : chunks)
{
// Copy the chunk bytes into the builder.
builder.append(chunk.getByteBuffer());
// The chunk has been consumed, release it.
// This pairs the retain() in consume().
chunk.release(); (4)
}
return builder.toCompleteString();
}
}
1 | The read() that must be paired with a release() . |
2 | The release() that pairs the read() . |
3 | The retain() that must be paired with a release() . |
4 | The release() that pairs the retain() . |
Note how method consume(Content.Chunk)
retains the Content.Chunk
because it does not consume it, but rather stores it away for later use.
With this additional retain, the retain count is now 2
: one implicitly from the read()
that returned the Content.Chunk
, and one explicit in consume(Content.Chunk)
.
However, just after returning from consume(Content.Chunk)
the Content.Chunk
is released (pairing the implicit retain from read()
), so that the retain count goes to 1
, and an additional release is still necessary.
Method getResult()
arranges to release all the Content.Chunk
s that have been accumulated, pairing the retains done in consume(Content.Chunk)
, so that the retain count for the Content.Chunk
s goes finally to 0
.
Content.Sink
The high-level abstraction that Jetty offers to write bytes is org.eclipse.jetty.io.Content.Sink
.
The primary method to use is Content.Sink.write(boolean, ByteBuffer, Callback)
, which performs a non-blocking write of the given ByteBuffer
, with the indication of whether the write is the last.
The Callback
parameter is completed, successfully or with a failure, and possibly asynchronously by a different thread, when the write is complete.
Your application can typically perform zero or more non-last writes, and one final last write.
However, because the writes may be asynchronous, you cannot start a next write before the previous write is completed.
This code is wrong:
public void wrongWrite(Content.Sink sink, ByteBuffer content1, ByteBuffer content2)
{
// Initiate a first write.
sink.write(false, content1, Callback.NOOP);
// WRONG! Cannot initiate a second write before the first is complete.
sink.write(true, content2, Callback.NOOP);
}
You must initiate a second write only when the first is finished, for example:
public void manyWrites(Content.Sink sink, ByteBuffer content1, ByteBuffer content2)
{
// Initiate a first write.
Callback.Completable resultOfWrites = Callback.Completable.with(callback1 -> sink.write(false, content1, callback1))
// Chain a second write only when the first is complete.
.compose(callback2 -> sink.write(true, content2, callback2));
// Use the resulting Callback.Completable as you would use a CompletableFuture.
// For example:
resultOfWrites.whenComplete((ignored, failure) ->
{
if (failure == null)
System.getLogger("sink").log(INFO, "writes completed successfully");
else
System.getLogger("sink").log(INFO, "writes failed", failure);
});
}
When you need to perform an unknown number of writes, you must use an IteratingCallback
, explained in this section, to avoid StackOverFlowError
s.
For example, to copy from a Content.Source
to a Content.Sink
you should use the convenience method Content.copy(Content.Source, Content.Sink, Callback)
.
For illustrative purposes, below you can find the implementation of copy(Content.Source, Content.Sink, Callback)
that uses an IteratingCallback
:
@SuppressWarnings("InnerClassMayBeStatic")
class Copy extends IteratingCallback
{
private final Content.Source source;
private final Content.Sink sink;
private final Callback callback;
private Content.Chunk chunk;
public Copy(Content.Source source, Content.Sink sink, Callback callback)
{
this.source = source;
this.sink = sink;
// The callback to notify when the copy is completed.
this.callback = callback;
}
@Override
protected Action process() throws Throwable
{
// If the last write completed, succeed this IteratingCallback,
// causing onCompleteSuccess() to be invoked.
if (chunk != null && chunk.isLast())
return Action.SUCCEEDED;
// Read a chunk.
chunk = source.read();
// No chunk, demand to be called back when there will be more chunks.
if (chunk == null)
{
source.demand(this::iterate);
return Action.IDLE;
}
// The read failed, re-throw the failure
// causing onCompleteFailure() to be invoked.
if (Content.Chunk.isFailure(chunk))
throw chunk.getFailure();
// Copy the chunk.
sink.write(chunk.isLast(), chunk.getByteBuffer(), this);
return Action.SCHEDULED;
}
@Override
protected void onSuccess()
{
// After every successful write, release the chunk.
chunk.release();
}
@Override
protected void onCompleteSuccess()
{
// The copy is succeeded, succeed the callback.
callback.succeeded();
}
@Override
protected void onCompleteFailure(Throwable failure)
{
// In case of a failure, either on the
// read or on the write, release the chunk.
chunk.release();
// The copy is failed, fail the callback.
callback.failed(failure);
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
Non-blocking writes can be easily turned in blocking writes. This leads to perhaps code that is simpler to read, but that also comes with a price: greater resource usage that may lead to less scalability and less performance.
public void blockingWrite(Content.Sink sink, ByteBuffer content1, ByteBuffer content2) throws IOException
{
// First blocking write, returns only when the write is complete.
Content.Sink.write(sink, false, content1);
// Second blocking write, returns only when the write is complete.
// It is legal to perform the writes sequentially, since they are blocking.
Content.Sink.write(sink, true, content2);
}