日志分析_使用shell完整日志分析案例

一、需求分析

 1. 日志文件每天生成一份(需要将日志文件定时上传至hdfs)
 2. 分析日志文件中包含的字段:访问IP,访问时间,访问URL,访问状态,访问流量 3. 现在有"昨日"的日志文件即logclean.jar
 3. 需求指标
      a. 统计PV值
      b. 统计注册人数
      c. 统计IP数
      d. 统计跳出率
      f. 统计二跳率

二、数据分析

1. 数据采集 使用shell脚本定时上传
2. 数据清洗 过滤字段 格式化时间等字段
3. 数据分析 使用一级分区(date)
4. 数据导出 sqoop
5. 使用到的框架有: shell脚本 hdfs mapreduce hive sqoop mysql
期望结果
  pv    register  ip    jumpprob    two_jumpprob

三、实施

1. 自动上传到hdfs
     $HADOOP_HOME/bin/hdfs dfs -rm -r $HDFS_INPUT_PATH > /dev/null 2>&1
     $HADOOP_HOME/bin/hdfs dfs -mkdir -p $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1
     $HADOOP_HOME/bin/hdfs dfs -put $LOG_PATH  $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1
2. 数据清洗(使用mapreduce过滤脏数据与不需要的静态数据及去双引号,转换date)
     $HADOOP_HOME/bin/hdfs dfs -rm -r $HDFS_OUTPUT_PATH > /dev/null 2>&1
     $HADOOP_HOME/bin/yarn jar $JAR_PATH $ENTRANCE $HDFS_INPUT_PATH/$yesterday $HDFS_OUTPUT_PATH/date=$yesterday
3. 在Hive中创建日志数据库和分区表并将清洗后的文件加入分区
     $HIVE_HOME/bin/hive -e "create database if not exists $HIVE_DATABASE" > /dev/null 2>&1
     $HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "create external table if not exists $HIVE_TABLE(
     ip string,day string,url string) partitioned by (date string)
     row format delimited fields terminated by ‘\t‘ location ‘$HDFS_OUTPUT_PATH‘ "
     $HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "alter table $HIVE_TABLE add partition (date=‘$yesterday‘)"
4. 分析数据并使用sqoop导出至mysql
     pv:
        create table if not exists pv_tb(pv string) row format delimited fields terminated by ‘\t‘;
        insert overwrite table pv_tb select count(1) from weblog_clean where date=‘2016_11_13‘;
     register:
        create table if not exists register_tb(register string) row format delimited fields terminated by ‘\t‘;
        insert overwrite table register_tb select count(1) from weblog_clean where date=‘2016_11_13‘ and instr(url,‘member.php?mod=register‘) > 0;
     ip:
        create table if not exists ip_tb(ip string) row format delimited fields terminated by ‘\t‘;
        insert overwrite table ip_tb select count(distinct ip) from weblog_clean where date=‘2016_11_13‘;
     jumpprob:
        create table if not exists jumpprob_tb(jump double) row format delimited fields terminated by ‘\t‘;
        insert overwrite table jumpprob_tb
        select ghip.singleip/aip.ips from (select count(1) singleip from(select count(ip) ips from weblog_clean where date=‘2016_11_13‘ group by ip having ips <2) gip) ghip,
        (select count(ip) ips from weblog_clean where date=‘2016_11_13‘) aip;
     two_jumpprob:
        create table if not exists two_jumpprob_tb(jump double) row format delimited fields terminated by ‘\t‘;
        insert overwrite table two_jumpprob_tb
        select ghip.singleip/aip.ips from (select count(1) singleip from(select count(ip) ips from weblog_clean where date=‘2016_11_13‘ group by ip having ips >=2) gip) ghip,
        (select count(ip) ips from weblog_clean where date=‘2016_11_13‘) aip;
     merge table # 注意上面几个表是分开创建,效率比下面高,但存储消耗上面较高
        create table if not exists log_result(pv string,register string,ip string,jumpprob double,two_jumpprob double ) row format delimited fields terminated by ‘\t‘;
        insert overwrite table log_result
        select log_pv.pv,log_register.register,log_ip.ip,log_jumpprob.jumpprob,log_two_jumpprob.two_jumpprob from (select count(1) pv from weblog_clean where date=‘2016_11_13‘) log_pv,
        (select count(1) register from weblog_clean where date=‘2016_11_13‘ and instr(url,‘member.php?mod=register‘) > 0) log_register,
        (select count(distinct ip) ip from weblog_clean where date=‘2016_11_13‘) log_ip,
        (select ghip.singleip/aip.ips jumpprob from (select count(1) singleip from(select count(ip) ips from weblog_clean where date=‘2016_11_13‘ group by ip having ips <2) gip) ghip,
        (select count(ip) ips from weblog_clean where date=‘2016_11_13‘) aip) log_jumpprob,
        (select ghip.singleip/aip.ips two_jumpprob from (select count(1) singleip from(select count(ip) ips from weblog_clean where date=‘2016_11_13‘ group by ip having ips >=2) gip) ghip,
        (select count(ip) ips from weblog_clean where date=‘2016_11_13‘) aip) log_two_jumpprob;

