[rxjs] Creating An Observable with RxJS

Create an observable

var Observable = Rx.Observable;

var source = Observable.create(function(observe){
    var person = {
      name: "Zhentian",
      message: "Hello World!"
    };

    observe.onNext(person);
    observe.onCompleted();
});

var sub = source.subscribe(function onNext(person){
  console.log(person.name + ‘ say ‘ + person.message);
}, function onError(err){
  console.log(err);
}, function onCompleted(){
  console.log("done");
});

//Zhentian say Hello World!
//done

Async

var Observable = Rx.Observable;

var source = Observable.create(function(observe){

  setTimeout(function(){

    var person = {
      name: "Zhentian",
      message: "Hello World!"
    };  

    observe.onNext(person);
    observe.onCompleted();
  }, 1000);

  console.log("ansyc finished!");

});

var sub = source.subscribe(function onNext(person){
  console.log(person.name + ‘ say ‘ + person.message);
}, function onError(err){
  console.log(err);
}, function onCompleted(){
  console.log("done");
});

//"ansyc finished!"
//"Zhentian say Hello World!"
//"done"

Dispose the async

When you dispose the operation, we can see it log out "start timeout", which is not good, because, the onNext() would never be called, what we want is it even don‘t get inside setTimeout function.

var Observable = Rx.Observable;

var source = Observable.create(function(observe){

  setTimeout(function(){
    console.log("Starat timeout");
    var person = {
      name: "Zhentian",
      message: "Hello World!"
    };  

    observe.onNext(person);
    observe.onCompleted();
  }, 1000);

  console.log("ansyc finished!");

});

var sub = source.subscribe(function onNext(person){
  console.log(person.name + ‘ say ‘ + person.message);
}, function onError(err){
  console.log(err);
}, function onCompleted(){
  console.log("done");
});

setTimeout(function(){

  sub.dispose();
}, 500);

/*
"ansyc finished!"
"Starat timeout"
*/

Define the dispose

We can give setTimeout and id, and in the return function, we clear this timeout.

var Observable = Rx.Observable;

var source = Observable.create(function(observe){

  var id = setTimeout(function(){
    console.log("Starat timeout");
    var person = {
      name: "Zhentian",
      message: "Hello World!"
    };  

    observe.onNext(person);
    observe.onCompleted();
  }, 1000);

  console.log("ansyc finished!");

  // Note that this is optional, you do not have to return this if you require no cleanup
  return function(){
    clearTimeout(id);
  }

});

var sub = source.subscribe(function onNext(person){
  console.log(person.name + ‘ say ‘ + person.message);
}, function onError(err){
  console.log(err);
}, function onCompleted(){
  console.log("done");
});

setTimeout(function(){

  sub.dispose();
}, 500);

/*
"ansyc finished!"
*/

Catch error

If we throw an error in the code, but we found it actually not catched by the onError handler.

var Observable = Rx.Observable;

var source = Observable.create(function(observe){

  var id = setTimeout(function(){
    throw "there is an error"; //Throw an error here
    var person = {
      name: "Zhentian",
      message: "Hello World!"
    };  

    observe.onNext(person);
    observe.onCompleted();
  }, 1000);

  // Note that this is optional, you do not have to return this if you require no cleanup
  return function(){
    clearTimeout(id);
  }

});

var sub = source.subscribe(function onNext(person){
  console.log(person.name + ‘ say ‘ + person.message);
}, function onError(err){
  console.log("Error: " + err);
}, function onCompleted(){
  console.log("done");
});

/*
"error"
"Uncaught there is an error (line 6)"
*/

What we can do is to add try catch in the block.

var Observable = Rx.Observable;

var source = Observable.create(function(observe){

  var id = setTimeout(function(){
    try{
       throw "there is an error"; //Throw an error here
       var person = {
         name: "Zhentian",
         message: "Hello World!"
       };  

       observe.onNext(person);
       observe.onCompleted();
    }catch(error){
       observe.onError(error);
    }

  }, 1000);

  // Note that this is optional, you do not have to return this if you require no cleanup
  return function(){
    clearTimeout(id);
  }

});

var sub = source.subscribe(function onNext(person){
  console.log(person.name + ‘ say ‘ + person.message);
}, function onError(err){
  console.log("Error: " + err);
}, function onCompleted(){
  console.log("done");
});

/*
"Error: there is an error"
*/
时间: 2024-10-12 12:19:40

[rxjs] Creating An Observable with RxJS的相关文章

[RxJS] Creating Observable From Scratch

Get a better understanding of the RxJS Observable by implementing one that's similar from the ground up. class SafeObserver { constructor(destination) { this.destination = destination; } next(value) { const destination = this.destination; if (destina

[Javascript + rxjs] Introducing the Observable

In this lesson we will get introduced to the Observable type. An Observable is a collection that arrives over time. Observables can be used to model events, asynchronous requests, and animations. Observables can also be transformed, combined, and con

[RxJS] Subject: an Observable and Observer hybrid

This lesson teaches you how a Subject is simply a hybrid of Observable and Observer which can act as a bridge between the source Observable and multiple observers, effectively making it possible for multiple observers to share the same Observable exe

rxjs——subject和Observable的区别

一个 Observable 是可以被多个 observer 订阅的,只是每个订阅都是一个新的独立的 Observable execution : const { Observable } = Rx const clock$ = Observable.interval(1000).take(3); const observerA = { next(v) { console.log('A next: ' + v) } } const observerB = { next(v) { console.l

[RxJS] Use groupBy in real RxJS applications

This lesson will show when to apply groupBy in the real world. This RxJS operator is best suited when a source observable represents many data sources, e.g. an observable for multitouch events. const busObservable = Rx.Observable.of( {code: 'en-us',

[RxJS] Reactive Programming - Why choose RxJS?

RxJS is super when dealing with the dynamic value. Let's see an example which not using RxJS: var a = 4; var b = a * 10; console.log(b); // 40 a = 5; console.log(b); // 40 So you change a, it won't affect b's value because b is already defined.... So

[RxJS] Learn How To Use RxJS 5.5 Beta 2

The main changes is about how you import rxjs opreators from now on. And introduce lettable opreator. import { range } from 'rxjs/observable/range'; import { map, filter, scan } from 'rxjs/operators'; const source$ = range(0, 10); source$.pipe( filte

[RxJS] Reactive Programming - What is RxJS?

First thing need to understand is, Reactive programming is dealing with the event stream. Event streams happens overtime, which not stay in the memory. For example, the array we have: var source = ['1', '1', 'foo', '2', '3', '5', 'bar', '8', '13']; W

构建流式应用—RxJS详解

讲之前先说一点哈,插入的图片,不能正常显示,图片一半被遮盖了,如果想看,鼠标右击然后图片地址,图片另存为去看.如果有知道怎么解决的,可以在下面给我评论,我会及时的修改. 好啦,开始: 目录 常规方式实现搜索功能 RxJS · 流 Stream RxJS 实现原理简析 观察者模式 迭代器模式 RxJS 的观察者 + 迭代器模式 RxJS 基础实现 Observable Observer RxJS · Operators Operators ·入门 一系列的 Operators 操作 使用 RxJS