查询引擎之二

这一部分承接上一部分,主要介绍Eleasticsearch Stack
本篇是本系列第四篇,也是最后一篇

Elasticsearch

前一段时间,在接触spring-boot时有过接触,当时的博客
这里先上一张图:
elasticsearch数据结构.png

再看看ES的术语:

关系型数据库Elasticsearch
TableIndex(Type)
RowDocument
ColumnFiled
SchemaMapping
SQLDSL

Elasticsearch有3个标签:Store, Search, and Analyze,前边的博客介绍了Store与Search,这里主要介绍Analyze。
官网参考

ES前期一个Index可以有多个type,现在已经移除,一个Index只能有一个type了
ES默认是动态创建索引和索引类型的mapping的。这就相当于无需定义Solr中的Schema,无需指定各个field的索引规则就可以索引文件,很方便。但有时方便就代表着不灵活。比如,ES默认一个field是要做分词的,但我们有时要搜索匹配整个field却不行。如有统计工作要记录每个城市出现的次数。对于NAMEfield,若记录「new york」文本,ES可能会把它拆分成「new」和「york」这两个词,分别计算这个两个单词的次数,而不是我们期望的「new york 」。这时,就需要我们在创建索引时定义mapping。mapping文件如下:出处

{ "index_type":
    { "properties":
        { 
            "ID":{ 
                "type":"string", "index":"not_analyzed" }, 
            "NAME":{ "
                type":"string", "fields":{ "NAME":{ "type":"string" }, "raw":{ "type":"string", "index":"not_analyzed" } } }
         } 
    } 
}

聚合简介

ES提供了4种类型的聚合,包括:Bucekting、Metric、Matrix、Pipeline,主要是前2个
Metrix: 在一组document之上的计算指标,max、min、sum、avg等
Bucketing: 生成Bucketing的一组聚合,其中每个Bucketing都与一个键和一个Document的条件相关联。执行汇总时,将对上下文中的每个document评估所有bucketing条件,并且当条件匹配时,该文档将被视为“落入”相关buckeing内。对应SQL的groupBy
Matrix:一类聚合,可在多个字段上进行操作,并根据从请求的文档字段中提取的值生成矩阵结果。ES对它的支持还不完善,还不支持脚本
Pipeline: 汇总其他汇总及其相关指标的输出的汇总

聚合的结构:

"aggregations" : {
    "<aggregation_name>" : {
        "<aggregation_type>" : {
            <aggregation_body>
        }
        [,"meta" : {  [<meta_data_body>] } ]?
        [,"aggregations" : { [<sub_aggregation>]+ } ]?
    }
    [,"<aggregation_name_2>" : { ... } ]*
}

说明:aggregations可以写成aggs

Metrix

Avg

比如一个学生分数的document,计算平均分:

POST /exams/_search?size=0
{
    "aggs" : {
        "avg_grade" : { "avg" : { "field" : "grade" } }
    }
}

这里的aggregation_name是"avg_grade",自定义的。aggregation_type是"avg",类似于算子,ES提供的,filed指明avg在哪个field上执行,算子的参数。结果:

{
    ...
    "aggregations": {
        "avg_grade": {
            "value": 75.0
        }
    }
}

avg可以基于脚本来编写:

POST /exams/_search?size=0
{
    "aggs" : {
        "avg_grade" : {
            "avg" : {
                "script" : {
                    "id": "my_script",
                    "params": {
                        "field": "grade"
                    }
                }
            }
        }
    }
}

这会将脚本参数解释为具有painless语言且没有脚本参数的嵌入式脚本。更进一步可以写为:

POST /exams/_search?size=0
{
    "aggs" : {
        "avg_corrected_grade" : {
            "avg" : {
                "field" : "grade",
                "script" : {
                    "lang": "painless",
                    "source": "_value * params.correction",
                    "params" : {
                        "correction" : 1.2
                    },
                    "missing":  0
                }
            }
        }
    }
}

在原有的分数上做一个修正。missing参数指定缺省值。
与Avg类似的还有个加权平均:Weighted Avg,比Avg增加了weight参数,这个不多介绍了。

POST /exams/_search
{
    "size": 0,
    "aggs" : {
        "weighted_grade": {
            "weighted_avg": {
                "value": {
                    "field": "grade"
                },
                "weight": {
                    "field": "weight"
                }
            }
        }
    }
}

extended_stats

cardinality等价于distinct,查询某个field不同值的个数,max、min、sum都类似于sql,不多介绍,简单介绍一个stats与extended_stats。stats是将某个field的count、min、max、avg、sum都计算出来,extended_stats比stats增加了平方和、方差、标准差等值

GET /exams/_search
{
    "size": 0,
    "aggs" : {
        "grades_stats" : { "extended_stats" : { "field" : "grade" } }
    }
}

结果如下:

{
    ...

    "aggregations": {
        "grades_stats": {
           "count": 2,
           "min": 50.0,
           "max": 100.0,
           "avg": 75.0,
           "sum": 150.0,
           "sum_of_squares": 12500.0,
           "variance": 625.0,
           "std_deviation": 25.0,
           "std_deviation_bounds": {
            "upper": 125.0,
            "lower": 25.0
           }
        }
    }
}

这一块用的是Get,有点疑惑,另外std_deviation_bounds这个值是:与测量值的正负两个标准差的间隔,正态分布的话,应该是95.4%的区间

percentiles

还有一个比较有趣的是percentiles,这个给出的是相应概率发生的位置,比如对加响应时间的分析:

GET latency/_search
{
    "size": 0,
    "aggs" : {
        "load_time_outlier" : {
            "percentiles" : {
                "field" : "load_time" 
            }
        }
    }
}

结果是:

{
    ...

   "aggregations": {
      "load_time_outlier": {
         "values" : {
            "1.0": 5.0,
            "5.0": 25.0,
            "25.0": 165.0,
            "50.0": 445.0,
            "75.0": 725.0,
            "95.0": 945.0,
            "99.0": 985.0
         }
      }
   }
}

Bucket

filter

介绍一些filter,类似于where的用法,比如计算t-shirt的平均价格:

POST /sales/_search?size=0
{
    "aggs" : {
        "t_shirts" : {
            "filter" : { "term": { "type": "t-shirt" } },
            "aggs" : {
                "avg_price" : { "avg" : { "field" : "price" } }
            }
        }
    }
}

结果:

{
    ...
    "aggregations" : {
        "t_shirts" : {
            "doc_count" : 3,
            "avg_price" : { "value" : 128.33333333333334 }
        }
    }
}

还有一种写法

POST /sales/_search?size=0
{
    "query" : {
        "constant_score" : {
            "filter" : {
                "match" : { "type" : "hat" }
            }
        }
    },
    "aggs" : {
        "hat_prices" : { "sum" : { "field" : "price" } }
    }
}

term

term有点类似与group by,分组统计个数,例如,汇总某种类型(genre)的document数量

GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : { "field" : "genre" } 
        }
    }
}