四、结果展示

mysql> select * from weblog_result;
       +--------+----------+-------+----------+--------------+
       | pv     | register | ip    | jumpprob | two_jumpprob |
       +--------+----------+-------+----------+--------------+
       | 169857 | 28       | 10411 |     0.02 |         0.04 |
       +--------+----------+-------+----------+--------------+
       1 row in set (0.00 sec)

五、logclean.jar(过滤日志字段:日期转换,去除双引号,过去根url)

package org.apache.hadoop.log.project;

import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class LogClean extends Configured implements Tool {

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int res = ToolRunner.run(conf, new LogClean(), args);
            System.exit(res);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public int run(String[] args) throws Exception {
    	Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "logclean");
        // 设置为可以打包运行

        job.setJarByClass(LogClean.class);
        FileInputFormat.setInputPaths(job, args[0]);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 清理已存在的输出文件
        FileSystem fs = FileSystem.get(new URI(args[0]), getConf());
        Path outPath = new Path(args[1]);
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }

        boolean success = job.waitForCompletion(true);
        if(success){
            System.out.println("Clean process success!");
        }
        else{
            System.out.println("Clean process failed!");
        }
        return 0;
    }

    static class MyMapper extends
            Mapper<LongWritable, Text, LongWritable, Text> {
        LogParser logParser = new LogParser();
        Text outputValue = new Text();

        protected void map(
                LongWritable key,
                Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
                throws java.io.IOException, InterruptedException {
            final String[] parsed = logParser.parse(value.toString());

            // step1.过滤掉静态资源访问请求
            if (parsed[2].startsWith("GET /static/")
                    || parsed[2].startsWith("GET /uc_server")) {
                return;
            }
            // step2.过滤掉开头的指定字符串
            if (parsed[2].startsWith("GET /")) {
                parsed[2] = parsed[2].substring("GET /".length());
            } else if (parsed[2].startsWith("POST /")) {
                parsed[2] = parsed[2].substring("POST /".length());
            }
            // step3.过滤掉结尾的特定字符串
            if (parsed[2].endsWith(" HTTP/1.1")) {
                parsed[2] = parsed[2].substring(0, parsed[2].length()
                        - " HTTP/1.1".length());
            }
            // step4.只写入前三个记录类型项
            outputValue.set(parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]);
            context.write(key, outputValue);
        }
    }

    static class MyReducer extends
            Reducer<LongWritable, Text, Text, NullWritable> {
        protected void reduce(
                LongWritable k2,
                java.lang.Iterable<Text> v2s,
                org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            for (Text v2 : v2s) {
                context.write(v2, NullWritable.get());
            }
        };
    }

    /*
     * 日志解析类
     */
    static class LogParser {
        public static final SimpleDateFormat FORMAT = new SimpleDateFormat(
                "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
        public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(
                "yyyyMMddHHmmss");

        public static void main(String[] args) throws ParseException {
            final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] \"GET /static/image/common/faq.gif HTTP/1.1\" 200 1127";
            LogParser parser = new LogParser();
            final String[] array = parser.parse(S1);
            System.out.println("样例数据: " + S1);
            System.out.format(
                    "解析结果:  ip=%s, time=%s, url=%s, status=%s, traffic=%s",
                    array[0], array[1], array[2], array[3], array[4]);
        }

        /**
         * 解析英文时间字符串
         *
         * @param string
         * @return
         * @throws ParseException
         */
        private Date parseDateFormat(String string) {
            Date parse = null;
            try {
                parse = FORMAT.parse(string);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            return parse;
        }

        /**
         * 解析日志的行记录
         *
         * @param line
         * @return 数组含有5个元素,分别是ip、时间、url、状态、流量
         */
        public String[] parse(String line) {
            String ip = parseIP(line);
            String time = parseTime(line);
            String url = parseURL(line);
            String status = parseStatus(line);
            String traffic = parseTraffic(line);

            return new String[] { ip, time, url, status, traffic };
        }

        private String parseTraffic(String line) {
            final String trim = line.substring(line.lastIndexOf("\"") + 1)
                    .trim();
            String traffic = trim.split(" ")[1];
            return traffic;
        }

        private String parseStatus(String line) {
            final String trim = line.substring(line.lastIndexOf("\"") + 1)
                    .trim();
            String status = trim.split(" ")[0];
            return status;
        }

        private String parseURL(String line) {
            final int first = line.indexOf("\"");
            final int last = line.lastIndexOf("\"");
            String url = line.substring(first + 1, last);
            return url;
        }

        private String parseTime(String line) {
            final int first = line.indexOf("[");
            final int last = line.indexOf("+0800]");
            String time = line.substring(first + 1, last).trim();
            Date date = parseDateFormat(time);
            return dateformat1.format(date);
        }

        private String parseIP(String line) {
            String ip = line.split("- -")[0].trim();
            return ip;
        }
    }
}

