ELK分享.md

ChangeLog 2020-12-10

这里对ELK的使用,进行整理,分成几大部分:

  1. Elasticsearch简介
  2. Elasitcsearch搜素
  3. Elasticsearch分析
  4. ELK联用

这一部分承接上一部分,主要介绍Eleasticsearch Stack
本篇是本系列第四篇,也是最后一篇

Elasticsearch简介

Elasticsearch有3个标签:Store, Search, and Analyze,前边的博客介绍了Store与Search,这里主要介绍Analyze。
官网参考

术语对比

ES是基于NoSql做的存储,其术语对比如下:

关系型数据库Elasticsearch
TableIndex(Type)
RowDocument
ColumnFiled
SchemaMapping
SQLDSL

ES前期一个Index可以有多个type,现在已经移除,一个Index只能有一个type了

Mapping简介

Mapping等价于关系型数据库的Schema,它保存着一个document有哪些field,以及field的类型信息。

field的类型包括,如:text、keyword、date、long、double、boolean 、object、nested
还有一些特殊类型: ip、geo_point、geo_shape、completion

一个string field可以通过fields参数来指定即是text又是keyword

es默认是动态创建索引和索引类型的mapping的。这就相当于无需定义Schema,无需指定各个field的索引规则就可以创建文件。但es自动推测的type并不能一直满足需要,这是就需要自定义mapping。

es在创建index可以动态的创建mapping(Dynamic mapping),当然也支持用户显示的定义mapping( Explicit mapping).显示创建index,如:

 PUT http://localhost:9200/user
 {
 "mappings": {
     "properties": {
     "age":    { "type": "integer" },  
     "email":  { "type": "keyword"  }, 
     "name":   { "type": "text"  }     
     }
   }
 }

在已有index上增加一个已经存在的mapping:

 PUT http://localhost:9200/user/_mapping
 {
 "properties": {
     "employee-id": {
     "type": "keyword",
     "index": false
     }
   }
 }

注意:除了修改field的参数,不能改变一个index的mapping或者field type。如果需要修改,需要重新创建一个index,然后通过reindex数据。 这个reindex的说明是:Copies documents from one index to another。

下边举两个常需要制定mapping的例子

text与keyword

text字段用来创建全文索引,如一封email的内容或者一个产品的描述。这个字段的内容会传递个analyzer进行分词,然后创建索引,这样就可以通过某个关键字进行搜索。Text字段不能用于sort以及极少极少用于aggregation。 例如:

 PUT my_index
 {
 "mappings": {
     "properties": {
     "full_name": {
         "type":  "text"
     }
     }
 }
 }

keyword用来索引结构化的数据,如id、email的地址、hostname、status code。这与text是两个问题域。text是通过分词,来用term进行搜索document。keyword则是通过document来查询term进行filter、sorting、aggregation等。例如:

 PUT my_index
 {
 "mappings": {
     "properties": {
     "tags": {
         "type":  "keyword"
     }
     }
 }
 }

object与nested

  • object

    json文档自带有层级结构,如此就有object。如下

      PUT my_index/_doc/1
      { 
      "region": "US",
      "manager": { 
          "age":     30,
          "name": { 
          "first": "John",
          "last":  "Smith"
          }
      }
    }
    

    manager本身是一个对象,它下边还有一个name对象,在内部document会将manager对象拍平来存储,比如:

      {
      "region":             "US",
      "manager.age":        30,
      "manager.name.first": "John",
      "manager.name.last":  "Smith"
      }
    

    它的mapping如下。

      PUT my_index
      {
      	"mappings": {
          "properties": { 
          	"region": {
              "type": "keyword"
          	},
          	"manager": { 
              "properties": {
              	"age":  { "type": "integer" },
              	"name": { 
                  "properties": {
                  "first": { "type": "text" },
                  "last":  { "type": "text" }
                  }
              	}
              }
          	}
          }
      	}
      }
    
  • nested

    ES中并没有专门的array类型,任何field都可以包括多个值,当然这些值必须是相同的数据类型。但在存储 Array时,会有一些变化,如:

      PUT my_index/_doc/1
      {
      "group" : "fans",
      "user" : [ 
          {
          "first" : "John",
          "last" :  "Smith"
          },
          {
          "first" : "Alice",
          "last" :  "White"
          }
        ]
      }
    

    它在es中将会转换成如下结构:

     {
      "group" :        "fans",
      "user.first" : [ "Alice", "John" ],
      "user.last" :  [ "Smith", "White" ]
      }
    

    这种存储,使得Alice与White的关系不存在了,如果进行查询:

      GET my_index/_search
      {
      "query": {
          "bool": {
          "must": [
              { "match": { "user.first": "Alice" }},
              { "match": { "user.last":  "Smith" }}
          ]
          }
      }
      }
    

    数据就会查找出来
    如何解决这个问题呢,就需要nested,通过

      PUT my_index
      {
      "mappings": {
          "properties": {
          "user": {
              "type": "nested" 
          }
          }
      }
      }
    

    将user属性增加nested,来指明,这样它们的关系就存在了,查询就无法查询出来了

    GET my_index/_search
     {
      "query": {
          "nested": {
          "path": "user",
          "query": {
              "bool": {
              "must": [
                  { "match": { "user.first": "Alice" }},
                  { "match": { "user.last":  "White" }} 
              ]
              }
          }
          }
      }
      }
    

