这一部分承接上一部分,主要介绍Eleasticsearch Stack
本篇是本系列第四篇,也是最后一篇
Elasticsearch
前一段时间,在接触spring-boot时有过接触,当时的博客
这里先上一张图:

再看看ES的术语:
关系型数据库 | Elasticsearch |
Table | Index(Type) |
Row | Document |
Column | Filed |
Schema | Mapping |
SQL | DSL |
Elasticsearch有3个标签:Store, Search, and Analyze,前边的博客介绍了Store与Search,这里主要介绍Analyze。
官网、参考
ES前期一个Index可以有多个type,现在已经移除,一个Index只能有一个type了
ES默认是动态创建索引和索引类型的mapping的。这就相当于无需定义Solr中的Schema,无需指定各个field的索引规则就可以索引文件,很方便。但有时方便就代表着不灵活。比如,ES默认一个field是要做分词的,但我们有时要搜索匹配整个field却不行。如有统计工作要记录每个城市出现的次数。对于NAMEfield,若记录「new york」文本,ES可能会把它拆分成「new」和「york」这两个词,分别计算这个两个单词的次数,而不是我们期望的「new york 」。这时,就需要我们在创建索引时定义mapping。mapping文件如下:出处
{ "index_type":
{ "properties":
{
"ID":{
"type":"string", "index":"not_analyzed" },
"NAME":{ "
type":"string", "fields":{ "NAME":{ "type":"string" }, "raw":{ "type":"string", "index":"not_analyzed" } } }
}
}
}
聚合简介
ES提供了4种类型的聚合,包括:Bucekting、Metric、Matrix、Pipeline,主要是前2个
Metrix: 在一组document之上的计算指标,max、min、sum、avg等
Bucketing: 生成Bucketing的一组聚合,其中每个Bucketing都与一个键和一个Document的条件相关联。执行汇总时,将对上下文中的每个document评估所有bucketing条件,并且当条件匹配时,该文档将被视为“落入”相关buckeing内。对应SQL的groupBy
Matrix:一类聚合,可在多个字段上进行操作,并根据从请求的文档字段中提取的值生成矩阵结果。ES对它的支持还不完善,还不支持脚本
Pipeline: 汇总其他汇总及其相关指标的输出的汇总
聚合的结构:
"aggregations" : {
"<aggregation_name>" : {
"<aggregation_type>" : {
<aggregation_body>
}
[,"meta" : { [<meta_data_body>] } ]?
[,"aggregations" : { [<sub_aggregation>]+ } ]?
}
[,"<aggregation_name_2>" : { ... } ]*
}
说明:aggregations可以写成aggs
Metrix
Avg
比如一个学生分数的document,计算平均分:
POST /exams/_search?size=0
{
"aggs" : {
"avg_grade" : { "avg" : { "field" : "grade" } }
}
}
这里的aggregation_name是"avg_grade",自定义的。aggregation_type是"avg",类似于算子,ES提供的,filed指明avg在哪个field上执行,算子的参数。结果:
{
...
"aggregations": {
"avg_grade": {
"value": 75.0
}
}
}
avg可以基于脚本来编写:
POST /exams/_search?size=0
{
"aggs" : {
"avg_grade" : {
"avg" : {
"script" : {
"id": "my_script",
"params": {
"field": "grade"
}
}
}
}
}
}
这会将脚本参数解释为具有painless语言且没有脚本参数的嵌入式脚本。更进一步可以写为:
POST /exams/_search?size=0
{
"aggs" : {
"avg_corrected_grade" : {
"avg" : {
"field" : "grade",
"script" : {
"lang": "painless",
"source": "_value * params.correction",
"params" : {
"correction" : 1.2
},
"missing": 0
}
}
}
}
}
在原有的分数上做一个修正。missing参数指定缺省值。
与Avg类似的还有个加权平均:Weighted Avg,比Avg增加了weight参数,这个不多介绍了。
POST /exams/_search
{
"size": 0,
"aggs" : {
"weighted_grade": {
"weighted_avg": {
"value": {
"field": "grade"
},
"weight": {
"field": "weight"
}
}
}
}
}
extended_stats
cardinality等价于distinct,查询某个field不同值的个数,max、min、sum都类似于sql,不多介绍,简单介绍一个stats与extended_stats。stats是将某个field的count、min、max、avg、sum都计算出来,extended_stats比stats增加了平方和、方差、标准差等值
GET /exams/_search
{
"size": 0,
"aggs" : {
"grades_stats" : { "extended_stats" : { "field" : "grade" } }
}
}
结果如下:
{
...
"aggregations": {
"grades_stats": {
"count": 2,
"min": 50.0,
"max": 100.0,
"avg": 75.0,
"sum": 150.0,
"sum_of_squares": 12500.0,
"variance": 625.0,
"std_deviation": 25.0,
"std_deviation_bounds": {
"upper": 125.0,
"lower": 25.0
}
}
}
}
这一块用的是Get,有点疑惑,另外std_deviation_bounds这个值是:与测量值的正负两个标准差的间隔,正态分布的话,应该是95.4%的区间
percentiles
还有一个比较有趣的是percentiles,这个给出的是相应概率发生的位置,比如对加响应时间的分析:
GET latency/_search
{
"size": 0,
"aggs" : {
"load_time_outlier" : {
"percentiles" : {
"field" : "load_time"
}
}
}
}
结果是:
{
...
"aggregations": {
"load_time_outlier": {
"values" : {
"1.0": 5.0,
"5.0": 25.0,
"25.0": 165.0,
"50.0": 445.0,
"75.0": 725.0,
"95.0": 945.0,
"99.0": 985.0
}
}
}
}
Bucket
filter
介绍一些filter,类似于where的用法,比如计算t-shirt的平均价格:
POST /sales/_search?size=0
{
"aggs" : {
"t_shirts" : {
"filter" : { "term": { "type": "t-shirt" } },
"aggs" : {
"avg_price" : { "avg" : { "field" : "price" } }
}
}
}
}
结果:
{
...
"aggregations" : {
"t_shirts" : {
"doc_count" : 3,
"avg_price" : { "value" : 128.33333333333334 }
}
}
}
还有一种写法
POST /sales/_search?size=0
{
"query" : {
"constant_score" : {
"filter" : {
"match" : { "type" : "hat" }
}
}
},
"aggs" : {
"hat_prices" : { "sum" : { "field" : "price" } }
}
}
term
term有点类似与group by,分组统计个数,例如,汇总某种类型(genre)的document数量
GET /_search
{
"aggs" : {
"genres" : {
"terms" : { "field" : "genre" }
}
}
}
结果:
{
...
"aggregations" : {
"genres" : {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets" : [
{
"key" : "electronic",
"doc_count" : 6
},
{
"key" : "rock",
"doc_count" : 3
},
{
"key" : "jazz",
"doc_count" : 2
}
]
}
}
}
官方对这里有个说明:terms聚合应该是关键字类型的字段或适用于bucket聚合的任何其他数据类型。为了与text一起使用,您将需要启用fielddata。
默认情况下,terms汇总将返回doc_count排序的前十的 数据,可以通过设置size参数来更改此默认行为。另外还有一个
shard_size来对size过大时出现的性能问题进行调优。如果要分页来获取,需要使用Composite,这个后边介绍。
查询时可以通过order来对结果进行排序,如下:
GET /_search
{
"aggs" : {
"genres" : {
"terms" : {
"field" : "genre",
"order" : { "_count" : "asc" }
}
}
}
}
分组最大并排序:
GET /_search
{
"aggs" : {
"genres" : {
"terms" : {
"field" : "genre",
"order" : { "max_play_count" : "desc" }
},
"aggs" : {
"max_play_count" : { "max" : { "field" : "play_count" } }
}
}
}
}
在嵌套的情况下,有时会应用聚合的结果,这时通过">"来指明层级,而通过"."来引用某个metric
histogram
POST /sales/_search?size=0
{
"aggs" : {
"prices" : {
"histogram" : {
"field" : "price",
"interval" : 50,
"min_doc_count" : 1
}
}
}
}
interval来指明间隔、min_doc_count指明最少的个数,如果小于该值,则过滤掉该区间
结果:
{
...
"aggregations": {
"prices" : {
"buckets": [
{
"key": 0.0,
"doc_count": 1
},
{
"key": 50.0,
"doc_count": 1
},
{
"key": 150.0,
"doc_count": 2
},
{
"key": 200.0,
"doc_count": 3
}
]
}
}
}
date_histogram
最常用的按时间汇总,比如按月统计
POST /sales/_search?size=0
{
"aggs" : {
"sales_over_time" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month",
"format" : "yyyy-MM-dd"
}
}
}
}
这里的calendar_interval只能是单位时间,如:1d、1M等,key是时间戳,如果要显示文本,可以增加format,结果如下:
{
...
"aggregations": {
"sales_over_time": {
"buckets": [
{
"key_as_string": "2015-01-01",
"key": 1420070400000,
"doc_count": 3
},
{
"key_as_string": "2015-02-01",
"key": 1422748800000,
"doc_count": 2
},
{
"key_as_string": "2015-03-01",
"key": 1425168000000,
"doc_count": 2
}
]
}
}
}
如果是想自定义时间区间,可以用date_range来查询,这里不多说了。
composite
-
source
source用来指明构建composite bucket的源,source的顺序很重要,它控制返回key的顺序。
composite有3种source: term、histogram、Date histogram。可以但source,但感觉单source跟原意差别不大。下例组合date_histogram与 term,感觉像是两种source的笛卡尔积,这个得试一下,官网没给出
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } },
{ "product": { "terms": {"field": "product" } } }
]
}
}
}
}
-
pagination
通过size与after来制定分页
第一次查询:
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } },
{ "product": { "terms": {"field": "product" } } }
]
}
}
}
}
返回:
{
...
"aggregations": {
"my_buckets": {
"after_key": {
"date": 1494288000000,
"product": "mad max"
},
"buckets": [
{
"key": {
"date": 1494201600000,
"product": "rocky"
},
"doc_count": 1
},
{
"key": {
"date": 1494288000000,
"product": "mad max"
},
"doc_count": 2
}
]
}
}
}
第二次查询:
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } },
{ "product": { "terms": {"field": "product", "order": "asc" } } }
],
"after": { "date": 1494288000000, "product": "mad max" }
}
}
}
}
top_hits
在官网中,top_hits当成metric aggregator,但我觉得它主要与bucket连用。它追踪之前命中(hit)的document,这样可以在子聚合上对这些document进行某些操作。
top_hits可以有效的通过bucket aggregator来对某些指定field进行分组、排序等
它有几个参数:
- from: 偏移量
- size: 与其他size一样,指定个数
- sort: 对命中的document进行排序
- _source: 指定字段,如果不指定,则全部返回
比如通过某个字段来分桶,然后再通过metics进行聚合,并且想返回多个字段。
POST /my_index/_search
{
"size": 0,
"aggs" : {
"metaQunatity" : {
"terms" : {
"field" : "metaQuantityId"
},
"aggs" : {
"quantitySumPrice" : { "sum" : { "field" : "quantityTotalPrice" } },
"materialSumPrice" : { "sum" : { "field" : "materiaTotalPrice" } },
"aggs": {
"top_hits": {
"size": 1,
"_source":{
"includes": ["metaQuantityId", "metaQuantityName"]
}
}
}
}
}
}
}
Pipeline
这个只介绍一个:sum_bucket,可以对兄弟聚合的结果进行再次聚合
例如:
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"sum_monthly_sales": {
"sum_bucket": {
"buckets_path": "sales_per_month>sales"
}
}
}
}
sum_bucket与buckets_path是语法,sales_per_month>sales是指明对兄弟聚合的哪个field进行聚合,结果:
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375.0
}
}
]
},
"sum_monthly_sales": {
"value": 985.0
}
}
}
这个例子也给出一般的写法,用bucket来分组,用metrix来计算,最后可以通过pipeline对计算结果再次汇总。
Mapping
追加一节的Mapping
-
简介
首先Mapping是什么?前边说过Mapping等价于关系型数据库的Schema,它保存着一个document有哪些field,以及field的类型信息。用mapping可以定义:
哪些string field应该用于全文检索
哪些field包含number、date或者地理坐标
date值的formate
自定义规则来控制mapping动态增加field
每个field都有一个数据type,可以是:
一个简单的type,如:text、keyword、date、long、double、boolean 或者ip
json类型,用object或者nested
特殊类型: geo_point、geo_shape、completion
一个string field可以通过fields参数来指定即是text又是keyword
es在创建index可以动态的创建mapping(Dynamic mapping),当然也支持用户显示的定义mapping( Explicit mapping).显示创建index,如:
PUT http://localhost:9200/user
{
"mappings": {
"properties": {
"age": { "type": "integer" },
"email": { "type": "keyword" },
"name": { "type": "text" }
}
}
}
在已有index上增加一个已经存在的mapping:
PUT http://localhost:9200/user/_mapping
{
"properties": {
"employee-id": {
"type": "keyword",
"index": false
}
}
}
注意:除了修改field的参数,不能改变一个index的mapping或者field type。如果需要修改,需要重新创建一个index,然后通过reindex数据。 这个reindex的说明是:Copies documents from one index to another。
{
"source": {
"index": "old_index"
},
"dest": {
"index": "new_index"
}
}
另外也不能修改一个field的名,但可以通过alias来起个别名。
ps: 这一部分像极了create table
下边介绍几个类型,主要是:text、keyword、nested、object以及reindex
-
text
这个字段用来创建全文索引,如一封email的内容或者一个产品的描述。这个字段的内容会传递个analyzer进行分词,然后创建索引,这样就可以通过某个关键字进行搜索。Text字段不能用于sort以及极少极少用于aggregation。 例如:
PUT my_index
{
"mappings": {
"properties": {
"full_name": {
"type": "text"
}
}
}
}
前边说了极少极少,也就说其实是可以的,如果想进行聚合,那就需要用fielddata参数设成true。这个参数默认是false的,因为在第一使用它是,它是通过从磁盘读取每个段的整个反向索引而构建的,通过反转document与term的关系来构建通过document查询term的功能,并将结果存储在内存中。这个绝对是个无底洞,所以也就是极少极少啦。
PUT my_index/_mapping
{
"properties": {
"my_field": {
"type": "text",
"fielddata": true
}
}
}
text可以通过fields字段来指定兼容keyword。
{
"my_index": {
"mappings": {
"properties": {
"group": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
-
keyword
这个字段用来索引结构化的数据,如id、email的地址、hostname、status code。这与text是两个问题域。text是通过分词,来用term进行搜索document。keyword则是通过document来查询term进行filter、sorting、aggregation等。例如:
PUT my_index
{
"mappings": {
"properties": {
"tags": {
"type": "keyword"
}
}
}
}
keyword有个默认开启的属性:doc_value,这个属性在很多的field上都有。它就是。doc_values是在document index时建立的磁盘上数据结构,这使通过document查询term这种数据访问模式成为可能。默认开启就能通过手动关闭,这里不做介绍了。默认的就是最好的。
-
object与nested
json文档天热带有层级结构,如此就有object。如图:
PUT my_index/_doc/1
{
"region": "US",
"manager": {
"age": 30,
"name": {
"first": "John",
"last": "Smith"
}
}
}
manager本身是一个对象,它下边还有一个name对象,在内部document会将manager对象拍平来存储,比如:
{
"region": "US",
"manager.age": 30,
"manager.name.first": "John",
"manager.name.last": "Smith"
}
它的mapping如下。
PUT my_index
{
"mappings": {
"properties": {
"region": {
"type": "keyword"
},
"manager": {
"properties": {
"age": { "type": "integer" },
"name": {
"properties": {
"first": { "type": "text" },
"last": { "type": "text" }
}
}
}
}
}
}
}
这里要多说一句,ES中并没有专门的array类型,任何field都可以包括多个值,当然这些值必须是相同的数据类型。但在存储 Array时,会有一些变化,如:
PUT my_index/_doc/1
{
"group" : "fans",
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]
}
它在es中将会转换成如下结构:
{
"group" : "fans",
"user.first" : [ "alice", "john" ],
"user.last" : [ "smith", "white" ]
}
这种存储,使得Alice与White的关系不存在了,如果进行查询:
GET my_index/_search
{
"query": {
"bool": {
"must": [
{ "match": { "user.first": "Alice" }},
{ "match": { "user.last": "Smith" }}
]
}
}
}
数据就会查找出来
如何解决这个问题呢,就需要nested
,通过
PUT my_index
{
"mappings": {
"properties": {
"user": {
"type": "nested"
}
}
}
}
将user属性增加nested,来指明,这样它们的关系就存在了,查询就无法查询出来了
GET my_index/_search
{
"query": {
"nested": {
"path": "user",
"query": {
"bool": {
"must": [
{ "match": { "user.first": "Alice" }},
{ "match": { "user.last": "White" }}
]
}
}
}
}
}
这时候,从mapping中看,user的类型就是nested.
Logstash
Logstash只做简单的介绍
Logstash主要用于收集数据,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。
官网、简介、参考

Logstash事件处理管道包括3个阶段:inputs 、filters、outputs。
kibana
官网
kibana不介绍了,官网就比较清楚啦,从初步了解上看,它做了2个事情:将页面的配置转换成ES的查询;将查询的结果用图形显示出来。