六、完整shell,注意准备logclean.jar(用于日志过滤MR程序),与"昨日"的日志文件和文件位置

#!/bin/bash

echo -ne | cat <<eot
#############################################################################
##########################   普   度   众   生    ###########################
                                  _oo0oo_
                                 088888880
                                 88" . "88
                                 (| -_- |)
                                  0\ = /0
                               ___/‘---‘\___
                             .‘ \\\\|     |// ‘.
                            / \\\\|||  :  |||// \\
                           /_ ||||| -:- |||||- \\
                          |   | \\\\\\  -  /// |   |
                          | \_|  ‘‘\---/‘‘  |_/ |
                          \  .-\__  ‘-‘  __/-.  /
                        ___‘. .‘  /--.--\  ‘. .‘___
                     ."" ‘<  ‘.___\_<|>_/___.‘ >‘  "".
                    | | : ‘-  \‘.;‘\ _ /‘;.‘/ - ‘ : | |
                    \  \ ‘_.   \_ __\ /__ _/   .-‘ /  /
                =====‘-.____‘.___ \_____/___.-‘____.-‘=====
                                  ‘=---=‘                                    

              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                        佛祖保佑    iii    永不出错
eot
##get yesterday date
yesterday=`date -d ‘-1 day‘ +‘%Y_%m_%d‘`
echo $yesterday
############
## define ##
############
HADOOP_HOME=/opt/cdh-5.6.3/hadoop-2.5.0-cdh5.3.6
HIVE_HOME=/opt/cdh-5.6.3/hive-0.13.1-cdh5.3.6
SQOOP_HOME=/opt/cdh-5.6.3/sqoop-1.4.5-cdh5.2.6
HIVE_DATABASE=weblog
HIVE_TABLE=weblog_clean
HIVE_RSTABLE=weblog_result
MYSQL_USERNAME=root
MYSQL_PASSWORD=root
EXPORT_DIR=/user/hive/warehouse/weblog.db/weblog_result
NUM_MAPPERS=1
#########################
##  get logfile path   ##
#########################
LOG_PATH=/home/liuwl/opt/datas/weblog/access_$yesterday.log
JAR_PATH=/home/liuwl/opt/datas/logclean.jar
ENTRANCE=org.apache.hadoop.log.project.LogClean
HDFS_INPUT_PATH=/weblog/source
HDFS_OUTPUT_PATH=/weblog/clean
SQOOP_JDBC=jdbc:mysql://hadoop09-linux-01.ibeifeng.com:3306/$HIVE_DATABASE
############################
## upload logfile to hdfs ##
############################
echo "start to upload logfile"
#$HADOOP_HOME/bin/hdfs dfs -rm -r $HDFS_INPUT_PATH > /dev/null 2>&1
HSFiles=`$HADOOP_HOME/bin/hdfs dfs -ls $HDFS_INPUT_PATH/$yesterday`
if [ -z "$HSFiles" ]; then
$HADOOP_HOME/bin/hdfs dfs -mkdir -p $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1
$HADOOP_HOME/bin/hdfs dfs -put $LOG_PATH  $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1
echo "upload ok"
else
echo "exists"
fi
###########################
## clean the source file ##
###########################
echo "start to clean logfile"
HCFiles=`$HADOOP_HOME/bin/hdfs dfs -ls $HDFS_OUTPUT_PATH`
if [ -z "$HCFiles" ]; then
$HADOOP_HOME/bin/yarn jar $JAR_PATH $ENTRANCE $HDFS_INPUT_PATH/$yesterday $HDFS_OUTPUT_PATH/date=$yesterday
echo "clean ok"
fi
###########################
## create the hive table ##
###########################
echo "start to create the hive table"
$HIVE_HOME/bin/hive -e "create database if not exists $HIVE_DATABASE" > /dev/null 2>&1
$HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "create external table if not exists $HIVE_TABLE(ip string,day string,url string) partitioned by (date string) row format delimited fields terminated by ‘\t‘ location ‘$HDFS_OUTPUT_PATH‘ "
echo "add patition to hive table"
$HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "alter table $HIVE_TABLE add partition (date=‘$yesterday‘)"
##################################
## create the hive reslut table ##
##################################
echo "start to create the hive reslut table"
$HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "create table if not exists $HIVE_RSTABLE(pv string,register string,ip string,jumpprob double,two_jumpprob double ) row format delimited fields terminated by ‘\t‘;"
#################
## insert data ##
#################
echo "start to insert data"
HTFiles=`$HADOOP_HOME/bin/hdfs dfs -ls $EXPORT_DIR`
if [ -z "$HTFiles" ]; then
$HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "insert overwrite table $HIVE_RSTABLE select log_pv.pv,log_register.register,log_ip.ip,log_jumpprob.jumpprob,log_two_jumpprob.two_jumpprob from (select count(1) pv from $HIVE_TABLE where date=‘$yesterday‘) log_pv,(select count(1) register from $HIVE_TABLE where date=‘$yesterday‘ and instr(url,‘member.php?mod=register‘) > 0) log_register,(select count(distinct ip) ip from $HIVE_TABLE where date=‘$yesterday‘) log_ip,(select ghip.singleip/aip.ips jumpprob from (select count(1) singleip from(select count(ip) ips from $HIVE_TABLE where date=‘$yesterday‘ group by ip having ips <2) gip) ghip,(select count(ip) ips from $HIVE_TABLE where date=‘$yesterday‘) aip) log_jumpprob,(select ghip.singleip/aip.ips two_jumpprob from (select count(1) singleip from(select count(ip) ips from $HIVE_TABLE where date=‘$yesterday‘ group by ip having ips >=2) gip) ghip,(select count(ip) ips from $HIVE_TABLE where date=‘$yesterday‘) aip) log_two_jumpprob"
fi
###################################
## create the mysql reslut table ##
###################################
mysql -u$MYSQL_USERNAME -p$MYSQL_PASSWORD -e "
create database if not exists $HIVE_DATABASE DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
use $HIVE_DATABASE;
create table if not exists $HIVE_RSTABLE(pv varchar(20) not null,register varchar(20) not null,ip varchar(20) not null,jumpprob double(6,2) not null,two_jumpprob double(6,2) not null) DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
truncate table if exists $HIVE_RSTABLE;
quit"
#######################################
## export hive result table to mysql ##
#######################################
echo "start to export hive result table to mysql"
$SQOOP_HOME/bin/sqoop export --connect $SQOOP_JDBC --username $MYSQL_USERNAME --password $MYSQL_PASSWORD --table $HIVE_RSTABLE --export-dir $EXPORT_DIR --num-mappers $NUM_MAPPERS --input-fields-terminated-by ‘\t‘
echo "shell finished"
时间: 2024-11-07 07:37:04