结果:

{
    ...
    "aggregations" : {
        "genres" : {
            "doc_count_error_upper_bound": 0, 
            "sum_other_doc_count": 0, 
            "buckets" : [ 
                {
                    "key" : "electronic",
                    "doc_count" : 6
                },
                {
                    "key" : "rock",
                    "doc_count" : 3
                },
                {
                    "key" : "jazz",
                    "doc_count" : 2
                }
            ]
        }
    }
}

官方对这里有个说明:terms聚合应该是关键字类型的字段或适用于bucket聚合的任何其他数据类型。为了与text一起使用,您将需要启用fielddata。

默认情况下,terms汇总将返回doc_count排序的前十的 数据,可以通过设置size参数来更改此默认行为。另外还有一个
shard_size来对size过大时出现的性能问题进行调优。如果要分页来获取,需要使用Composite,这个后边介绍。

查询时可以通过order来对结果进行排序,如下:

GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : {
                "field" : "genre",
                "order" : { "_count" : "asc" }
            }
        }
    }
}

分组最大并排序:

GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : {
                "field" : "genre",
                "order" : { "max_play_count" : "desc" }
            },
            "aggs" : {
                "max_play_count" : { "max" : { "field" : "play_count" } }
            }
        }
    }
}

在嵌套的情况下,有时会应用聚合的结果,这时通过">"来指明层级,而通过"."来引用某个metric

histogram

POST /sales/_search?size=0
{
    "aggs" : {
        "prices" : {
            "histogram" : {
                "field" : "price",
                "interval" : 50,
                "min_doc_count" : 1
            }
        }
    }
}

