webflux 响应式编程利器
先来一张图, 这是 spring 文档的一张截图, 介绍了 spring 如今的两种开发模式, MVC 和 webflux 两种开发模式, 可见 webflux 的重要性
1. 初识 SpringWebFlux
webflux 是 spring5 推出的一种响应式 Web 框架, 它是一种非阻塞的开发模式, 可以在一个线程里处理多个请求 (非阻塞), 运行在 netty 环境, 也可以可以运行在 servlet3.1 之后的容器, 支持异步 servlet, 可以支持更高的并发量
2. 异步 servlet
我们知道同步 servlet 阻塞了 Tomcat 容器的线程, 当一个网络请求到我们的 Tomcat 容器之后, 容器会给每个请求启动一个线程去处理, 线程里面会调用一个 servlet 去处理, 当使用同步 servlet 时, 业务代码花多长时间, 你的线程就要等待多长时间, 这就是堵塞 (同步和异步是服务器后台才有异步这个概念, 对于浏览器来说所有的请求都是异步, 前台都要花费业务逻辑时间)
异步 servlet 的主要作用是它不会堵塞 Tomcat 容器的 servlet 线程, 它可以把一些耗时的操作放在一个独立的线程池, 那么我们的 servlet 就可以立马返回, 处理下一个请求, 以此就可以达到高并发.
通过代码比较一下同步 servlet 与异步 servlet
同步 servlet
- @WebServlet(urlPatterns = "/SyncServlet")
- public class SyncServlet extends HttpServlet {
- private static final long serialVersionUID = 1L;
- public SyncServlet() {
- super();
- }
- protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
- doGet(request, response);
- }
- protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
- long t1 = System.currentTimeMillis();
- // 执行业务代码
- doSomeThing(request, response);
- System.out.println("sync use:" + (System.currentTimeMillis() - t1));
- }
- private void doSomeThing(HttpServletRequest request,
- HttpServletResponse response) throws IOException {
- // 模拟耗时操作
- try {
- TimeUnit.SECONDS.sleep(5);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- response.getWriter().append("done");
- }
- }
异步 servlet
- @WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
- public class AsyncServlet extends HttpServlet {
- private static final long serialVersionUID = 1L;
- public AsyncServlet() {
- super();
- }
- protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
- doGet(request, response);
- }
- protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
- long t1 = System.currentTimeMillis();
- // 开启异步
- AsyncContext asyncContext = request.startAsync();
- // 执行业务代码, 放入一个线程池里
- CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
- asyncContext.getRequest(), asyncContext.getResponse()));
- System.out.println("async use:" + (System.currentTimeMillis() - t1));
- }
- private void doSomeThing(AsyncContext asyncContext,
- ServletRequest servletRequest, ServletResponse servletResponse) {
- // 模拟耗时操作
- try {
- TimeUnit.SECONDS.sleep(5);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- try {
- servletResponse.getWriter().append("done");
- } catch (IOException e) {
- e.printStackTrace();
- }
- // 业务代码处理完毕, 通知结束
- asyncContext.complete();
- }
- }
通过以上两段代码控制台的打印结果可以看出, 异步 servlet 把耗时操作放在一个独立的线程池, 那么我们的 servlet 就可以立马返回, 处理下一个请求.
3. CRUD 完整示例
通过下图可以看出 MVC 和 wenflux 的区别
以下通过一个例子了解一下 webflux 开发
实体类
- @Document(collection = "user")
- @Data
- public class User {
- @Id
- private String id;
- @NotBlank
- private String name;
- @Range(min=10, max=100)
- private int age;
- }
Controller 层
- @RestController
- @RequestMapping("/user")
- public class UserController {
- private final UserRepository repository;
- public UserController(UserRepository repository) {
- this.repository = repository;
- }
- /**
- * 以数组形式一次性返回数据
- */
- @GetMapping("/")
- public Flux<User> getAll() {
- return repository.findAll();
- }
- /**
- * 以 SSE 形式多次返回数据
- */
- @GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public Flux<User> streamGetAll() {
- return repository.findAll();
- }
- /**
- * 新增数据
- */
- @PostMapping("/")
- public Mono<User> createUser(@Valid @RequestBody User user) {
- // spring data jpa 里面, 新增和修改都是 save. 有 id 是修改, id 为空是新增
- // 根据实际情况是否置空 id
- user.setId(null);
- CheckUtil.checkName(user.getName());
- return this.repository.save(user);
- }
- /**
- * 根据 id 删除用户 存在的时候返回 200, 不存在返回 404
- */
- @DeleteMapping("/{id}")
- public Mono<ResponseEntity<Void>> deleteUser(
- @PathVariable("id") String id) {
- // deletebyID 没有返回值, 不能判断数据是否存在
- // this.repository.deleteById(id)
- return this.repository.findById(id)
- // 当你要操作数据, 并返回一个 Mono 这个时候使用 flatMap
- // 如果不操作数据, 只是转换数据, 使用 map
- .flatMap(user -> this.repository.delete(user).then(
- Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
- .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
- }
- /**
- * 修改数据 存在的时候返回 200 和修改后的数据, 不存在的时候返回 404
- */
- @PutMapping("/{id}")
- public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id,
- @Valid @RequestBody User user) {
- CheckUtil.checkName(user.getName());
- return this.repository.findById(id)
- // flatMap 操作数据
- .flatMap(u -> {
- u.setAge(user.getAge());
- u.setName(user.getName());
- return this.repository.save(u);
- })
- // map: 转换数据
- .map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
- .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
- }
- /**
- * 根据 ID 查找用户 存在返回用户信息, 不存在返回 404
- */
- @GetMapping("/{id}")
- public Mono<ResponseEntity<User>> findUserById(
- @PathVariable("id") String id) {
- return this.repository.findById(id)
- .map(u -> new ResponseEntity<User>(u, HttpStatus.OK))
- .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
- }
- /**
- * 根据年龄查找用户
- */
- @GetMapping("/age/{start}/{end}")
- public Flux<User> findByAge(@PathVariable("start") int start,
- @PathVariable("end") int end) {
- return this.repository.findByAgeBetween(start, end);
- }
- /**
- * 根据年龄查找用户
- */
- @GetMapping(value = "/stream/age/{start}/{end}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public Flux<User> streamFindByAge(@PathVariable("start") int start,
- @PathVariable("end") int end) {
- return this.repository.findByAgeBetween(start, end);
- }
- /**
- * 得到 20-30 用户
- */
- @GetMapping("/old")
- public Flux<User> oldUser() {
- return this.repository.oldUser();
- }
- /**
- * 得到 20-30 用户
- */
- @GetMapping(value = "/stream/old", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public Flux<User> streamOldUser() {
- return this.repository.oldUser();
- }
- }
Repository 层
- @Repository
- public interface UserRepository extends ReactiveMongoRepository<User, String> {
- /**
- * 根据年龄查找用户
- */
- Flux<User> findByAgeBetween(int start, int end);
- @Query("{'age':{'$gte': 20,'$lte': 30}}")
- Flux<User> oldUser();
- }
以上代码没有进行校验, 当然没有校验的代码是不能用的, 校验代码我就不放了, 想了解的 GitHub https://github.com/FSJAlger/webflux 上有完整代码.
来源: https://www.cnblogs.com/algerfan/p/10305148.html