ELK日志搭建

1. 前言

之前我们就用过Elasticsearch来进行统计工作,也搭建过ELK来收集日志,但效果并不太理想,这次深入的来整理整理这块。

我们用ELK来做什么?

  1. 定位故障

    将日志进行收集,可以帮助分析异常。这些日志可以从各业务服务的异常log来获取。

    这可用sentry也可以来做,但从Promethues配合ELK能更好的实现。

  2. 业务分析

    通过日志,分析业务指标。这些日志可以从网关的访问日志中获取。

    业务指标主要围绕组织、用户、功能:

    • 组织
      • 组织日活
      • 组织活跃用户数
      • 组织关注的功能
    • 用户
      • 用户日活
      • 不同用户关注的功能
    • 功能
      • topN功能

2. 案例

下边来看一个ELK收集nginx的案例:参考

  • 整体流程
  • nginx日志配置
  • filebeat
  • logstash
  • kibana

3. 设计

  • 日志来源

    前面已经说过目前日志有2个来源: 业务服务与网关

    • 业务服务主要用于故障的定位,将业务日志中的warning、errors追踪到;

    • 网关既可以是nginx,也可以从gateway中开始,主要用于业务的分析;

    gateway也是springboot,我们直接从springBoot中收集日志即可。

  • 流程

    案例中流程:nginx服务器 => filebeat => kafaka => logstash => es =>kibana,根据我们的需要要进行简化。

    整个流程为:springBoot日志 => logstash => es => kibana

4. SpringBoot日志

既然从SpringBoot中抽取日志,那么先来看看SpringBoot日志框架。

4.1 日志框架

SpringBoot使用SLF4J作为接口,默认使用LogBack实现,支持Log4J、Log4J2、commons-logging、jboss-logging 等。

slf4j示例

log4J作者又写了logBack来提升,log4J2又参考logBack;

springboot日志详解SpringBoot日志框架的选择

日志的功能

  • 日志级别:TRACE、DEBUG、INFO、WARN、ERROR、FATAL、OFF

  • 格式:以指定的格式输出

  • 控制台输出:输出到控制台

  • 日志文件:输出到日志文件,以及日志文件的路径、大小等控制

对与配置文件,在类路径下放上每个日志框架自己的配置文件即可,对于logback而言,日志文件通常是 logback-spring.xml(推荐)也可以是logback.xml等

4.2 整合Logstash

参考

整合Logstash2步骤即可:1. 增加logstash-logback-encoder;2.配置logback-spring.xml

先在pom.xml整增加依赖

<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>5.3</version>
</dependency>

然后在在类路径(application.yml同级)上增加logback-spring.xml,整个配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <include resource="org/springframework/boot/logging/logback/defaults.xml" />

    <springProperty scope="context" name="appName" source="spring.application.name"/>
