因为我了解 Akka-http 的主要目的不是为了有关 web-Server 的编程,而是想实现一套系统集成的 api,所以也需要考虑由服务端主动向客户端发送指令的应用场景。比如一个零售店管理平台的服务端在完成了某些数据更新后需要通知各零售门市客户端下载最新数据。虽然 Akka-http 也提供对 websocket 协议的支持,但 websocket 的网络连接是双向恒久的,适合频繁的问答交互式服务端与客户端的交流,消息结构也比较零碎。而我们面临的可能是批次型的大量数据库数据交换,只需要简单的服务端单向消息就行了,所以 websocket 不太合适,而 Akka-http 的 SSE 应该比较适合我们的要求。SSE 模式的基本原理是服务端统一集中发布消息,各客户端持久订阅服务端发布的消息并从消息的内容中筛选出属于自己应该执行的指令,然后进行相应的处理。客户端接收 SSE 是在一个独立的线程里不断进行的,不会影响客户端当前的运算流程。当收到有用的消息后就会调用一个业务功能函数作为后台异步运算任务。
服务端的 SSE 发布是以 Source[ServerSentEvent,NotUsed] 来实现的。ServerSentEvent 类型定义如下:
- /**
- * Representation of a server-sent event. According to the specification, an empty data field designates an event
- * which is to be ignored which is useful for heartbeats.
- *
- * @param data data, may span multiple lines
- * @param eventType optional type, must not contain \n or \r
- * @param id optional id, must not contain \n or \r
- * @param retry optional reconnection delay in milliseconds
- */
- final
- case class ServerSentEvent(data:
- String, eventType: Option[String] = None, id: Option[String] = None, retry: Option[Int] = None) {...
- }
这个类型的参数代表事件消息的数据结构。用户可以根据实际需要充分利用这个数据结构来传递消息。服务端是通过 complete 以 SeverSentEvent 类为元素的 Source 来进行 SSE 的,如下:
- import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
- complete {
- Source
- .tick(2.seconds, 2.seconds, NotUsed)
- .map( _ => processToServerSentEvent)
- .keepAlive(1.second, () => ServerSentEvent.heartbeat)
- }
以上代码代表服务端定时运算 processToServerSentEvent 返回 ServerSentEvent 类型结果后发布给所有订阅的客户端。我们用一个函数 processToServerSentEvent 模拟重复运算的业务功能:
- private def processToServerSentEvent: ServerSentEvent = {
- Thread.sleep(3000) //processing delay
- ServerSentEvent(SyncFiles.fileToSync)
- }
这个函数模拟发布事件数据是某种业务运算结果,在这里代表客户端需要下载文件名称。我们用客户端 request 来模拟设定这个文件名称:
- object SyncFiles {
- var fileToSync: String = ""
- }
- private def route = {
- import Directives._
- import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
- def syncRequests =
- pathPrefix("sync") {
- pathSingleSlash {
- post {
- parameter("file") { filename =>
- complete {
- SyncFiles.fileToSync = filename
- s"set download file to : $filename"
- }
- }
- }
- }
- }
客户端订阅 SSE 的方式如下:
- import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
- import system.dispatcher
- Http()
- .singleRequest(Get("http://localhost:8011/events"))
- .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
- .foreach(_.runForeach(se => downloadFiles(se.data)))
每当客户端收到 SSE 后即运行 downloadFiles(filename) 函数。downloadFiles 函数定义:
- def downloadFiles(file: String) = {
- Thread.sleep(3000) //process delay
- if (file != "")
- println(s"Try to download $file")
- }
下面是客户端程序的测试运算步骤:
- scala.io.StdIn.readLine()
- println("do some thing ...")
- Http().singleRequest(
- HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
- ).onSuccess {
- case msg => println(msg)
- }
- scala.io.StdIn.readLine()
- println("do some other things ...")
- Http().singleRequest(
- HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
- ).onSuccess {
- case msg => println(msg)
- }
运算结果:
- do some thing ...
- HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))
- Try to download Orders
- Try to download Orders
- do some other things ...
- HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))
- Try to download Orders
- Try to download Orders
- Try to download Items
- Try to download Items
- Try to download Items
- Process finished with exit code 0
下面是本次讨论的示范源代码:
服务端:
- import akka.NotUsed
- import akka.actor.ActorSystem
- import akka.http.scaladsl.Http
- import akka.http.scaladsl.server.Directives
- import akka.stream.ActorMaterializer
- import akka.stream.scaladsl.Source
- import scala.concurrent.duration.DurationInt
- import akka.http.scaladsl.model.sse.ServerSentEvent
- object SSEServer {
- def main(args: Array[String]): Unit = {
- implicit val system = ActorSystem()
- implicit val mat = ActorMaterializer()
- Http().bindAndHandle(route, "localhost", 8011)
- scala.io.StdIn.readLine()
- system.terminate()
- }
- object SyncFiles {
- var fileToSync: String = ""
- }
- private def route = {
- import Directives._
- import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
- def syncRequests =
- pathPrefix("sync") {
- pathSingleSlash {
- post {
- parameter("file") { filename =>
- complete {
- SyncFiles.fileToSync = filename
- s"set download file to : $filename"
- }
- }
- }
- }
- }
- def events =
- path("events") {
- get {
- complete {
- Source
- .tick(2.seconds, 2.seconds, NotUsed)
- .map( _ => processToServerSentEvent)
- .keepAlive(1.second, () => ServerSentEvent.heartbeat)
- }
- }
- }
- syncRequests ~ events
- }
- private def processToServerSentEvent: ServerSentEvent = {
- Thread.sleep(3000) //processing delay
- ServerSentEvent(SyncFiles.fileToSync)
- }
- }
客户端:
我的博客即将同步至腾讯云 + 社区。邀大家一同入驻 http://cloud.tencent.com/developer/support-plan
- import akka.NotUsed
- import akka.actor.ActorSystem
- import akka.http.scaladsl.Http
- import akka.http.scaladsl.client.RequestBuilding.Get
- import akka.http.scaladsl.model.HttpMethods
- import akka.http.scaladsl.unmarshalling.Unmarshal
- import akka.stream.ActorMaterializer
- import akka.stream.scaladsl.Source
- import akka.http.scaladsl.model.sse.ServerSentEvent
- import akka.http.scaladsl.model._
- object SSEClient {
- def downloadFiles(file: String) = {
- Thread.sleep(3000) //process delay
- if (file != "")
- println(s"Try to download $file")
- }
- def main(args: Array[String]): Unit = {
- implicit val system = ActorSystem()
- implicit val mat = ActorMaterializer()
- import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
- import system.dispatcher
- Http()
- .singleRequest(Get("http://localhost:8011/events"))
- .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
- .foreach(_.runForeach(se => downloadFiles(se.data)))
- scala.io.StdIn.readLine()
- println("do some thing ...")
- Http().singleRequest(
- HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
- ).onSuccess {
- case msg => println(msg)
- }
- scala.io.StdIn.readLine()
- println("do some other things ...")
- Http().singleRequest(
- HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
- ).onSuccess {
- case msg => println(msg)
- }
- scala.io.StdIn.readLine()
- system.terminate()
- }
- }
来源: http://www.cnblogs.com/tiger-xc/p/8042765.html