Elasticsearch查询

这里先上一张图:
elasticsearch数据结构.png

filter与match

es的查询有分成2种:query上下文(query context)与 filter上下文(filter context)

query上下文:document与查询的条目(clause)匹配有多好,它会算一个score,用来表示匹配度。

​ 只要将查询的条目传递给query参数,query上下文就能生效。

filter上下午:document与查询的条目(clause)是否匹配,用于过滤。

​ 只要将查询的条目查询各filter参数,filter上下文就能生效,常用的在bool查询中或者filter聚合中。

实例如下:

GET /_search
{
  "query": { 
    "bool": { 
      "must": [
        { "match": { "title":   "Search"        }},
        { "match": { "content": "Elasticsearch" }}
      ],
      "filter": [ 
        { "term":  { "status": "published" }},
        { "range": { "publish_date": { "gte": "2015-01-01" }}}
      ]
    }
  }
}

最外层的query预示着query上下午;

bool与2各match是在此上下文中,用于给每个document打分。bool意味着越多的match分值越高。里边除了must还可以用must_not,should等

filter预示着filter上下文,term与range用于过滤不匹配的项。

分页数据

es默认返回前10条匹配数据,为了分页返回一个大的结果,用from、size两个参数。from是跳过的个数,size是获取的个数。

GET /_search
{
  "from": 5,
  "size": 20,
  "query": {
    "match": {
      "user.id": "kimchy"
    }
  }
}

避免一次性获取太多的数据,这样内存与cpu损耗过大。

仅获取制定field

默认情况下,es查询会返回所有的字段,为了查询指定的字段,需要使用fields参数,如下:

POST my-index-000001/_search
{
  "query": {
    "match": {
      "message": "foo"
    }
  },
  "fields": ["user.id", "@timestamp"],
  "_source": false
}

带有fields的参数的查询,除了直接查询数据,还会查询该index的mapping。

sort结果

用sort来对结果进行排序,sort是在每个字段上定义的,其中特殊字段名供 score 按分数排序,而_doc按索引顺序排序。

PUT /my-index-000001
{
  "mappings": {
    "properties": {
      "post_date": { "type": "date" },
      "user": {
        "type": "keyword"
      },
      "name": {
        "type": "keyword"
      },
      "age": { "type": "integer" }
    }
  }
}
GET /my-index-000001/_search
{
  "sort" : [
    { "post_date" : {"order" : "asc"}},
    "user",
    { "name" : "desc" },
    { "age" : "desc" },
    "_score"
  ],
  "query" : {
    "term" : { "user" : "kimchy" }
  }
}

获取多个indices的数据

es可以从多个索引中获取数据,如sql相比,更像是union的用法。

方法1:在路径中用逗号隔开

GET /my-index-000001,my-index-000002/_search
{
  "query": {
    "match": {
      "user.id": "kimchy"
    }
  }
}

方法2:使用通配符

GET /my-index-*/_search
{
  "query": {
    "match": {
      "user.id": "kimchy"
    }
  }
}

方法3:对全部索引查询使用_all 或者 *

GET /_all/_search
{
  "query": {
    "match": {
      "user.id": "kimchy"
    }
  }
}

