From 45b6f104fb086b5844830017a139d34c9dd503a5 Mon Sep 17 00:00:00 2001 From: Sambo Chea Date: Sat, 11 Jul 2020 21:17:44 +0700 Subject: [PATCH] Init to bean components --- .../com/cubetiqs/wsclient/ClientConfig.kt | 31 +++++++++++++++++++ .../cubetiqs/wsclient/RSocketStockClient.kt | 14 +++++++++ .../com/cubetiqs/wsclient/StockClient.kt | 18 ----------- .../cubetiqs/wsclient/WebClientStockClient.kt | 19 ++++++++++++ .../wsclient/RSocketClientApplicationTests.kt | 7 ++++- 5 files changed, 70 insertions(+), 19 deletions(-) create mode 100644 src/main/kotlin/com/cubetiqs/wsclient/ClientConfig.kt create mode 100644 src/main/kotlin/com/cubetiqs/wsclient/RSocketStockClient.kt create mode 100644 src/main/kotlin/com/cubetiqs/wsclient/WebClientStockClient.kt diff --git a/src/main/kotlin/com/cubetiqs/wsclient/ClientConfig.kt b/src/main/kotlin/com/cubetiqs/wsclient/ClientConfig.kt new file mode 100644 index 0000000..215a47b --- /dev/null +++ b/src/main/kotlin/com/cubetiqs/wsclient/ClientConfig.kt @@ -0,0 +1,31 @@ +package com.cubetiqs.wsclient + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.rsocket.RSocketRequester +import org.springframework.web.reactive.function.client.WebClient + +@Configuration +class ClientConfig { + @Bean + @ConditionalOnMissingBean + fun webClient(): WebClient { + return WebClient.builder().build() + } + + @Bean + fun webClientStockClient(webClient: WebClient): WebClientStockClient { + return WebClientStockClient(webClient) + } + + @Bean + fun stockClient(requester: RSocketRequester): RSocketStockClient { + return RSocketStockClient(requester) + } + + @Bean + fun createRequester(builder: RSocketRequester.Builder): RSocketRequester { + return builder.connectTcp("localhost", 7000).block() ?: throw Exception("rsocket requester not loaded") + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/cubetiqs/wsclient/RSocketStockClient.kt b/src/main/kotlin/com/cubetiqs/wsclient/RSocketStockClient.kt new file mode 100644 index 0000000..a68530f --- /dev/null +++ b/src/main/kotlin/com/cubetiqs/wsclient/RSocketStockClient.kt @@ -0,0 +1,14 @@ +package com.cubetiqs.wsclient + +import org.springframework.messaging.rsocket.RSocketRequester +import reactor.core.publisher.Flux + +class RSocketStockClient( + private val requester: RSocketRequester +) : StockClient { + override fun priceFor(symbol: String): Flux { + return requester.route("stockPrices") + .data(symbol) + .retrieveFlux(StockPrice::class.java) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/cubetiqs/wsclient/StockClient.kt b/src/main/kotlin/com/cubetiqs/wsclient/StockClient.kt index 18d6e84..900d690 100644 --- a/src/main/kotlin/com/cubetiqs/wsclient/StockClient.kt +++ b/src/main/kotlin/com/cubetiqs/wsclient/StockClient.kt @@ -1,25 +1,7 @@ package com.cubetiqs.wsclient -import org.springframework.messaging.rsocket.RSocketRequester -import org.springframework.stereotype.Service import reactor.core.publisher.Flux -@Service interface StockClient { fun priceFor(symbol: String): Flux -} - -@Service -class RSocketStockClient ( - private val requesterBuilder: RSocketRequester.Builder -) : StockClient { - private fun createRSocketRequester(): RSocketRequester { - return requesterBuilder.connectTcp("localhost", 7000).block() ?: throw Exception("requester not loaded") - } - - override fun priceFor(symbol: String): Flux { - return createRSocketRequester().route("stockPrices") - .data(symbol) - .retrieveFlux(StockPrice::class.java) - } } \ No newline at end of file diff --git a/src/main/kotlin/com/cubetiqs/wsclient/WebClientStockClient.kt b/src/main/kotlin/com/cubetiqs/wsclient/WebClientStockClient.kt new file mode 100644 index 0000000..93c1809 --- /dev/null +++ b/src/main/kotlin/com/cubetiqs/wsclient/WebClientStockClient.kt @@ -0,0 +1,19 @@ +package com.cubetiqs.wsclient + +import org.springframework.web.reactive.function.client.WebClient +import reactor.core.publisher.Flux +import reactor.kotlin.extra.retry.retryExponentialBackoff +import java.time.Duration + +class WebClientStockClient(val webClient: WebClient) : StockClient { + override fun priceFor(symbol: String): Flux { + return webClient.get() + .uri("http://localhost:8080/stocks/{symbol}", symbol) + .retrieve() + .bodyToFlux(StockPrice::class.java) + .retryExponentialBackoff(5, Duration.ofSeconds(10), Duration.ofSeconds(20)) + .doOnError { + println(it.message) + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/cubetiqs/wsclient/RSocketClientApplicationTests.kt b/src/test/kotlin/com/cubetiqs/wsclient/RSocketClientApplicationTests.kt index a47b42f..483238f 100644 --- a/src/test/kotlin/com/cubetiqs/wsclient/RSocketClientApplicationTests.kt +++ b/src/test/kotlin/com/cubetiqs/wsclient/RSocketClientApplicationTests.kt @@ -5,6 +5,7 @@ import org.junit.jupiter.api.Test import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import reactor.core.publisher.Flux +import reactor.test.StepVerifier @SpringBootTest class RSocketClientApplicationTests @Autowired constructor( @@ -18,6 +19,10 @@ class RSocketClientApplicationTests @Autowired constructor( val fivePrices = prices.take(5) Assertions.assertEquals(5, fivePrices.count().block()) Assertions.assertEquals("USD", fivePrices.blockFirst()?.symbol) - } + StepVerifier.create(prices.take(2)) + .expectNextMatches { it.symbol == "USD" } + .expectNextMatches { it.symbol == "USD" } + .verifyComplete() + } }