日志分析_使用shell完整日志分析案例的相关文章

SQL Server中的事务日志管理(6/9):大容量日志恢复模式里的日志管理

当一切正常时,没有必要特别留意什么是事务日志,它是如何工作的.你只要确保每个数据库都有正确的备份.当出现问题时,事务日志的理解对于采取修正操作是重要的,尤其在需要紧急恢复数据库到指定点时.这系列文章会告诉你每个DBA应该知道的具体细节. 这个标题有点用词不当,因为运行在大容量日志恢复模式里的数据库,我们通常是长期不管理日志.但是,DBA会考虑在大容量加载时,短期切换到大容量恢复模式.当数据库在大容量模式里运行时,一些其他例如索引重建的操作会最小化日志(minimally logged),因此在日

基于mariadb的日志服务器及用loganalyzer实现日志的管理分析

日志文件和相关服务进程 日志文件用来记录系统,服务等在运行过程中发生的事件,事件发生的时间及事件的关键性程序.这些记录的信息在服务器运行出现问题时用来查看分析,以便解决问题.在Linux上,日志的记录一般有两种方式,一种是由软件自身完成自身运行状态的记录,例如httpd:另一种是由Linux上提供的日志文件管理系统来统一管理.运行的软件只需要调用这个管理系统中的相关服务,即可完成日志的记录.rsyslog就是这样的一个日志文件管理系统. rsyslog内置了很多facility,这个可以理解为服

