package com.cubetiqs.live.service import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication import org.springframework.context.ApplicationEventPublisher import org.springframework.scheduling.annotation.Async import org.springframework.scheduling.annotation.EnableAsync import org.springframework.web.bind.annotation.* import reactor.core.publisher.Flux import reactor.core.publisher.Flux.generate import java.util.concurrent.ThreadLocalRandom @SpringBootApplication @EnableAsync class LiveServiceApplication fun main(args: Array) { runApplication(*args) } @RestController @RequestMapping("/api") class HelloController( eventPublisher: MessageCreatedEventPublisher, private val applicationEventPublisher: ApplicationEventPublisher, ) { private val events: Flux private val objectMapper = jacksonObjectMapper() init { events = Flux.create(eventPublisher).share() } @Async @RequestMapping("/publish") fun publish(@RequestParam(value = "message", defaultValue = "Hello World") message: String): Map { applicationEventPublisher.publishEvent(MessageCreatedEvent(message)) return mapOf( "status" to "Message was published!", "message" to message, ) } @GetMapping("/stream/chat", produces = ["text/event-stream"]) @CrossOrigin(value = ["*"]) fun streamChat(): Flux { return this.events.map { event -> try { objectMapper.writeValueAsString(event) + "\n\n" } catch (e: JsonProcessingException) { throw RuntimeException(e) } } } @GetMapping("/stream", produces = ["text/event-stream"]) fun stream(): Flux { return generate { sink -> val random = ThreadLocalRandom.current().nextDouble() sink.next(("$random").toByteArray()) Thread.sleep(1000) } } }