GET /*/_search
{
  "query": {
    "match": {
      "user.id": "kimchy"
    }
  }
}

Elasticsearch分析

ES提供了3种类型的聚合,包括:Bucekting、Metrics、Pipeline,主要是前2个

  • Metrics: 在一组document之上的计算指标,max、min、sum、avg等

  • Bucketing: 生成Bucketing的一组聚合,其中每个Bucketing都与一个键和一个Document的条件相关联。执行汇总时,将对上下文中的每个document评估所有bucketing条件,并且当条件匹配时,该文档将被视为“落入”相关buckeing内。对应SQL的groupBy

  • Pipeline: 汇总其他汇总及其相关指标的输出的汇总

    聚合的结构:

    "aggregations" : {
        "<aggregation_name>" : {
            "<aggregation_type>" : {
                <aggregation_body>
            }
            [,"meta" : {  [<meta_data_body>] } ]?
            [,"aggregations" : { [<sub_aggregation>]+ } ]?
        }
        [,"<aggregation_name_2>" : { ... } ]*
    }
    

说明:aggregations可以写成aggs

Metrics

Avg

比如一个学生分数的document,计算平均分:

POST /exams/_search?size=0
{
    "aggs" : {
        "avg_grade" : { "avg" : { "field" : "grade" } }
    }
}

这里的aggregation_name是"avg_grade",自定义的。aggregation_type是"avg",类似于算子,ES提供的,filed指明avg在哪个field上执行,算子的参数。结果:

{
    ...
    "aggregations": {
        "avg_grade": {
            "value": 75.0
        }
    }
}

avg可以基于脚本来编写:

POST /exams/_search?size=0
{
    "aggs" : {
        "avg_grade" : {
            "avg" : {
                "script" : {
                    "id": "my_script",
                    "params": {
                        "field": "grade"
                    }
                }
            }
        }
    }
}

这会将脚本参数解释为具有painless语言且没有脚本参数的嵌入式脚本。更进一步可以写为:

POST /exams/_search?size=0
{
    "aggs" : {
        "avg_corrected_grade" : {
            "avg" : {
                "field" : "grade",
                "script" : {
                    "lang": "painless",
                    "source": "_value * params.correction",
                    "params" : {
                        "correction" : 1.2
                    },
                    "missing":  0
                }
            }
        }
    }
}

在原有的分数上做一个修正。missing参数指定缺省值。
与Avg类似的还有个加权平均:Weighted Avg,比Avg增加了weight参数,这个不多介绍了。

POST /exams/_search
{
    "size": 0,
    "aggs" : {
        "weighted_grade": {
            "weighted_avg": {
                "value": {
                    "field": "grade"
                },
                "weight": {
                    "field": "weight"
                }
            }
        }
    }
}

extended_stats

cardinality等价于distinct,查询某个field不同值的个数,max、min、sum都类似于sql,不多介绍,简单介绍一个stats与extended_stats。stats是将某个field的count、min、max、avg、sum都计算出来,extended_stats比stats增加了平方和、方差、标准差等值

GET /exams/_search
{
    "size": 0,
    "aggs" : {
        "grades_stats" : { "extended_stats" : { "field" : "grade" } }
    }
}

结果如下:

{
    ...

    "aggregations": {
        "grades_stats": {
           "count": 2,
           "min": 50.0,
           "max": 100.0,
           "avg": 75.0,
           "sum": 150.0,
           "sum_of_squares": 12500.0,
           "variance": 625.0,
           "std_deviation": 25.0,
           "std_deviation_bounds": {
            "upper": 125.0,
            "lower": 25.0
           }
        }
    }
}

这一块用的是Get,有点疑惑,另外std_deviation_bounds这个值是:与测量值的正负两个标准差的间隔,正态分布的话,应该是95.4%的区间

percentiles

还有一个比较有趣的是percentiles,这个给出的是相应概率发生的位置,比如对加响应时间的分析:

GET latency/_search
{
    "size": 0,
    "aggs" : {
        "load_time_outlier" : {
            "percentiles" : {
                "field" : "load_time" 
            }
        }
    }
}

结果是:

{
    ...

   "aggregations": {
      "load_time_outlier": {
         "values" : {
            "1.0": 5.0,
            "5.0": 25.0,
            "25.0": 165.0,
            "50.0": 445.0,
            "75.0": 725.0,
            "95.0": 945.0,
            "99.0": 985.0
         }
      }
   }
}

Bucket

filter

介绍一些filter,类似于where的用法,比如计算t-shirt的平均价格:

POST /sales/_search?size=0
{
    "aggs" : {
        "t_shirts" : {
            "filter" : { "term": { "type": "t-shirt" } },
            "aggs" : {
                "avg_price" : { "avg" : { "field" : "price" } }
            }
        }
    }
}

结果:

{
    ...
    "aggregations" : {
        "t_shirts" : {
            "doc_count" : 3,
            "avg_price" : { "value" : 128.33333333333334 }
        }
    }
}

还有一种写法

POST /sales/_search?size=0
{
    "query" : {
        "constant_score" : {
            "filter" : {
                "match" : { "type" : "hat" }
            }
        }
    },
    "aggs" : {
        "hat_prices" : { "sum" : { "field" : "price" } }
    }
}

terms

term有点类似与group by,分组统计个数,例如,汇总某种类型(genre)的document数量

GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : { "field" : "genre" } 
        }
    }
}

结果:

{
    ...
    "aggregations" : {
        "genres" : {
            "doc_count_error_upper_bound": 0, 
            "sum_other_doc_count": 0, 
            "buckets" : [ 
                {
                    "key" : "electronic",
                    "doc_count" : 6
                },
                {
                    "key" : "rock",
                    "doc_count" : 3
                },
                {
                    "key" : "jazz",
                    "doc_count" : 2
                }
            ]
        }
    }
}

官方对这里有个说明:terms聚合应该是关键字类型的字段或适用于bucket聚合的任何其他数据类型。为了与text一起使用,您将需要启用fielddata。

默认情况下,terms汇总将返回doc_count排序的前十的 数据,可以通过设置size参数来更改此默认行为。另外还有一个
shard_size来对size过大时出现的性能问题进行调优。如果要分页来获取,需要使用Composite,这个后边介绍。

查询时可以通过order来对结果进行排序,如下:

GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : {
                "field" : "genre",
                "order" : { "_count" : "asc" }
            }
        }
    }
}

分组最大并排序:

GET /_search
{
    "aggs" : {
        "genres" : {
            "terms" : {
                "field" : "genre",
                "order" : { "max_play_count" : "desc" }
            },
            "aggs" : {
                "max_play_count" : { "max" : { "field" : "play_count" } }
            }
        }
    }
}

在嵌套的情况下,有时会应用聚合的结果,这时通过">"来指明层级,而通过"."来引用某个metric

histogram

POST /sales/_search?size=0
{
    "aggs" : {
        "prices" : {
            "histogram" : {
                "field" : "price",
                "interval" : 50,
                "min_doc_count" : 1
            }
        }
    }
}

interval来指明间隔、min_doc_count指明最少的个数,如果小于该值,则过滤掉该区间
结果:

{
    ...
    "aggregations": {
        "prices" : {
            "buckets": [
                {
                    "key": 0.0,
                    "doc_count": 1
                },
                {
                    "key": 50.0,
                    "doc_count": 1
                },
                {
                    "key": 150.0,
                    "doc_count": 2
                },
                {
                    "key": 200.0,
                    "doc_count": 3
                }
            ]
        }
    }
}

date_histogram

最常用的按时间汇总,比如按月统计

POST /sales/_search?size=0
{
    "aggs" : {
        "sales_over_time" : {
            "date_histogram" : {
                "field" : "date",
                "calendar_interval" : "month",
                "format" : "yyyy-MM-dd" 
            }
        }
    }
}

这里的calendar_interval只能是单位时间,如:1d、1M等,key是时间戳,如果要显示文本,可以增加format,结果如下:

{
    ...
    "aggregations": {
        "sales_over_time": {
            "buckets": [
                {
                    "key_as_string": "2015-01-01",
                    "key": 1420070400000,
                    "doc_count": 3
                },
                {
                    "key_as_string": "2015-02-01",
                    "key": 1422748800000,
                    "doc_count": 2
                },
                {
                    "key_as_string": "2015-03-01",
                    "key": 1425168000000,
                    "doc_count": 2
                }
            ]
        }
    }
}

如果是想自定义时间区间,可以用date_range来查询,这里不多说了。

composite

  • source
    source用来指明构建composite bucket的源,source的顺序很重要,它控制返回key的顺序。
    composite有3种source: term、histogram、Date histogram。可以但source,但感觉单source跟原意差别不大。下例组合date_histogram与 term,感觉像是两种source的笛卡尔积,这个得试一下,官网没给出

    GET /_search
      {
          "size": 0,
          "aggs" : {
              "my_buckets": {
                  "composite" : {
                      "sources" : [
                          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } },
                          { "product": { "terms": {"field": "product" } } }
                      ]
                  }
              }
          }
      }
    
  • pagination
    通过size与after来制定分页

    第一次查询:

    GET /_search
    {
        "size": 0,
        "aggs" : {
            "my_buckets": {
                "composite" : {
                    "size": 2,
                    "sources" : [
                        { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } },
                        { "product": { "terms": {"field": "product" } } }
                    ]
                }
            }
        }
    }
    

    返回:

    {
          ...
          "aggregations": {
              "my_buckets": {
                  "after_key": {
                      "date": 1494288000000,
                      "product": "mad max"
                  },
                  "buckets": [
                      {
                          "key": {
                              "date": 1494201600000,
                              "product": "rocky"
                          },
                          "doc_count": 1
                      },
                      {
                          "key": {
                              "date": 1494288000000,
                              "product": "mad max"
                          },
                          "doc_count": 2
                      }
                  ]
              }
          }
    }
    

    第二次查询:

      GET /_search
      {
          "size": 0,
          "aggs" : {
              "my_buckets": {
                  "composite" : {
                      "size": 2,
                      "sources" : [
                          { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } },
                          { "product": { "terms": {"field": "product", "order": "asc" } } }
                      ],
                      "after": { "date": 1494288000000, "product": "mad max" } 
                  }
              }
          }
      }
    

top_hits

在官网中,top_hits当成metric aggregator,但我觉得它主要与bucket连用。它追踪之前命中(hit)的document,这样可以在子聚合上对这些document进行某些操作。
top_hits可以有效的通过bucket aggregator来对某些指定field进行分组、排序等

它有几个参数:

  • from: 偏移量
  • size: 与其他size一样,指定个数
  • sort: 对命中的document进行排序
  • _source: 指定字段,如果不指定,则全部返回

比如通过某个字段来分桶,然后再通过metics进行聚合,并且想返回多个字段。

POST /my_index/_search
  {
    "size": 0,
    "aggs" : {
        "metaQunatity" : {
            "terms" : {
                "field" : "metaQuantityId"
            },
            "aggs" : {
                "quantitySumPrice" : { "sum" : { "field" : "quantityTotalPrice" } },
                "materialSumPrice" : { "sum" : { "field" : "materiaTotalPrice" } },
                "aggs": {
                        "top_hits": {
                            "size": 1,
                            "_source":{
                                "includes": ["metaQuantityId", "metaQuantityName"]
                            }

                        }
                }

            }
        }
    }
}

Pipeline

这个只介绍一个:sum_bucket,可以对兄弟聚合的结果进行再次聚合

例如:

POST /sales/_search
{
    "size": 0,
    "aggs" : {
        "sales_per_month" : {
            "date_histogram" : {
                "field" : "date",
                "calendar_interval" : "month"
            },
            "aggs": {
                "sales": {
                    "sum": {
                        "field": "price"
                    }
                }
            }
        },
        "sum_monthly_sales": {
            "sum_bucket": {
                "buckets_path": "sales_per_month>sales" 
            }
        }
    }
}

sum_bucket与buckets_path是语法,sales_per_month>sales是指明对兄弟聚合的哪个field进行聚合,结果:

{
   "took": 11,
   "timed_out": false,
   "_shards": ...,
   "hits": ...,
   "aggregations": {
      "sales_per_month": {
         "buckets": [
            {
               "key_as_string": "2015/01/01 00:00:00",
               "key": 1420070400000,
               "doc_count": 3,
               "sales": {
                  "value": 550.0
               }
            },
            {
               "key_as_string": "2015/02/01 00:00:00",
               "key": 1422748800000,
               "doc_count": 2,
               "sales": {
                  "value": 60.0
               }
            },
            {
               "key_as_string": "2015/03/01 00:00:00",
               "key": 1425168000000,
               "doc_count": 2,
               "sales": {
                  "value": 375.0
               }
            }
         ]
      },
      "sum_monthly_sales": {
          "value": 985.0
      }
   }
}

这个例子也给出一般的写法,用bucket来分组,用metrix来计算,最后可以通过pipeline对计算结果再次汇总。

ELK技术栈联用

Logstash

Logstash只做简单的介绍
Logstash主要用于收集数据,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

官网简介参考

logstash结构示意图.png
Logstash事件处理管道包括3个阶段:inputs 、filters、outputs。

  • inputs
    input产生事件,源可以包括:file、syslog、redis、beats

  • filters
    filter转换数据,包括:grok结构化数据;对数据进行修改、替换、删除数据,比如修改日期格式;geoip对ip增加地理信息等

  • outputs
    输出,一般就输出到es中

kibana

官网
kibana不介绍了,官网就比较清楚啦,从初步了解上看,它做了2个事情:将页面的配置转换成ES的查询;将查询的结果用图形显示出来。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×