interval来指明间隔、min_doc_count指明最少的个数,如果小于该值,则过滤掉该区间
结果:

{
    ...
    "aggregations": {
        "prices" : {
            "buckets": [
                {
                    "key": 0.0,
                    "doc_count": 1
                },
                {
                    "key": 50.0,
                    "doc_count": 1
                },
                {
                    "key": 150.0,
                    "doc_count": 2
                },
                {
                    "key": 200.0,
                    "doc_count": 3
                }
            ]
        }
    }
}

date_histogram

最常用的按时间汇总,比如按月统计

POST /sales/_search?size=0
{
    "aggs" : {
        "sales_over_time" : {
            "date_histogram" : {
                "field" : "date",
                "calendar_interval" : "month",
                "format" : "yyyy-MM-dd" 
            }
        }
    }
}

这里的calendar_interval只能是单位时间,如:1d、1M等,key是时间戳,如果要显示文本,可以增加format,结果如下:

{
    ...
    "aggregations": {
        "sales_over_time": {
            "buckets": [
                {
                    "key_as_string": "2015-01-01",
                    "key": 1420070400000,
                    "doc_count": 3
                },
                {
                    "key_as_string": "2015-02-01",
                    "key": 1422748800000,
                    "doc_count": 2
                },
                {
                    "key_as_string": "2015-03-01",
                    "key": 1425168000000,
                    "doc_count": 2
                }
            ]
        }
    }
}

如果是想自定义时间区间,可以用date_range来查询,这里不多说了。

composite

  • source
    source用来指明构建composite bucket的源,source的顺序很重要,它控制返回key的顺序。
    composite有3种source: term、histogram、Date histogram。可以但source,但感觉单source跟原意差别不大。下例组合date_histogram与 term,感觉像是两种source的笛卡尔积,这个得试一下,官网没给出

    GET /_search
      {
          "size": 0,
          "aggs" : {
              "my_buckets": {
                  "composite" : {
                      "sources" : [
                          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } },
                          { "product": { "terms": {"field": "product" } } }
                      ]
                  }
              }
          }
      }
    
  • pagination
    通过size与after来制定分页

    第一次查询:

    GET /_search
    {
        "size": 0,
        "aggs" : {
            "my_buckets": {
                "composite" : {
                    "size": 2,
                    "sources" : [
                        { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } },
                        { "product": { "terms": {"field": "product" } } }
                    ]
                }
            }
        }
    }
    

    返回:

    {
          ...
          "aggregations": {
              "my_buckets": {
                  "after_key": {
                      "date": 1494288000000,
                      "product": "mad max"
                  },
                  "buckets": [
                      {
                          "key": {
                              "date": 1494201600000,
                              "product": "rocky"
                          },
                          "doc_count": 1
                      },
                      {
                          "key": {
                              "date": 1494288000000,
                              "product": "mad max"
                          },
                          "doc_count": 2
                      }
                  ]
              }
          }
    }
    

    第二次查询:

      GET /_search
      {
          "size": 0,
          "aggs" : {
              "my_buckets": {
                  "composite" : {
                      "size": 2,
                      "sources" : [
                          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } },
                          { "product": { "terms": {"field": "product", "order": "asc" } } }
                      ],
                      "after": { "date": 1494288000000, "product": "mad max" } 
                  }
              }
          }
      }
    

top_hits

在官网中,top_hits当成metric aggregator,但我觉得它主要与bucket连用。它追踪之前命中(hit)的document,这样可以在子聚合上对这些document进行某些操作。
top_hits可以有效的通过bucket aggregator来对某些指定field进行分组、排序等

它有几个参数:

  • from: 偏移量
  • size: 与其他size一样,指定个数
  • sort: 对命中的document进行排序
  • _source: 指定字段,如果不指定,则全部返回

比如通过某个字段来分桶,然后再通过metics进行聚合,并且想返回多个字段。

POST /my_index/_search
  {
    "size": 0,
    "aggs" : {
        "metaQunatity" : {
            "terms" : {
                "field" : "metaQuantityId"
            },
            "aggs" : {
                "quantitySumPrice" : { "sum" : { "field" : "quantityTotalPrice" } },
                "materialSumPrice" : { "sum" : { "field" : "materiaTotalPrice" } },
                "aggs": {
                        "top_hits": {
                            "size": 1,
                            "_source":{
                                "includes": ["metaQuantityId", "metaQuantityName"]
                            }

                        }
                }

            }
        }
    }
}