<!--    <springProperty scope="context" name="logPath" source="app.logPath"/>-->
    <property name="logPath" value="/home/sun/logs/tomcat"/>

    <!-- 控制台输出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr([%15.15t]){faint} %clr(---){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <!-- 日志文件输出 -->
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>
                {
                "@timestamp": "%d{yyyy-MM-dd HH:mm:ss.SSS}",
                "logLevel": "%level",
                "appName": "${appName:-}",
                "pid": "${PID:-}",
                "thread": "%thread",
                "logger": "%logger{40}",
                "message": "%message"
                }%n
            </pattern>
            <charset>UTF-8</charset>
        </encoder>
        <!-- 在日滚动文件中,强制只保存错误INFO级别以上的信息 -->
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
        <!--文件夹配置-->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--日志文件输出的文件名-->
            <FileNamePattern>${logPath}/${appName}-%d{yyyy-MM-dd}.log</FileNamePattern>
            <!--日志文件保留天数-->
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <!--日志文件最大的大小-->
        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <MaxFileSize>10MB</MaxFileSize>
        </triggeringPolicy>
    </appender>

    <!--logstash配置-->
    <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>127.0.0.1:5000</destination>
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>WARN</level>
        </filter>
        <!-- 日志输出编码 -->
        <encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>UTC</timeZone>
                </timestamp>
                <pattern>
                    <pattern>
                        {
                            "logLevel": "%level",
                            "appName": "${appName:-}",
                            "pid": "${PID:-}",
                            "thread": "%thread",
                            "logger": "%logger{40}",
                            "message": "%message"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
    </appender>

    <!--指定最基础的日志输出级别-->
    <root level="INFO">
        <!--appender将会添加到这个loger-->
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="FILE"/>
        <appender-ref ref="LOGSTASH"/>
    </root>

</configuration>

3个输出:console、file、logstash

分别配置不同的级别,对于业务服务logstash中保存到warn级别,对于网关服务可以保存到Info

4.3 gateway日志

gateway日志收集

参考nginx的配置的一些命名方式,定义了如下字段:

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GatewayLog {
    // 该字段用于logstash对message进行filter
    private String messageType = "JSON_LOG";

    /**谁**/
    // 客户端ip
    private String clientIp;

    // 客户端浏览器
    private String userAgent;

    /**请求**/
    // 请求地址,浏览器中输入的地址
    private String host;

    // 来源页面,从哪个页面请求过来的
    private String referer;

    // 请求方法
    private String method;

    // 请求路由
    private String path;

    // 请求参数
    private String args;

    // 完整的uri
    private String uri;

    /**谁**/
    // 服务
    private String server;

    /**响应**/
    // 整个响应时间
    private Long startTime;
    private Long endTime;
    private Long requestTime;

    // 响应的状态
    private Integer status;

    // 响应的字节数
    private Integer byteToSent;
}

再增加GlobalFilter进行日志的记录,参考:参考1参考2

5. elk安装

这里采用了docker-compose.yml的方式,如下:

version: '3.4'

services:
  diego-elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2
    hostname: elasticsearch
    volumes:
      -  /home/sun/database/elasticsearch_data:/usr/share/elasticsearch/data
    configs:
      - source: elastic_config
        target: /usr/share/elasticsearch/config/elasticsearch.yml
    environment:
      discovery.type: single-node
      bootstrap.memory_lock: 'true'
      ES_JAVA_OPTS: "-Xmx512m -Xms512m"
      ELASTIC_PASSWORD: ELK_91801
    networks:
      - diego-network
    ports: 
      - "9200:9200"
      - "9300:9300"
    deploy:
      replicas: 1

  diego-logstash:
    image: docker.elastic.co/logstash/logstash:7.15.2
    hostname: diego-logstash
    configs:
      - source: logstash_config
        target: /usr/share/logstash/config/logstash.yml
      - source: logstash_pipeline
        target: /usr/share/logstash/pipeline/logstash.conf
    ports:
      - "9600:9600"
      - "5044:5044"
      - "5000:5000/tcp"
      - "5000:5000/udp"
    environment:
      LS_JAVA_OPTS: "-Xmx256m -Xms256m"
    networks:
      - diego-network
    depends_on:
      - diego-elasticsearch
    deploy:
      replicas: 1

  diego-kibana:
    image: docker.elastic.co/kibana/kibana:7.15.2
    hostname: diego-kibana
    configs:
      - source: kibana_config
        target: /usr/share/kibana/config/kibana.yml
    ports: 
      - "5601:5601"
    networks:
      - diego-network
    depends_on:
      - diego-elasticsearch
    deploy:
      replicas: 1

configs:
  elastic_config:
    file: /home/sun/deploy/elk/configs/elasticsearch.yml
  logstash_config:
    file: /home/sun/deploy/elk/configs/logstash.yml
  logstash_pipeline:
    file: /home/sun/deploy/elk/configs/logstash.conf
  kibana_config:
    file: /home/sun/deploy/elk/configs/kibana.yml  

networks:
  diego-network: 
    external: true

使用docker stack deploy -c docker-compose.yml es即可。

elk的几个配置文件见配置文件,篇幅问题,这里不全放了,logstash的配置见下文

6. logstash配置

这里要对gateway的日志进一步说明一下。

gateway日志与业务服务日志目的不同,考虑将它们分开:

这里有3种方法:

  • 其一:1个pipeline多个input
  • 其二:1个pipilne多个outpout
  • 其三: 2个pipeline

这里查看应了第2种方式,整个logstash.config如下:

# 输入
input {
	tcp {
	   mode => "server"
        host => "0.0.0.0"
        port => 5000
	   codec => json {
		   charset => "UTF-8"
	   }
	#    add_field => {"[@metadata][myid]" => "gateway_log" }
	}
}

filter {
	mutate {
		gsub => ["message", "\\x", "\\\x"]
	}

	if ( 'method":"HEAD' in [message] ){
		drop {}
	}

	# 这里针对message是JSON的情况,进行解构,JSON_LOG在业务中定义
	if ( 'JSON_LOG' in [message] ) {
		json {
			source => "message"
			remove_field => "message"
		}
	}
}

# 输出
output {
	if  [appName] == "gateway" {
		elasticsearch {
			hosts => "diego-elasticsearch:9200"
			index => "gateway-log-%{+YYYY.MM.dd}"
			user => "elastic"
			password => "ELK_91801"
			ecs_compatibility => disabled
		}
	} else {
		elasticsearch {
			hosts => "diego-elasticsearch:9200"
			index => "server-log-%{+YYYY.MM.dd}"
			user => "elastic"
			password => "ELK_91801"
			ecs_compatibility => disabled
		}
	}	
	#  打开这里可以发到标准输出,查看传输的内容
	stdout {
		codec => "rubydebug"
	}
}

相关参考

  1. https://www.elastic.co/guide/en/logstash/current/index.html

  2. https://cloud.tencent.com/developer/article/1671480?from=10680

7. kibana使用

通过http://127.0.0.1:5601访问kibana

  • 先到stack Management(导航栏下部) 中去创建索引模式(索引的模糊匹配)

  • 再到Discover中查看数据

  • 再下一步创建业务指标的仪表板

    这块等后边再完善

8. 结语

本文主要总结了ELK日志的搭建过程,其主要的流程为:

springBoot日志 => logstash => es => kibana

# 微服务 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×