GitLab Commit is coming up on August 3-4. Learn how to innovate together using GitLab, the DevOps platform. Register for free: gitlabcommitvirtual2021.com

Commit cfa23740 authored by Alexis Hassler's avatar Alexis Hassler
Browse files

Back-pressure example

parents
export MAVEN_OPTS=-Xmx64m
observable-sync:
mvn compile exec:java -Dexec.args=observable-sync
observable:
mvn compile exec:java -Dexec.args=observable-async
flowable:
mvn compile exec:java -Dexec.args=flowable-async
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>fr.sewatech.rx</groupId>
<artifactId>backpressure-example</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<rxjava.version>3.0.0</rxjava.version>
<junit.version>4.12</junit.version>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<exec.mainClass>fr.sewatech.rx.BackPressureApplication</exec.mainClass>
</properties>
<dependencies>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<configuration>
<mainClass>${exec.mainClass}</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
package fr.sewatech.rx;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.Semaphore;
import static fr.sewatech.rx.Helper.message;
class AppLifecycle {
private final Semaphore sem = new Semaphore(0);
void letsGo() {
// Avoid waiting for end of thread pool, esp. with Maven
System.setProperty("rx3.purge-enabled", "false");
// Handle errors, esp OOME
RxJavaPlugins.setErrorHandler(throwable -> Schedulers.shutdown());
}
void waitUntilTheEnd() {
try {
sem.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
void thisIsTheEnd() {
System.out.println(message("Completed"));
sem.release();
}
}
package fr.sewatech.rx;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.stream.Stream;
import static fr.sewatech.rx.Helper.message;
import static fr.sewatech.rx.Helper.sleep;
public class BackPressureApplication {
private static final BackPressureApplication singleton = new BackPressureApplication();
private static final AppLifecycle lifecycle = new AppLifecycle();
private static final int MAX_COUNT = 1_000;
public static void main(String[] args) {
lifecycle.letsGo();
RunMode.from(args).main.run();
lifecycle.waitUntilTheEnd();
}
private void mainSyncObservable() {
Observable
.range(0, MAX_COUNT)
.map(Data::new)
.subscribe(
this::handleNext,
this::handleError,
lifecycle::thisIsTheEnd
);
}
private void mainAsyncObservable() {
Observable
.range(0, 1_000)
.map(Data::new)
.observeOn(Schedulers.computation(), false, 32)
.subscribe(
this::handleNext,
this::handleError,
lifecycle::thisIsTheEnd
);
}
private void mainFlowable() {
Flowable
.range(0, 1_000)
.map(Data::new)
.observeOn(Schedulers.computation())
.subscribe(
this::handleNext,
this::handleError,
lifecycle::thisIsTheEnd
);
}
private void handleNext(Data data) {
sleep(data.value == 0 ? 500 : 50);
System.out.println(message(data));
}
private void handleError(Throwable throwable) {
System.err.println(message(throwable));
}
private enum RunMode {
OBSERVABLE_SYNC(BackPressureApplication.singleton::mainSyncObservable),
OBSERVABLE_ASYNC(BackPressureApplication.singleton::mainAsyncObservable),
FLOWABLE_ASYNC(BackPressureApplication.singleton::mainFlowable);
private final Runnable main;
RunMode(Runnable main) {
this.main = main;
}
static RunMode from(String[] args) {
return Stream.of(args)
.findFirst()
.map(text -> text.replace('-', '_'))
.map(String::toUpperCase)
.map(RunMode::valueOf)
.orElse(OBSERVABLE_SYNC);
}
}
}
package fr.sewatech.rx;
import static fr.sewatech.rx.Helper.message;
import static fr.sewatech.rx.Helper.sleep;
class Data {
final Byte[] bytes;
final Integer value;
Data(Integer value) {
this.value = value;
System.out.println(message("New data: " + value));
bytes = new Byte[50_000];
sleep(10);
}
@Override
public String toString() {
return "Handled: " + value;
}
}
package fr.sewatech.rx;
import java.lang.management.ManagementFactory;
public class Helper {
static String message(Object content) {
long uptime = ManagementFactory.getRuntimeMXBean().getUptime();
String threadName = Thread.currentThread().getName();
long usedMemory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() / 1024 / 1024;
return String.format("%s - [%s] %s (%s MB)", uptime, threadName, content, usedMemory);
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment