Akka-http针对Connection的两头都提供了方便编程的Api,分别是Server-Side-Api和Client-Side-Api。通过这两个Api可以大大提高编程效率。当然,上期我们提到过,Http-Server是Akka-http的核心部分,所有系统集成功能都是在Server-Side实现的。Akka-http-Server-Side-Api可以说是最先进的Http-Server编程工具,支持:
Server-Side-Api又分两个层次:Low-level-Server-Side-Api和High-level-Server-Side-Api。Low-level-server-api支持HTTP/1.1Server所有功能,包括:
其它Server功能如请求解析request routing,文件服务file serving,数据压缩compression等都放在了High-level-server-api里。Akka-http是基于Akka-stream编写的,所以我们需要从Akka-stream运算模式来理解Akka-http的类型表现形式。
一个Http-Server是绑定在一个Socket上来接收客户端上传的request进行相关的服务提供的。Server对Socket的绑定在Akka-http里的可以Stream形式来表现:
- val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
- Http().bind(interface = "localhost", port = 8080)
Server-Side Socket绑定实际上就是一个Akka-stream-source:Source[IncomingConnection]:
- /**
- * Creates a [[akka.stream.scaladsl.Source]] of [[akka.http.scaladsl.Http.IncomingConnection]] instances which represents a prospective HTTP server binding
- * on the given `endpoint`.
- *
- * If the given port is 0 the resulting source can be materialized several times. Each materialization will
- * then be assigned a new local port by the operating system, which can then be retrieved by the materialized
- * [[akka.http.scaladsl.Http.ServerBinding]].
- *
- * If the given port is non-zero subsequent materialization attempts of the produced source will immediately
- * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
- * [[akka.http.scaladsl.Http.ServerBinding]].
- *
- * If an [[ConnectionContext]] is given it will be used for setting up TLS encryption on the binding.
- * Otherwise the binding will be unencrypted.
- *
- * If no `port` is explicitly given (or the port value is negative) the protocol's default port will be used,
- * which is 80 for HTTP and 443 for HTTPS.
- *
- * To configure additional settings for a server started using this method,
- * use the `akka.http.server` config section or pass in a [[akka.http.scaladsl.settings.ServerSettings]] explicitly.
- */
- def bind(interface: String, port: Int = DefaultPortForProtocol,
- connectionContext: ConnectionContext = defaultServerHttpContext,
- settings: ServerSettings = ServerSettings(system),
- log: LoggingAdapter = system.log)(implicit fm: Materializer): Source[Http.IncomingConnection, Future[ServerBinding]] = {
- val fullLayer = fuseServerBidiFlow(settings, connectionContext, log)
- tcpBind(interface, choosePort(port, connectionContext), settings)
- .map(incoming ⇒ {
- val serverFlow = fullLayer.addAttributes(prepareAttributes(settings, incoming)) join incoming.flow
- IncomingConnection(incoming.localAddress, incoming.remoteAddress, serverFlow)
- })
- .mapMaterializedValue(materializeTcpBind)
- }
run这个Source[IncomingConnection]产生一串连接Connection:
- /**
- * Represents one accepted incoming HTTP connection.
- */
- final case class IncomingConnection(
- localAddress: InetSocketAddress,
- remoteAddress: InetSocketAddress,
- flow: Flow[HttpResponse, HttpRequest, NotUsed]) {
- /**
- * Handles the connection with the given flow, which is materialized exactly once
- * and the respective materialization result returned.
- */
- def handleWith[Mat](handler: Flow[HttpRequest, HttpResponse, Mat])(implicit fm: Materializer): Mat =
- flow.joinMat(handler)(Keep.right).run()
- /**
- * Handles the connection with the given handler function.
- */
- def handleWithSyncHandler(handler: HttpRequest ⇒ HttpResponse)(implicit fm: Materializer): Unit =
- handleWith(Flow[HttpRequest].map(handler))
- /**
- * Handles the connection with the given handler function.
- */
- def handleWithAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse], parallelism: Int = 1)(implicit fm: Materializer): Unit =
- handleWith(Flow[HttpRequest].mapAsync(parallelism)(handler))
- }
IncomingConnection类型提供了个handleWith这样的streaming函数进行request到response的转换。用户可以下面的方式提供自定义的转换方法:
调用handleWith传入Flow[HttpRequest,HttpResponse,_],如:
- def req2Resp: HttpRequest => HttpResponse = _ => HttpResponse(entity=
- HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
- val flow = Flow.fromFunction(req2Resp)
提供HttpRequest=>HttpResponse函数传人handleWithSyncHandler:
- def syncHandler: HttpRequest => HttpResponse = {
- case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) =>
- HttpResponse(entity=
- HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
- case req: HttpRequest =>
- req.discardEntityBytes() // important to drain incoming HTTP Entity stream
- HttpResponse(404, entity = "Unknown resource!")
- }
提供HttpRequest=>Future[HttpResponse]函数传人handleWithASyncHandler:
- def asyncHandler: HttpRequest = >Future[HttpResponse] = {
- case HttpRequest(HttpMethods.GET, Uri.Path("/"), _headers, _entiy, _protocol) = >Future {
- HttpResponse(entity = HttpEntity(ContentTypes.`text / html(UTF - 8)`, "<h> Hello World! </h>"))
- }
- case req:
- HttpRequest = >Future {
- req.discardEntityBytes() // important to drain incoming HTTP Entity stream
- HttpResponse(404, entity = "Unknown resource!")
- }
- }
run Source[IncomingConnection,Future[ServerBinding]]返回结果为Future[ServerBinding]:
- val futBinding: Future[Http.ServerBinding] =
- connSource.to { Sink.foreach{ connection =>
- println(s"client address ${connection.remoteAddress}")
- // connection handleWith flow
- // connection handleWithSyncHandler syncHandler
- connection handleWithAsyncHandler asyncHandler
- }}.run()
我们可以通过ServerBinding来释放绑定的Socket:
- /**
- * Represents a prospective HTTP server binding.
- *
- * @param localAddress The local address of the endpoint bound by the materialization of the `connections` [[akka.stream.scaladsl.Source]]
- *
- */
- final case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () ⇒ Future[Unit]) {
- /**
- * Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections`
- * [[akka.stream.scaladsl.Source]]
- *
- * The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed.
- */
- def unbind(): Future[Unit] = unbindAction()
- }
我们可以调用这个unbind():
- futBinding.flatMap(_.unbind())
整个示范源代码如下:
- import akka.actor._
- import akka.stream._
- import akka.stream.scaladsl._
- import akka.http.scaladsl.Http
- import akka.http.scaladsl.model._
- import scala.concurrent._
- object LowLevelServerApi extends App {
- implicit val httpSys = ActorSystem("actorSystem")
- implicit val httpMat = ActorMaterializer()
- implicit val httpEc = httpSys.dispatcher
- val (interface,port) = ("localhost",8088)
- val connSource: Source[Http.IncomingConnection,Future[Http.ServerBinding]] =
- Http().bind(interface,port)
- def req2Resp: HttpRequest => HttpResponse = _ => HttpResponse(entity=
- HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
- val flow = Flow.fromFunction(req2Resp)
- def syncHandler: HttpRequest => HttpResponse = {
- case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) =>
- HttpResponse(entity=
- HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
- case req: HttpRequest =>
- req.discardEntityBytes() // important to drain incoming HTTP Entity stream
- HttpResponse(404, entity = "Unknown resource!")
- }
- def asyncHandler: HttpRequest => Future[HttpResponse] = {
- case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) => Future {
- HttpResponse(entity=
- HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>")) }
- case req: HttpRequest => Future {
- req.discardEntityBytes() // important to drain incoming HTTP Entity stream
- HttpResponse(404, entity = "Unknown resource!")
- }
- }
- val futBinding: Future[Http.ServerBinding] =
- connSource.to { Sink.foreach{ connection =>
- println(s"client address ${connection.remoteAddress}")
- // connection handleWith flow
- // connection handleWithSyncHandler syncHandler
- connection handleWithAsyncHandler asyncHandler
- }}.run()
- println(s"Server running at $interface $port. Press any key to exit ...")
- scala.io.StdIn.readLine()
- futBinding.flatMap(_.unbind())
- .onComplete(_ => httpSys.terminate())
- }
来源: http://www.cnblogs.com/tiger-xc/p/7709377.html