.net推送数据之Kafka

.NET VS工具添加程序包源 在NuGet包管理中选择程序包源为上面添加的私有仓库。 搜索Data.Pipelines并安装。 在app.congif或者web.config中添加Kafka配置

  <appSettings>
    <add key="kafka.ip" value="172.20.105.205"/>
    <add key="kafka.prot" value="9092"/>
  </appSettings>
        /// <summary>
        /// 实时数据综合推送
        /// </summary>
        /// <param name="listTemp"></param>
        public void RealTimeDataPush(List<T_SJZX_SZZDJC_CYXXInfo> listTemp, List<T_SJZX_SZZDJC_JCJGInfo> listTempDetail)
        {
            var listData = new List<Dictionary<string, object>>();

            foreach (var item in listTemp)
            {
                var d = new Dictionary<string, object>();
                d.Add("XH", item.XH);//序号
                d.Add("DXMC", item.DXMC);//对象名称
                d.Add("DXBH", item.DXBH);//对象编号
                d.Add("CDBH", item.CDBH);//测点编号
                d.Add("CDMC", item.CDMC);//测点名称
                d.Add("XZQDM", item.XZQDM);//行政区代码
                d.Add("SSXZQ", item.SSXZQ);//所属行政区
                d.Add("JCSJ", item.JCSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//监测时间
                                                                               //d.Add("JCN", item.JCN);//监测年
                                                                               //d.Add("JCY", item.JCY);//监测月
                                                                               //d.Add("JCR", item.JCR);//监测日
                                                                               //d.Add("JCXS", item.JCXS);//监测小时
                d.Add("SFZX", item.SFZX);//是否在线
                if (!string.IsNullOrEmpty(item.ZXBZBH))
                    d.Add("ZXBZBH", item.ZXBZBH);//执行标准编号
                else
                    d.Add("ZXBZBH", "");

                if (!string.IsNullOrEmpty(item.ZXBZMC))
                    d.Add("ZXBZMC", item.ZXBZMC);//执行标准名称
                else
                    d.Add("ZXBZMC", "");

                if (!string.IsNullOrEmpty(item.SZLB))
                    d.Add("SZLB", item.SZLB);//水质类别
                else
                    d.Add("SZLB", "");

                if (!string.IsNullOrEmpty(item.SFCB))
                    d.Add("SFCB", item.SFCB);//是否超标
                else
                    d.Add("SFCB", "");

                if (!string.IsNullOrEmpty(item.CBXM))
                    d.Add("CBXM", item.CBXM);//超标项目
                else
                    d.Add("CBXM", "");

                //d.Add("YQBM", item.YQBM);//仪器编码
                //d.Add("YQMC", item.YQMC);//仪器名称
                //d.Add("YQXH", item.YQXH);//仪器型号
                //d.Add("YQZT", item.YQZT);//仪器状态

                d.Add("ORGID", item.ORGID);//机构代码
                d.Add("CJR", item.CJR);//创建人
                d.Add("CJSJ", item.CJSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//创建时间
                d.Add("GXR", item.GXR);//更新人
                d.Add("SJLY", item.SJLY);//数据来源
                d.Add("SJZT", item.SJZT);//数据状态
                d.Add("GXSJ", item.GXSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//更新时间

                #region 明细

                var listData2 = new List<Dictionary<string, object>>();
                var listTempDetail2 = listTempDetail.Where(p => p.CYXH == item.XH);

                foreach (var item2 in listTempDetail2)
                {
                    var d2 = new Dictionary<string, object>();
                    d2.Add("XH", item2.XH);//序号
                    d2.Add("CYXH", item2.CYXH);//采样序号

                    if (!string.IsNullOrEmpty(item2.FXXMDM))
                        d2.Add("FXXMDM", item2.FXXMDM);//分析项目编号
                    else
                        d2.Add("FXXMDM", "");

                    if (!string.IsNullOrEmpty(item2.FXXMMC))
                        d2.Add("FXXMMC", item2.FXXMMC);//分析项目名称
                    else
                        d2.Add("FXXMMC", "");

                    d2.Add("BCJG", item2.BCJG);//报出结果
                    d2.Add("BCJGBS", item2.BCJGBS);//报出结果表示
                    d2.Add("BCJGDW", item2.BCJGDW);//报出结果单位

                    if (!string.IsNullOrEmpty(item2.BZSX))
                        d2.Add("BZSX", item2.BZSX);//标准上限
                    else
                        d2.Add("BZSX", "");

                    if (!string.IsNullOrEmpty(item2.BZXX))
                        d2.Add("BZXX", item2.BZXX);//标准下限
                    else
                        d2.Add("BZXX", "");

                    if (!string.IsNullOrEmpty(item2.SFCB))
                        d2.Add("SFCB", item2.SFCB);//是否超标
                    else
                        d2.Add("SFCB", "");

                    if (!string.IsNullOrEmpty(item2.CBBS))
                        d2.Add("CBBS", item2.CBBS);//超标倍数
                    else
                        d2.Add("CBBS", "");

                    if (!string.IsNullOrEmpty(item2.SZLB))
                        d2.Add("SZLB", item2.SZLB);//水质类别
                    else
                        d2.Add("SZLB", "");

                    if (!string.IsNullOrEmpty(item2.SZLB))
                        d2.Add("YQSBXH", item2.YQSBXH);//仪器编码
                    else
                        d2.Add("YQSBXH", "");

                    d2.Add("ORGID", item2.ORGID);//机构代码
                    d2.Add("CJR", item2.CJR);//创建人
                    d2.Add("CJSJ", item2.CJSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//创建时间
                    d2.Add("GXR", item2.GXR);//更新人
                    d2.Add("SJLY", item2.SJLY);//数据来源
                    d2.Add("SJZT", item2.SJZT);//数据状态
                    d2.Add("GXSJ", item2.GXSJ.Value.ToString("yyyy-MM-dd HH:mm:ss"));//更新时间

                    listData2.Add(d2);
                }
                #endregion

                d.Add("JCJG", listData2);
                listData.Add(d);
            }

            //SZZDZ_CYXX
            var product = new DataProducer("dfb426e321fc417981858b0927c21016", "XH");

            //分页推送
            int PageSize = 100;
            int PageTotal = (int)Math.Ceiling((decimal)listData.Count / PageSize);
            for (int PageIndex = 0; PageIndex < PageTotal; PageIndex++)
            {
                var data = listData.Skip(PageIndex * PageSize).Take(PageSize).ToList();

                product.Send(data, r => Console.WriteLine(!r.Error.IsError
                ? $"Delivered message to {r.TopicPartitionOffset}"
                : $"Delivery Error: {r.Error.Reason}"));
            }

            Assert.IsTrue(true);

            //Thread.Sleep(500);
        }
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Confluent.Kafka;
//using Newtonsoft.Json;
using System.Configuration;
using Newtonsoft.Json;

namespace Common
{
    class DataProducer
    {
        private readonly string _key;
        private readonly string _dataKeyField;

        // 从配置读取kafka配置
        static string ip = ConfigurationManager.AppSettings["kafka.ip"];
        static string port = ConfigurationManager.AppSettings["kafka.port"];

        /// <summary>
        /// 构造方法
        /// </summary>
        /// <param name="key">业务数据队列key</param>
        /// <param name="dataKeyField">数据主键</param>
        public DataProducer(string key, string dataKeyField)
        {
            this._key = key;
            this._dataKeyField = dataKeyField;
        }

        /// <summary>
        /// 发送数据
        /// </summary>
        /// <param name="data">数据集</param>
        /// <param name="callback">发送成功后回调,在发送成功前,不要杀死相关线程,否则可能导致数据丢失</param>
        public void Send(List<Dictionary<string, object>> data, Action<DeliveryReport<string, string>> callback)
        {
            foreach (var datum in data)
            {
                var keys = new List<string>(datum.Count);
                // 找出时候类型的字段
                foreach (var entry in datum)
                {
                    if (entry.Value is DateTime)
                    {
                        keys.Add(entry.Key);
                    }
                }

                // 把时间转成long,与Java的getTime()兼容
                foreach (var key in keys)
                {
                    var eeee = (DateTime)datum[key];
                    // TODO 日期转换
                    datum[key] = "日期字符串";
                }

            }

            var topic = this._key;

            var d = new Dictionary<string, object> { { "data", data }, { "keyField", this._dataKeyField } };
            var jsonString = JsonConvert.SerializeObject(d, new JsonSerializerSettings { NullValueHandling = NullValueHandling.Ignore });

            // TODO 生成key
            Send(topic, jsonString.GetHashCode().ToString(), jsonString, callback);
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="topic">KafkaTopic</param>
        /// <param name="key">消息key</param>
        /// <param name="value">数据</param>
        /// <param name="callback">发送完成回调</param>
        private void Send(string topic, string key, string value, Action<DeliveryReport<string, string>> callback)
        {
            if (string.IsNullOrEmpty(ip))
            {
                ip = "172.20.105.205";
            }
            if (string.IsNullOrEmpty(port))
            {
                port = "9092";
            }

            var conf = new ProducerConfig { BootstrapServers = $"{ip}:{port}" };

            using (var p = new ProducerBuilder<string, string>(conf).Build())
            {
                // TODO 异常处理,添加日志
                p.Produce(topic, new Message<string, string> { Key = key, Value = value }, callback);
                //Console.WriteLine($"topic: {topic}. key: k, value: " + value);

                // TODO 异常处理,添加日志
                // wait for up to 1 seconds for any inflight messages to be delivered.
                p.Flush(TimeSpan.FromSeconds(1));
            }
        }
    }
}

原文地址:https://www.cnblogs.com/elves/p/12274823.html

时间: 2024-10-07 18:18:41

.net推送数据之Kafka的相关文章

第87课:Flume推送数据到SparkStreaming案例实战和内幕源码解密

本期内容: 1. Flume on HDFS案例回顾 2. Flume推送数据到Spark Streaming实战 3. 原理绘图剖析 1. Flume on HDFS案例回顾 上节课要求大家自己安装配置Flume,并且测试数据的传输.我昨天是要求传送的HDFS上. 文件配置: ~/.bashrc: export FLUME_HOME=/usr/local/flume/apache-flume-1.6.0-bin export FLUME_CONF_DIR=$FLUME_HOME/conf PA

第87讲:Flume推送数据到SparkStreaming案例实战和内幕源码解密

本期内容: 1. Flume on HDFS案例回顾 2. Flume推送数据到Spark Streaming实战 3. 原理绘图剖析 1. Flume on HDFS案例回顾 上节课要求大家自己安装配置Flume,并且测试数据的传输.我昨天是要求传送的HDFS上. 文件配置: ~/.bashrc: export FLUME_HOME=/usr/local/flume/apache-flume-1.6.0-bin export FLUME_CONF_DIR=$FLUME_HOME/conf PA

SQL Server 2000向SQL Server 2008 R2推送数据

[文章摘要]最近做的一个项目要获取存在于其他服务器的一些数据,为了安全起见,采用由其他“服务器”向我们服务器推送的方式实现.我们服务器使用的是SQL Server 2008 R2,其他“服务器”使用的都是SQL Server 2000,还都是运行在Windows XP上的,整个过程遇到了一些问题,也参考了一些文档,最终费了好多事才算搞定. [文章索引] 配置分发服务器 配置发布数据库 配置订阅 [一.配置分发服务器] SQLServer 2000的复制服务包括三个角色:发布服务器.分发服务器和订

WebService推送数据,数据结构应该怎样定义?

存放在Session有一些弊端,不能实时更新.server压力增大等... 要求:将从BO拿回来的数据存放在UI Cache里面,数据库更新了就通过RemoveCallback "告诉"UI Cache.实现更新. 环境:BO 提供一个WebService给UI取数据.UI也有一个WebService,提供给BO 通知UI更新数据.数据结构的原生类始终在BO层. 本来是想在数据库Update 后,在BO将Cache的数据推送至UI Cache,但当中遇到了自己解决不了的问题: 数据结构

ssh整合dwr,推送数据

1.web.xml中添加如下代码: <!-- 配置DWR前端控制器 -->     <servlet>         <servlet-name>dwrServlet</servlet-name>         <servlet-class>org.directwebremoting.servlet.DwrServlet</servlet-class>         <!-- 指定配置文件 -->         &

Spring MVC 实现web Socket向前端实时推送数据

最近项目中用到了webSocket服务,由后台实时向所有的前端推送消息,前端暂时是不可以发消息给后端的,数据的来源是由具体的设备数据收集器收集起来,然后通过socket推送给后端,后端收到数据后,再将这些数据推送给前端. 听起来业务逻辑有点复杂.其实单独的实现socket或websocket都比较简单,但是二者之间的数据传输问题,困扰了我很久.也想过用redis做一个消息队列,将socket接收到的数据处理后丢进去,然后再用websocket从redis里取出数据,再推送给前端. 但是.问题来了

推送数据到mysql乱码,对方是lantin

show variables like 'char%';  charracter _set_database 是数据库安装的编码: 这时候,用jdbc用该在连接时候,.getConnection(config.getDBUrl()+"?useUnicode=true&characterEncoding=gbk", config.getDBUser(), config.getDBPassword());追加编码方式. 无需在再带里面在set names gbk了 cmd :ins

当对服务器端返回的极光推送数据请求时,AFN 的 GET 请求失败如何解决

代码段 控制台 只需在 manager 那里添加一行代码即可 //传入json格式数据,不写则普通post manager.requestSerializer = [AFJSONRequestSerializer serializer];//默认返回JSON类型(此句可以不写) manager.responseSerializer.acceptableContentTypes = [NSSet setWithObject:@"text/html"];//设置相应内容类型

SignalR系列教程:服务器广播与主动数据推送

本篇是本系列入门篇的最后一遍,由于工作关系,接触SignalR的时间不是很多.等下次有空的话我会写一个利用“SignalR”开发一个在线聊天室的系列博文.近期的话我更偏向于更新框架设计相关的文章,到时候我会在文章中分享我在工作中开发的“日志框架”.“缓存框架”,“分布式下载框架”等.有兴趣的朋友可以关注我,一起交流. 本篇博文参考:https://www.asp.net/signalr/overview/getting-started/tutorial-server-broadcast-with