ElasticSearch的搭建与数据统计

 平台内的产品有一个数据分析,统计平台内某个商户某个时间段内(今天、昨天、7天内、30天内……)的各种数据分析,这种分析显然用MySql的count、sum、GroupBy之类的去查询是很不靠谱的,尤其是在数据量很大的情况下效率就不言而喻了,本来想着用HBase的MR来做,或者直接把各纬度的数据通过HADOOP的MR处理完存到HBase里面,后来与朋友聊天后被朋友严重鄙视了一顿,鄙视的内容基本是嫌弃我们的数据量太小根本用不到HBase更用不到MR,在朋友的极力蛊惑之下决定用ElasticSearch来实现以下简称ES,好吧,那我们还是从传统的搭建-采坑-填坑-再采坑-再填坑开始。

1、下载并安装配置ElasticSearch

 ElasticSearch的官网http://www.elastic.co/products/elasticsearch找到需要的版本,我这里选择的是5.6.9的版本,不要问我为什么,因为最新版在我的未知领域有更多的坑!直接下载5.6.9是我目前用着最舒服的一个版本,ES依赖最低JDK1.8版本,所以环境变量一定要配置好

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.9.tar.gz
tar -zxvf elasticsearch-5.6.9.tar.gz -C /usr/local/
cd /usr/local/elasticsearch-5.6.9

 ES的配置文件全部在config目录下,其中elasticsearch.yml是主配置文件,进去后主要修改几个地方,其他的地方根据业务需求自行修改:

vim config/其中elasticsearch.yml
1
2
3
4
5
6
cluster.name=tsk-es
node.name=tsk1
path.data: /opt/data/elastic/data
path.logs: /opt/data/elastic/log
network.host: 0.0.0.0
http.port: 9200

 不要以为把elasticsearch.yml修改完就可以直接执行bin目录下的elasticsearch,会报一堆错误给你的!第一个错误就是告诉你不能用root用户启动ES,所以你要先创建一个用户,我这里创建一个叫elastic的用户然后记得给用户授文件夹的权限,然后su进入用户启动,但是先别急着进elastic用户,还有些东西需要改一下:

 1、修改/etc/security/limits.conf文件,否则会报max file descriptors [4096] for elasticsearch process likely too low, increase to at least [65536]错误

vim /etc/security/limits.conf
1
2
3
4
* soft nofile 65536
* hard nofile 65536
* soft nproc 2048
* hard nproc 4096

 2、修改/etc/sysctl.conf 文件否则会报max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]错误

vim /etc/sysctl.conf
1
vm.max_map_count=262144

 全部修改完成之后就可以进入elastic用户启动ES了

su elastic
bin/elasticsrarch

 如果不出意外的话你的ES应该正常运行了,这时用浏览器访问192.168.0.1:9200你就会看到一串JSON字符串,证明你的ES已经启动成功,如果想要后台运行ES直接执行

nohup bin/elasticsearch > /opt/data/elastic/elastic.log 2>&1 &

2、ES操作初了解

 ES的操作都是通过HTTP请求进行的,不同的请求方式和参数针对不同的操作,比如创建一个索引就是PUT,删除一个索引用的是DELETE,查询如果没参数直接用GET就好,如果有参数或者是提交数据的话用POST,那么我们第一步肯定是先创建索引开始:

PUT:http://192.168.0.1:9200/shopsinfo
{
"mappings":{
    "shopsOrder":{
        "properties":{
            "shopid":{
                "type":"string",
                "index": "not_analyzed"
            },
            "createdate":{
                "type":"string",
                "index": "not_analyzed"
            },
            "timestamp":{
                "type":"long"    
            },
            "paymentType":{
                "type":"string"    ,
                "index": "not_analyzed"
            },
            "amount":{
                "type":"long"
            }
        }
    }
}
}

 上面的意思是创建一个名叫shopsinfo的索引,里面有一个叫shopsOrder的mapping,里面有shopid,createdata,timestamp,paymentType,amount几个字段,以及分别对应的type

 插入数据比较简单,就是POST就好参数就是一个JSON

POST:http://192.168.0.1:9200/shopsinfo/shopsOrder
{
    "shopid": "96119",
    "createdate": "20180410",
    "timestamp": 1523289600000,
    "paymentType": "alipay",
    "amount": 6917
}

 删除数据

POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_delete_by_query

 查询
GET/POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_search

 关于ES的操作和其他不再过多赘述,官网有中文版,度娘上也一大堆,重中之重是ES的统计查询这是ES的关键

