程序员过关斩将--自定义线程池来实现文档转码

背景

我司在很久之前,一位很久之前的同事写过一个文档转图片的服务,具体业务如下:

  1. 用户在客户端上传文档,可以是ppt,word,pdf 等格式,用户上传完成可以在客户端预览上传的文档,预览的时候采用的是图片形式(不要和我说用别的方式预览,现在已经来不及了)
  2. 当用户把文档上传到云端之后(阿里云),把文档相关的信息记录在数据库,然后等待转码完成
  3. 服务器有一个转码服务(其实就是一个windows service)不停的在轮训待转码的数据,如果有待转码的数据,则从数据库取出来,然后根据文档的网络地址下载到本地进行转码(转成多张图片)
  4. 当文档转码完毕,把转码出来的图片上传到云端,并把云端图片的信息记录到数据库
  5. 客户端有预览需求的时候,根据数据库来判断有没有转码成功,如果成功,则获取数据来显示。

文档预览的整体过程如以上所说,老的转码服务现在什么问题呢?

  1. 由于一个文档同时只能被一个线程进行转码操作,所以老的服务采用了把待转码数据划分管道的思想,一共有六个管道,映射到数据库大体就是 Id=》管道ID 这个样子。
  2. 一个控制台程序,根据配置文件信息,读取某一个管道待转码的文档,然后单线程进行转码操作
  3. 一共有六个管道,所以服务器上起了六个cmd的黑窗口……
  4. 有的时候个别文档由于格式问题或者其他问题 转码过程中会卡住,具体的表现为:停止了转码操作。
  5. 如果程序卡住了,需要运维人员重新启动转码cmd窗口(这种维护比较蛋疼)

后来机缘巧合,这个程序的维护落到的菜菜头上,维护了一周左右,大约重启了10多次,终于忍受不了了,重新搞一个吧。仔细分析过后,刨除实际文档转码的核心操作之外,整个转码流程其实还有很多注意点

  1. 需要保证转码服务不被卡住,如果和以前一样就没有必要重新设计了
  2. 尽量避免开多个进程的方式,其实在这个业务场景下,多个进程和多个线程作用是一致的。
  3. 每个文档只能被转码一次,如果一个文档被转码多次,不仅浪费了服务器资源,而且还有可能会有数据不一致的情况发生
  4. 转码失败的文档需要有一定次数的重试,因为一次失败不代表第二次失败,所以一定要给失败的文档再次被操作的机会
  5. 因为程序不停的把文档转码成本地图片,所以需要保证这些文件在转码完成在服务器上删除,不然的话,时间长了会生成很多无用的文件

说了这么多,其实需要注意的点还是很多的。以整个的转码流程来说,本质上是一个任务池的生产和消费问题,任务池中的任务就是待转码的文档,生产者不停的把待转码文档丢进任务池,消费者不停的把任务池中文档转码完成。

线程池

