Client
Overview
The Reactive System client allows applications to consume remote services using asynchronous message exchange pattern exposed by a Reactive System server.
In order to consume a remote service all you need is an instance of ReactiveClient:
implicit val system = ActorSystem("ReactiveClient")
implicit val materializer = ActorMaterializer()
implicit val timeout = Timeout(5 seconds)
import system.dispatcher
val client = new ReactiveKafkaClient(KafkaReactiveClientConfig.default)
Simple Request
We are now going to create a simple client which will call the Reactive System server we have created in the previous section.
For clarity here is the code snipped for the server definition:
import org.patricknoir.kafka.reactive.server.dsl._
implicit val system = ActorSystem("SimpleService")
implicit val materializer = ActorMaterializer()
import system.dispatcher
var counter: Int = 0
def getCounter(): Int = counter
def incrementCounter(step: Int): Unit = counter += step
val route: ReactiveRoute = request.aSync[Unit, Int]("getCounter") { _ =>
getCounter()
} ~ request.sync[Int, Unit]("incrementCounter") { step =>
incrementCounter(step)
}
val atLeastOnceSource = ReactiveKafkaSource.atLeastOnce(
requestTopic = "simple",
bootstrapServers = Set("localhost:9092"),
clientId = "simpleService"
)
val atLeastOnceSink = ReactiveKafkaSink.atLeastOnce(
bootstrapServers = Set("localhost:9092"),
concurrency = 8,
commitMaxBatchSize = 10,
commitTimeWindow = 5 seconds
)
val reactiveSystem = atLeastOnceSource ~> route ~> atLeastOnceSink
reactiveSystem.run()
As we can see the server exposes 2 services:
- incrementCounter - accepts a
step:Intand returnsUnit - getCounter - doesn’t accept any parameter and returns
Int
The services are bound to the kafka topic: simple as defined by the Kafka Source used to create the ReactiveSystem instance.
Here is the client code in order to invoke incrementCounter:
implicit val system = ActorSystem("ReactiveClient")
implicit val materializer = ActorMaterializer()
implicit val timeout = Timeout(5 seconds)
import system.dispatcher
val client = new ReactiveKafkaClient(KafkaReactiveClientConfig.default)
val result: Future[Unit] = client.request[Int, Unit]("kafka:simple/incrementCounter", 1)
result.onComplete {
case Success(_) => println("incrementCounter request successfully completed")
case Failure(err) => println(s"error requesting incrementCounter: ${err.getMessage}")
}
Await.ready(result, Duration.Inf)
One-Way Simple Request
The Reactive System client also allows to send a message to a remote service without awaiting for a reply from the target service.
The following snippet shows how to invoke the API in order to send a message to the incrementCounter service without awaiting for a response:
val result: Future[Unit] = client.send[Int]("kafka:simple/incrementCounter", 1, confirmSend = true)
result.onComplete {
case Success(_) => println("incrementCounter request successfully sent")
case Failure(err) => println(s"error sending incrementCounter request: ${err.getMessage}")
}
Await.ready(result, Duration.Inf)
Please note even though this is a fire-and-forget way of sending a message, the API allows to be confirmed whether the message being successfully sent by setting the confirmSend flag.