加入收藏 | 设为首页 | 会员中心 | 我要投稿 辽源站长网 (https://www.0437zz.com/)- 云专线、云连接、智能数据、边缘计算、数据安全!
当前位置: 首页 > 运营中心 > 建站资源 > 优化 > 正文

1小时让你掌握响应式编程,并入门Reactor

发布时间:2019-09-24 18:32:17 所属栏目:优化 来源:Java之高级架构
导读:我看同步阻塞 你知道什么是同步阻塞吗,当然知道了。那你怎么看它呢,这个。。。 在同步阻塞的世界里,代码执行到哪里,数据就跟到哪里。如果数据很慢跟不上来,代码就停在那里等待数据的到来,然后再带着数据一起往下执行。 可以说是,代码执行和数据是结

那就开始建立对应关系。因为“反应”是一系列行为动作,所以应该和“执行逻辑”对应。那“变化”只能和“数据”对应,其实这是对的,“数据”由不可用到可用,本身就是发生了一个“变化”。

这个对应关系建立的很完美,但是逻辑顺序却完全冲突。响应式是由变化主导反应,这很好理解,我都没有变化,你无须做出反应。同步阻塞是由执行逻辑主导数据,这也很好理解,我代码都没执行呢,根本不需要数据。

可见,它们的对应关系非常完美,但主导顺序完全相反,这就是一个非常非常有价值的发现。

因为我们只需把同步阻塞倒过来,就是实现响应式的大致方向。这样的推理貌似是对的,但实际当中是这样的吗?嗯,是这样的。

现在请大家和我一起扭转思维。原来以逻辑代码执行作为主线,数据作为参与者。现在以数据作为主线,逻辑代码执行作为参与者。说的再白一些,原来是数据传递到逻辑代码里,现在是逻辑代码传递到数据里。

有人也许会问,逻辑代码怎么传递?哈哈,Lambda表达式呀,函数式编程呀。

想象一下,有一个长长的管子,里面的水一直在流。

如果你想让水变成橙色的,只需在管子上开个口,加装一个可以持续投放橙色染料的装置,结果流经它的水都变成橙色的了。

如果你想让橙色的水变甜的话,只需在后面的管子上开个口,加装一个可以持续投放白糖的装置,结果流经它的水都变成甜的了。

同理,可以在后面继续加装投放柠檬酸的装置,让水变酸,在后面继续加装压入二氧化碳的装置,让水带气泡。

最后发现,自来水经过多道工序处理后变成了芬达。

如果把水流看作是数据流,把投放装置看作是逻辑代码,就变成了,数据先流入第一个逻辑代码,处理后再流入第二个逻辑代码,依次流下去直至结束。

这就是以数据作为主线,逻辑代码只是参与者,同时它也是Reactor实现响应式编程的原理,Spring官方使用的响应式类库就是Reactor。

其中,“以数据为主线”和“在变化时通知处理者”这两个功能Reactor库都已经实现了,我们需要做的就是“对变化做出反应”,即插入逻辑代码。

Reactor入门

在Reactor中,有两个非常重要的类,就是Mono和Flux,它们都是数据源,在它们内部都已经实现了“以数据为主线”和“在变化时通知处理者”这两个功能,而且还提供了方法让我们来插入逻辑代码用于“对变化做出反应”。

Mono表示0个或1个数据,Flux表示0到多个数据。先从简单的Mono开始。

设计一个简单的示例,首先创建一个数据源,只包含一个数据10,第一个处理就是加1,第二个处理就是奇偶性过滤,第三个处理就是把这个数据消费掉,然后就结束了。

为了清楚地看出来主线程执行的是哪些代码,工作线程执行的是哪些代码,特意打印了很多信息。

  1. public static void main(String[] args) { 
  2.  displayCurrTime(1); 
  3.  displayCurrThreadId(1); 
  4.  //创建一个数据源 
  5.  Mono.just(10) 
  6.  //延迟5秒再发射数据 
  7.  .delayElement(Duration.ofSeconds(5)) 
  8.  //在数据上执行一个转换 
  9.  .map(n -> { 
  10.  displayCurrTime(2); 
  11.  displayCurrThreadId(2); 
  12.  displayValue(n); 
  13.  delaySeconds(2); 
  14.  return n + 1; 
  15.  }) 
  16.  //在数据上执行一个过滤 
  17.  .filter(n -> { 
  18.  displayCurrTime(3); 
  19.  displayCurrThreadId(3); 
  20.  displayValue(n); 
  21.  delaySeconds(3); 
  22.  return n % 2 == 0; 
  23.  }) 
  24.  //如果数据没了就用默认值 
  25.  .defaultIfEmpty(9) 
  26.  //订阅一个消费者把数据消费了 
  27.  .subscribe(n -> { 
  28.  displayCurrTime(4); 
  29.  displayCurrThreadId(4); 
  30.  displayValue(n); 
  31.  delaySeconds(2); 
  32.  System.out.println(n + " consumed, worker Thread over, exit."); 
  33.  }); 
  34.  displayCurrTime(5); 
  35.  displayCurrThreadId(5); 
  36.  pause(); 
  37. //显示当前时间 
  38. static void displayCurrTime(int point) { 
  39.  System.out.println(point + " : " + LocalTime.now()); 
  40. //显示当前线程Id 
  41. static void displayCurrThreadId(int point) { 
  42.  System.out.println(point + " : " + Thread.currentThread().getId()); 
  43. //显示当前的数值 
  44. static void displayValue(int n) { 
  45.  System.out.println("input : " + n); 
  46. //延迟若干秒 
  47. static void delaySeconds(int seconds) { 
  48.  try { 
  49.  TimeUnit.SECONDS.sleep(seconds); 
  50.  } catch (InterruptedException e) { 
  51.  e.printStackTrace(); 
  52.  } 
  53. //主线程暂停 
  54. static void pause() { 
  55.  try { 
  56.  System.out.println("main Thread over, paused."); 
  57.  System.in.read(); 
  58.  } catch (IOException e) { 
  59.  e.printStackTrace(); 
  60.  } 

(编辑:辽源站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读