这很显然和线程池很类似,菜菜之前就写过一个线程池的文章,有兴趣的同学可以去翻翻历史。今天我们就以这个线程池来解决这个转码问题。线程池的本质是初始化一定数目的线程,不停的执行任务。

 //线程池定义     public class LXThreadPool:IDisposable    {        bool PoolEnable = true; //线程池是否可用         List<Thread> ThreadContainer = null; //线程的容器        ConcurrentQueue<ActionData> JobContainer = null; //任务的容器        int _maxJobNumber; //线程池最大job容量

        ConcurrentDictionary<string, DateTime> JobIdList = new ConcurrentDictionary<string, DateTime>(); //job的副本,用于排除某个job 是否在运行中

        public LXThreadPool(int threadNumber,int maxJobNumber=1000)        {            if(threadNumber<=0 || maxJobNumber <= 0)            {                throw new Exception("线程池初始化失败");            }            _maxJobNumber = maxJobNumber;            ThreadContainer = new List<Thread>(threadNumber);            JobContainer = new ConcurrentQueue<ActionData>();            for (int i = 0; i < threadNumber; i++)            {                var t = new Thread(RunJob);                t.Name = $"转码线程{i}";                ThreadContainer.Add(t);                t.Start();            }            //清除超时任务的线程            var tTimeOutJob = new Thread(CheckTimeOutJob);            tTimeOutJob.Name = $"清理超时任务线程";            tTimeOutJob.Start();        }

        //往线程池添加一个线程,返回线程池的新线程数        public int AddThread(int number=1)        {            if(!PoolEnable || ThreadContainer==null || !ThreadContainer.Any() || JobContainer==null|| !JobContainer.Any())            {                return 0;            }            while (number <= 0)            {                var t = new Thread(RunJob);                ThreadContainer.Add(t);                t.Start();                number -= number;            }            return ThreadContainer?.Count ?? 0;        }

        //向线程池添加一个任务,返回0:添加任务失败   1:成功        public int AddTask(Action<object> job, object obj,string actionId, Action<Exception> errorCallBack = null)        {            if (JobContainer != null)            {                if(JobContainer.Count>= _maxJobNumber)                {                    return 0;                }                //首先排除10分钟还没转完的                var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10) < DateTime.Now);                if(timeoOutJobList!=null&& timeoOutJobList.Any())                {                    foreach (var timeoutJob in timeoOutJobList)                    {                        JobIdList.TryRemove(timeoutJob.Key,out DateTime v);                    }                }

                if (!JobIdList.Any(s => s.Key == actionId))                {                    if(JobIdList.TryAdd(actionId, DateTime.Now))                    {                        JobContainer.Enqueue(new ActionData { Job = job, Data = obj, ActionId = actionId, ErrorCallBack = errorCallBack });                        return 1;                    }                    else                    {                        return 101;                    }                }                else                {                    return 100;                }                        }            return 0;        }  

        private void RunJob()        {            while (JobContainer != null  && PoolEnable)            {

                //任务列表取任务                ActionData job = null;                JobContainer?.TryDequeue(out job);                if (job == null)                {                    //如果没有任务则休眠                    Thread.Sleep(20);                    continue;                }                try                {                    //执行任务                    job.Job.Invoke(job.Data);                }                catch (Exception error)                {                    //异常回调                    if (job != null&& job.ErrorCallBack!=null)                    {                        job?.ErrorCallBack(error);                    }

                }                finally                {                    if (!JobIdList.TryRemove(job.ActionId,out DateTime v))                    {

                    }                }            }        }

        //终止线程池        public void Dispose()        {            PoolEnable = false;            JobContainer = null;            if (ThreadContainer != null)            {                foreach (var t in ThreadContainer)                {                    //强制线程退出并不好,会有异常                    t.Join();                }                ThreadContainer = null;            }        }

        //清理超时的任务        private void CheckTimeOutJob()        {            //首先排除10分钟还没转完的            var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10) < DateTime.Now);            if (timeoOutJobList != null && timeoOutJobList.Any())            {                foreach (var timeoutJob in timeoOutJobList)                {                    JobIdList.TryRemove(timeoutJob.Key, out DateTime v);                }            }            System.Threading.Thread.Sleep(60000);        }    }    public class ActionData    {        //任务的id,用于排重        public string ActionId { get; set; }        //执行任务的参数        public object Data { get; set; }        //执行的任务        public Action<object> Job { get; set; }        //发生异常时候的回调方法        public Action<Exception> ErrorCallBack { get; set; }    }

以上就是一个线程池的具体实现,和具体的业务无关,完全可以用于任何适用于线程池的场景,其中有一个注意点,我新加了任务的标示,主要用于排除重复的任务被投放多次(只排除正在运行中的任务)。当然代码不是最优的,有需要的同学可以自己去优化

使用线程池