Pipeline

这个只介绍一个:sum_bucket,可以对兄弟聚合的结果进行再次聚合

例如:

POST /sales/_search
{
    "size": 0,
    "aggs" : {
        "sales_per_month" : {
            "date_histogram" : {
                "field" : "date",
                "calendar_interval" : "month"
            },
            "aggs": {
                "sales": {
                    "sum": {
                        "field": "price"
                    }
                }
            }
        },
        "sum_monthly_sales": {
            "sum_bucket": {
                "buckets_path": "sales_per_month>sales" 
            }
        }
    }
}

sum_bucket与buckets_path是语法,sales_per_month>sales是指明对兄弟聚合的哪个field进行聚合,结果:

{
   "took": 11,
   "timed_out": false,
   "_shards": ...,
   "hits": ...,
   "aggregations": {
      "sales_per_month": {
         "buckets": [
            {
               "key_as_string": "2015/01/01 00:00:00",
               "key": 1420070400000,
               "doc_count": 3,
               "sales": {
                  "value": 550.0
               }
            },
            {
               "key_as_string": "2015/02/01 00:00:00",
               "key": 1422748800000,
               "doc_count": 2,
               "sales": {
                  "value": 60.0
               }
            },
            {
               "key_as_string": "2015/03/01 00:00:00",
               "key": 1425168000000,
               "doc_count": 2,
               "sales": {
                  "value": 375.0
               }
            }
         ]
      },
      "sum_monthly_sales": {
          "value": 985.0
      }
   }
}

这个例子也给出一般的写法,用bucket来分组,用metrix来计算,最后可以通过pipeline对计算结果再次汇总。

Mapping

