Client

Overview

The Reactive System client allows applications to consume remote services using asynchronous message exchange pattern exposed by a Reactive System server.

In order to consume a remote service all you need is an instance of ReactiveClient:

implicit val system = ActorSystem("ReactiveClient")
implicit val materializer = ActorMaterializer()
implicit val timeout = Timeout(5 seconds)

import system.dispatcher

val client = new ReactiveKafkaClient(KafkaReactiveClientConfig.default)

Simple Request

We are now going to create a simple client which will call the Reactive System server we have created in the previous section.

For clarity here is the code snipped for the server definition:


import org.patricknoir.kafka.reactive.server.dsl._ implicit val system = ActorSystem("SimpleService") implicit val materializer = ActorMaterializer() import system.dispatcher var counter: Int = 0 def getCounter(): Int = counter def incrementCounter(step: Int): Unit = counter += step val route: ReactiveRoute = request.aSync[Unit, Int]("getCounter") { _ => getCounter() } ~ request.sync[Int, Unit]("incrementCounter") { step => incrementCounter(step) } val atLeastOnceSource = ReactiveKafkaSource.atLeastOnce( requestTopic = "simple", bootstrapServers = Set("localhost:9092"), clientId = "simpleService" ) val atLeastOnceSink = ReactiveKafkaSink.atLeastOnce( bootstrapServers = Set("localhost:9092"), concurrency = 8, commitMaxBatchSize = 10, commitTimeWindow = 5 seconds ) val reactiveSystem = atLeastOnceSource ~> route ~> atLeastOnceSink reactiveSystem.run()

As we can see the server exposes 2 services:

  • incrementCounter - accepts a step:Int and returns Unit
  • getCounter - doesn’t accept any parameter and returns Int

The services are bound to the kafka topic: simple as defined by the Kafka Source used to create the ReactiveSystem instance.

Here is the client code in order to invoke incrementCounter:

implicit val system = ActorSystem("ReactiveClient")
implicit val materializer = ActorMaterializer()
implicit val timeout = Timeout(5 seconds)

import system.dispatcher

val client = new ReactiveKafkaClient(KafkaReactiveClientConfig.default)

val result: Future[Unit] = client.request[Int, Unit]("kafka:simple/incrementCounter", 1)

result.onComplete {
  case Success(_)   => println("incrementCounter request successfully completed")
  case Failure(err) => println(s"error requesting incrementCounter: ${err.getMessage}")
}

Await.ready(result, Duration.Inf)

One-Way Simple Request

The Reactive System client also allows to send a message to a remote service without awaiting for a reply from the target service.

The following snippet shows how to invoke the API in order to send a message to the incrementCounter service without awaiting for a response:

val result: Future[Unit] = client.send[Int]("kafka:simple/incrementCounter", 1, confirmSend = true)

result.onComplete {
  case Success(_)   => println("incrementCounter request successfully sent")
  case Failure(err) => println(s"error sending incrementCounter request: ${err.getMessage}")
}

Await.ready(result, Duration.Inf)

Please note even though this is a fire-and-forget way of sending a message, the API allows to be confirmed whether the message being successfully sent by setting the confirmSend flag.