完成了一套标准的 REST 风格数据库 CRUD 操作 httpserver 后发现有许多不足. 主要是为了追求 "通用" 两个字, 想把所有服务接口做的更 "范 generic" 些, 结果反而限制了目标数据库的特点, 最终产生了一套功能弱小的玩具. 比如说吧: 标准 REST 风格 getbyId 需要所有的数据表都具备 id 这个字段, 有点傻. 然后 get 返回的结果集又没有什么灵活的控制方法如返回数量, 字段, 排序等. 特别对 MongoDB 这样的在查询操作方面接近关系式数据库的分布式数据库: 上篇提到过, 它的 query 能力强大, 条件组合灵活, 如果不能在网络服务 API 中体现出来就太可惜了. 所以, 这篇博文会讨论一套专门针对 MongoDB 的 REST-server. 我想达到的目的是: 后台数据库是 MongoDB, 通过 httpserver 提供对 MongoDB 的 CRUD 操作, 客户端通过 http 调用 CRUD 服务. 后台开发对每一个数据库表单使用统一的标准增添一套新的 CRUD 服务. 希望如此能够提高开发效率, 减少代码出错机会.
MongoDB 是一种文件类型数据库, 数据格式更加多样化. 在这次示范里希望能把 MongoDB 有特点的数据类型以及它们的处理方法都介绍了, 包括: 日期类型, 二进制类型 blob(图片) 等. 顺便提一下: 普通大型文本文件也可以用二进制 blob 方式存入 MongoDB, 因为文件在 http 传输过程中必须以 byte 方式进行, 所以后台 httpserver 接收的文件格式是一串 byte, 不用任何格式转换就可以直接存入 MongoDB blob 字段. 客户端从后台下载时就需要把 bytes 转换成 UTF8 字符就可以恢复文件内容了.
首先, 我们先从 Model 开始, 在 scala 里用 case class 来表示. Model 是 MongoDB Document 的对应. 在 scala 编程里我们是用 case class 当作 Document 来操作的. 我们设计的 Model 都会继承一个 ModelBase trait:
- trait ModelBase[E] {
- def to: E
- }
- case class Person(
- userid: String = "",
- name: String = "",
- age: Option[Int] = None,
- dob: Option[MGODate] = None, // 生日
- address: Option[String] = None
- ) extends ModelBase[Document] {
- import org.MongoDB.scala.bson._
- override def to: Document = {
- var doc = Document(
- "userid" -> this.userid,
- "name" -> this.name)
- if (this.age != None)
- doc = doc + ("age" -> this.age.get)
- if (this.dob != None)
- doc = doc + ("dob" -> this.dob.get)
- if (this.address != None)
- doc = doc + ("address" -> this.address.getOrElse(""))
- doc
- }
- }
- object Person {
- val fromDocument: Document => Person = doc => {
- val keyset = doc.keySet
- Person(
- userid = doc.getString("userid"),
- name = doc.getString("name"),
- age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],
- dob = {if (keyset.contains("dob"))
- Some(doc.getDate("dob"))
- else None },
- address = mgoGetStringOrNone(doc,"address")
- )
- }
- }
在上面例子里 Person 对应 MongoDB 里一个 Document. 除了注意对应类型属性与表字段类型外, 还提供了 to,fromDecument 两个转换函数. 其中 to 函数是继承 ModelBase 的, 代表所有 MongoDB Model 都必须具备 to 这个函数. 这点很重要, 因为在从 JSON 构建成 Model 时, 如果属于 ModelBase 则肯定可以调用一个 to 函数:
- class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(
- ...
- post {
- entity(as[String]) { JSON =>
- val extractedEntity: M = fromJson[M](JSON)
- val doc: Document = extractedEntity.to
- val futmsg = repository.insert(doc).value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(c) => c.toString()
- case None => "insert may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
注意这个 extractedEntity: 我们现在还不能确定它的具体类型, 是 Person,Animal,Machine? 但我们确定它是 M 类型, 而 M<:ModalBase[Document], 所以 M 是 MongoDB Model. 可以调用 extractedEntity.to 获取一个 Document.
仔细看, Person 里并不包括 blob 类型字段. 因为到现在我还没有想到办法在一个 httprequest 里把多个字段和图片一次性发出来, 必须分两个 request 才能完成一个 Document 的上传. httpserver 收到两个 requests 后还要进行 requests 的匹配对应管理, 十分的复杂. 所以含 blob 类型的 Document 只能把 blob 分拆到另一个 Document 里, 然后用这个 Document 唯一一个 id 字段来链接:
- case class Photo (
- id: String,
- photo: Option[MGOBlob]
- ) extends ModelBase[Document] {
- override def to: Document = {
- var doc = Document("id" -> this.id)
- if (photo != None)
- doc = doc + ("photo" -> this.photo)
- doc
- }
- }
- object Photo {
- def fromDocument: Document => Photo = doc => {
- val keyset = doc.keySet
- Photo(
- id = doc.getString("id"),
- photo = mgoGetBlobOrNone(doc, "photo")
- )
- }
- }
从另一个角度来讲, 把 blob 和正常字段分开来存储也有一定的优势, 最多也就是需要两次 query 罢了.
第二部分是 repository: 数据库操作函数:
- class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) {
- def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
- var res = Seq[ResultOptions]()
- next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
- sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
- fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
- top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
- val ctxFind = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
- .setCommand(Find(andThen = res))
- mgoQuery[Seq[R]](ctxFind,converter)
- }
- def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
- var res = Seq[ResultOptions]()
- next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
- sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
- fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
- top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
- val ctxFind = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
- .setCommand(Find(filter = Some(filtr),andThen = res))
- mgoQuery[Seq[R]](ctxFind,converter)
- }
- def getOneDocument(filtr: Bson): DBOResult[Document] = {
- val ctxFind = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
- .setCommand(Find(filter = Some(filtr),firstOnly = true))
- mgoQuery[Document](ctxFind,converter)
- }
- def insert(doc: Document): DBOResult[Completed] = {
- val ctxInsert = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
- .setCommand(Insert(Seq(doc)))
- mgoUpdate[Completed](ctxInsert)
- }
- def delete(filter: Bson): DBOResult[DeleteResult] = {
- val ctxDelete = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
- .setCommand(Delete(filter))
- mgoUpdate[DeleteResult](ctxDelete)
- }
- def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {
- val ctxUpdate = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
- .setCommand(Update(filter,update,None,!many))
- mgoUpdate[UpdateResult](ctxUpdate)
- }
- def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {
- val ctxUpdate = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
- .setCommand(Replace(filter,row))
- mgoUpdate[UpdateResult](ctxUpdate)
- }
- }
这部分上篇博文讨论过. 最后是 akka-http 的核心部分: Route.MongoDB CRUD 服务对外的 API:
- (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {
- (filter,fields,sort,top,next) => {
- dbor = {
- filter match {
- case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)
- case None => repository.getAll(next,sort,fields,top)
- }
- }
- val futRows = dbor.value.value.runToFuture.map {
- eolr =>
- eolr match {
- case Right(olr) => olr match {
- case Some(lr) => lr
- case None => Seq[M]()
- }
- case Left(_) => Seq[M]()
- }
- }
- complete(futureToJson(futRows))
- }
- } ~ post {
- entity(as[String]) { JSON =>
- val extractedEntity: M = fromJson[M](JSON)
- val doc: Document = extractedEntity.to
- val futmsg = repository.insert(doc).value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(c) => c.toString()
- case None => "insert may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
- complete(futmsg)
- }
- } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>
- val bson = Document(filter)
- if (set == None) {
- entity(as[String]) { JSON =>
- val extractedEntity: M = fromJson[M](JSON)
- val doc: Document = extractedEntity.to
- val futmsg = repository.replace(bson, doc).value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
- case None => "update may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
- complete(futureToJson(futmsg))
- }
- } else {
- set match {
- case Some(u) =>
- val ubson = Document(u)
- dbou = repository.update(bson, ubson, many.getOrElse(true))
- case None =>
- dbou = Left(new IllegalArgumentException("missing set statement for update!"))
- }
- val futmsg = dbou.value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
- case None => "update may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
- complete(futureToJson(futmsg))
- }
- } ~ (delete & parameters('filter,'many.as[Boolean].?)) { (filter,many) =>
- val bson = Document(filter)
- val futmsg = repository.delete(bson).value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(d) => s"${d.getDeletedCount} rows deleted."
- case None => "delete may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
- complete(futureToJson(futmsg))
- }
- }
与上篇最大的区别就是这次的 Route 支持 MongoDB 特性的 query string,bson 类型的参数. 如:
- http://192.168.0.189:50081/private/crud/person
- http://192.168.0.189:50081/private/crud/person?filter={
- "userid":"c001"
- }
- http://192.168.0.189:50081/private/crud/person?sort={
- "userid":-1
- }
- http://192.168.0.189:50081/private/crud/person?filter={
- "userid":{
- $gt:"c000"
- }
- }&sort={
- "userid":-1
- }&top=3
可惜的是 bson 表达式中有些字符是 url 禁止的, 所以必须预先处理一下. 可以用公网的 UrlEncoder 在线转换:
https://www.url-encoder.com {"userid":"c001"} -> %7B%22userid%22%3A%22c001%22%7D
在程序里可以用软件工具:"com.github.tasubo" % "jurl-tools" % "0.6" URLEncode.encode(xyz)
- val sort =
- """
- |{userid:-1}
- """.stripMargin
- val getAllRequest = HttpRequest(
- HttpMethods.GET,
- uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort),
- ).addHeader(authentication)
blob 服务的 API Route:
- pathPrefix("blob") {
- (get & path(Remaining)) { id =>
- val filtr = equal("id", id)
- val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {
- eodoc =>
- eodoc match {
- case Right(odoc) => odoc match {
- case Some(doc) =>
- if (doc == null) None
- else mgoGetBlobOrNone(doc, "photo")
- case None => None
- }
- case Left(_) => None
- }
- }
- onComplete(futOptPic) {
- case Success(optBlob) => optBlob match {
- case Some(blob) =>
- withoutSizeLimit {
- encodeResponseWith(Gzip) {
- complete(
- HttpEntity(
- ContentTypes.`application/octet-stream`,
- ByteArrayToSource(blob.getData))
- )
- }
- }
- case None => complete(StatusCodes.NotFound)
- }
- case Failure(err) => complete(err)
- }
- } ~
- (post & parameter('id)) { id =>
- withoutSizeLimit {
- decodeRequest {
- extractDataBytes { bytes =>
- val fut = bytes.runFold(ByteString()) { case (hd, bs) =>
- hd ++ bs
- }
- onComplete(fut) {
- case Success(b) =>
- val doc = Document("id" -> id, "photo" -> b.toArray)
- val futmsg = repository.insert(doc).value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(c) => c.toString()
- case None => "insert may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
- complete(futmsg)
- case Failure(err) => complete(err)
- }
- }
- }
- }
- }
- }
注意: MongoRoute[M] 是个范类型. 我希望对任何 Model 的 Route 只需要指定 M 即可, 如:
- implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument))
- implicit val picDao = new MongoRepo[Photo]("testdb","photo", None)
- ...
- pathPrefix("public") {
- (pathPrefix("crud")) {
- new MongoRoute[Person]("person")(personDao)
- .route ~
- new MongoRoute[Photo]("photo")(picDao)
- .route
- }
- }
是否省力多了? 但是, 回到原来问题: blob 类型在整个移动过程中都不需要进行格式转换. 所以 id 字段名称是指定的, 这点在设计表结构时要注意.
如何测试一个 httpserver 还是比较头痛的. 用浏览器只能测试 GET, 其它 POST,PUT,DELETE 应该怎么测试? 其实可以用 curl:
- curl -i -X GET http://rest-api.io/items
- curl -i -X GET http://rest-api.io/items/5069b47aa892630aae059584
- curl -i -X DELETE http://rest-api.io/items/5069b47aa892630aae059584
- curl -i -X POST -H 'Content-Type: application/json' -d '{"name":"New item","year":"2009"}' http://rest-api.io/items
- curl -i -X PUT -H 'Content-Type: application/json' -d '{"name":"Updated item","year":"2010"}' http://rest-api.io/items/5069b47aa892630aae059584
下面写两个客户端分别测试 crud 和 blob:
- TestCrudClient.scala
- import akka.actor._
- import akka.http.scaladsl.model.headers._
- import scala.concurrent._
- import scala.concurrent.duration._
- import akka.http.scaladsl.Http
- import akka.http.scaladsl.marshalling._
- import akka.http.scaladsl.model._
- import akka.stream.ActorMaterializer
- import com.GitHub.tasubo.jurl.URLEncode
- import com.datatech.REST.mongo.MongoModels.Person
- import de.heikoseeberger.akkahttpjson4s.Json4sSupport
- import org.json4s.jackson
- import com.datatech.sdp.mongo.engine.MGOClasses._
- trait JsonCodec extends Json4sSupport {
- import org.json4s.DefaultFormats
- import org.json4s.ext.JodaTimeSerializers
- implicit val serilizer = jackson.Serialization
- implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
- }
- object JsConverters extends JsonCodec
- object TestCrudClient {
- type UserInfo = Map[String,Any]
- def main(args: Array[String]): Unit = {
- import JsConverters._
- implicit val system = ActorSystem()
- implicit val materializer = ActorMaterializer()
- // needed for the future flatMap/onComplete in the end
- implicit val executionContext = system.dispatcher
- val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
- val authRequest = HttpRequest(
- HttpMethods.POST,
- uri = "http://192.168.0.189:50081/auth",
- headers = List(authorization)
- )
- val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)
- val respToken = for {
- resp <- futToken
- jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
- } yield jstr
- val jstr = Await.result[String](respToken,2 seconds)
- println(jstr)
- scala.io.StdIn.readLine()
- val authentication = headers.Authorization(OAuth2BearerToken(jstr))
- val sort =
- """
- |{userid:-1}
- """.stripMargin
- val getAllRequest = HttpRequest(
- HttpMethods.GET,
- uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort),
- ).addHeader(authentication)
- val futGetAll: Future[HttpResponse] = Http().singleRequest(getAllRequest)
- println(Await.result(futGetAll,2 seconds))
- scala.io.StdIn.readLine()
- var bf =
- """ |{"userid":"c888"}
- """.stripMargin
- println(URLEncode.encode(bf))
- val delRequest = HttpRequest(
- HttpMethods.DELETE,
- uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)
- ).addHeader(authentication)
- val futDel: Future[HttpResponse] = Http().singleRequest(delRequest)
- println(Await.result(futDel,2 seconds))
- scala.io.StdIn.readLine()
- bf =
- """ |{"userid":"c001"}
- """.stripMargin
- val getRequest = HttpRequest(
- HttpMethods.GET,
- uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf),
- ).addHeader(authentication)
- val futGet: Future[HttpResponse] = Http().singleRequest(getRequest)
- println(Await.result(futGet,2 seconds))
- scala.io.StdIn.readLine()
- val tiger = Person("c001","tiger chan",Some(56))
- val john = Person("c002", "johnny dep", Some(60))
- val peter = Person("c003", "pete brad", Some(58))
- val susan = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) )
- val ns = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) )
- val saveRequest = HttpRequest(
- HttpMethods.POST,
- uri = "http://192.168.0.189:50081/public/crud/person"
- ).addHeader(authentication)
- val futPost: Future[HttpResponse] =
- for {
- reqEntity <- Marshal(peter).to[RequestEntity]
- response <- Http().singleRequest(saveRequest.copy(entity=reqEntity))
- } yield response
- println(Await.result(futPost,2 seconds))
- scala.io.StdIn.readLine()
- var set =
- """
- | {$set:
- | {
- | name:"tiger the king",
- | age:18
- | }
- | }
- """.stripMargin
- val updateRequest = HttpRequest(
- HttpMethods.PUT,
- uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(
- bf)+"&set="+URLEncode.encode(set)+"&many=true"
- ).addHeader(authentication)
- val futUpdate: Future[HttpResponse] = Http().singleRequest(updateRequest)
- println(Await.result(futUpdate,2 seconds))
- scala.io.StdIn.readLine()
- val repRequest = HttpRequest(
- HttpMethods.PUT,
- uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf)
- ).addHeader(authentication)
- val futReplace: Future[HttpResponse] =
- for {
- reqEntity <- Marshal(susan).to[RequestEntity]
- response <- Http().singleRequest(updateRequest.copy(entity=reqEntity))
- } yield response
- println(Await.result(futReplace,2 seconds))
- scala.io.StdIn.readLine()
- system.terminate()
- }
- }
- TestFileClient.scala
- import akka.stream._
- import java.nio.file._
- import java.io._
- import akka.http.scaladsl.model.headers._
- import scala.concurrent._
- import com.datatech.REST.mongo.FileStreaming._
- import scala.concurrent.duration._
- import akka.actor.ActorSystem
- import akka.http.scaladsl.marshalling.Marshal
- import akka.http.scaladsl.model._
- import akka.http.scaladsl.Http
- import akka.stream.scaladsl.{FileIO, Source}
- import scala.util._
- case class FileUtil(implicit sys: ActorSystem) {
- import sys.dispatcher
- implicit val mat = ActorMaterializer()
- def createEntity(file: File): RequestEntity = {
- require(file.exists())
- val formData =
- Multipart.FormData(
- Source.single(
- Multipart.FormData.BodyPart(
- "test",
- HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
- Map("filename" -> file.getName))))
- Await.result(Marshal(formData).to[RequestEntity], 3 seconds)
- }
- def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
- implicit val mat = ActorMaterializer()
- import sys.dispatcher
- val futResp = Http(sys).singleRequest(
- // Gzip.encodeMessage(
- request.copy(entity = dataEntity) //.addHeader(`Content-Encoding`(HttpEncodings.gzip))
- // )
- )
- futResp
- .andThen {
- case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
- entity.dataBytes.map(_.utf8String).runForeach(println)
- case Success(r@HttpResponse(code, _, _, _)) =>
- println(s"Upload request failed, response code: $code")
- r.discardEntityBytes()
- case Success(_) => println("Unable to Upload file!")
- case Failure(err) => println(s"Upload failed: ${err.getMessage}")
- }
- }
- def downloadFileTo(request: HttpRequest, destPath: String) = {
- // val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))
- val futResp = Http(sys).singleRequest(request) //.map(Gzip.decodeMessage(_))
- futResp
- .andThen {
- case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
- entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
- .onComplete { case _ => println(s"Download file saved to: $destPath") }
- case Success(r@HttpResponse(code, _, _, _)) =>
- println(s"Download request failed, response code: $code")
- r.discardEntityBytes()
- case Success(_) => println("Unable to download file!")
- case Failure(err) => println(s"Download failed: ${err.getMessage}")
- }
- }
- }
- object TestFileClient {
- type UserInfo = Map[String,Any]
- def main(args: Array[String]): Unit = {
- implicit val system = ActorSystem()
- implicit val materializer = ActorMaterializer()
- // needed for the future flatMap/onComplete in the end
- implicit val executionContext = system.dispatcher
- val helloRequest = HttpRequest(uri = "http://192.168.0.189:50081/")
- val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
- val authRequest = HttpRequest(
- HttpMethods.POST,
- uri = "http://192.168.0.189:50081/auth",
- headers = List(authorization)
- )
- val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)
- val respToken = for {
- resp <- futToken
- jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
- } yield jstr
- val jstr = Await.result[String](respToken,2 seconds)
- println(jstr)
- scala.io.StdIn.readLine()
- val authentication = headers.Authorization(OAuth2BearerToken(jstr))
- val entity = HttpEntity(
- ContentTypes.`application/octet-stream`,
- fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024)
- )
- //
- val chunked = HttpEntity.Chunked.fromData(
- ContentTypes.`application/octet-stream`,
- fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024)
- )
- val uploadRequest = HttpRequest(
- HttpMethods.POST,
- // uri = "http://192.168.0.189:50081/private/file?filename=tiger.jpg",
- uri = "http://192.168.0.189:50081/public/crud/photo/blob?id=tiger.jpg",
- ).addHeader(authentication)
- //upload file
- Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)
- //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)
- val dlRequest = HttpRequest(
- HttpMethods.GET,
- // uri = "http://192.168.0.189:50081/api/file/mypic.jpg",
- uri = "http://192.168.0.189:50081/public/crud/photo/blob/tiger.jpg",
- ).addHeader(authentication)
- FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")
- scala.io.StdIn.readLine()
- system.terminate()
- }
- }
下面是本次示范中的源代码:
- build.sbt
- name := "rest-mongo"
- version := "0.1"
- scalaVersion := "2.12.8"
- scalacOptions += "-Ypartial-unification"
- val akkaVersion = "2.5.23"
- val akkaHttpVersion = "10.1.8"
- libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-http" % "10.1.8",
- "com.typesafe.akka" %% "akka-stream" % "2.5.23",
- "com.pauldijou" %% "jwt-core" % "3.0.1",
- "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
- "org.json4s" %% "json4s-native" % "3.6.1",
- "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
- "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
- "org.slf4j" % "slf4j-simple" % "1.7.25",
- "org.json4s" %% "json4s-jackson" % "3.6.7",
- "org.json4s" %% "json4s-ext" % "3.6.7",
- // for scalikejdbc
- "org.scalikejdbc" %% "scalikejdbc" % "3.2.1",
- "org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test",
- "org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1",
- "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
- "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
- "com.h2database" % "h2" % "1.4.199",
- "com.zaxxer" % "HikariCP" % "2.7.4",
- "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
- "com.typesafe.slick" %% "slick" % "3.3.2",
- //for cassandra 3.6.0
- "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
- "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",
- "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.1.0",
- //for MongoDB 4.0
- "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
- "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0",
- "ch.qos.logback" % "logback-classic" % "1.2.3",
- "io.monix" %% "monix" % "3.0.0-RC3",
- "org.typelevel" %% "cats-core" % "2.0.0-M4",
- "com.github.tasubo" % "jurl-tools" % "0.6"
- )
- MongoHttpServer.scala
- package com.datatech.REST.mongo
- import akka.actor._
- import akka.stream._
- import akka.http.scaladsl.Http
- import akka.http.scaladsl.server.Directives._
- import pdi.jwt._
- import AuthBase._
- import MockUserAuthService._
- import org.MongoDB.scala._
- import scala.collection.JavaConverters._
- import MongoModels._
- import MongoRepo._
- import MongoRoute._
- object MongoHttpServer extends App {
- implicit val httpSys = ActorSystem("httpSystem")
- implicit val httpMat = ActorMaterializer()
- implicit val httpEC = httpSys.dispatcher
- val settings: MongoClientSettings = MongoClientSettings.builder()
- .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))
- .build()
- implicit val client: MongoClient = MongoClient(settings)
- implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument))
- implicit val picDao = new MongoRepo[Photo]("testdb","photo", None)
- implicit val authenticator = new AuthBase()
- .withAlgorithm(JwtAlgorithm.HS256)
- .withSecretKey("OpenSesame")
- .withUserFunc(getValidUser)
- val route =
- path("auth") {
- authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
- post { complete(authenticator.issueJwt(userinfo))}
- }
- } ~
- pathPrefix("private") {
- authenticateOAuth2(realm = "private", authenticator.authenticateToken) { validToken =>
- FileRoute(validToken)
- .route
- // ~ ...
- }
- } ~
- pathPrefix("public") {
- (pathPrefix("crud")) {
- new MongoRoute[Person]("person")(personDao)
- .route ~
- new MongoRoute[Photo]("photo")(picDao)
- .route
- }
- }
- val (port, host) = (50081,"192.168.0.189")
- val bindingFuture = Http().bindAndHandle(route,host,port)
- println(s"Server running at $host $port. Press any key to exit ...")
- scala.io.StdIn.readLine()
- bindingFuture.flatMap(_.unbind())
- .onComplete(_ => httpSys.terminate())
- }
- ModalBase.scala
- package com.datatech.REST.mongo
- trait ModelBase[E] {
- def to: E
- }
- MongoModel.scala
- package com.datatech.REST.mongo
- import org.MongoDB.scala._
- import com.datatech.sdp.mongo.engine._
- import MGOClasses._
- object MongoModels {
- case class Person(
- userid: String = "",
- name: String = "",
- age: Option[Int] = None,
- dob: Option[MGODate] = None,
- address: Option[String] = None
- ) extends ModelBase[Document] {
- import org.MongoDB.scala.bson._
- override def to: Document = {
- var doc = Document(
- "userid" -> this.userid,
- "name" -> this.name)
- if (this.age != None)
- doc = doc + ("age" -> this.age.get)
- if (this.dob != None)
- doc = doc + ("dob" -> this.dob.get)
- if (this.address != None)
- doc = doc + ("address" -> this.address.getOrElse(""))
- doc
- }
- }
- object Person {
- val fromDocument: Document => Person = doc => {
- val keyset = doc.keySet
- Person(
- userid = doc.getString("userid"),
- name = doc.getString("name"),
- age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]],
- dob = {if (keyset.contains("dob"))
- Some(doc.getDate("dob"))
- else None },
- address = mgoGetStringOrNone(doc,"address")
- )
- }
- }
- case class Photo (
- id: String,
- photo: Option[MGOBlob]
- ) extends ModelBase[Document] {
- override def to: Document = {
- var doc = Document("id" -> this.id)
- if (photo != None)
- doc = doc + ("photo" -> this.photo)
- doc
- }
- }
- object Photo {
- def fromDocument: Document => Photo = doc => {
- val keyset = doc.keySet
- Photo(
- id = doc.getString("id"),
- photo = mgoGetBlobOrNone(doc, "photo")
- )
- }
- }
- }
- MongoRepo.scala
- package com.datatech.REST.mongo
- import org.MongoDB.scala._
- import org.bson.conversions.Bson
- import org.MongoDB.scala.result._
- import com.datatech.sdp.mongo.engine._
- import MGOClasses._
- import MGOEngine._
- import MGOCommands._
- import com.datatech.sdp.result.DBOResult.DBOResult
- import MongoModels._
- object MongoRepo {
- class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) {
- def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
- var res = Seq[ResultOptions]()
- next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
- sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
- fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
- top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
- val ctxFind = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
- .setCommand(Find(andThen = res))
- mgoQuery[Seq[R]](ctxFind,converter)
- }
- def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {
- var res = Seq[ResultOptions]()
- next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))}
- sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))}
- fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))}
- top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)}
- val ctxFind = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
- .setCommand(Find(filter = Some(filtr),andThen = res))
- mgoQuery[Seq[R]](ctxFind,converter)
- }
- def getOneDocument(filtr: Bson): DBOResult[Document] = {
- val ctxFind = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_QUERY)
- .setCommand(Find(filter = Some(filtr),firstOnly = true))
- mgoQuery[Document](ctxFind,converter)
- }
- def insert(doc: Document): DBOResult[Completed] = {
- val ctxInsert = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
- .setCommand(Insert(Seq(doc)))
- mgoUpdate[Completed](ctxInsert)
- }
- def delete(filter: Bson): DBOResult[DeleteResult] = {
- val ctxDelete = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
- .setCommand(Delete(filter))
- mgoUpdate[DeleteResult](ctxDelete)
- }
- def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = {
- val ctxUpdate = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
- .setCommand(Update(filter,update,None,!many))
- mgoUpdate[UpdateResult](ctxUpdate)
- }
- def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = {
- val ctxUpdate = MGOContext(dbName = db,collName=coll)
- .setActionType(MGO_ACTION_TYPE.MGO_UPDATE)
- .setCommand(Replace(filter,row))
- mgoUpdate[UpdateResult](ctxUpdate)
- }
- }
- }
- MongoRoute.scala
- package com.datatech.REST.mongo
- import akka.http.scaladsl.server.Directives
- import scala.util._
- import org.MongoDB.scala._
- import com.datatech.sdp.file.Streaming._
- import org.MongoDB.scala.result._
- import MongoRepo._
- import akka.stream.ActorMaterializer
- import com.datatech.sdp.result.DBOResult._
- import org.MongoDB.scala.model.Filters._
- import com.datatech.sdp.mongo.engine.MGOClasses._
- import monix.execution.CancelableFuture
- import akka.util._
- import akka.http.scaladsl.model._
- import akka.http.scaladsl.coding.Gzip
- object MongoRoute {
- class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])(
- implicit c: MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter {
- import monix.execution.Scheduler.Implicits.global
- var dbor: DBOResult[Seq[M]] = _
- var dbou: DBOResult[UpdateResult] = _
- val route = pathPrefix(pathName) {
- pathPrefix("blob") {
- (get & path(Remaining)) { id =>
- val filtr = equal("id", id)
- val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map {
- eodoc =>
- eodoc match {
- case Right(odoc) => odoc match {
- case Some(doc) =>
- if (doc == null) None
- else mgoGetBlobOrNone(doc, "photo")
- case None => None
- }
- case Left(_) => None
- }
- }
- onComplete(futOptPic) {
- case Success(optBlob) => optBlob match {
- case Some(blob) =>
- withoutSizeLimit {
- encodeResponseWith(Gzip) {
- complete(
- HttpEntity(
- ContentTypes.`application/octet-stream`,
- ByteArrayToSource(blob.getData))
- )
- }
- }
- case None => complete(StatusCodes.NotFound)
- }
- case Failure(err) => complete(err)
- }
- } ~
- (post & parameter('id)) { id =>
- withoutSizeLimit {
- decodeRequest {
- extractDataBytes { bytes =>
- val fut = bytes.runFold(ByteString()) { case (hd, bs) =>
- hd ++ bs
- }
- onComplete(fut) {
- case Success(b) =>
- val doc = Document("id" -> id, "photo" -> b.toArray)
- val futmsg = repository.insert(doc).value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(c) => c.toString()
- case None => "insert may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
- complete(futmsg)
- case Failure(err) => complete(err)
- }
- }
- }
- }
- }
- } ~
- (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) {
- (filter,fields,sort,top,next) => {
- dbor = {
- filter match {
- case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top)
- case None => repository.getAll(next,sort,fields,top)
- }
- }
- val futRows = dbor.value.value.runToFuture.map {
- eolr =>
- eolr match {
- case Right(olr) => olr match {
- case Some(lr) => lr
- case None => Seq[M]()
- }
- case Left(_) => Seq[M]()
- }
- }
- complete(futureToJson(futRows))
- }
- } ~ post {
- entity(as[String]) { JSON =>
- val extractedEntity: M = fromJson[M](JSON)
- val doc: Document = extractedEntity.to
- val futmsg = repository.insert(doc).value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(c) => c.toString()
- case None => "insert may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
- complete(futmsg)
- }
- } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) =>
- val bson = Document(filter)
- if (set == None) {
- entity(as[String]) { JSON =>
- val extractedEntity: M = fromJson[M](JSON)
- val doc: Document = extractedEntity.to
- val futmsg = repository.replace(bson, doc).value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
- case None => "update may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
- complete(futureToJson(futmsg))
- }
- } else {
- set match {
- case Some(u) =>
- val ubson = Document(u)
- dbou = repository.update(bson, ubson, many.getOrElse(true))
- case None =>
- dbou = Left(new IllegalArgumentException("missing set statement for update!"))
- }
- val futmsg = dbou.value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated."
- case None => "update may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
- complete(futureToJson(futmsg))
- }
- } ~ (delete & parameters('filter,'many.as[Boolean].?)) { (filter,many) =>
- val bson = Document(filter)
- val futmsg = repository.delete(bson).value.value.runToFuture.map {
- eoc =>
- eoc match {
- case Right(oc) => oc match {
- case Some(d) => s"${d.getDeletedCount} rows deleted."
- case None => "delete may not complete!"
- }
- case Left(err) => err.getMessage
- }
- }
- complete(futureToJson(futmsg))
- }
- }
- }
- }
来源: https://www.cnblogs.com/tiger-xc/p/11325996.html