接下来,我们利用以上的线程池来完成我们的文档转码任务,首先我们启动的时候初始化一个线程池,并启动一个独立线程来不停的往线程池来输送任务,顺便起了一个监控线程去监视发送任务的线程

       string lastResId = null;        string lastErrorResId = null;

        Dictionary<string, int> ResErrNumber = new Dictionary<string, int>(); //转码失败的资源重试次数        int MaxErrNumber = 5;//最多转码错误的资源10次        Thread tPutJoj = null;        LXThreadPool pool = new LXThreadPool(4,100);        public void OnStart()        {            //初始化一个线程发送转码任务            tPutJoj = new Thread(PutJob);            tPutJoj.IsBackground = true;            tPutJoj.Start();

            //初始化 监控线程            var tMonitor = new Thread(MonitorPutJob);            tMonitor.IsBackground = true;            tMonitor.Start();        }       //监视发放job的线程        private void MonitorPutJob()        {            while (true)            {                if(tPutJoj == null|| !tPutJoj.IsAlive)                {                    Log.Error($"发送转码任务线程停止==========");                    tPutJoj = new Thread(PutJob);                    tPutJoj.Start();                    Log.Error($"发送转码任务线程重新初始化并启动==========");                }                System.Threading.Thread.Sleep(5000);            }

        }

        private void PutJob()        {                       while (true)            {                try                {                    //先搜索等待转码的                    var fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Wait }, 30, lastResId);                    Log.Error($"拉取待转码记录===总数:lastResId:{lastResId},结果:{fileList?.Count() ?? 0}");                    if (fileList == null || !fileList.Any())                    {                        lastResId = null;                        Log.Error($"待转码数量为0,开始拉取转码失败记录,重新转码==========");                        //如果无待转,则把出错的 尝试                        fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Error, (int)FileToImgStateEnum.TimeOut, (int)FileToImgStateEnum.Fail }, 1, lastErrorResId);                        if (fileList == null || !fileList.Any())                        {                            lastErrorResId = null;                        }                        else                        {                            // Log.Error($"开始转码失败记录:{JsonConvert.SerializeObject(fileList)}");                            List<DocResourceRegister> errFilter = new List<DocResourceRegister>();                            foreach (var errRes in fileList)                            {                                if (ResErrNumber.TryGetValue(errRes.res_id, out int number))                                {                                    if (number > MaxErrNumber)                                    {                                        Log.Error($"资源:{errRes.res_id} 转了{MaxErrNumber}次不成功,放弃===========");                                        continue;                                    }                                    else                                    {                                        errFilter.Add(errRes);                                        ResErrNumber[errRes.res_id] = number + 1;                                    }                                }                                else                                {                                    ResErrNumber.Add(errRes.res_id, 1);                                    errFilter.Add(errRes);                                }                            }                            fileList = errFilter;                            if (fileList.Any())                            {                                lastErrorResId = fileList.Select(s => s.res_id).Max();                            }                        }                    }                    else                    {                        lastResId = fileList.Select(s => s.res_id).Max();                    }

                    if (fileList != null && fileList.Any())                    {                        foreach (var file in fileList)                        {                            //如果 任务投放线程池失败,则等待一面继续投放                            int poolRet = 0;                            while (poolRet <= 0)                            {                                poolRet = pool.AddTask(s => {                                    AliFileService.ConvertToImg(file.res_id + $".{file.res_ext}", FileToImgFac.Instance(file.res_ext));                                }, file, file.res_id);                                if (poolRet <= 0 || poolRet > 1)                                {                                    Log.Error($"发放转码任务失败==========线程池返回结果:{poolRet}");                                    System.Threading.Thread.Sleep(1000);                                }                            }                        }                    }                    //每一秒去数据库取一次数据                    System.Threading.Thread.Sleep(3000);                }                catch                {                    continue;                }

            }        }

以上就是发放任务,线程池执行任务的所有代码,由于具体的转码代码涉及到隐私,这里不在提供,如果有需要可以私下找菜菜索要,虽然我深知还有更优的方式,但是我觉得线程池这样的思想可能会对部分人有帮助,其中任务超时的核心代码如下(采用了polly插件):

 var policy= Policy.Timeout(TimeSpan.FromSeconds(this.TimeOut), onTimeout: (context, timespan, task) =>                {                    ret.State=Enum.FileToImgStateEnum.TimeOut;                                   });                policy.Execute(s=>{                    .....                });

把你的更优方案写在留言区吧,2020年大家越来越好

原文地址:https://www.cnblogs.com/zhanlang/p/12178914.html

时间: 2024-10-18 10:47:23

程序员过关斩将--自定义线程池来实现文档转码的相关文章

程序员带你学习安卓开发-XML文档的创建与解析

这是程序员带你学习安卓开发系列教程.本文章致力于面向对象程序员可以快速学习开发安卓技术. 上篇文章:程序员带你学习安卓开发系列-Android文件存储 因知识连贯性推荐关注头条号:做全栈攻城狮.从头开始学习. 链接:http://www.toutiao.com/m5443584213/ 项目概述: 学生信息管理系统.添加学生信息,到XML文件库. 显示所有添加的学生列表. 界面: 前台界面代码: XML生成: 生成的xml: XML解析: 更多教程,欢迎大家关注今日头条-做全栈攻城狮.一起交流探

Dash——程序员的的好帮手:API文档浏览器+代码片段管理工具

作为一名死coder,每天最常见的动作就是查看各种API文档,你一定也有过同时打开N个窗口(HTML.PDF.CHM),不停的在编辑器与文档之间切换的感受吧?怎么说呢,其实我很讨厌这种枯燥无味的动作,那么如何才能提高效率,减少无用功呢?下面就给大家介绍一款非常好用的Mac小工具:Dash,相比这个英文名,我跟喜欢叫它“叮当猫”,嘿嘿. 点我直达AppStore介绍页面 功能简介 官方用一句话就概括了它的用途:Dash是一个API文档浏览器( API Documentation Browser),

程序员不得的不会的接口文档

一.传统方式 众所周知,我们Java程序员在写完数据接口之后,想要前端或者App工程师调用的,需要写出接口文档,方便描述每一个接口都是干什么的,需要什么,怎么请求,返回的结果又是什么?可是现在的你是否还在手写接口文档呢?在手写接口文档中,有没有遇到,文档刚写好,测试反馈接口有问题,又不得不改写接口,结果接口改完之后,发送文档对不上了,怎么办? 我在工作中,是如何编写接口文档的呢?接下来给大家聊一神器,惊喜在后面. 首先,我新建一个项目,基于Spring Boot,开发几个接口,发布运行. 编写代

Android 自定义线程池的实战

前言:在上一篇文章中我们讲到了AsyncTask的基本使用.AsyncTask的封装.AsyncTask 的串行/并行线程队列.自定义线程池.线程池的快速创建方式. 对线程池不了解的同学可以先看 Android AsyncTask 深度理解.简单封装.任务队列分析.自定义线程池 ------------------------------------------------------------------------------------------------------- 1.Exec

基于ThreadPoolExecutor,自定义线程池简单实现

一.线程池作用 在上一篇随笔中有提到多线程具有同一时刻处理多个任务的特点,即并行工作,因此多线程的用途非常广泛,特别在性能优化上显得尤为重要.然而,多线程处理消耗的时间包括创建线程时间T1.工作时间T2.销毁线程时间T3,创建和销毁线程需要消耗一定的时间和资源,如果能够减少这部分的时间消耗,性能将会进一步提高,线程池就能够很好解决问题.线程池在初始化时会创建一定数量的线程,当需要线程执行任务时,从线程池取出线程,当任务执行完成后,线程置回线程池成为空闲线程,等待下一次任务.JDK1.5提供了一个

程序员过关斩将--来自于静态方法和实例方法的联想翩翩

这两周没有妹子来找我问问题,有点小伤感,所以耽误更新了.哈哈,别当真,因为菜菜这两周周末都有事(你可以认为去公司加班了),实在是没有精力,忘各位见谅!! 以下为菜菜自己观点,不代表任何妹子的观点,请轻喷 ◆◆ 面向对象 ◆◆ 作为一个久经考验并得到业界肯定的编程思想莫过于面向对象编程思想了. 面向对象(Object Oriented,OO)是软件开发方法.面向对象的概念和应用已超越了程序设计和软件开发,扩展到如数据库系统.交互式界面.应用结构.应用平台.分布式系统.网络管理结构.CAD技术.人工

池化技术——自定义线程池

目录 池化技术--自定义线程池 1.为什么要使用线程池? 1.1.池化技术的特点: 1.2.线程池的好处: 1.3.如何自定义一个线程池 2.三大方法 2.1.单个线程的线程池方法 2.2.固定的线程池的大小的方法 2.3.可伸缩的线程池的方法 2.4.完整的测试代码为: 3.为什么要自定义线程池?三大方法创建线程池的弊端分析 4.七大参数 5.如何手动的去创建一个线程池 6.四种拒绝策略 6.1.会抛出异常的拒绝策略 6.2.哪来的去哪里拒绝策略 6.3.丢掉任务拒绝策略 6.4.尝试竞争拒绝

java多线程(四)-自定义线程池

当我们使用 线程池的时候,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法,其实我们深入到这些方法里面,就可以看到它们的是实现方式是这样的. 1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new Synchro

c#网络通信框架networkcomms内核解析之十 支持优先级的自定义线程池

本例基于networkcomms2.3.1开源版本  gplv3协议 如果networkcomms是一顶皇冠,那么CommsThreadPool(自定义线程池)就是皇冠上的明珠了,这样说应该不夸张的,她那么优美,简洁,高效. 在 <c#网络通信框架networkcomms内核解析之六 处理接收到的二进制数据>中我们曾经提到,服务器收到数据后,如果是系统内部保留类型数据或者是最高优先级数据,系统会在主线程中处理,其他的会交给自定义线程池进行处理. 作为服务器,处理成千上万的连接及数据,单线程性能