.net + sql
#region 构造统计日期临时表 DataTable originalDates = new DataTable(); DataColumn col = new DataColumn("OriginalStartDate", typeof(Int32)); originalDates.Columns.Add(col); col = new DataColumn("OriginalEndDate", typeof(Int32)); originalDates.Columns.Add(col); int step = endDate.Subtract(startDate).Days + 1; for (int i = 1; i <= 30; i++) { //如果是天的周期,增加到30日留存 if (period != PeriodOptions.Daily && i > 6) break; DataRow newRow = originalDates.NewRow(); DateTime pStartDate; DateTime pEndDate = Utility.GetNextStatDate(period, startDate, endDate, -i, out pStartDate); newRow["OriginalStartDate"] = int.Parse(pStartDate.ToString("yyyyMMdd")); newRow["OriginalEndDate"] = int.Parse(pEndDate.ToString("yyyyMMdd")); originalDates.Rows.Add(newRow); } #endregion //计算汇总 SqlParameter[] paramters = new SqlParameter[] { SqlParamHelper.MakeInParam("@dt", SqlDbType.Structured), SqlParamHelper.MakeInParam("@StartDate", SqlDbType.Int, 4, startDate.ToString("yyyyMMdd")), SqlParamHelper.MakeInParam("@EndDate", SqlDbType.Int, 4, endDate.ToString("yyyyMMdd")), SqlParamHelper.MakeInParam("@Period", SqlDbType.TinyInt, 1, (int)period) }; paramters[0].TypeName = "dbo.OriginalDatesType"; paramters[0].Value = originalDates; DataSet ds = SqlHelper.ExecuteDataset(ComputingDB_ConnString, CommandType.StoredProcedure, "PR_StatRetainedUsers", paramters);
ALTER PROCEDURE [dbo].[PR_StatRetainedUsers] ( @dt OriginalDatesType readonly, @StartDate int, @EndDate int, @Period tinyint ) AS begin create table #RetainedUsers(SoftID int,Platform tinyint,ChannelID int,OriginalStatDate int,RetainedUserCount int) declare @sql nvarchar(max); if (@Period = 1) begin set @sql = N‘insert into #RetainedUsers(SoftID,Platform,ChannelID,OriginalStatDate,RetainedUserCount) select A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate,COUNT(distinct A.IMEI) from ...‘ + CAST((@StartDate / 10000) as nvarchar(10)) + N‘ A with(nolock) inner join @dt B on [email protected] and A.FirstLoginDate between B.OriginalStartDate and B.OriginalEndDate and (FromCache=0 or FromCache is null) group by A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate;‘ exec sp_executesql @sql, N‘@StartDate int,@dt OriginalDatesType readonly‘, @StartDate, @dt end else begin if (@StartDate/10000 = @EndDate / 10000) begin set @sql = N‘insert into #RetainedUsers(SoftID,Platform,ChannelID,OriginalStatDate,RetainedUserCount) select A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate,COUNT(distinct A.IMEI) from ....‘ + CAST((@StartDate / 10000) as nvarchar(10)) + N‘ A with(nolock) inner join @dt B on [email protected] and A.LoginDate between @StartDate and @EndDate and A.FirstLoginDate between B.OriginalStartDate and B.OriginalEndDate and (FromCache=0 or FromCache is null) group by A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate;‘ end else begin set @sql = N‘insert into #RetainedUsers(SoftID,Platform,ChannelID,OriginalStatDate,RetainedUserCount) select A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate,COUNT(distinct A.IMEI) from ( select * from ....‘ + CAST((@StartDate / 10000) as nvarchar(10)) + N‘ with(nolock) where [email protected] and LoginDate between @StartDate and @EndDate and (FromCache=0 or FromCache is null) union all select * from .... ‘ + CAST((@EndDate / 10000) as nvarchar(10)) + N‘ with(nolock) where [email protected] and LoginDate between @StartDate and @EndDate and (FromCache=0 or FromCache is null)) A inner join @dt B on A.FirstLoginDate between B.OriginalStartDate and B.OriginalEndDate group by A.SoftID,A.PLATFORM,A.FirstChannelID,B.OriginalEndDate;‘ end declare @part tinyint = 0; while @part < 128 begin --if (@Period <> 12 or @part = 0) begin exec sp_executesql @sql, N‘@part tinyint,@StartDate int,@EndDate int,@dt OriginalDatesType readonly‘, @Part, @StartDate, @EndDate, @dt; --end set @part = @part + 1 end end select @Period Period,OriginalStatDate,@EndDate StatDate,SoftID,Platform,-1 ID2,-1 ID1,0 OriginalNewUserCount,SUM(RetainedUserCount) RetainedUserCount from #RetainedUsers group by SoftID,Platform,OriginalStatDate select @Period Period,OriginalStatDate,@EndDate StatDate,SoftID,Platform,ChannelID ID2,-1 ID1,0 OriginalNewUserCount,SUM(RetainedUserCount) RetainedUserCount from #RetainedUsers group by SoftID,Platform,ChannelID,OriginalStatDate drop table #RetainedUsers end
hadoop
@MapConfig public static class MapTask extends Mapper<LongWritable, Text, Text, Text> { private Text mKey = new Text(); private Text mValue = new Text(); private StringBuilder sb = new StringBuilder(); private Map<String,Integer> map=new HashMap<String,Integer>(); @Override protected void setup(Context context) throws IOException, InterruptedException { String enddate =context.getConfiguration().get("key_enddate"); int period =Integer.parseInt(context.getConfiguration().get("period")); int step=Integer.parseInt(context.getConfiguration().get("step")); DateTime curstatdate=DateTime.parseToDateTime(enddate,"yyyyMMdd"); //设置要计算留存的时间 if (period==PeriodOptions.GetValueByEnum(PeriodOptions.Daily)){ for (DateTime startdate=curstatdate.addDays(-30);new Double(DateTime.minusDay(curstatdate,startdate)).intValue()>0;startdate=startdate.addDays(1)){ Integer tmp=Integer.parseInt(startdate.toString("yyyyMMdd")); map.put(startdate.toString("yyyyMMdd"),tmp); } }else if(period==PeriodOptions.GetValueByEnum(PeriodOptions.Weekly)){ for (DateTime startdate=curstatdate.addDays(-48);DateTime.minusDay(curstatdate,startdate)>0;startdate=startdate.addDays(1)){ if (WeekOptions.GetEnumByValue(startdate.getDayOfWeek())== WeekOptions.SUNDAY){ Integer tmp=Integer.parseInt(startdate.toString("yyyyMMdd")); for (DateTime substartdate=startdate.addDays(-step);new Double(DateTime.minusDay(startdate,substartdate)).intValue()>=0;substartdate=substartdate.addDays(1)){ map.put(substartdate.toString("yyyyMMdd"),tmp); } } } }else if(period==PeriodOptions.GetValueByEnum(PeriodOptions.NaturalMonth)){ for (DateTime startdate=curstatdate.addMonths(-7).addDays(1);DateTime.minusDay(curstatdate,startdate)>0;startdate=startdate.addDays(1)){ if (startdate.addDays(1).day()==1){ Integer tmp=Integer.parseInt(startdate.toString("yyyyMMdd")); for (DateTime substartdate=startdate.addMonths(-1).addDays(1);new Double(DateTime.minusDay(startdate,substartdate)).intValue()>0;substartdate=substartdate.addDays(1)){ map.put(substartdate.toString("yyyyMMdd"),tmp); } } } } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String str = value.toString(); String[] params = StringUtils.splitByWholeSeparatorPreserveAllTokens(str, "\t"); if (!map.containsKey(params[23]) || (Integer.parseInt(params[15])&1)!=0 ){ return; } int firstlogintime=map.get(params[23]); String enddate =context.getConfiguration().get("key_enddate"); sb.delete(0, sb.length()); //key<0:softid,2:platform,22 firstchannelid> //value<23 firstlogindate,4:logindate,3:imei> sb.append(params[0]).append("\t") .append(params[2]).append("\t") .append(params[22]); mKey.set(sb.toString()); mValue.set(firstlogintime + "\t" + enddate + "\t" + params[3]); context.write(mKey, mValue); sb.delete(0, sb.length()); sb.append(params[0]).append("\t") .append(params[2]).append("\t") .append(-1); mKey.set(sb.toString()); mValue.set(firstlogintime + "\t" + enddate + "\t" + params[3]); context.write(mKey, mValue); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { map.clear(); } } //key<0:softid,2:platform,22 firstchannelid> //value<23 firstlogindate,4:logindate,3:imei> @CombineConfig public static class CombineTask extends Reducer<Text, Text, Text, Text> { Text mvalue=new Text(); //留存用户 private Multiset<String> multiset=HashMultiset.create(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { multiset.clear(); for (Text item:values){ multiset.add(item.toString()); } for (String item:multiset.elementSet()){ mvalue.set(item); context.write(key,mvalue); } } } //key<0:softid,2:platform,22 firstchannelid> //value<23 firstlogindate,4:logindate,3:imei> @ReduceConfig public static class ReduceTask extends Reducer<Text, Text, Text, Text> { private Text mValue = new Text(); //留存用户 private Map<String, Multiset<Object>> MRetained = new HashMap<>(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Map.Entry<String, Multiset<Object>> map : MRetained.entrySet()) { MRetained.get(map.getKey()).clear(); } MRetained.clear(); for (Text item : values) { String[] params = StringUtils.splitByWholeSeparatorPreserveAllTokens(item.toString(), "\t"); String mapkey = params[0] + "\t" + params[1]; if (!params[0].equals(params[1])) { if (!MRetained.containsKey(mapkey)) { MRetained.put(mapkey, HashMultiset.create()); } MRetained.get(mapkey).add(params[2]); } } String period = context.getConfiguration().get("period"); for (Map.Entry<String, Multiset<Object>> map : MRetained.entrySet()) { if (map.getKey().split("\t") == null ) { continue; } mValue.set(period + "\t" + map.getKey() + "\t" + 0 + "\t" + map.getValue().elementSet().size()); //softid,platform,channelid,period,originaldate,statdate,OriginalNewUserCount,RetainedUserCount context.write(key, mValue); } } }
时间: 2024-11-02 14:40:38