在云计算的推动下, 软件系统发展趋于平台化. 云平台系统一般都是分布式的集群系统, 采用大数据技术. 在这方面 akka 提供了比较完整的开发技术支持. 我在上一个系列有关 CQRS 的博客中按照实际应用的要求对 akka 的一些开发技术进行了介绍. CQRS 模式着重操作流程控制, 主要涉及交易数据的管理. 那么, 作为交易数据产生过程中发挥验证作用的一系列基础数据如用户信息, 商品信息, 支付类型信息等又应该怎样维护呢? 首先基础数据也应该是在平台水平上的, 但数据的采集, 维护是在系统前端的, 比如一些 web 界面. 所以平台基础数据维护系统是一套前后台结合的系统. 对于一个开放的平台系统来说, 应该能够适应各式各样的前端系统. 一般来讲, 平台通过定义一套 API 与前端系统集成是通用的方法. 这套 API 必须遵循行业标准, 技术要普及通用, 这样才能支持各种异类前端系统功能开发. 在这些要求背景下, 相对 gRPC, GraphQL 来说, REST 风格的 http 集成模式能得到更多开发人员的接受.
在有关 CQRS 系列博客里, 我以 akka-http 作为系统集成工具的一种, 零星地针对实际需要对 http 通信进行了介绍. 在 restapi 这个系列里我想系统化的用 akka-http 构建一套完整的, REST 风格数据维护和数据交换 API, 除 CRUD 之外还包括网络安全, 文件交换等功能. 我的计划是用 akka-http 搭建一个平台数据维护 API 的 REST-CRUD 框架, 包含所有标配功能如用户验证, 异常处理等. CRUD 部分要尽量做成通用的 generic, 框架型的, 能用一套标准的方法对任何数据表进行操作.
akka-http 是一套 http 程序开发工具. 它的 Routing-DSL 及数据序列化 marshalling 等都功能强大. 特别是 HttpResponse 处理, 一句 complete 解决了一大堆问题, magnet-pattern 结合 marshalling 让它的使用更加方便.
在这篇讨论里先搭一个 restapi 的基本框架, 包括客户端身份验证和使用权限. 主要是示范如何达到通用框架的目的. 这个在 akka-http 编程里主要体现在 Routing-DSL 的结构上, 要求 Route 能够简洁易懂, 如下:
- val route =
- path("auth") {
- authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
- post { complete(authenticator.issueJwt(userinfo))}
- }
- } ~
- pathPrefix("api") {
- authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
- (path("hello") & get) {
- complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
- } ~
- (path("how are you") & get) {
- complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
- }
- // ~ ...
- }
- }
我觉着这应该是框架型正确的方向: 把所有功能都放在 API 下, 统统经过权限验证. 可以直接在后面不断加功能 Route.
身份验证和使用权限也应该是一套标准的东西, 但身份验证方法可能有所不同, 特别是用户身份验证可能是通过独立的身份验证服务器实现的, 对不同的验证机制应该有针对性的定制函数. 构建身份管理的对象应该很方便或者很通用, 如下:
- val authenticator = new AuthBase()
- .withAlgorithm(JwtAlgorithm.HS256)
- .withSecretKey("OpenSesame")
- .withUserFunc(getValidUser)
AuthBase 源码如下:
- package com.datatech.restapi
- import akka.http.scaladsl.server.directives.Credentials
- import pdi.jwt._
- import org.json4s.native.JSON
- import org.json4s._
- import org.json4s.jackson.JsonMethods._
- import pdi.jwt.algorithms._
- import scala.util._
- object AuthBase {
- type UserInfo = Map[String, Any]
- case class AuthBase(
- algorithm: JwtAlgorithm = JwtAlgorithm.HMD5,
- secret: String = "OpenSesame",
- getUserInfo: Credentials => Option[UserInfo] = null) {
- ctx =>
- def withAlgorithm(algo: JwtAlgorithm): AuthBase = ctx.copy(algorithm=algo)
- def withSecretKey(key: String): AuthBase = ctx.copy(secret = key)
- def withUserFunc(f: Credentials => Option[UserInfo]): AuthBase = ctx.copy(getUserInfo = f)
- def authenticateToken(credentials: Credentials): Option[String] =
- credentials match {
- case Credentials.Provided(token) =>
- algorithm match {
- case algo: JwtAsymmetricAlgorithm =>
- Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtAsymmetricAlgorithm]))) match {
- case true => Some(token)
- case _ => None
- }
- case _ =>
- Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtHmacAlgorithm]))) match {
- case true => Some(token)
- case _ => None
- }
- }
- case _ => None
- }
- def getUserInfo(token: String): Option[UserInfo] = {
- algorithm match {
- case algo: JwtAsymmetricAlgorithm =>
- Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtAsymmetricAlgorithm])) match {
- case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
- case Failure(err) => None
- }
- case _ =>
- Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtHmacAlgorithm])) match {
- case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
- case Failure(err) => None
- }
- }
- }
- def issueJwt(userinfo: UserInfo): String = {
- val claims = JwtClaim() + JSON(DefaultFormats).write(("userinfo", userinfo))
- Jwt.encode(claims, secret, algorithm)
- }
- }
- }
我已经把多个通用的函数封装在里面了. 再模拟一个用户身份管理对象:
- package com.datatech.restapi
- import akka.http.scaladsl.server.directives.Credentials
- import AuthBase._
- object MockUserAuthService {
- case class User(username: String, password: String, userInfo: UserInfo)
- val validUsers = Seq(User("johnny", "p4ssw0rd",Map("shopid" -> "1101", "userid" -> "101"))
- ,User("tiger", "secret", Map("shopid" -> "1101" , "userid" -> "102")))
- def getValidUser(credentials: Credentials): Option[UserInfo] =
- credentials match {
- case p @ Credentials.Provided(_) =>
- validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match {
- case Some(user) => Some(user.userInfo)
- case _ => None
- }
- case _ => None
- }
- }
好了, 服务端示范代码中可以直接构建或者调用这些标准的类型了:
- package com.datatech.restapi
- import akka.actor._
- import akka.stream._
- import akka.http.scaladsl.Http
- import akka.http.scaladsl.server.Directives._
- import pdi.jwt._
- import AuthBase._
- import MockUserAuthService._
- object RestApiServer extends App {
- implicit val httpSys = ActorSystem("httpSystem")
- implicit val httpMat = ActorMaterializer()
- implicit val httpEC = httpSys.dispatcher
- val authenticator = new AuthBase()
- .withAlgorithm(JwtAlgorithm.HS256)
- .withSecretKey("OpenSesame")
- .withUserFunc(getValidUser)
- val route =
- path("auth") {
- authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
- post { complete(authenticator.issueJwt(userinfo))}
- }
- } ~
- pathPrefix("api") {
- authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
- (path("hello") & get) {
- complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
- } ~
- (path("how are you") & get) {
- complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
- }
- // ~ ...
- }
- }
- val (port, host) = (50081,"192.168.11.189")
- val bindingFuture = Http().bindAndHandle(route,host,port)
- println(s"Server running at $host $port. Press any key to exit ...")
- scala.io.StdIn.readLine()
- bindingFuture.flatMap(_.unbind())
- .onComplete(_ => httpSys.terminate())
- }
就是说后面的 http 功能可以直接插进这个框架, 精力可以完全聚焦于具体每项功能的开发上了.
然后用下面的客户端测试代码:
- import akka.actor._
- import akka.stream._
- import akka.http.scaladsl.Http
- import akka.http.scaladsl.model.headers._
- import scala.concurrent._
- import akka.http.scaladsl.model._
- import pdi.jwt._
- import org.json4s._
- import org.json4s.jackson.JsonMethods._
- import scala.util._
- import scala.concurrent.duration._
- object RestApiClient {
- type UserInfo = Map[String,Any]
- def main(args: Array[String]): Unit = {
- implicit val system = ActorSystem()
- implicit val materializer = ActorMaterializer()
- // needed for the future flatMap/onComplete in the end
- implicit val executionContext = system.dispatcher
- val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/")
- val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
- val authRequest = HttpRequest(
- HttpMethods.POST,
- uri = "http://192.168.11.189:50081/auth",
- headers = List(authorization)
- )
- val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)
- val respToken = for {
- resp <- futToken
- jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
- } yield jstr
- val jstr = Await.result[String](respToken,2 seconds)
- println(jstr)
- scala.io.StdIn.readLine()
- val parts = Jwt.decodeRawAll(jstr, "OpenSesame", Seq(JwtAlgorithm.HS256)) match {
- case Failure(exception) => println(s"Error: ${exception.getMessage}")
- case Success(value) =>
- println(((parse(value._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
- }
- scala.io.StdIn.readLine()
- val authentication = headers.Authorization(OAuth2BearerToken(jstr))
- val apiRequest = HttpRequest(
- HttpMethods.GET,
- uri = "http://192.168.11.189:50081/api/hello",
- ).addHeader(authentication)
- val futAuth: Future[HttpResponse] = Http().singleRequest(apiRequest)
- println(Await.result(futAuth,2 seconds))
- scala.io.StdIn.readLine()
- system.terminate()
- }
- }
- build.sbt
- name := "restapi"
- version := "0.1"
- scalaVersion := "2.12.8"
- libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-http" % "10.1.8",
- "com.typesafe.akka" %% "akka-stream" % "2.5.23",
- "com.pauldijou" %% "jwt-core" % "3.0.1",
- "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
- "org.json4s" %% "json4s-native" % "3.6.1",
- "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
- "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
- "org.slf4j" % "slf4j-simple" % "1.7.25",
- "org.json4s" %% "json4s-jackson" % "3.6.7",
- "org.json4s" %% "json4s-ext" % "3.6.7"
- )
来源: https://www.cnblogs.com/tiger-xc/p/11169197.html