Commit 937f9ff1 authored by Georg Mittendorfer's avatar Georg Mittendorfer

Remove pre 1.5.5 compatibility for gTTA errors. Add rate limit weights that...

Remove pre 1.5.5 compatibility for gTTA errors. Add rate limit weights that count successful requests several times (mainly for retried gTTA requests with status 400 that should not get limited too much).
parent b7c9b738
Pipeline #46450872 passed with stage
in 3 minutes and 8 seconds
......@@ -26,7 +26,7 @@ import com.mio.piri.nodes.selection.*;
import com.mio.piri.service.CommandChecker;
import com.mio.piri.service.validation.*;
import com.mio.piri.tolerance.CircuitBreakerFactory;
import com.mio.piri.tolerance.RateLimiterRegistry;
import com.mio.piri.tolerance.RateLimitersRegistry;
import com.mio.piri.util.ClientSessionExtractor;
import com.mio.piri.util.UrlValidator;
import io.micrometer.core.instrument.MeterRegistry;
......@@ -58,26 +58,14 @@ public class PiriConfiguration {
return new MeterFactory(registry);
}
// @Bean
// public ReactorResourceFactory reactorResourceFactory() {
// ConnectionProvider connectionProvider = ConnectionProvider.fixed("piri-fixed");
// ConnectionProvider connectionProvider = ConnectionProvider.elastic("piri-elastic");
// ConnectionProvider connectionProvider = ConnectionProvider.newConnection();
// LoggerFactory.getLogger(getClass()).info("Set http connection provider: [{}}", connectionProvider);
// HttpResources.set(connectionProvider);
// not working with global resource sharing:
// resourceFactory.setConnectionProviderSupplier(() -> ConnectionProvider.fixed("piri-webflux"));
// return new ReactorResourceFactory();
// }
@Bean
public CircuitBreakerFactory circuitBreakerFactory(MeterFactory meterFactory, Environment env) {
return new CircuitBreakerFactory(meterFactory, env);
}
@Bean
public RateLimiterRegistry rateLimiterFactory(Environment env, MeterFactory meterFactory) {
return new RateLimiterRegistry(env, meterFactory);
public RateLimitersRegistry rateLimiterFactory(Environment env, MeterFactory meterFactory) {
return new RateLimitersRegistry(env, meterFactory);
}
@Bean
......
......@@ -20,8 +20,6 @@
package com.mio.piri.commands;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.mio.piri.commands.response.GetTransactionsToApproveErrorHandler;
import com.mio.piri.commands.response.ResponseErrorHandler;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
......@@ -48,8 +46,4 @@ public class GetTransactionsToApprove extends IriCommand {
@JsonInclude(value=NON_EMPTY)
private String reference;
@Override
public ResponseErrorHandler getResponseErrorHandler() {
return new GetTransactionsToApproveErrorHandler();
}
}
......@@ -60,6 +60,9 @@ import java.util.List;
})
public abstract class IriCommand {
@JsonIgnore
private static final ResponseErrorHandler DEFAULT_ERROR_HANDLER = new DefaultErrorHandler();
@Null
@JsonIgnore
private String sessionId;
......@@ -96,7 +99,7 @@ public abstract class IriCommand {
@JsonIgnore
public ResponseErrorHandler getResponseErrorHandler() {
return new DefaultErrorHandler();
return IriCommand.DEFAULT_ERROR_HANDLER;
}
}
/*
* Copyright (c) 2019.
*
* This file is part of Piri.
*
* Piri is free software: you can redistribute it and/or modify it under
* the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* Piri is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public
* License along with Piri. If not, see <http://www.gnu.org/licenses/>.
*/
package com.mio.piri.commands.response;
import org.springframework.http.HttpStatus;
public class GetTransactionsToApproveErrorHandler extends DefaultErrorHandler {
@Override
public boolean isCriticalError(HttpStatus httpStatus) {
// for IRI 1.5.4 and before gTTA might return a 5xx error that needs to be returned and is not critical
return super.isCriticalError(httpStatus) && httpStatus != HttpStatus.INTERNAL_SERVER_ERROR;
}
}
......@@ -26,7 +26,7 @@ import com.mio.piri.metrics.MeterFactory;
import com.mio.piri.nodes.Node;
import com.mio.piri.nodes.selection.NodeSelector;
import com.mio.piri.service.validation.DelegatingValidator;
import com.mio.piri.tolerance.RateLimiterRegistry;
import com.mio.piri.tolerance.RateLimitersRegistry;
import com.mio.piri.util.ClientSessionExtractor;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
......@@ -77,7 +77,7 @@ public class IriApiHandler {
private ClientSessionExtractor session;
@Autowired
private RateLimiterRegistry rateLimiterRegistry;
private RateLimitersRegistry rateLimiterRegistry;
@Autowired
private NodeSelector nodeSelector;
......@@ -144,10 +144,16 @@ public class IriApiHandler {
.map(responseEntity -> {
HttpStatus httpStatus = responseEntity.getStatusCode();
// TODO add error handling for unsynced get node info
if (httpStatus.isError() && command.getResponseErrorHandler().isCriticalError(httpStatus)) {
throw WebClientResponseException.create(httpStatus.value(), httpStatus.getReasonPhrase(), responseEntity.getHeaders(),
responseEntity.getBody() != null ? responseEntity.getBody().getBytes() : null,
Charset.forName("UTF-8"));
if (httpStatus.isError()) {
if (command.getResponseErrorHandler().isCriticalError(httpStatus)) {
throw WebClientResponseException.create(httpStatus.value(), httpStatus.getReasonPhrase(), responseEntity.getHeaders(),
responseEntity.getBody() != null ? responseEntity.getBody().getBytes() : null,
Charset.forName("UTF-8"));
}
} else {
// successful requests might count multiple times. this is a workaround for not being able
// to extend the rate limit dynamically within one rate limiting period.
rateLimiterRegistry.applyRateLimitWeight(command.getIp(), command.getCommand());
}
return responseEntity;
});
......
......@@ -43,13 +43,14 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public class RateLimiterRegistry {
// class name is plural because resilience4j has conflicting class name
public class RateLimitersRegistry {
private static final String NAME_IP_COMMAND = "%s-%s";
private static final String NAME_IP = "%s";
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final int DEFAULT_LIMIT = 10;
private static final int DEFAULT_LIMIT = 50;
private static final int DEFAULT_MAX_RATE_LIMITERS_COUNT = 1000;
private static final String NAME_MUST_NOT_BE_NULL = "Name must not be null";
......@@ -57,6 +58,7 @@ public class RateLimiterRegistry {
private final RateLimiterConfig ipRateLimiterConfig;
private final Map<String, RateLimiterConfig> rateLimitedCommandsConfig = new HashMap<>();
private final Map<String, Integer> commandsSuccessWeight = new HashMap<>();
private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<>();
......@@ -68,7 +70,7 @@ public class RateLimiterRegistry {
private final RateLimiter global;
public RateLimiterRegistry(Environment env, MeterFactory meterFactory) {
public RateLimitersRegistry(Environment env, MeterFactory meterFactory) {
ipRateLimiterConfig = RateLimiterConfig.custom()
.limitRefreshPeriod(getRefreshPeriod(env, "ip"))
.limitForPeriod(getLimitPerPeriod(env, "ip"))
......@@ -91,6 +93,10 @@ public class RateLimiterRegistry {
.timeoutDuration(getTimeoutDuration(env, "ip." + str))
.build()
));
// get maximum dynamic limit (limit that can be increased, e.g. for ignoring erroneous responses).
Arrays.stream(getRateLimitedCommands(env)).forEach(str -> commandsSuccessWeight.put(str,
getRateLimitWeight(env, "ip." + str)));
meterFactory.createRateLimiterCountGauge(rateLimiters);
String noLimitPattern = StringUtils.stripToNull(env.getProperty("piri.rate.ip.unlimited.pattern"));
......@@ -115,6 +121,26 @@ public class RateLimiterRegistry {
return rateLimiter(limiterSpecification._1, limiterSpecification._2);
}
public void applyRateLimitWeight(final String ip, final String command) {
if (isCommandRateLimited(command) && commandsSuccessWeight.getOrDefault(command, 1) > 1) {
RateLimiter rateLimiter = rateLimiter(ip, command);
int weight = commandsSuccessWeight.get(command);
int processed = 1;
long waitTime = 0;
while (processed < weight && waitTime >= 0) {
// negative wait time means there is no reservation possible
waitTime = rateLimiter.reservePermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration());
processed++;
if (waitTime != 0) {
logger.debug("Reservation wait time for rate limiter [{}]: [{}] nanos.", rateLimiter.getName(), waitTime);
}
}
logger.debug("Applied rate limit weight of [{}] for rate limiter [{}].", processed, rateLimiter.getName());
} // else do nothing
}
public boolean isIpRateLimited(String ip) {
// rate limit if no exception defined or ip does not match pattern
return noRateLimitForPattern == null || StringUtils.isBlank(ip) || !noRateLimitForPattern.matcher(ip).matches();
......@@ -177,6 +203,12 @@ public class RateLimiterRegistry {
return value;
}
private int getRateLimitWeight(Environment env, String type) {
int value = getInt(env, "piri.rate." + type + ".weight", 1);
logger.info("Configured [{}] weight: [{}].", type, value);
return value;
}
private int getMaxRateLimitersCount(Environment env) {
int value = getInt(env, "piri.rate.cleanup.max.count", DEFAULT_MAX_RATE_LIMITERS_COUNT);
logger.info("Configured [ip] rate limiters cache size: [{}].", value);
......
......@@ -46,9 +46,10 @@ piri.rate.ip.cleanup.idle=10m
# You can configure rate limits per ip and command.
# piri.rate.ip.limited.commands=getTransactionsToApprove
# piri.rate.ip.getTransactionsToApprove.limit=2
# piri.rate.ip.getTransactionsToApprove.period=2s
# piri.rate.ip.getTransactionsToApprove.timeout=2s
# piri.rate.ip.getTransactionsToApprove.limit=10
# optional: piri.rate.ip.getTransactionsToApprove.weight=5
# piri.rate.ip.getTransactionsToApprove.period=5s
# piri.rate.ip.getTransactionsToApprove.timeout=5s
# circuit breaker config
piri.circuit.open.duration=120s
......@@ -151,6 +152,7 @@ iri.allow.checkConsistency=true
# Switch on debug logging
# logging.level.root=DEBUG
# logging.level.com.mio.piri=DEBUG
# logging.level.com.mio.piri.tolerance=DEBUG
# Next line is needed for logging network packages. Add client and or server log in addition to this.
# logging.level.reactor.ipc.netty.channel.ContextHandler=debug
# web client logs (node communication)
......
/*
* Copyright (c) 2019.
*
* This file is part of Piri.
*
* Piri is free software: you can redistribute it and/or modify it under
* the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* Piri is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public
* License along with Piri. If not, see <http://www.gnu.org/licenses/>.
*/
package com.mio.piri.commands;
import com.mio.piri.commands.response.GetTransactionsToApproveErrorHandler;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class GetTransactionsToApproveTest {
private final GetTransactionsToApprove gTTA = new GetTransactionsToApprove();
@Test
public void whenGetResponseErrorHandlerThenReturnCommandSpecific() {
assertThat(gTTA.getResponseErrorHandler()).isInstanceOf(GetTransactionsToApproveErrorHandler.class);
}
}
/*
* Copyright (c) 2019.
*
* This file is part of Piri.
*
* Piri is free software: you can redistribute it and/or modify it under
* the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* Piri is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
* the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public
* License along with Piri. If not, see <http://www.gnu.org/licenses/>.
*/
package com.mio.piri.commands.response;
import org.junit.Test;
import org.springframework.http.HttpStatus;
import static org.assertj.core.api.Assertions.assertThat;
public class GetTransactionsToApproveErrorHandlerTest {
private final ResponseErrorHandler errorHandler = new GetTransactionsToApproveErrorHandler();
@Test
public void givenNoErrorWhenIsCriticalErrorThenFalse() {
assertThat(errorHandler.isCriticalError(HttpStatus.OK)).isFalse();
assertThat(errorHandler.isCriticalError(HttpStatus.BAD_REQUEST)).isFalse();
// special case for old IRI 1.5.4 behaviour
assertThat(errorHandler.isCriticalError(HttpStatus.INTERNAL_SERVER_ERROR)).isFalse();
}
@Test
public void givenErrorWhenIsCriticalErrorThenTrue() {
assertThat(errorHandler.isCriticalError(HttpStatus.TOO_MANY_REQUESTS)).isTrue();
assertThat(errorHandler.isCriticalError(HttpStatus.NOT_FOUND)).isTrue();
assertThat(errorHandler.isCriticalError(HttpStatus.FORBIDDEN)).isTrue();
assertThat(errorHandler.isCriticalError(HttpStatus.UNAUTHORIZED)).isTrue();
assertThat(errorHandler.isCriticalError(HttpStatus.SERVICE_UNAVAILABLE)).isTrue();
}
}
\ No newline at end of file
......@@ -182,7 +182,8 @@ public class IriApiHandlerCommandsSysIT {
@Test
public void givenBroadcastTransactionsWhenPostThenOk() {
ResponseTrytes responseTrytes = postCommandExpectOk(getTrytes("LAZZPOJXCVBKLYJJNMWC9IYPYBKBCUVEDMJOCJZZRSCPEIWA9TVQLCXMVWGSYMVDMSI9LSMKH9DMZ9999"), ResponseTrytes.class);
ResponseHashes tips = postCommandExpectOk(command("getTips"), ResponseHashes.class); // get random transactions
ResponseTrytes responseTrytes = postCommandExpectOk(getTrytes(tips.getHashes().get(0)), ResponseTrytes.class); // convert to trytes
String trytes = responseTrytes.getTrytes().get(0);
String result = postCommandExpectOk(broadcastTransactions(trytes));
assertTrue(result, result.contains("duration"));
......@@ -190,7 +191,8 @@ public class IriApiHandlerCommandsSysIT {
@Test
public void givenStoreTransactionsWhenPostThenOk() {
ResponseTrytes responseTrytes = postCommandExpectOk(getTrytes("LAZZPOJXCVBKLYJJNMWC9IYPYBKBCUVEDMJOCJZZRSCPEIWA9TVQLCXMVWGSYMVDMSI9LSMKH9DMZ9999"), ResponseTrytes.class);
ResponseHashes tips = postCommandExpectOk(command("getTips"), ResponseHashes.class); // get random transactions
ResponseTrytes responseTrytes = postCommandExpectOk(getTrytes(tips.getHashes().get(0)), ResponseTrytes.class); // convert to trytes
String trytes = responseTrytes.getTrytes().get(0);
String result = postCommandExpectOk(storeTransactions(trytes));
assertTrue(result, result.contains("duration"));
......
......@@ -20,6 +20,9 @@
package com.mio.piri.service;
import com.mio.piri.commands.GetNodeInfo;
import com.mio.piri.commands.GetTips;
import com.mio.piri.commands.GetTrytes;
import com.mio.piri.util.TestCommandFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
......@@ -29,7 +32,20 @@ import org.springframework.http.HttpStatus;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(properties = {"piri.rate.global.limit=1", "piri.rate.global.timeout=1ms", "piri.rate.global.period=1s", "iri.nodes="}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@SpringBootTest(properties = {
// "piri.rate.global.limit=1",
// "piri.rate.global.timeout=1ms",
// "piri.rate.global.period=1s",
"piri.rate.ip.limited.commands=getTips,getTrytes,getNodeInfo",
"piri.rate.ip.getNodeInfo.limit=1",
"piri.rate.ip.getNodeInfo.period=1m",
"piri.rate.ip.getTips.limit=2",
"piri.rate.ip.getTips.weight=3",
"piri.rate.ip.getTips.period=1h",
"piri.rate.ip.getTrytes.limit=4",
"piri.rate.ip.getTrytes.weight=2",
"piri.rate.ip.getTrytes.period=1h",
"iri.nodes="}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class IriApiHandlerRateLimitIT extends AbstractIriApiHandlerTest {
private final Logger logger = LoggerFactory.getLogger(getClass());
......@@ -40,6 +56,10 @@ public class IriApiHandlerRateLimitIT extends AbstractIriApiHandlerTest {
.setHeader("Content-Type", "application/json")
.setBody(syncedNodeInfo));
prepareResponse(response -> response
.setHeader("Content-Type", "application/json")
.setBody(syncedNodeInfo));
util.postCommand(new GetNodeInfo()).expectStatus().isOk(); // rate limit not reached yet
String responseBody = util.postCommand(new GetNodeInfo())
......@@ -49,4 +69,61 @@ public class IriApiHandlerRateLimitIT extends AbstractIriApiHandlerTest {
logger.debug(responseBody);
}
@Test
public void givenBadRequestWhenExchangeThenCountWithoutWeight() {
prepareResponse(response -> response
.setHeader("Content-Type", "application/json")
.setResponseCode(400)
.setBody(syncedNodeInfo));
prepareResponse(response -> response
.setHeader("Content-Type", "application/json")
.setResponseCode(400)
.setBody(syncedNodeInfo));
// to make sure 429 is not triggered in retry because of timeout
prepareResponse(response -> response
.setHeader("Content-Type", "application/json")
.setResponseCode(400)
.setBody(syncedNodeInfo));
GetTips command = TestCommandFactory.getTips();
util.postCommand(command).expectStatus().isEqualTo(HttpStatus.BAD_REQUEST); // rate limit not reached yet
util.postCommand(command).expectStatus().isEqualTo(HttpStatus.BAD_REQUEST); // rate limit not reached yet
String responseBody = util.postCommand(command)
.expectStatus().isEqualTo(HttpStatus.TOO_MANY_REQUESTS)
.expectBody(String.class).returnResult().getResponseBody();
logger.debug(responseBody);
}
@Test
public void givenOkWhenExchangeThenCountWithWeight() {
prepareResponse(response -> response
.setHeader("Content-Type", "application/json")
.setBody(syncedNodeInfo));
prepareResponse(response -> response
.setHeader("Content-Type", "application/json")
.setBody(syncedNodeInfo));
// to make sure 429 is not triggered in retry because of timeout
prepareResponse(response -> response
.setHeader("Content-Type", "application/json")
.setBody(syncedNodeInfo));
GetTrytes command = TestCommandFactory.getTrytes("blah");
util.postCommand(command).expectStatus().isEqualTo(HttpStatus.OK); // rate limit not reached yet
util.postCommand(command).expectStatus().isEqualTo(HttpStatus.OK); // rate limit not reached yet
String responseBody = util.postCommand(command)
.expectStatus().isEqualTo(HttpStatus.TOO_MANY_REQUESTS)
.expectBody(String.class).returnResult().getResponseBody();
logger.debug(responseBody);
}
}
\ No newline at end of file
......@@ -25,7 +25,7 @@ import com.mio.piri.exceptions.NoNodeAvailable;
import com.mio.piri.metrics.MeterFactory;
import com.mio.piri.nodes.IriNode;
import com.mio.piri.nodes.selection.NodeSelector;
import com.mio.piri.tolerance.RateLimiterRegistry;
import com.mio.piri.tolerance.RateLimitersRegistry;
import com.mio.piri.util.ClientSessionExtractor;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
......@@ -49,6 +49,8 @@ import reactor.test.StepVerifier;
import java.time.Duration;
import static io.github.resilience4j.ratelimiter.RateLimiterRegistry.of;
import static io.github.resilience4j.ratelimiter.RateLimiterRegistry.ofDefaults;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;
......@@ -68,7 +70,7 @@ public class IriApiHandlerTest {
private ClientSessionExtractor sessionReader;
@Mock
private RateLimiterRegistry rateLimiterRegistry;
private RateLimitersRegistry rateLimiterRegistry;
@InjectMocks
private final IriApiHandler iriApiHandler = new IriApiHandler();
......@@ -88,8 +90,8 @@ public class IriApiHandlerTest {
@Mock
private ResponseErrorHandler responseErrorHandler;
private final RateLimiter dummyLimiter = io.github.resilience4j.ratelimiter.RateLimiterRegistry.ofDefaults().rateLimiter("test-rate-limiter");
private final RateLimiter dummyGlobalLimiter = io.github.resilience4j.ratelimiter.RateLimiterRegistry.ofDefaults().rateLimiter("test-global-limiter");
private final RateLimiter dummyLimiter = ofDefaults().rateLimiter("test-rate-limiter");
private final RateLimiter dummyGlobalLimiter = ofDefaults().rateLimiter("test-global-limiter");
private final MeterRegistry simpleMeterRegistry = new SimpleMeterRegistry();
private final Counter rejectedCounter = simpleMeterRegistry.counter("test-rejected-counter");
private final Counter failedCounter = simpleMeterRegistry.counter("test-error-counter");
......@@ -162,7 +164,7 @@ public class IriApiHandlerTest {
@Test
public void givenRateLimitTriggeredWhenExchangeThenReturnRateLimitReached() {
RateLimiter triggered = io.github.resilience4j.ratelimiter.RateLimiterRegistry.of(RateLimiterConfig.custom()
RateLimiter triggered = of(RateLimiterConfig.custom()
.limitForPeriod(2)
.limitRefreshPeriod(Duration.ofHours(1))
.timeoutDuration(Duration.ofMillis(100)) // duration for requests to wait until RequestNotPermitted
......@@ -201,7 +203,7 @@ public class IriApiHandlerTest {
@Test
public void givenLongTimeOutWhenRateLimitThenDoNotThrow() {
RateLimiter triggered = io.github.resilience4j.ratelimiter.RateLimiterRegistry.of(RateLimiterConfig.custom()
RateLimiter triggered = of(RateLimiterConfig.custom()
.limitForPeriod(1)
.limitRefreshPeriod(Duration.ofMillis(500))
.timeoutDuration(Duration.ofMillis(500))
......@@ -223,7 +225,7 @@ public class IriApiHandlerTest {
}
@Test
public void givenRateLimitExceptionWhenExchangeThenIgnoreRateLimit() {
public void givenIpExcludedFromLimitWhenExchangeThenIgnoreRateLimit() {
when(sessionReader.getClientIp(httpRequest)).thenReturn("some-ip");
when(rateLimiterRegistry.isIpRateLimited("some-ip")).thenReturn(false);
when(node.call(any(IriCommand.class))).thenReturn(Mono.just(response));
......@@ -307,4 +309,28 @@ public class IriApiHandlerTest {
verify(node, times(2)).call(any(IriCommand.class));
}
@Test
public void givenSuccessWhenExchangeThenApplyRateLimitWeight() {
when(command.getIp()).thenReturn("some-ip");
when(response.getStatusCode()).thenReturn(HttpStatus.OK);
when(node.call(any(IriCommand.class))).thenReturn(Mono.just(response));
Mono<ResponseEntity<String>> responseMono = iriApiHandler.exchangeWithIri(command, httpRequest).log();
StepVerifier.create(responseMono)
.expectNext(response)
.verifyComplete();
verify(rateLimiterRegistry).applyRateLimitWeight("some-ip", "test-command");
}
@Test
public void givenErrorWhenExchangeThenDoNotApplyRateLimitWeight() {
when(command.getIp()).thenReturn("some-ip");
when(response.getStatusCode()).thenReturn(HttpStatus.BAD_REQUEST);
when(node.call(any(IriCommand.class))).thenReturn(Mono.just(response));
Mono<ResponseEntity<String>> responseMono = iriApiHandler.exchangeWithIri(command, httpRequest).log();
StepVerifier.create(responseMono)
.expectNext(response)
.verifyComplete();
verify(rateLimiterRegistry, never()).applyRateLimitWeight(anyString(), anyString());
}
}
\ No newline at end of file
......@@ -24,17 +24,18 @@ import io.github.resilience4j.ratelimiter.RateLimiter;
import org.junit.Test;
import org.springframework.core.env.Environment;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;
public class RateLimiterRegistryTest {
public class RateLimitersRegistryTest {
private final Environment env = mock(Environment.class);
private final MeterFactory meterFactory = mock(MeterFactory.class);
private final RateLimiterRegistry registry = new RateLimiterRegistry(env, meterFactory);
private final RateLimitersRegistry registry = new RateLimitersRegistry(env, meterFactory);
@Test
public void whenGlobalRateLimiterThenReturnSame() {
......@@ -50,7 +51,7 @@ public class RateLimiterRegistryTest {
public void whenRateLimiterThenCleanUpCache() throws InterruptedException {
when(env.getProperty("piri.rate.cleanup.max.count", Integer.class)).thenReturn(1);
when(env.getProperty("piri.rate.cleanup.idle")).thenReturn("50ms");
RateLimiterRegistry registry = new RateLimiterRegistry(env, meterFactory);
RateLimitersRegistry registry = new RateLimitersRegistry(env, meterFactory);
RateLimiter limiter = registry.rateLimiter("foo", null);
TimeUnit.MILLISECONDS.sleep(100);
......@@ -61,7 +62,7 @@ public class RateLimiterRegistryTest {
@Test
public void givenRateLimitedCommandWhenRateLimiterThenReturnSpecificLimiter() {
when(env.getProperty("piri.rate.ip.limited.commands")).thenReturn("foo-command");
RateLimiterRegistry registry = new RateLimiterRegistry(env, meterFactory);
RateLimitersRegistry registry = new RateLimitersRegistry(env, meterFactory);
RateLimiter limiter = registry.rateLimiter("foo", "foo-command");
RateLimiter another = registry.rateLimiter("foo", "bar-command");
......@@ -72,7 +73,7 @@ public class RateLimiterRegistryTest {
@Test
public void givenUnlimitedCommandWhenRateLimiterThenReturnSameLimiter() {
when(env.getProperty("piri.rate.ip.limited.commands")).thenReturn("some-command");
RateLimiterRegistry registry = new RateLimiterRegistry(env, meterFactory);
RateLimitersRegistry registry = new RateLimitersRegistry(env, meterFactory);
RateLimiter limiter = registry.rateLimiter("foo", "foo-command");
assertThat(limiter).isSameAs(registry.rateLimiter("foo", "bar-command"));
......@@ -85,7 +86,7 @@ public class RateLimiterRegistryTest {
@Test
public void givenNoConfigWhenIsRateLimitedIpThenReturnTrue() {
RateLimiterRegistry registry = new RateLimiterRegistry(env, meterFactory);
RateLimitersRegistry registry = new RateLimitersRegistry(env, meterFactory);
assertThat(registry.isIpRateLimited("foo")).isTrue();
assertThat(registry.isIpRateLimited(null)).isTrue();
}
......@@ -93,7 +94,7 @@ public class RateLimiterRegistryTest {
@Test
public void givenNoMatchWhenIsRateLimitedIpThenReturnTrue() {
when(env.getProperty("piri.rate.ip.unlimited.pattern")).thenReturn("foo");
RateLimiterRegistry registry = new RateLimiterRegistry(env, meterFactory);
RateLimitersRegistry registry = new RateLimitersRegistry(env, meterFactory);
assertThat(registry.isIpRateLimited("bar")).isTrue();
assertThat(registry.isIpRateLimited(null)).isTrue();
}
......@@ -101,8 +102,40 @@ public class RateLimiterRegistryTest {
@Test
public void givenMatchWhenIsRateLimitedIpThenReturnFalse() {
when(env.getProperty("piri.rate.ip.unlimited.pattern")).thenReturn("foo");
RateLimiterRegistry registry = new RateLimiterRegistry(env, meterFactory);
RateLimitersRegistry registry = new RateLimitersRegistry(env, meterFactory);
assertThat(registry.isIpRateLimited("foo")).isFalse();
}
@Test
public void whenApplyRateLimitWeightThenDecreaseLimit() {
when(env.getProperty("piri.rate.ip.limited.commands")).thenReturn("some");
when(env.getProperty("piri.rate.ip.some.weight", Integer.class)).thenReturn(55);
when(env.getProperty("piri.rate.ip.some.timeout")).thenReturn("1ms");
RateLimitersRegistry registry = new RateLimitersRegistry(env, meterFactory);
RateLimiter limiter = registry.rateLimiter("foo", "some");
limiter.getPermission(Duration.ofNanos(0)); // one for the call
registry.applyRateLimitWeight("foo", "some"); // decreases by 49 (one
assertThat(limiter.getMetrics().getAvailablePermissions()).isZero();
}
@Test
public void givenWaitTimeWhenApplyRateLimitWeightThenDecreaseLimitForNextPeriod() {
when(env.getProperty("piri.rate.ip.limited.commands")).thenReturn("some");
when(env.getProperty("piri.rate.ip.some.weight", Integer.class)).thenReturn(53);
RateLimitersRegistry registry = new RateLimitersRegistry(env, meterFactory);
RateLimiter limiter = registry.rateLimiter("foo", "some");
limiter.getPermission(Duration.ofNanos(0)); // one for the call
registry.applyRateLimitWeight("foo", "some"); // decrease weight
assertThat(limiter.getMetrics().getAvailablePermissions()).isEqualTo(-3); // 3 reservations waiting
}
@Test
public void givenNoWeightConfigWhenApplyRateLimitWeightThenDoNotDecrease() {
RateLimitersRegistry registry = new RateLimitersRegistry(env, meterFactory);
RateLimiter limiter = registry.rateLimiter("foo", "some");
int limitForPeriod = limiter.getRateLimiterConfig().getLimitForPeriod();
registry.applyRateLimitWeight("foo", "some");
assertThat(limiter.getMetrics().getAvailablePermissions()).isEqualTo(limitForPeriod);
}
}
\ No newline at end of file
......@@ -81,6 +81,12 @@ public class TestCommandFactory {
return gt;
}
public static GetTips getTips() {
GetTips gt = new GetTips();
gt.setCommand("getTips");
return gt;
}
public static GetInclusionStates getInclusionStates(String tx, String tip) {
GetInclusionStates gis = new GetInclusionStates();
gis.setCommand("getInclusionStates");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment