延迟添加队列

using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace AnfleCrawler.Common
{
    /// <summary>
    /// 延迟添加队列
    /// </summary>
    internal class LagPattern : Disposable
    {
        private static readonly List<LagPattern> Set = new List<LagPattern>();
        private const double PerCount = 50d;
        private WeakReference _weakRef;
        private int _depth;
        private object _state;
        private JobTimer _job;

        private PageCrawler Weak
        {
            get
            {
                if (!_weakRef.IsAlive)
                {
                    throw new InvalidOperationException("Wrap Dead");
                }
                return (PageCrawler)_weakRef.Target;
            }
        }

        public LagPattern(PageCrawler wrap, StringPatternGenerator urlGen, int depth, object state)
        {
            Contract.Requires(wrap != null && urlGen != null);

            _weakRef = new WeakReference(wrap);
            _depth = depth;
            _state = state;

            var tor = urlGen.GetEnumerator();
            int totalCount = urlGen.Count();
            if (totalCount < PerCount)
            {
                Proc(tor);
                return;
            }

            double span = Math.Ceiling(360d / (totalCount / PerCount));
            //double span = 16d;
            App.LogInfo("LagPattern Span:{0}", span);
            _job = new JobTimer(Proc, TimeSpan.FromSeconds(span));
            _job.Start(tor);
            //App.DisposeService.Register(this.GetType(), this);
            lock (Set)
            {
                Set.Add(this);
            }
        }

        protected override void DisposeInternal(bool disposing)
        {
            if (disposing)
            {
                if (_job != null)
                {
                    _job.Dispose();
                }
                //App.DisposeService.Release(this.GetType(), this);
                lock (Set)
                {
                    Set.Remove(this);
                }
            }
        }

        private void Proc(object state)
        {
            var wrap = this.Weak;
            var urlGen = (IEnumerator<string>)state;
            for (int i = 0; i < PerCount; i++)
            {
                if (!urlGen.MoveNext())
                {
                    this.Dispose();
                    return;
                }
                wrap.PushUrl(new Uri(urlGen.Current), _depth, _state);
            }
        }
    }
}
时间: 2024-08-10 21:29:24

延迟添加队列的相关文章

基于redis的延迟消息队列设计

需求背景 用户下订单成功之后隔20分钟给用户发送上门服务通知短信 订单完成一个小时之后通知用户对上门服务进行评价 业务执行失败之后隔10分钟重试一次 类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理. 队列设计 目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件. 开发前需要考虑的问题? 及时性 消费端能按时收到 同一时间消息的消费权重 可靠性 消息

Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列

本文主要摘录自:详细介绍Spring Boot + RabbitMQ实现延迟队列 并增加了自己的一些理解,记录下来,以便日后查阅. 项目源码: spring-boot-rabbitmq-delay-queue 实现 stream-rabbitmq-delay-queue 实现 背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 延迟队列能做什么?延迟队列多用于需要延迟工作的场景.最常见的是以下两种场景: 延迟消费

Java操作RabbitMQ添加队列、消费队列和三个交换机

一.发送消息到队列(生产者) 新建一个maven项目,在pom.xml文件加入以下依赖 <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>

DelayQueue延迟队列-实现缓存

延迟阻塞队列DelayQueue DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素, 同时元素必须实现 Delayed 接口:在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素. 使用场景 缓存系统:当能够从延迟队列DelayQueue中获取到元素时,说明缓存已经过期 定时任务调度:一分钟后发送短信 基于延迟队列,实现一个缓存系统 延迟队列中添加的元素,实现了Delayed接口 public c

灵感来袭,基于Redis的分布式延迟队列

延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. Redisson Delayed Queue 如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单. 基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能.该功能可以用来实现消息传送延迟按几何增长或几何衰减的发

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知

在第三方支付中,例如支付宝.或者微信,对于订单请求,第三方支付系统采用的是消息同步返回.异步通知+主动补偿查询的补偿机制. 由于互联网通信的不可靠性,例如双方网络.服务器.应用等因素的影响,不管是同步返回.异步通知.主动查询报文都可能出现超时无响应.报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制. 例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台.后台通知的

Spring Boot(十四)RabbitMQ延迟队列

一.前言 延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单:2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度:3.过1分钟给新注册会员的用户,发送注册邮件等. 实现延迟队列的方式有两种: 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能: 使用rabbitmq-delayed-message-exchange插件实现延迟功能: 注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上

消息队列_RabbitMQ-0002.深入MQ生产者/信道/交换机/队列/消费者?

形象说明: 比喻: RabbitMQ提供的消息投递服务类似于现实生活中的快递公司,双11我们可能会买很多东西,自然会陆续收到很多寄自淘宝店主由快递公司发来的快件,但是可能很多时候买回来的东西并不合心意,自然会陆续通过快递公司退回快件,所以回归到架构,这里的快件就相当于消息,我们相当于应用程序,淘宝店主相当于服务器,而快递公司相当于路由器,应用程序可以发送和接收消息,服务器也可以发送和接收消息,所以当应用程序连接到RabbitMQ时,就必须做一个决定:我是发送还是接收哪? 现实: 生产者(Prod

java B2B2C 仿淘宝电子商城系统-基于Rabbitmq实现延迟消息

1. 预备知识 1.1 消息传递 首先我们知道消费者是从队列中获取消息的,那么消息是如何到达队列的? 当我们发送一条消息时,首先会发给交换器(exchange),交换器根据规则(路由键:routing key)将会确定消息投递到那个队列(queue). 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 一零三八七七四六二六 带着这几个关键字:交换器.路由键和队列. 1.2 交换器类型 如之前所说,交换器根据规则决定消息的路由方向.因此,rabbitmq