3、重点之查询

 ES是属于倒排索引,查询的效率特别的高,但是ES的查询语句就很麻烦了,ES不管是查询、统计都是用POST的BODY以JSON的形式进行的,比如我要查询时间戳>某个时间并且shopId为100000002和100000006的在SQL中是这样的:

select * from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")

 在ES中就得这么查:

POST:http://192.168.0.1:9200/shopsinfo/shopsOrder/_search
{
    "size":20,
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "timestamp":{
                            "gte":1523671189000
                        }

                    }
                },
                {
                    "terms":{
                        "shopid":["100000002","100000006"]
                    }
                }
            ]
        }
    }
}

 这里面我传递了size参数,如果不传,ES默认给你返回10条数据,查询结果ES也会给你返回JSON,其中hits字段中会有total就是你查询的结果的总数hits会返回给你结果内容。

 以上是简单的查询,统计的话ES是以aggs作为参数,全称应该叫做Aggregation,比如接着刚才的查询我想计算出结果的amount总额是多少就是类似SQL中的

select sum(amount)query_amount from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")

 在ES中就得这么查

{
    "aggs":{
        "query_amount":{
            "sum":{
                "field":"amount"
            }
        }
    },
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "timestamp":{
                            "gte":1523671189000
                        }

                    }
                },
                {
                    "terms":{
                        "shopid":["100000002","100000006"]
                    }
                }
            ]
        }
    }
}

 统计的结果在返回值的aggregations参数里的query_amount下类似这样的:

......
"aggregations": {
    "query_amount": {
        "value": 684854
    }
}
......

 接下来再复杂一点点,按天分组进行统计查询SQL中的提现是这样的:

select createdate,sum(amount)query_amount from shopsOrder where timestamp>1523671189000 and shopid in ("100000002","100000006")
group by createdate order by createdate

 在ES中是这样的:

{
    "size":0,
    "aggs":{
        "orderDate":{
            "terms":{
                "field":"createdate",
                "order":{
                    "_term":"asc"
                }
            },
            "aggs":{
                "query_amount":{
                    "sum":{
                        "field":"amount"
                    }
                }
            }
        }
    },
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "timestamp":{
                            "gte":1523671189000
                        }

                    }
                },
                {
                    "terms":{
                        "shopid":["100000002","100000006"]
                    }
                }
            ]
        }
    }
}

 查询结果为

......
"aggregations": {
    "orderDate": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 99,
        "buckets": [
            ......
            {
                "key": "20180415",
                "doc_count": 8,
                "query_amount": {
                    "value": 31632
                }
            },
            {
                "key": "20180417",
                "doc_count": 3,
                "query_amount": {
                    "value": 21401
                }
            },
            {
                "key": "20180418",
                "doc_count": 2,
                "query_amount": {
                    "value": 2333
                }
            }
            ......
        ]
    }
}

 buckets中就是查询的结果,key为按我createdate分组后的值,doc_count类似count,query_amount为sum后的值。至于我的参数里面有一个size:0是因为我不需要具体的记录就是hits,所以这里传0

 最后我们来个更复杂的1、统计所有的总额;2、先按paymentType支付方式分组统计amount总额,并且每个支付方式中再按天分组统计每天的amount总额

    {
    "size":0,
    "aggs":{
        "amount":{
            "sum":{
                "field":"amount"
            }
        },
        "paymenttype":{
            "terms":{
                "field":"paymentType"
            },
            "aggs":{
                "query_amount":{
                    "sum":{
                        "field":"amount"
                    }
                },
                "payment_date":{
                    "terms":{
                        "field":"createdate"
                    },
                    "aggs":{
                        "query_amount":{
                            "sum":{
                                "field":"amount"
                            }
                        }
                    }
                }
            }
        }
    },
    "query":{
        "bool":{
            "must":[
                {
                    "range":{
                        "timestamp":{
                            "gte":1523671189000
                        }

                    }
                },
                {
                    "terms":{
                        "shopid":["100000002","100000006"]
                    }
                }
            ]
        }
    }
}

 查询结果为:

......
"amount": {
    "value": 684854
},
"paymenttype":{
     ......
    "buckets": [
        {
            "key": "wechatpay",
            "doc_count": 73,
            "amount": {
                "value": 351142
            },
            "payment_date": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 25,
                "buckets": [
                    ......
                    {
                        "key": "20180415",
                        "doc_count": 6,
                        "amount": {
                            "value": 29032
                        }
                    },
                    {
                        "key": "20180425",
                        "doc_count": 6,
                        "amount": {
                            "value": 21592
                        }
                    }
                    ......
                ]
            }
        },
        {
            "key": "alipay",
            "doc_count": 67,
            "amount": {
                "value": 333712
            },
            "payment_date": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 23,
                "buckets": [
                    ......
                    {
                        "key": "20180506",
                        "doc_count": 8,
                        "amount": {
                            "value": 38280
                        }
                    },
                    {
                        "key": "20180426",
                        "doc_count": 6,
                        "amount": {
                            "value": 41052
                        }
                    }
                    ......
                ]
            }
        }
    ]
}

4、JAVA操作ES

 根据自己下载的ES版本下载对应版本的JAR包,我安装的是5.6.9所以我的JAR包版本也应该是5.6.9

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>5.6.9</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.6.9</version>
</dependency>

 创建一个helper操作ES,由于我的ES项目是基于SpringBoot的所以我的helper决定交由spring去管理,其实写一个单例也是可以的,先创建client连接

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.net.InetAddress;

Settings settings = Settings.builder().put("cluster.name", "tsk-es").put("client.transport.sniff", true)
                    .build();
TransportClient client = new PreBuiltTransportClient(settings)
                    .addTransportAddresses(new InetSocketTransportAddress(InetAddress.getByName(HOST), PORT));

 插入数据比较简单你可以直接插入JSON字符串,也可以传递JAVA BEAN

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.xcontent.XContentType;

IndexResponse response = client.prepareIndex(index, mapping).setSource(jsonStr, XContentType.JSON)
                .get();

 查询就比较麻烦了,已上面最后一个查询先按paymentType支付方式分组统计amount总额,并且每个支付方式中再按天分组统计每天的amount总额为例:

public void getAmountData(Long startTimestamp, String... shopIds) {
    // 先初始化一个SearchRequestBuilder,指向shopsInfo/shopsOrder
    SearchRequestBuilder sbuilder = client.prepareSearch("shopsinfo").setTypes("shopsOrder");
    // 条件一
    TermsQueryBuilder mpq = QueryBuilders.termsQuery("shopid", shopIds);
    // 条件二
    RangeQueryBuilder mpq2 = QueryBuilders.rangeQuery("timestamp").gte(startTimestamp);
    // 初始化QueryBuilder
    QueryBuilder queryBuilder = QueryBuilders.boolQuery().must(mpq).must(mpq2);
    // 将QueryBuilder放入SearchRequestBuilder
    sbuilder.setQuery(queryBuilder);
    sbuilder.setSize(0);

    // sum,这里创建一个实例全部用这个实例就行
    SumAggregationBuilder salaryAgg = AggregationBuilders.sum("query_amount").field("amount");

    TermsAggregationBuilder paymentAgg = AggregationBuilders.terms("paymentType").field("paymentType");
    paymentAgg.size(100);
    paymentAgg.subAggregation(salaryAgg);
    TermsAggregationBuilder groupDateAff = AggregationBuilders.terms("payment_date").field("createdate")
            .order(Order.term(true));
    groupDateAff.size(100);
    groupDateAff.subAggregation(salaryAgg);
    paymentAgg.subAggregation(groupDateAff);
    // 将统计查询放入SearchRequestBuilder
    sbuilder.addAggregation(salaryAgg).addAggregation(paymentAgg);
    SearchResponse response = sbuilder.execute().actionGet();
    Map<String, Aggregation> aggMap = response.getAggregations().asMap();
    // 获取全部的总额
    InternalSum shopGroupAllAmount = (InternalSum) aggMap.get("amount");
    Double amount = shopGroupAllAmount.getValue();
    ......
}

 SearchResponse中就已经可以获取到全部的信息,至于后续怎么解析数据那就看具体业务需求以及每个人的习惯。ES的各种操作说简单也简单说复杂也复杂,按照朋友的话说就是用熟了自然就简单,确实也是这样,不管用啥都得深入了解一下,要不然自己就是个坑!比如我在做统计查询的时候返回的结果总是比应该的结果要少很多,而少的数量总是出现在sum_other_doc_count这个字段里,研究了半天才发现原来统计的结果也需要传递size参数,否则一样默认10条!

 最后,还是要感谢一下我那朋友的,虽然他对我想构建各种大数据平台的事情嗤之以鼻(因为我们数据量确实不大),但依然输出了一下他所能想到的最优解。

坚持原创技术分享,您的支持将鼓励我继续创作!