[RxJS] Sharing Streams with Share

A stream will run with each new subscription added to it. This lesson shows the benefits of using share so that the same stream can be shared across multiple subscriptions.

const timer$ = starters$
    .switchMap(intervalActions)
    .startWith(data)
    .scan((acc, curr)=> curr(acc))

timer$
    .do((x)=> console.log(x))
    .takeWhile((data)=> data.count <= 3)
    .withLatestFrom(
        input$.do((x)=> console.log(x)),
        (timer, input)=> ({count: timer.count, text: input})
    )
    .filter((data)=> data.count === parseInt(data.text))
    .reduce((acc, curr)=> acc + 1, 0)
    .repeat()
    .subscribe(
        (x)=> document.querySelector(‘#score‘).innerHTML = `
            ${x}
        `,
        err=> console.log(err),
        ()=> console.log(‘complete‘)
    );

Current code has a problem every time we type in the input, we need to click delete keyborad to delete the input. This is not conveninet.

So what we want is every time when the code check whether the value match, then we clean the input:

const runningGame$ = timer$
    .do((x)=> console.log(x))
    .takeWhile((data)=> data.count <= 3)
    .withLatestFrom(
        input$.do((x)=> console.log(x)),
        (timer, input)=> ({count: timer.count, text: input})
    );

runningGame$
  .subscribe( (x) => input.value = "");

 runningGame$
    .filter((data)=> data.count === parseInt(data.text))
    .reduce((acc, curr)=> acc + 1, 0)
    .repeat()
    .subscribe(
        (x)=> document.querySelector(‘#score‘).innerHTML = `
            ${x}
        `,
        err=> console.log(err),
        ()=> console.log(‘complete‘)
    );

So we split the code and add another subscribe to clean the input, but now the problem is everytime it console log twice. This is because we have two subscribe on the runningGame$.

TO solve this problem, we need to share() the stream:

const runnintGame$ = timer$
    .do((x)=> console.log(x))
    .takeWhile((data)=> data.count <= 3)
    .withLatestFrom(
        input$.do((x)=> console.log(x)),
        (timer, input)=> ({count: timer.count, text: input})
    )
.share();

Now it only log out once, but it doesn‘t clean the input when we click start again. THis is because we only repeat() this part of code:

 runningGame$
    .filter((data)=> data.count === parseInt(data.text))
    .reduce((acc, curr)=> acc + 1, 0)
    .repeat()

What we need to do is also make runningGame$ (clean the input) stream repeat itself:

// To clean the input
runningGame$
  .repeat()
  .subscribe( (x) => input.value = "");

----------------------------

Code:

const timer$ = starters$
    .switchMap(intervalActions)
    .startWith(data)
    .scan((acc, curr)=> curr(acc))

const runningGame$ = timer$
    .do((x)=> console.log(x))
    .takeWhile((data)=> data.count <= 3)
    .withLatestFrom(
        input$.do((x)=> console.log(x)),
        (timer, input)=> ({count: timer.count, text: input})
    )
.share();

// To clean the input
runningGame$
  .repeat()
  .subscribe( (x) => input.value = "");

 runningGame$
    .filter((data)=> data.count === parseInt(data.text))
    .reduce((acc, curr)=> acc + 1, 0)
    .repeat()
    .subscribe(
        (x)=> document.querySelector(‘#score‘).innerHTML = `
            ${x}
        `,
        err=> console.log(err),
        ()=> console.log(‘complete‘)
    );
时间: 2024-10-10 21:45:15

[RxJS] Sharing Streams with Share的相关文章

[RxJS] Combining streams in RxJS

Source: Link We will looking some opreators for combining stream in RxJS: merge combineLatest withLatestFrom concat forkJoin flatMap / switchMap  Merge:  Observable.merge behaves like a "logical OR" to have your stream handle one interaction OR

[RxJS] Combining Streams with CombineLatest

Two streams often need to work together to produce the values you’ll need. This lesson shows how to use an input stream and an interval stream together and push an object with both values through the stream. const Observable = Rx.Observable; const st

[RxJS] Aggregating Streams With Reduce And Scan using RxJS

What is the RxJS equivalent of Array reduce? What if I want to emit my reduced or aggregated value at each event? This brief tutorial covers Observable operators reduce() and scan(), their differences and gotchas. In ES5, the Array's reduce function

Methods and systems for sharing common job information

Apparatus and methods are provided for utilizing a plurality of processing units. A method comprises selecting a pending job from a plurality of unassigned jobs based on a plurality of assigned jobs for the plurality of processing units and assigning

【专家坐堂】四种并发编程模型简介

本文来自网易云社区 概述 并发往往和并行一起被提及,但是我们应该明确的是"并发"不等同于"并行" ?       并发 :同一时间 对待 多件事情 (逻辑层面) ?       并行 :同一时间 做(执行) 多件事情 (物理层面) 并发可以构造出一种问题解决方法,该方法能够被用于并行化,从而让原本只能串行处理的事务并行化,更好地发挥出当前多核CPU,分布式集群的能力. 但是,并发编程和人们正常的思维方式是不一样的,因此才有了各种编程模型的抽象来帮助我们更方便,更不容

windows 2008 R2通过NFS共享磁盘给HP-UX或Linux挂载使用

此次任务是把windows 2008 R2的本地硬盘通过NFS的方式共享给HP-UX做DB的备份使用: 本次模拟环境是在VMware workstation 10上安装了一台windows 2008 R2 Server和一台Centos 5.4,接下来开始实验: 这里虽然使用的是centos 5.4,但是linux和HP-UX下命令都一样,经过本人实践. 首先要给windows 2008安装NFS特性,才能通过NFS将本地文件共享给centos,接下来添加"文件服务"角色和"

(转)Selenium-11: Execute JavaScript with JavascriptExecutor

Outline Sometimes we cannot handle some conditions or problems with Webdriver, web controls don't react well against selenium commands. In this kind of situations, we use Javascript. It is useful for custom synchronizations, hide or show the web elem

[转]Golang适合高并发场景的原因分析

来源:http://blog.csdn.net/ghj1976/article/details/27996095 作者:蝈蝈俊 典型的两个现实案例: 我们先看两个用Go做消息推送的案例实际处理能力. 360消息推送的数据: 16台机器,标配:24个硬件线程.64GB内存 Linux Kernel 2.6.32 x86_64 单机80万并发连接,load 0.2~0.4,CPU 总使用率 7%~10%,内存占用20GB (res) 眼下接入的产品约1280万在线用户 2分钟一次GC.停顿2秒 (1

说说Golang的使用心得

13年上半年接触了Golang,对Golang十分喜爱.现在是2015年,离春节还有几天,从开始学习到现在的一年半时间里,前前后后也用Golang写了些代码,其中包括业余时间的,也有产品项目中的.一直有想法写点Golang相关的总结或者感想,决定还是在年前总结下吧.注明下:我只是Golang的喜好者,不是脑残粉,也无意去挑起什么语言之争. 特性少,语法简单.GO是崇尚极简主义的,提倡少即是多.这点在它的Spec上尤其凸显,一下午的时间绝对可以看完.GO的特性很少,很多GO的使用者都反馈,GO的关