追加一节的Mapping

  • 简介
    首先Mapping是什么?前边说过Mapping等价于关系型数据库的Schema,它保存着一个document有哪些field,以及field的类型信息。用mapping可以定义:
    哪些string field应该用于全文检索
    哪些field包含number、date或者地理坐标
    date值的formate
    自定义规则来控制mapping动态增加field

    每个field都有一个数据type,可以是:
    一个简单的type,如:text、keyword、date、long、double、boolean 或者ip
    json类型,用object或者nested
    特殊类型: geo_point、geo_shape、completion

    一个string field可以通过fields参数来指定即是text又是keyword

    es在创建index可以动态的创建mapping(Dynamic mapping),当然也支持用户显示的定义mapping( Explicit mapping).显示创建index,如:

     PUT http://localhost:9200/user
     {
     "mappings": {
         "properties": {
         "age":    { "type": "integer" },  
         "email":  { "type": "keyword"  }, 
         "name":   { "type": "text"  }     
         }
       }
     }
    

    在已有index上增加一个已经存在的mapping:

     PUT http://localhost:9200/user/_mapping
     {
     "properties": {
         "employee-id": {
         "type": "keyword",
         "index": false
         }
       }
     }
    

    注意:除了修改field的参数,不能改变一个index的mapping或者field type。如果需要修改,需要重新创建一个index,然后通过reindex数据。 这个reindex的说明是:Copies documents from one index to another。

     {
     "source": {
         "index": "old_index"
     },
     "dest": {
         "index": "new_index"
     }
     }
    

    另外也不能修改一个field的名,但可以通过alias来起个别名。

    ps: 这一部分像极了create table

    下边介绍几个类型,主要是:text、keyword、nested、object以及reindex

  • text
    这个字段用来创建全文索引,如一封email的内容或者一个产品的描述。这个字段的内容会传递个analyzer进行分词,然后创建索引,这样就可以通过某个关键字进行搜索。Text字段不能用于sort以及极少极少用于aggregation。 例如:

     PUT my_index
     {
     "mappings": {
         "properties": {
         "full_name": {
             "type":  "text"
         }
         }
     }
     }
    

    前边说了极少极少,也就说其实是可以的,如果想进行聚合,那就需要用fielddata参数设成true。这个参数默认是false的,因为在第一使用它是,它是通过从磁盘读取每个段的整个反向索引而构建的,通过反转document与term的关系来构建通过document查询term的功能,并将结果存储在内存中。这个绝对是个无底洞,所以也就是极少极少啦。

    PUT my_index/_mapping
     {
     "properties": {
         "my_field": { 
         "type":     "text",
         "fielddata": true
         }
     }
     }
    

    text可以通过fields字段来指定兼容keyword。

    {
         "my_index": {
             "mappings": {
                 "properties": {
                     "group": {
                         "type": "text",
                         "fields": {
                             "keyword": {
                                 "type": "keyword",
                                 "ignore_above": 256
                             }
                         }
                     }
                 }
             }
         }
     }
    
  • keyword
    这个字段用来索引结构化的数据,如id、email的地址、hostname、status code。这与text是两个问题域。text是通过分词,来用term进行搜索document。keyword则是通过document来查询term进行filter、sorting、aggregation等。例如:

     PUT my_index
     {
     "mappings": {
         "properties": {
         "tags": {
             "type":  "keyword"
         }
         }
     }
     }
    

    keyword有个默认开启的属性:doc_value,这个属性在很多的field上都有。它就是。doc_values是在document index时建立的磁盘上数据结构,这使通过document查询term这种数据访问模式成为可能。默认开启就能通过手动关闭,这里不做介绍了。默认的就是最好的。

  • object与nested

    json文档天热带有层级结构,如此就有object。如图:

      PUT my_index/_doc/1
      { 
      "region": "US",
      "manager": { 
          "age":     30,
          "name": { 
          "first": "John",
          "last":  "Smith"
          }
      }
    }
    

    manager本身是一个对象,它下边还有一个name对象,在内部document会将manager对象拍平来存储,比如:

      {
      "region":             "US",
      "manager.age":        30,
      "manager.name.first": "John",
      "manager.name.last":  "Smith"
      }
    

    它的mapping如下。

    PUT my_index
    {
    "mappings": {
        "properties": { 
        "region": {
            "type": "keyword"
        },
        "manager": { 
            "properties": {
            "age":  { "type": "integer" },
            "name": { 
                "properties": {
                "first": { "type": "text" },
                "last":  { "type": "text" }
                }
            }
            }
        }
        }
    }
    }
    

    这里要多说一句,ES中并没有专门的array类型,任何field都可以包括多个值,当然这些值必须是相同的数据类型。但在存储 Array时,会有一些变化,如:

    PUT my_index/_doc/1
    {
    "group" : "fans",
    "user" : [ 
        {
        "first" : "John",
        "last" :  "Smith"
        },
        {
        "first" : "Alice",
        "last" :  "White"
        }
      ]
    }
    

    它在es中将会转换成如下结构:

    {
    "group" :        "fans",
    "user.first" : [ "alice", "john" ],
    "user.last" :  [ "smith", "white" ]
    }
    

    这种存储,使得Alice与White的关系不存在了,如果进行查询:

    GET my_index/_search
    {
    "query": {
        "bool": {
        "must": [
            { "match": { "user.first": "Alice" }},
            { "match": { "user.last":  "Smith" }}
        ]
        }
    }
    }
    

    数据就会查找出来
    如何解决这个问题呢,就需要nested,通过

    PUT my_index
    {
    "mappings": {
        "properties": {
        "user": {
            "type": "nested" 
        }
        }
    }
    }
    

    将user属性增加nested,来指明,这样它们的关系就存在了,查询就无法查询出来了

    GET my_index/_search
    {
     "query": {
         "nested": {
         "path": "user",
         "query": {
             "bool": {
             "must": [
                 { "match": { "user.first": "Alice" }},
                 { "match": { "user.last":  "White" }} 
             ]
             }
         }
         }
     }
     }
    

    这时候,从mapping中看,user的类型就是nested.

Logstash

Logstash只做简单的介绍
Logstash主要用于收集数据,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

官网简介参考

logstash结构示意图.png
Logstash事件处理管道包括3个阶段:inputs 、filters、outputs。

  • inputs
    input产生事件,源可以包括:file、syslog、redis、beats

  • filters
    filter转换数据,包括:grok结构化数据;对数据进行修改、替换、删除数据,比如修改日期格式;geoip对ip增加地理信息等

  • outputs
    输出,一般就输出到es中

kibana

官网
kibana不介绍了,官网就比较清楚啦,从初步了解上看,它做了2个事情:将页面的配置转换成ES的查询;将查询的结果用图形显示出来。

# 大数据 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×