研究关于 restapi 的初衷是想搞一套通用的平台数据表维护 http 工具. 前面谈过身份验证和使用权限, 文件的上传下载, 这次来到具体的数据库表维护. 我们在这篇示范里设计一套通用的对平台每一个数据表的标准维护方式. http 服务端数据表维护 CRUD 有几个标准的部分组成: Model,Repository,Route. 我们先看看这几个类型的基类:
- trait ModelBase[M,E] {
- def to: M => E
- def from: E => M
- }
- trait RepoBase[M] {
- def getById(id: Long) : Future[Option[M]]
- def getAll : Future[Seq[M]]
- def filter(expr: M => Boolean): Future[Seq[M]]
- def save(row: M) : Future[AnyRef]
- def deleteById(id: Long) : Future[Int]
- def updateById(id: Long, row: M) : Future[Int]
- }
- abstract class RouteBase[M](val pathName: String, repository: RepoBase[M])(
- implicit m: Manifest[M]) extends Directives with JsonConverter {
- val route = path(pathName) {
- get {
- complete(futureToJson(repository.getAll))
- } ~ post {
- entity(as[String]) { JSON =>
- val extractedEntity = fromJson[M](JSON)
- complete(futureToJson(repository.save(extractedEntity)))
- }
- }
- } ~ path(pathName / LongNumber) { id =>
- get {
- complete(futureToJson(repository.getById(id)))
- } ~ put {
- entity(as[String]) { JSON =>
- val extractedEntity = fromJson[M](JSON)
- complete(futureToJsonAny(repository.updateById(id, extractedEntity)))
- }
- } ~ delete {
- complete(futureToJsonAny(repository.deleteById(id)))
- }
- }
- }
很明显, Model 是数据库表行类型的表达方式, Repository 是数据库表操作方法, Route 是操作方法的调用. 下面是这几个类型的实例示范:
- object MockModels {
- case class DataRow (
- name: String,
- age: Int
- )
- case class Person(name: String, age: Int)
- extends ModelBase[Person,DataRow] {
- def to: Person => DataRow = p => DataRow (
- name = p.name,
- age = p.age
- )
- def from: DataRow => Person = m => Person(
- name = m.name,
- age = m.age
- )
- }
- }
- package com.datatech.restapi
- import MockModels._
- import scala.concurrent.Future
- object MockRepo {
- class PersonRepo extends RepoBase[Person] {
- override def getById(id: Long): Future[Option[Person]] = Future.successful(Some(Person("johnny lee",23)))
- override def getAll: Future[Seq[Person]] = Future.successful(
- Seq(Person("jonny lee",23),Person("candy wang",45),Person("jimmy kowk",34))
- )
- override def filter(expr: Person => Boolean): Future[Seq[Person]] = Future.successful(
- Seq(Person("jonny lee",23),Person("candy wang",45),Person("jimmy kowk",34))
- )
- override def save(row: Person): Future[Person] = Future.successful(row)
- override def deleteById(id: Long): Future[Int] = Future.successful(1)
- override def updateById(id: Long, row: Person): Future[Int] = Future.successful(1)
- }
- }
- object PersonRoute {
- class PersonRoute(pathName: String, repo: RepoBase[Person])
- extends RouteBase[Person](pathName,repo)
- val route = new PersonRoute("person",new PersonRepo).route
- }
Model 代表数据表结构以及某种数据库的表行与 Model 之间的转换. 而 repository 则代表某种数据库对库表具体操作的实现. 我们把焦点拉回到 RouteBase 上来, 这里包含了 REST 标准的 get,post,put,delete http 操作. 实际上就是 request/response 处理机制. 因为数据需要在线上 on-the-wire 来回移动, 所以需要进行数据转换. 通用的数据传输模式是: 类 ->JSON-> 类, 即序列化 / 反序列化. akka-http 提供了丰富的 Marshaller 来实现自动的数据转换, 但在编译时要提供 Marshaller 的隐式实例 implicit instance, 所以用类参数是无法通过编译的. 只能手工进行类和 JSON 之间的转换. JSON 转换是通过 json4s 实现的:
- import java.text.SimpleDateFormat
- import akka.http.scaladsl.model._
- import org.json4s.JsonAST.{JNull, JString}
- import org.json4s.{CustomSerializer, DefaultFormats, Formats}
- import org.json4s.jackson.Serialization
- import scala.concurrent.ExecutionContext.Implicits.global
- import scala.concurrent.Future
- trait DateSerializer {
- case object SqlDateSerializer extends CustomSerializer[java.sql.Date](format => ( {
- case JString(date) => {
- val utilDate = new SimpleDateFormat("yyyy-MM-dd").parse(date);
- new java.sql.Date(utilDate.getTime)
- }
- case JNull => null
- }, {
- case date: java.sql.Date => JString(date.toString)
- }))
- }
- trait JsonConverter extends DateSerializer {
- implicit val formats: Formats = new DefaultFormats {
- override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd")
- } ++ List(SqlDateSerializer)
- def toJson(obj: AnyRef): String = {
- Serialization.write(obj)
- }
- def futureToJson(obj: Future[AnyRef]): Future[HttpResponse] = {
- obj.map { x =>
- HttpResponse(status = StatusCodes.OK, entity = HttpEntity(MediaTypes.`application/json`, Serialization.write(x)))
- }.recover {
- case ex => ex.printStackTrace(); HttpResponse(status = StatusCodes.InternalServerError)
- }
- }
- def futureToJsonAny(obj: Future[Any]): Future[HttpResponse] = {
- obj.map { x =>
- HttpResponse(status = StatusCodes.OK, entity = HttpEntity(MediaTypes.`application/json`, s"""{status : ${x}"""))
- }.recover {
- case ex => HttpResponse(status = StatusCodes.InternalServerError)
- }
- }
- def fromJson[E](JSON: String)(implicit m: Manifest[E]): E = {
- Serialization.read[E](JSON)
- }
- }
当然对于一些特别的数据库表, 我们还是希望使用 akka-http 强大的功能, 如 streaming. 这时对于每一个这样的表单就需要要定制 Route 了. 下面是一个定制 Route 的例子:
- object MockModel {
- case class AddressRow (
- province: String,
- city: String,
- street: String,
- zip: String
- )
- case class Address(
- province: String,
- city: String,
- street: String,
- zip: String
- )
- extends ModelBase[Address,AddressRow] {
- def to: Address => AddressRow = addr => AddressRow (
- province = addr.province,
- city = addr.city,
- street = addr.street,
- zip = addr.zip
- )
- def from: AddressRow => Address = row => Address(
- province = row.province,
- city = row.city,
- street = row.street,
- zip = row.zip
- )
- }
- }
- object AddressRepo {
- def getById(id: Long): Future[Option[Address]] = ???
- def getAll: Source[Address,_] = ???
- def filter(expr: Address => Boolean): Future[Seq[Address]] = ???
- def saveAll(rows: Source[Address,_]): Future[Int] = ???
- def saveAll(rows: Future[Seq[Address]]): Future[Int] = ???
- def deleteById(id: Long): Future[Address] = ???
- def updateById(id: Long, row: Address): Future[Address] = ???
- }
- package com.datatech.restapi
- import akka.actor._
- import akka.stream._
- import akka.http.scaladsl.common._
- import spray.JSON.DefaultJsonProtocol
- import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
- import akka.http.scaladsl.server._
- import MockModels.Address
- import MockRepo._
- trait FormatConverter extends SprayJsonSupport with DefaultJsonProtocol{
- implicit val addrFormat = jsonFormat4(Address.apply)
- }
- case class AddressRoute(val pathName: String)(implicit akkaSys: ActorSystem) extends Directives with FormatConverter{
- implicit val mat = ActorMaterializer()
- implicit val jsonStreamingSupport = EntityStreamingSupport.JSON()
- .withParallelMarshalling(parallelism = 2, unordered = false)
- val route = path(pathName) {
- get {
- complete(AddressRepo.getAll)
- } ~ post {
- withoutSizeLimit {
- entity(asSourceOf[Address]) { source =>
- /* val futSavedRows: Future[Seq[Address]] =
- source.runFold(Seq[Address]())((acc, addr) => acc :+ addr)
- onComplete(futSavedRows) { rows => */
- onComplete(AddressRepo.saveAll(source)) {rows =>
- complete { s"$rows address saved."}
- }
- }
- }
- } ~ path(pathName / LongNumber) { id =>
- get {
- complete(AddressRepo.getById(id)))
- } ~ put {
- entity(as[Address]) { addr =>
- onComplete(AddressRepo.updateById(id,addr)) { addr =>
- complete(s"address updated to: $addr")
- }
- } ~ delete {
- onComplete(AddressRepo.deleteById(id)) { addr =>
- complete(s"address deleted: $addr")
- }
- }
- }
这样做可以灵活的使用 akka-stream 提供的功能.
上面的例子 Mock PersonRoute.route 可以直接贴在主 route 后面:
- val route =
- path("auth") {
- authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
- post { complete(authenticator.issueJwt(userinfo))}
- }
- } ~
- pathPrefix("openspace") {
- (path("hello") & get) {
- complete(s"Hello, you are in open space.")
- }
- } ~
- pathPrefix("api") {
- authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
- (path("hello") & get) {
- complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
- } ~
- (path("how are you") & get) {
- complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
- } ~
- PersonRoute.route
- // ~ ...
- }
- }
和前面的示范一样, 我们还是写一个客户端来测试:
- import akka.actor._
- import akka.http.scaladsl.model.headers._
- import scala.concurrent._
- import scala.concurrent.duration._
- import akka.http.scaladsl.Http
- import spray.JSON.DefaultJsonProtocol
- import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
- import akka.http.scaladsl.marshalling._
- import akka.http.scaladsl.model._
- import akka.stream.ActorMaterializer
- trait JsonFormats extends SprayJsonSupport with DefaultJsonProtocol
- object JsonConverters extends JsonFormats {
- case class Person(name: String,age: Int)
- implicit val fmtPerson = jsonFormat2(Person)
- }
- object TestCrudClient {
- type UserInfo = Map[String,Any]
- def main(args: Array[String]): Unit = {
- implicit val system = ActorSystem()
- implicit val materializer = ActorMaterializer()
- // needed for the future flatMap/onComplete in the end
- implicit val executionContext = system.dispatcher
- val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/")
- val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
- val authRequest = HttpRequest(
- HttpMethods.POST,
- uri = "http://192.168.11.189:50081/auth",
- headers = List(authorization)
- )
- val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)
- val respToken = for {
- resp <- futToken
- jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
- } yield jstr
- val jstr = Await.result[String](respToken,2 seconds)
- println(jstr)
- scala.io.StdIn.readLine()
- val authentication = headers.Authorization(OAuth2BearerToken(jstr))
- val getAllRequest = HttpRequest(
- HttpMethods.GET,
- uri = "http://192.168.11.189:50081/api/crud/person",
- ).addHeader(authentication)
- val futGet: Future[HttpResponse] = Http().singleRequest(getAllRequest)
- println(Await.result(futGet,2 seconds))
- scala.io.StdIn.readLine()
- import JsonConverters._
- val saveRequest = HttpRequest(
- HttpMethods.POST,
- uri = "http://192.168.11.189:50081/api/crud/person"
- ).addHeader(authentication)
- val futPost: Future[HttpResponse] =
- for {
- reqEntity <- Marshal(Person("tiger chan",18)).to[RequestEntity]
- response <- Http().singleRequest(saveRequest.copy(entity=reqEntity))
- } yield response
- println(Await.result(futPost,2 seconds))
- scala.io.StdIn.readLine()
- system.terminate()
- }
- }
下面是 restapi 发展到现在状态的源代码:
- build.sbt
- name := "restapi"
- version := "0.3"
- scalaVersion := "2.12.8"
- libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-http" % "10.1.8",
- "com.typesafe.akka" %% "akka-stream" % "2.5.23",
- "com.pauldijou" %% "jwt-core" % "3.0.1",
- "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
- "org.json4s" %% "json4s-native" % "3.6.1",
- "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
- "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
- "org.slf4j" % "slf4j-simple" % "1.7.25",
- "org.json4s" %% "json4s-jackson" % "3.6.7",
- "org.json4s" %% "json4s-ext" % "3.6.7"
- )
- RestApiServer.scala
- package com.datatech.restapi
- import akka.actor._
- import akka.stream._
- import akka.http.scaladsl.Http
- import akka.http.scaladsl.server.Directives._
- import pdi.jwt._
- import AuthBase._
- import MockUserAuthService._
- object RestApiServer extends App {
- implicit val httpSys = ActorSystem("httpSystem")
- implicit val httpMat = ActorMaterializer()
- implicit val httpEC = httpSys.dispatcher
- implicit val authenticator = new AuthBase()
- .withAlgorithm(JwtAlgorithm.HS256)
- .withSecretKey("OpenSesame")
- .withUserFunc(getValidUser)
- val route =
- path("auth") {
- authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
- post { complete(authenticator.issueJwt(userinfo))}
- }
- } ~
- pathPrefix("api") {
- authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
- FileRoute(validToken)
- .route ~
- (pathPrefix("crud")) {
- PersonRoute.route
- }
- // ~ ...
- } ~
- (pathPrefix("crud")) {
- PersonRoute.route
- // ~ ...
- }
- }
- val (port, host) = (50081,"192.168.11.189")
- val bindingFuture = Http().bindAndHandle(route,host,port)
- println(s"Server running at $host $port. Press any key to exit ...")
- scala.io.StdIn.readLine()
- bindingFuture.flatMap(_.unbind())
- .onComplete(_ => httpSys.terminate())
- }
来源: https://www.cnblogs.com/tiger-xc/p/11229767.html