前面几篇讨论了关于 gRPC 方式的前后端连接集成方式. gRPC 也是一个开放的标准, 但讲到普及性就远远不及基于 http/1.1 协议的 web-service 了. 特别是 gRPC 的前端编程还是有一定的门槛, 所以作为一种开放的网络大平台还是必须考虑用 Web-service 方式的集成. 平台服务 API 可以有两样选择: 一种是传统 Web-service 方式, 新的一种是 REST API 款式. REST API 比较适合数据库表的 crud 操作. 在 2017 年我曾经写了一系列博客介绍 akka-http, 这里就不再叙述它的细节了. 这篇我们只聚焦在解决当前问题上. 在 POS 控制平台例子里不会涉及到 POST 操作, 应该全部是 GET 类型的, 如:
- http://192.168.11.189:2588/pos/logon?opr=1010
- http://192.168.11.189:2588/pos/logoff
- http://192.168.11.189:2588/pos/logsales?acct=001&dpt=01&code=978111&qty=3&price=1200
- http://192.168.11.189:2588/pos/subtotal?level=0
- http://192.168.11.189:2588/pos/discount?disctype=2&grouped=true&code=481&percent=20
可以看到, 请求部分只是带参数的 uri, 不含 entity 数据部分, 数据通过 querystring 提供. 但返回会有几种数据类型: POSResponse,TxnsItems,vchState, 这些都曾经在 Protobuffer 用 IDL 定义过:
- message PBVchState { // 单据状态
- string opr = 1; // 收款员
- int64 jseq = 2; //begin journal sequence for read-side replay
- int32 num = 3; // 当前单号
- int32 seq = 4; // 当前序号
- bool void = 5; // 取消模式
- bool refd = 6; // 退款模式
- bool susp = 7; // 挂单
- bool canc = 8; // 废单
- bool due = 9; // 当前余额
- string su = 10; // 主管编号
- string mbr = 11; // 会员号
- int32 mode = 12; // 当前操作流程: 0=logOff, 1=LogOn, 2=Payment
- }
- message PBTxnItem { // 交易记录
- string txndate = 1; // 交易日期
- string txntime = 2; // 录入时间
- string opr = 3; // 操作员
- int32 num = 4; // 销售单号
- int32 seq = 5; // 交易序号
- int32 txntype = 6; // 交易类型
- int32 salestype = 7; // 销售类型
- int32 qty = 8; // 交易数量
- int32 price = 9; // 单价 (分)
- int32 amount = 10; // 码洋 (分)
- int32 disc = 11; // 折扣率 (%)
- int32 dscamt = 12; // 折扣额: 负值 net 实洋 = amount + dscamt
- string member = 13; // 会员卡号
- string code = 14; // 编号 (商品, 卡号...)
- string acct = 15; // 账号
- string dpt = 16; // 部类
- }
- message PBPOSResponse {
- int32 sts = 1;
- string msg = 2;
- PBVchState voucher = 3;
- repeated PBTxnItem txnitems = 4;
- }
那么概括我们现在的主要工作包括: Uri 解析, HttpResponse 实例的构建和传输.
首先, 用 akka-http 搭建一个 http server 框架:
- import akka.actor._
- import akka.stream._
- import akka.http.scaladsl.Http
- import akka.http.scaladsl.server.Directives._
- object HttpServerDemo extends App {
- implicit val httpSys = ActorSystem("httpSystem")
- implicit val httpMat = ActorMaterializer()
- implicit val httpEC = httpSys.dispatcher
- val route =
- path("hello") {
- complete {"hello, http server"}
- }
- val (port, host) = (8011,"192.168.11.189")
- val bindingFuture = Http().bindAndHandle(route,host,port)
- println(s"Server running at $host $port. Press any key to exit ...")
- scala.io.StdIn.readLine()
- bindingFuture.flatMap(_.unbind())
- .onComplete(_ => httpSys.terminate())
- /*
- bindingFuture.foreach(s => println(s.localAddress.getHostString))
- bindingFuture.foreach(_.unbind())
- bindingFuture.onComplete {
- case Success(value) => value.unbind()
- }
- */
- }
用 akka-http 的 server API 很快就完成了一个简单的 http-server. 下一步研究一下如何构建返回的 HttpResponse:httpresponse 是从 server 端传送到 client 端的. 这个过程包括把 HttpResponse Entity 里的数据从某种类型转换成通讯用的二进制数据流, 到了客户端再转换成目标类型. akka-http 的数据转换机制 Marshaller/Unmarshaller 是通过类型转换的隐式实例来实现的, akka-http 提供了多个标准类型数据转换的隐式实例, 如 StringMarshaller:
- implicit val ByteArrayMarshaller: ToEntityMarshaller[Array[Byte]] = byteArrayMarshaller(`application/octet-stream`)
- def byteArrayMarshaller(contentType: ContentType): ToEntityMarshaller[Array[Byte]] =
- Marshaller.withFixedContentType(contentType) { bytes => HttpEntity(contentType, bytes) }
- implicit val ByteStringMarshaller: ToEntityMarshaller[ByteString] = byteStringMarshaller(`application/octet-stream`)
- def byteStringMarshaller(contentType: ContentType): ToEntityMarshaller[ByteString] =
- Marshaller.withFixedContentType(contentType) { bytes => HttpEntity(contentType, bytes) }
- implicit val StringMarshaller: ToEntityMarshaller[String] = stringMarshaller(`text/plain`)
- def stringMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[String] =
- Marshaller.withOpenCharset(mediaType) { (s, cs) => HttpEntity(mediaType withCharset cs, s) }
- def stringMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[String] =
- Marshaller.withFixedContentType(mediaType) { s => HttpEntity(mediaType, s) }
- ...
因为 akka-http 提供了 implicit val StringMarshaller, 所以在上面的例子里我可以直接写成: complete("hello world!"), 然后系统自动构建一个含字符类型数据 entity 的 HttpResponse.Entity.dataBytes 中的数据类型是由 Entity.contentType 指明的:
- object ContentTypes {
- val `application/json` = ContentType(MediaTypes.`application/json`)
- val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`)
- val `application/x-www-form-urlencoded` = ContentType(MediaTypes.`application/x-www-form-urlencoded`)
- val `text/plain(UTF-8)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-8`
- val `text/html(UTF-8)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-8`
- val `text/xml(UTF-8)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-8`
- val `text/csv(UTF-8)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-8`
- val `application/grpc+proto` = ContentType(MediaTypes.`application/grpc+proto`)
- // used for explicitly suppressing the rendering of Content-Type headers on requests and responses
- val NoContentType = ContentType(MediaTypes.NoMediaType)
- }
客户端收到 HttpResponse 后把收到的二进制数据流转换成 MediaTypes 指定的类型. 当然, 最基本的数据类型就是 String 了. 所有客户端都提供 String 类型的反序列化 deserialization. 理论上来讲, 我们可以用字符形式来描述任何类型数据, 这样我们可以把一个特殊类型实例转成 String, 然后发送给客户端. 客户端再按照协议好的类型转换规则把字符转换成目标类型:
- case class TextMessage(msg: String)
- val helloMsg: String = TextMessage("hello string message converter").toString
- val route =
- path("hello") {
- complete {helloMsg}
- }
不过, 这种情况只适用于内部系统的数据交换, 因为数据类型转换的规则方式都是内部私有的. xml,JSON 是开放平台系统数据交换的标准数据类型描述语言, 本身是字符 String 形式的, 只是它用 String 描述类型的语法是行业标准的. 客户端可以按行业标准从一个 xml/JSON 文件里提取里面的数据类型和实例. 所以, 自定义类型的数据转换主要包括 类型 ->jsonstring->bytestring->jsonstring-> 类型. 换句话说我们只要有隐式 JsonMarshaller 实例就可以完成大部分的数据交换工作了.
spray-JSON 是 akka-http 自带默认的一个 JSON 工具库, 它提供了通用的针对任何类型 T 的 Marshaller/Unmarshaller: ToEntityMarshaller[T] 和 FromEntityUnmarshaller[T]. 使用 spay-JSON 很简单, 如下:
- import akka.http.scaladsl.marshallers.sprayjson._
- import spray.JSON._
- object JsonMarshaller extends SprayJsonSupport with DefaultJsonProtocol {
- //domain models
- case class Person(name:String, age: Int)
- case class Location(province: String, city: String, zipcode: Int)
- case class Employee(person: Person, loccation: Location)
- //collect your JSON format instances
- implicit val fmtPerson = jsonFormat2(Person.apply)
- implicit val fmtLocation = jsonFormat3(Location.apply)
- implicit val fmtEmployee = jsonFormat2(Employee.apply)
- }
使用 Marshaller 时只要 import JsonMarshaller._ 把几个类型的隐式转换实例带进可视域即可, 如下:
- import JsonMarshaller._
- val person = Person("Jonh Doe", 23)
- val location = Location("GuangDong","ShenZhen",10223)
- val employee = Employee(person,location)
- val route =
- path("json") {
- complete {employee}
- }
就这么简单, 试试看:
- http://192.168.11.189:8011/json
- {
- "loccation":{
- "city":"ShenZhen","province":"GuangDong","zipcode":10223
- },"person":{
- "age":23,"name":"Jonh Doe"
- }
- }
没错, 客户端的确收到正确的 JSON 数据. 还有一项需求是在 Reponse 里返回一个数据流 (多条数据), 如当前交易项目清单. 这个也比较容易: akka-http 本身支持 JSON-streaming. 具体使用方法如下:
- import akka.http.scaladsl.common.EntityStreamingSupport
- import akka.stream.scaladsl._
- implicit val jsonStreamingSupport = EntityStreamingSupport.JSON()
- .withParallelMarshalling(parallelism = 4, unordered = false)
- val persons = List(person,Person("Peter Kung",28), Person("Ketty Wang",16))
- val personDataSource: Source[Person,Any] = Source.fromIterator(() => persons.iterator)
- val route =
- path("json") {
- complete {employee}
- } ~
- path("stream") {
- complete(personDataSource)
- }
在客户端 browser 上测试:
- http://192.168.11.189:8011/stream
- [{
- "age":23,"name":"Jonh Doe"
- },{
- "age":28,"name":"Peter Kung"
- },{
- "age":16,"name":"Ketty Wang"
- }]
也没问题. 下面是本次示范中使用的依赖和它们的版本:
- libraryDependencies ++= Seq(
- "de.heikoseeberger" %% "akka-http-json4s" % "1.26.0",
- "org.json4s" %% "json4s-jackson" % "3.6.6",
- "org.json4s" %% "json4s-ext" % "3.6.6",
- "com.typesafe.akka" %% "akka-http" % "10.1.8" ,
- "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
- "com.typesafe.akka" %% "akka-stream" % "2.5.23"
- )
来源: https://www.cnblogs.com/tiger-xc/p/11052170.html