Init to bean components

This commit is contained in:
Sambo Chea 2020-07-11 21:17:44 +07:00
parent a26812a6c2
commit 45b6f104fb
5 changed files with 70 additions and 19 deletions

View File

@ -0,0 +1,31 @@
package com.cubetiqs.wsclient
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.web.reactive.function.client.WebClient
@Configuration
class ClientConfig {
@Bean
@ConditionalOnMissingBean
fun webClient(): WebClient {
return WebClient.builder().build()
}
@Bean
fun webClientStockClient(webClient: WebClient): WebClientStockClient {
return WebClientStockClient(webClient)
}
@Bean
fun stockClient(requester: RSocketRequester): RSocketStockClient {
return RSocketStockClient(requester)
}
@Bean
fun createRequester(builder: RSocketRequester.Builder): RSocketRequester {
return builder.connectTcp("localhost", 7000).block() ?: throw Exception("rsocket requester not loaded")
}
}

View File

@ -0,0 +1,14 @@
package com.cubetiqs.wsclient
import org.springframework.messaging.rsocket.RSocketRequester
import reactor.core.publisher.Flux
class RSocketStockClient(
private val requester: RSocketRequester
) : StockClient {
override fun priceFor(symbol: String): Flux<StockPrice> {
return requester.route("stockPrices")
.data(symbol)
.retrieveFlux(StockPrice::class.java)
}
}

View File

@ -1,25 +1,7 @@
package com.cubetiqs.wsclient
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
@Service
interface StockClient {
fun priceFor(symbol: String): Flux<StockPrice>
}
@Service
class RSocketStockClient (
private val requesterBuilder: RSocketRequester.Builder
) : StockClient {
private fun createRSocketRequester(): RSocketRequester {
return requesterBuilder.connectTcp("localhost", 7000).block() ?: throw Exception("requester not loaded")
}
override fun priceFor(symbol: String): Flux<StockPrice> {
return createRSocketRequester().route("stockPrices")
.data(symbol)
.retrieveFlux(StockPrice::class.java)
}
}

View File

@ -0,0 +1,19 @@
package com.cubetiqs.wsclient
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux
import reactor.kotlin.extra.retry.retryExponentialBackoff
import java.time.Duration
class WebClientStockClient(val webClient: WebClient) : StockClient {
override fun priceFor(symbol: String): Flux<StockPrice> {
return webClient.get()
.uri("http://localhost:8080/stocks/{symbol}", symbol)
.retrieve()
.bodyToFlux(StockPrice::class.java)
.retryExponentialBackoff(5, Duration.ofSeconds(10), Duration.ofSeconds(20))
.doOnError {
println(it.message)
}
}
}

View File

@ -5,6 +5,7 @@ import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import reactor.core.publisher.Flux
import reactor.test.StepVerifier
@SpringBootTest
class RSocketClientApplicationTests @Autowired constructor(
@ -18,6 +19,10 @@ class RSocketClientApplicationTests @Autowired constructor(
val fivePrices = prices.take(5)
Assertions.assertEquals(5, fivePrices.count().block())
Assertions.assertEquals("USD", fivePrices.blockFirst()?.symbol)
}
StepVerifier.create(prices.take(2))
.expectNextMatches { it.symbol == "USD" }
.expectNextMatches { it.symbol == "USD" }
.verifyComplete()
}
}