Flux转Mono next()

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
public class TestFindResult {
  private static final Map<String, String> templates;
  private static final int sleep = 1000;

  static {
    templates = new LinkedHashMap<>();
    templates.put("aDB", "a");
    templates.put("bDB", "b");
    templates.put("cDB", "c");
  }

  public Mono<String> findResult(Function<String, Mono<String>> query) {
    return Flux.fromIterable(templates.values())
        .flatMap(query)
        .next()
        .onErrorResume(NoSuchElementException.class, e -> Mono.empty())
        .onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new);
  }

  public static void main(String[] args) {
    TestFindResult test = new TestFindResult();
    Function<String, Mono<String>> query = (value) -> {
      try {
        Thread.sleep(sleep); // mock DB query
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      log.info(
          "Thread id:{}, Thread name:{}, value:{}, used ms:{}",
          Thread.currentThread().getId(),
          Thread.currentThread().getName(),
          value,
          sleep);
      return Mono.just(value);
    };
    System.out.println(test.findResult(query).subscribe());
  }
}
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
public class TestFindMongo {
  private static final Map<String, String> templates;
  private static final int sleep = 1000;

  static {
    templates = new LinkedHashMap<>();
    templates.put("aDB", "a");
    templates.put("bDB", "b");
    templates.put("cDB", "c");
  }

  public Mono<String> findMongo() {
    StopWatch stopWatch = StopWatch.createStarted();
    return Flux.fromIterable(templates.entrySet())
        .filterWhen(
            template -> {
              String key = template.getKey();
              String value = template.getValue();
              try {
                Thread.sleep(sleep); // mock DB query
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
              log.info(
                  "Thread id:{}, Thread name:{}, query:{}, value:{} , used ms:{}",
                  Thread.currentThread().getId(),
                  Thread.currentThread().getName(),
                  key,
                  value,
                  sleep);
              return Mono.just(value.equals("b"));
            })
        .next()
        .doOnSuccess(templateEntry -> log.info("Match {} ", templateEntry.getKey()))
        .map(Entry::getValue)
        .onErrorResume(NoSuchElementException.class, e -> Mono.empty())
        .onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new)
        .doOnTerminate(() -> log.info("Database recon took {} ms", stopWatch.getTime()));
  }

  public static void main(String[] args) {
    TestFindMongo test = new TestFindMongo();
    System.out.println(test.findMongo().subscribe());
  }
}
import static org.springframework.http.HttpStatus.*;

import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Flux;

public class MultipleUpstreamException extends ResponseStatusException {

  private static final String MULTILPLE_UPSTREAM_MATCH_ERR =
      "Your query contains properties matching multiple upstreams. "
          + "Data for multiple upstreams can‘t be returned in one query. "
          + "Please either specify upstream by providing publisherSystem "
          + "(GSM,MUNI_ITICKET,MUNI_OASYS,TPSDERIV,EDLR) "
          + "and region or request deal properties matching only one upstream";

  MultipleUpstreamException() {
    super(BAD_REQUEST, MULTILPLE_UPSTREAM_MATCH_ERR);
  }

  /**
   * This constructor has syntax adapted to Mono API
   *
   * @param indexOutOfBoundsException emitted on {@link Flux#single()} when Flux has more than one
   *     elements
   * @see Mono#onErrorMap(Class, java.util.function.Function))
   */
  MultipleUpstreamException(IndexOutOfBoundsException indexOutOfBoundsException) {
    this();
  }
}

原文地址:https://www.cnblogs.com/tonggc1668/p/12055711.html

时间: 2024-11-11 22:56:34

Flux转Mono next()的相关文章

Flux 和 Mono 的区别

Flux 和 Mono 是 Reactor 中的两个基本概念.Flux 表示的是包含 0 到 N 个元素的异步序列.在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息.序列结束的消息和序列出错的消息.当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用.Mono 表示的是包含 0 或者 1 个元素的异步序列.该序列中同样可以包含与 Flux 相同的三种类型的消息通知.Flux 和 Mono 之间可以进行转换.对一个 Flux