日志和告警数据挖掘经验谈——利用日志相似度进行聚类,利用时间进行关联分析

摘自:http://www.36dsj.com/archives/75208 最近参与了了一个日志和告警的数据挖掘项目,里面用到的一些思路在这里和大家做一个分享. 项目的需求是收集的客户系统一个月300G左右的的日志和告警数据做一个整理,主要是归类(Grouping)和关联(Correlation),从而得到告警和日志的一些统计关系,这些统计结果可以给一线支持人员参考. 得到的数据主要分为两部分,一部分是告警的历史数据,这部分数据很少,只有50M左右,剩下的全部都是日志数据.日志数据大概有50多

如何使用Logminer来分析具体的DML操作日志

如何使用Logminer来分析具体的DML操作日志在Oracle数据库维护中,常常需要分析原来数据库都做了哪些删除.更新.增加数据的操作,所以一般需要用到Logminer这工具来分析归档日志.环境:AIX5.3+Oracle10.2.0.1   使用IBM的Tivoli Storage Manager把数据库数据.归档日志备份到带库中 1.确定具体时间的DML操作,把相应的归档日志从带库恢复到数据库中2.用Logminer来分析相应的归档日志 一.在sqlplus用sys超级用户登陆数据库中,然

如何通过友盟分析发布后App崩溃日志-b

要分析崩溃日志,首先需要保留发布时的编译出来的.xcarchive文件.这个文件包含了.DSYM文件. 我一般的做法是,发布成功后,把这个文件.xcarchive直接提交到代码版本库对应的版本分支里,这样就不会搞丢了. 这个文件在哪呢?打开XCode->菜单Window->Organizer,在编译成功的文件上右键,就能打开了. 两种比较麻烦的方法. 第一种方法: 使用dwarfdump命令 dwarfdump --uuid xx.app.dSYM     用来得到app的UUID.dwarf

elk分析nginx日志和tomcat日志

一.介绍 Elasticsearch + Logstash + Kibana(ELK)是一套开源的日志管理方案. Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等. Logstash是一个完全开源的工具,它可以对你的日志进行收集.分析,并将其存储供以后使用 kibana 是一个开源和免费的工具,它可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web

因为diagwait未配置导致RAC脑裂日志记录不完整的分析案例

1.故障现象 一个RAC,CRS版本为10.2.0.4,在第二节点DOWN机后,第一节点也相继DOWN机. 2.CRS日志分析 2.1 二节点日志情况 CRS_LOG [cssd(8796)]CRS-1611:node XXdb1 (1) at 75% heartbeat fatal, eviction in 14.118 seconds 2014-07-04 22:49:38.556 [cssd(8796)]CRS-1611:node XXdb1 (1) at 75% heartbeat fa

日志处理中一些shell命令技巧

日志处理中一些shell命令技巧 阴差阳错的做的日志分析,前途未卜的这段日子,唯一还有点意思的可能就是手动的处理大量日志.总结一下. 日志文件的输入是动则几个G的文本.从N个这样的文件中得到一个列表,一个数字,一个比例.在什么工具都没有情况下,用shell命令不仅是验证系统数据的准确性的方法,也是一个很好的学习过程. 使用cut命令切割日志行 下面的一行典型的apache访问日志: 120.51.133.125 - - [26/Apr/2013:12:20:06 +0800] "GET /ski

ArcGIS for Desktop入门教程_第七章_使用ArcGIS进行空间分析 - ArcGIS知乎-新一代ArcGIS问答社区

原文:ArcGIS for Desktop入门教程_第七章_使用ArcGIS进行空间分析 - ArcGIS知乎-新一代ArcGIS问答社区 1 使用ArcGIS进行空间分析 1.1 GIS分析基础 GIS的六大功能是数据获取.存储.查询.分析.表达.输出.在前面的内容里已经介绍了使用ArcGIS进行数据获取.存储.查询.表达和输出的过程,本章将介绍如何在ArcGIS中进行地理分析.分析是GIS的核心和灵魂,是GIS区别于一般的信息系统.CAD或者电子地图系统的主要标志之一. GIS分析,就是研究