Commit 648f17c0 authored by Alexis Hassler's avatar Alexis Hassler
Browse files

Merge branch 'back-pressure' into 'master'

Back pressure

See merge request !2
parents 2f1ceae8 8be7ad51
JAVA_OPTS=-Xmx64m
count_command=$(JAVA_HOME)/bin/java -classpath "target/classes/:target/lib/*" $(JAVA_OPTS) fr.sewatech.vertx.count.BackPressureCountApplication
#download_memory=512m #(minimum for read and send)
download_memory=8m
download_rate=10m
download_parallel=10
count.bounded:
@$(count_command) bounded
count.unbounded:
@$(count_command) unbounded
download.server:
@$(JAVA_HOME)/bin/java -Xmx$(download_memory) -classpath "target/classes/:target/lib/*" fr.sewatech.vertx.download.BackPressureDownloadApplication
download.client.single-read:
@curl http://localhost:8000/0?mode=read --limit-rate $(download_rate) --output /tmp/response-0.bin
download.client.single-nobp:
@curl http://localhost:8000/0?mode=no-bp --limit-rate $(download_rate) --output /tmp/response-0.bin
download.client.single-bp:
@curl http://localhost:8000/0?mode=bp --limit-rate $(download_rate) --output /tmp/response-0.bin
download.client.single-pipe:
@curl http://localhost:8000/0?mode=pipe --limit-rate $(download_rate) --output /tmp/response-0.bin
download.client.single-send:
@curl http://localhost:8000/0?mode=send --limit-rate $(download_rate) --output /tmp/response-0.bin
download.client.parallel-bp:
@seq $(download_parallel) \
| xargs -I{} expr {} % 10 \
| xargs -P $(download_parallel) -I{} \
curl http://localhost:8000/{}?mode=bp --silent --limit-rate $(download_rate) --output /tmp/response-{}.bin
download.client.parallel-send:
@seq $(download_parallel) \
| xargs -I{} expr {} % 10 \
| xargs -P $(download_parallel) -I{} \
curl http://localhost:8000/{}?mode=send --silent --limit-rate $(download_rate) --output /tmp/response-{}.bin
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>fr.sewatech.vertx</groupId>
<artifactId>backpressure-example</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<vertx.version>3.9.1</vertx.version>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<exec.mainClass>fr.sewatech.vertx.count.BackPressureCountApplication</exec.mainClass>
</properties>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<configuration>
<mainClass>${exec.mainClass}</mainClass>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
= Exemple de back-pressure avec Vert.x
== Prérequis
Pour le serveur, il faut Java 11 (avec $JAVA_HOME) et Maven.
Pour le client, les scrpits dans make utilisent curl, seq et parallel.
== Run
La façon la plus simple d'exécuter l'exemple est d'utiliser `make`.
== Run count
Il y a 2 options:
* *unbounded*
C'est la version *sans back-pressure* puisque la file d'attente du WriteStream n'a pas de limite.
----
make count.unbounded
----
Comme le DataStream produit les données plus vite que la consommation, ça doit mener à une erreur OOME.
* *bounded*
C'est la version *avec back-pressure*.
----
make count.bounded
----
La back-pressure permet d'éviter l'erreur OOME.
== Run download
* *server*
Le serveur est fait en Vert.x.
----
make download.server
----
* *client simple*
Le client envoie une requête HTTP de téléchargement avec curl.
----
make download.client.1
----
* *clients en parallèle*
Pour solliciter le serveur, ce client envoie 10 requêtes de téléchargement en parallèle.
----
make download.client.10
----
package fr.sewatech.vertx.common;
import java.util.Base64;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
public class Data {
private static final AtomicLong counter = new AtomicLong(0);
public final String content;
public final Long index;
public Data() {
this(49_876);
}
public Data(int contentSize) {
this.index = counter.getAndIncrement();
byte[] bytes = new byte[contentSize];
ThreadLocalRandom.current().nextBytes(bytes);
content = Base64.getUrlEncoder().encodeToString(bytes).replace('\n', '_');
}
@Override
public String toString() {
return "data: " + index;
}
public String toJson() {
return String.format("{ \"index\": %s, \"content\": \"%s\"}", index, content);
}
}
package fr.sewatech.vertx.common;
import java.lang.management.ManagementFactory;
public class Helper {
public static void println(Object content) {
System.out.println(message(content));
}
public static void sleep(long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static String message(Object content) {
long uptime = ManagementFactory.getRuntimeMXBean().getUptime();
String threadName = Thread.currentThread().getName();
long usedMemory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() / 1024 / 1024;
return String.format("%s - [%s] %s (%s MB)", uptime, threadName, content, usedMemory);
}
}
package fr.sewatech.vertx.count;
import fr.sewatech.vertx.common.Data;
import io.vertx.core.Vertx;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import java.util.stream.Stream;
public class BackPressureCountApplication {
private static final BackPressureCountApplication singleton = new BackPressureCountApplication();
private static final Vertx vertx = Vertx.vertx();
public static void main(String[] args) {
vertx.exceptionHandler(throwable -> {
throwable.printStackTrace();
vertx.close();
});
RunMode.from(args).main.run();
}
private void mainBounded() {
ReadStream<Data> readStream = new DataStream(vertx);
WriteStream<Data> writeStream = new PrintStream(vertx).setWriteQueueMaxSize(100);
readStream
.pipe()
.to(writeStream);
}
private void mainUnbounded() {
ReadStream<Data> readStream = new DataStream(vertx);
WriteStream<Data> writeStream = new PrintStream(vertx).setWriteQueueMaxSize(Integer.MAX_VALUE);
readStream
.pipe()
.to(writeStream);
}
private enum RunMode {
BOUNDED(BackPressureCountApplication.singleton::mainBounded),
UNBOUNDED(BackPressureCountApplication.singleton::mainUnbounded);
private final Runnable main;
RunMode(Runnable main) {
this.main = main;
}
static RunMode from(String[] args) {
return Stream.of(args)
.findFirst()
.map(text -> text.replace('-', '_'))
.map(String::toUpperCase)
.map(RunMode::valueOf)
.orElse(BOUNDED);
}
}
}
package fr.sewatech.vertx.count;
import fr.sewatech.vertx.common.Data;
import io.vertx.core.Handler;
import io.vertx.core.TimeoutStream;
import io.vertx.core.Vertx;
import io.vertx.core.streams.ReadStream;
import static fr.sewatech.vertx.common.Helper.println;
public class DataStream implements ReadStream<Data> {
private static final int MAX_COUNT = 1_000;
private final TimeoutStream periodicStream;
private Handler<Void> endHandler;
public DataStream(Vertx vertx) {
periodicStream = vertx.periodicStream(10L);
}
@Override
public synchronized DataStream fetch(long amount) {
periodicStream.fetch(amount);
return this;
}
@Override
public DataStream exceptionHandler(Handler<Throwable> handler) {
periodicStream.exceptionHandler(handler);
return this;
}
@Override
public synchronized DataStream handler(Handler<Data> handler) {
periodicStream.handler(event -> produce(handler));
return this;
}
private void produce(Handler<Data> handler) {
Data data = new Data();
if (data.index > MAX_COUNT) {
periodicStream.cancel();
endHandler.handle(null);
} else {
println("New data: " + data.index);
handler.handle(data);
}
}
@Override
public synchronized DataStream pause() {
periodicStream.pause();
return this;
}
@Override
public synchronized DataStream resume() {
periodicStream.resume();
return this;
}
@Override
public synchronized DataStream endHandler(Handler<Void> endHandler) {
this.endHandler = endHandler;
return this;
}
}
package fr.sewatech.vertx.count;
import fr.sewatech.vertx.common.Data;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.TimeoutStream;
import io.vertx.core.Vertx;
import io.vertx.core.streams.WriteStream;
import java.util.LinkedList;
import java.util.Queue;
import static fr.sewatech.vertx.common.Helper.println;
public class PrintStream implements WriteStream<Data> {
private final Vertx vertx;
private final Queue<Data> queue = new LinkedList<>();
private int capacity = Integer.MAX_VALUE;
private final TimeoutStream periodicStream;
private boolean finished = false;
private Handler<Void> drainHandler;
public PrintStream(Vertx vertx) {
this.vertx = vertx;
periodicStream = vertx.periodicStream(50L)
.handler(event -> consume());
}
private void consume() {
if (queue.isEmpty()) {
if (finished) {
vertx.close();
}
} else {
if (queue.size() < capacity / 2 && !finished) {
drain();
}
println(queue.poll());
}
}
private void drain() {
if (drainHandler != null) {
drainHandler.handle(null);
}
}
@Override
public WriteStream<Data> exceptionHandler(Handler<Throwable> handler) {
return this;
}
@Override
public WriteStream<Data> write(Data data) {
queue.add(data);
return this;
}
@Override
public WriteStream<Data> write(Data data, Handler<AsyncResult<Void>> handler) {
queue.add(data);
return this;
}
@Override
public void end() {
this.finished = true;
println("Completed");
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
end();
}
@Override
public WriteStream<Data> setWriteQueueMaxSize(int maxSize) {
this.capacity = maxSize;
return this;
}
@Override
public boolean writeQueueFull() {
return queue.size() >= capacity;
}
@Override
public WriteStream<Data> drainHandler(Handler<Void> handler) {
drainHandler = handler;
return this;
}
}
package fr.sewatech.vertx.download;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import java.nio.file.Path;
import static fr.sewatech.vertx.common.Helper.message;
import static fr.sewatech.vertx.download.BackPressureDownloadApplication.vertx;
public interface BackPressureDownloadActions {
Handler<Throwable> EXCEPTION_HANDLER = Throwable::printStackTrace;
static void downloadRead(Path path, HttpServerRequest request) {
HttpServerResponse response = request.response();
vertx
.fileSystem()
.readFile(
path.toString(),
ar -> {
if (ar.succeeded()) {
response
.exceptionHandler(EXCEPTION_HANDLER)
.endHandler(nothing -> System.out.println(message("Done: " + path.getFileName())))
.putHeader("Content-Disposition", "attachment; filename=" + path.getFileName())
.end(ar.result());
} else {
System.err.println(message("Error: " + ar.cause().toString()));
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
}
});
}
static void downloadChunked(Path path, HttpServerRequest request, boolean backPressure) {
vertx
.fileSystem()
.open(
path.toString(),
new OpenOptions().setRead(true),
ar -> {
if (ar.succeeded()) {
downloadAsyncFileChunked(ar.result(), request, backPressure);
} else {
System.err.println(message("Error: " + ar.cause().toString()));
request.response().setStatusCode(500).end(ar.cause().toString());
}
});
}
static void downloadAsyncFileChunked(AsyncFile file, HttpServerRequest request, boolean backPressure) {
HttpServerResponse response = request.response()
.exceptionHandler(EXCEPTION_HANDLER)
.setStatusCode(200)
.setChunked(true);
file
.handler(buffer -> {
response.write(buffer);
if (backPressure && response.writeQueueFull()) {
file.pause();
response.drainHandler(nothing -> file.resume());
}
})
.endHandler(nothing -> {
response.end();
System.out.println(message("Done: " + request.path()));
})
.exceptionHandler(EXCEPTION_HANDLER);
}
static void downloadPipe(Path path, HttpServerRequest request) {
vertx
.fileSystem()
.open(
path.toString(),
new OpenOptions().setRead(true),
ar -> {
if (ar.succeeded()) {
HttpServerResponse response = request.response()
.exceptionHandler(EXCEPTION_HANDLER)
.setStatusCode(200)
.setChunked(true);
ar.result().pipeTo(
response,
arv -> {
if (arv.succeeded()) {
System.out.println(message("Done: " + request.path()));
} else {
arv.cause().printStackTrace();
}
});
} else {
System.err.println(message("Error: " + ar.cause().toString()));
request.response().setStatusCode(500).end(ar.cause().toString());
}
});
}
static void downloadSendFile(Path path, HttpServerRequest request) {
request.response()
.exceptionHandler(EXCEPTION_HANDLER)
.sendFile(
path.toString(),
ar -> {
if (ar.succeeded()) {
System.out.println(message("Done: " + request.path()));
} else {
EXCEPTION_HANDLER.handle(ar.cause());
}
}
);
}
static void notFound(HttpServerRequest request) {
System.out.println(HttpResponseStatus.NOT_FOUND);
request.response().setStatusCode(HttpResponseStatus.NOT_FOUND.code()).end();
}
}
package fr.sewatech.vertx.download;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import java.nio.file.Path;
import java.util.Optional;
import java.util.function.BiConsumer;
import static fr.sewatech.vertx.common.Helper.message;
import static fr.sewatech.vertx.download.BackPressureDownloadActions.EXCEPTION_HANDLER;
import static fr.sewatech.vertx.download.BackPressureDownloadActions.notFound;