[C#]使用rx.net来帮助完成主动通知和订阅的功能

前言

rxjs已经算是在前端很红的一个主题了,不过相对来说rx.net则比较少看到有人讨论,其实rx提供了很多类似functional programming的概念,甚至还提供了类似event emit的想法,也有更多处理多个异步的强大operator,而今天想要来笔记一下,如何使用rx来完成推播和订阅的需求,而这边的主题最主要会使用到的就是subject这个名词。

导览

  1. 安装rx.net
  2. 情境模拟
  3. 完成订阅
  4. 发送主动通知
  5. 结论

安装rx.net

首先我们需要在visual studio安装rx.net,开起nuget并搜寻reactive的关键字,然后如下图下载

此举会同时帮你安装core和interface的部分,而我们会使用到的很多operator都是包装在Linq里面,同时也会看到很多已经熟悉的lambda语法,而如果爬文比较多看到的都是rxjs的介绍,就连官方对rx.net的说明也少得可怜,所以如果需要使用到rx.net的话,必须觉悟可能得花很多时间自行摸索,而如果对很多观念不懂的话,如果对前端熟悉的话,可以由rxjs来了解观念。

情境模拟

试想一下我们传统在做缓存数据的时候,都是用时间性来达成,但如果我们有新数据想要通知缓存更新的话,目前有另一种redis的做法,默认就有主动通知的api可以使用,但redis缓存比较适合在很多系统的共享数据情境,如果我们只是想要在各别系统缓存的话,还是放在各系统的ap上比较适合,而这时候我们其实就可以使用rx.net来达成即时通知缓存更新数据的做法。

首先我来摸拟一个会员登入数据,当有其他会员登入数据的时候,api会即时传给前端总共有哪些会员登入,先来实做一个Member.cs的dto

public class MemberModel
{
    public int Id { get; set; }
    public string Name { get; set; }
}                                          

接着我们先做两笔假数据,然后使用memory cache的方式来做缓存,程序如下

    public class CacheService
    {
        ObjectCache cache = MemoryCache.Default;

        public void Init()
        {
            if (cache.Get("Members") == null)
            {
                var Members = new List
                {
                    new MemberModel {Id=1,Name="kin" },
                    new MemberModel {Id=2,Name="anson" },
                };
                var policy = new CacheItemPolicy { Priority = CacheItemPriority.NotRemovable };
                cache.Add("Members", Members, policy);
            }
        }

        public List GetMembers()
        {
            return cache.Get("Members") as List;
        }
    }

因为我打算在系统一开始的时候,就先把缓存写进去了,所以我选择在Global.asax里面去Init缓存数据

    public class WebApiApplication : System.Web.HttpApplication
    {
        protected void Application_Start()
        {
            AreaRegistration.RegisterAllAreas();
            GlobalConfiguration.Configure(WebApiConfig.Register);
            FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
            RouteConfig.RegisterRoutes(RouteTable.Routes);
            CacheService service = new CacheService();
            service.Init();
            BundleConfig.RegisterBundles(BundleTable.Bundles);

        }
    }

接着我是使用swagger直接测试,以确保数据是有写进memory里面

    public class ValuesController : ApiController
    {
        CacheService _service = new CacheService();
        // GET: api/Values
        public IHttpActionResult Get()
        {
            return Ok(_service.GetMembers());
        }
    }

结果如下图

完成订阅

接着我们需要把写缓存的时机点,改成使用订阅的方式来实现,也就是我们去订阅Subject,而何时要去缓存的机制则改成用rx来实现,首先来看一下我们如何实做这个部分,新增一支EventSubjects.cs

    public static class EventSubjects
    {
        public static Subject SubjectMembers { get; set; } = new Subject();
    }

接着我希望订阅的时候一样在系统一开始就去执行,所以一样在Global.asax动手脚

    public class WebApiApplication : System.Web.HttpApplication
    {
        protected void Application_Start()
        {
            AreaRegistration.RegisterAllAreas();
            GlobalConfiguration.Configure(WebApiConfig.Register);
            FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
            RouteConfig.RegisterRoutes(RouteTable.Routes);
            CacheService service = new CacheService();
            service.Init();
            service.SubscribeSubjects(); //新增订阅subject
            BundleConfig.RegisterBundles(BundleTable.Bundles);

        }
    }

然后替CacheService新增订阅的方法

    public class CacheService
    {
        ObjectCache cache = MemoryCache.Default;
        CacheItemPolicy policy = new CacheItemPolicy { Priority = CacheItemPriority.NotRemovable };

        public void Init()
        {
            if (cache.Get("Members") == null)
            {
                var Members = new List
                {
                    new MemberModel {Id=1,Name="kin" },
                    new MemberModel {Id=2,Name="anson" },
                };
                cache.Add("Members", Members, policy);
            }
        }

       ///

       /// 订阅的方法
       /// 

        public void SubscribeSubjects()
        {
            EventSubjects.SubjectMembers.Subscribe(x =>
            {
                var result = GetMembers();
                var member=result.FirstOrDefault(r => r.Id == x.Id);
                if (member == null) result.Add(x);
                cache.Set("Members", result, policy);
            });
        }

        public List GetMembers()
        {
            return cache.Get("Members") as List;
        }
    }

大功告成了,自此之后什么时候重写缓存就是用订阅的方式来实现了

发送主动通知

这边则是关键点了,当我们打web api的post时候,假设成功就去触发subject去做主动通知

    public class ValuesController : ApiController
    {
        CacheService _service = new CacheService();
        // GET: api/Values
        public IHttpActionResult Get()
        {
            return Ok(_service.GetMembers());
        }

        public void Post(MemberModel model)
        {
            EventSubjects.SubjectMembers.OnNext(model);
        }
    }

整个完成之后我们就可以测试看看,是否会如预期的当我新增数据的时候,数据是否有顺利写进缓存。

一开始的数据

接着打post

重新再Get一次去拉缓存数据

确认我们是使用主动通知的方式,去通知写进缓存数据了。

结论

研究rx.net的时候,文档确实很缺乏,这就造成要研究这个技术确实不是那么容易入门和学习,毕竟文档不全相关文章也不多,但相对性的来说rxjs则是有很多相关应用,而rxjs光是operator的相关应用就博大精深了,所以有兴趣的笔者就请至rxjs30天这系列文章去做一些深入了解(https://ithelp.ithome.com.tw/articles/10186104),而这个连结更是用图示的方式让你理解各种operator的应用(http://rxmarbles.com/),此篇没什么用到operator的部分,主要是使用到subject的特性,如果对此篇文章有任何疑问或意见,再请回复啰。

原文:大专栏  [C#]使用rx.net来帮助完成主动通知和订阅的功能

原文地址:https://www.cnblogs.com/petewell/p/11445706.html

时间: 2024-10-10 10:57:13

[C#]使用rx.net来帮助完成主动通知和订阅的功能的相关文章

支付宝主动通知调用的页面

/// <summary> /// 功能:支付宝主动通知调用的页面(通知页) /// 版本:3.0 /// 日期:2010-06-30 /// 说明: /// 以下代码只是为了方便商户测试而提供的样例代码,商户可以根据自己网站的需要,按照技术文档编写,并非一定要使用该代码. /// 该代码仅供学习和研究支付宝接口使用,只是提供一个参考. /// /// ///////////////////页面功能说明/////////////////// /// 创建该页面文件时,请留心该页面文件中无任何H

WebSocket安卓客户端实现详解(三)–服务端主动通知

WebSocket安卓客户端实现详解(三)–服务端主动通知 本篇依旧是接着上一篇继续扩展,还没看过之前博客的小伙伴,这里附上前几篇地址 WebSocket安卓客户端实现详解(一)–连接建立与重连 WebSocket安卓客户端实现详解(二)–客户端发送请求 终于是最后一篇啦,有点激动\ ( ≧▽≦ ) /啦啦啦, 服务端主动通知 热身完毕,我们先回顾下第一篇中讲到的服务端主动通知的流程 根据notify中事件类型找到对应的处理类,处理对应逻辑. 然后用eventbus通知对应的ui界面更新. 如果

主动通知Android系统图库进行更新

项目中遇到调用图库进行图片的选择,因为不能主动及时更新,遂实现代码调用实现主动及时更新. 废话不多刷,看代码. 方式一,发送一个广播, sendBroadcast(new Intent(Intent.ACTION_MEDIA_SCANNER_SCAN_FILE,Uri.parse("file://"+fileSD_file))); 方式二,通过MediaScannerConnection 类 MediaScannerConnection.scanFile(context, new St

【Java面试题】50 垃圾回收器的基本原理是什么?垃圾回收器可以马上回收内存吗?有什么办法主动通知虚拟机进行垃圾回收?

1.对于GC来说,当程序员创建对象时,GC就开始监控这个对象的地址.大小以及使用情况. 通常,GC采用有向图的方式记录和管理堆(heap)中的所有对象.通过这种方式确定哪些对象是"可达的",哪些对象是"不可达的".当GC确定一些对象为"不可达"时,GC就有责任回收这些内存空间. 2.可以.程序员可以手动执行System.gc(),通知GC运行,但是Java语言规范并不保证GC一定会执行. 二.这里我们详细了解一下System.gc()的工作原理:

SignalR主动通知订阅者示例

html代码: <script src="~/Scripts/jquery.signalR-2.2.0.min.js"></script> <script src="@Url.Content("~/signalr/hubs")" type="text/javascript"></script> <script> $(function () { var hub = $.

消息中间件介绍

消息中间件(message oriented middleware)是指支持与保障分布式应用程序之间同步/异步收发消息的中间件.消息是分布式应用之间进行数据交换的基本信息单位,分布式应用程序 之间的通信接口由消息中间件提供.其中,异步方式指消息发送方在发送消息时不必知道接收方的状态,更无需等待接收方的回复,而接收方在收到消息时也不必知 道发送方的目前状态,更无需进行同步的消息处理,它们之间的连接完全是松耦合的,通信是非阻塞的,这种异步通信方式是由消息中间件中的消息队列及其服务机 制保障的.一般地

传统的分布式应用集成技术(网摘)

1  引言 分布式计算是指各种不同的工作站通过网络互相连接,由分布式系统提供跨越网络透明地访问各种异构设备所需要的支持,使得用户可以充分利用网络上的各种计算资源来完成自己的任务[1].与网络技术的发展和日益增长的应用需求相适应,分布式计算已经成为新一代计算和应用的主流.分布式计算中所涉及的分布式系统是指组件分布在网络计算机上且通过消息传递进行通信和动作协调的系统[2].分布式系统具有以下特征:组件的并发性.缺乏全局时钟.组件故障的独立性.构造分布式系统的挑战主要是其组件的异构性.开放性(指允许增

Android之——自己主动挂断电话的实现

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/47072451 通过<Android之--AIDL小结>与<Android之--AIDL深入>两篇博文.相信大家已经对Android AIDL有了一定的了解.以下,我们就利用Android的AIDL实现自己主动挂断电话的功能,好了.不多说了,我们直接进入主题. 1.准备AIDL文件 挂断电话的AIDL文件都是Android自带的文件,我们能够从Android的源码中找

手动通知扫描SD卡主动生成缩略图

最近做项目遇到的难题,调用系统拍照获取不到缩略图,非得关机重启才会生成,所以我们要主动通知系统扫描SD卡生成缩略图, 在Android4.4之前也就是以发送一个Action为“Intent.ACTION_MEDIA_MOUNTED”的广播通知执行扫描.如下: this.sendBroadcast(new Intent(Intent.ACTION_MEDIA_MOUNTED, Uri.parse("file://" + Environment.getExternalStorageDire