Reactor系列(十六)disposable停止Flux流

#java#reactor#flux#disposable# 停止flux流 视频讲解: https://www.bilibili.com/video/av81385859/ FluxMonoTestCase.java package com.example.reactor; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import reactor.core.Disposable; import rea

Spring 5 新特性:函数式Web框架

举例 我们先从示例应用程序的一些摘录开始.下面是暴露Person对象的响应信息库.很类似于传统的,非响应信息库,只不过它返回Flux<Person>而传统的返回List<Person>,以及返回Mono<Person>的地方返回Person.Mono<Void>用作完成标识:指出何时保存被完成.关于Reactor类型的更多信息,请参阅此博客文章. public interface PersonRepository { Mono<Person> g

(12)Reactor 3 自定义数据流——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式流 | Reactor 3快速上手 | 响应式流规范本文源码 2.2 自定义数据流 这一小节介绍如何通过定义相应的事件(onNext.onError和onComplete) 创建一个 Flux 或 Mono.Reactor提供了generate.create.push和handle等方法,所有这些方法都使用 sink(池)来生成数据流. sink,顾名思义,就是池子,可以想象一下厨房水池的样子.如下图所示: 下面介绍到的方法都有一个

(12)自定义数据流(实战Docker事件推送的REST API)——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor 3快速上手 | Spring WebFlux快速上手 | 响应式流规范本文 测试源码 | 实战源码 2.2 自定义数据流 这一小节介绍如何通过定义相应的事件(onNext.onError和onComplete) 创建一个 Flux 或 Mono.Reactor提供了generate.create.push和handle等方法,所有这些方法都使用 sink(池)来生成数据流. sink,顾名思义,就是池子,可以想象一下厨房水

附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor Operators 本节的内容来自我翻译的Reactor 3 参考文档--如何选择操作符.由于部分朋友打开github.io网速比较慢或上不去,贴出来方便大家查阅. 如果一个操作符是专属于 Flux 或 Mono 的,那么会给它注明前缀.公共的操作符没有前缀.如果一个具体的用例涉及多个操作符的组合,这里以方法调用的方式展现,会以一个点(.)开头,并将参数置于圆括号内,比如: .methodCall(parameter).

(15)Reactor 3 Operators——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 Reactor 3快速上手 | 响应式流规范 2.5 Reactor 3 Operators 虽然响应式流规范中对Operator(以下均称作"操作符")并未做要求,但是与RxJava等响应式开发库一样,Reactor也提供了非常丰富的操作符. 2.5.1 丰富的操作符 本系列前边的文章中,陆续介绍了一些常用的操作符.但那也只是冰山之一角,Reactor 3提供了丰富的操作符,如果要一个一个介绍,那篇幅大了去了,授人以

《Spring5官方文档》新功能(4,3)

<Spring5官方文档>新功能 原文链接 译者:supriseli Spring框架的新功能 这一章主要提供Spring框架新的功能和变更. 升级到新版本的框架可以参考.Spring git. 内容列表 Spring 5.x框架新的功能 Spring 4.x框架新的功能 Spring 3.x框架新的功能 Spring FrameWork 5.0新的功能 JDK 8+和Java EE7+以上版本 整个框架的代码基于java8 通过使用泛型等特性提高可读性 对java8提高直接的代码支撑 运行时

零基础快速入门SpringBoot2.0教程 (四)

一.JMS介绍和使用场景及基础编程模型简介:讲解什么是小写队列,JMS的基础知识和使用场景 1.什么是JMS: Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口 2.JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity).这里,JDBC 是可以用来访问许多不同关系数据库的 API 3.使用场景: 1)跨平台 2)多语言 3)多项目 4)解耦 5)分布式事务 6)流量控