合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
[TOC] # filter ## 1. filter作用 > Logstash过滤器根据规则提取数据 ## 2. 过滤器种类 * [ ] date过滤器:获得时间格式的数据 * [ ] extractnumbers过滤器:从字符串中提取数字 * [ ] grok过滤器:使用grok格式提取数据中的指定内容,比如从一长串的日志中,提取出ip地址 ### 2.1 date过滤器 > 用于解析事件中的时间数据,日期过滤器用于解析字段中的日期,然后使用该日期或时间戳作为事件的logstash时间戳。 > 默认情况下如果不加date插件的话,在kibana上创建索引模式的时候,选择的时间戳字段是logstash处理日志的时间,如果日志输出不是实时的,显示日志时间与对应的事件是不符合的(即es存储日志的时间字段是当前时间,不是log实际发生的时间) ~~~dart match => ["timestamp", "ISO8601", "dd-MMM-yyyy HH:mm:ss.SSS", "yyyy-MM-dd HH:mm:ss"] 默认target指向的就是@timestamp,意思就是如果timestamp匹配上后面自定义的日志格式,就会替换对应的@times ~~~ 可配置的参数: | 参数 | 作用 | 参数类型 | | --- | ---| --- | | locale | 指定用于日期解析的区域设置 |string | | match| 如何匹配时间格式 |array | | tag_on_failure | 匹配失败后追加的内容 | array | | target | 匹配成功后的内容需要设置的目标字段 | string | | timezone | 指定用于日期解析的时区规范ID |string | 配置 ``` input { redis { key => "logstash-date" host => "localhost" password => "dailearn" port => 6379 db => "0" data_type => "list" type => "date" } } filter { date { match => [ "message", "yyyy-MM-dd HH:mm:ss" ] locale => "Asia/Shanghai" timezone => "Europe/Paris" target => "messageDate" } } output { stdout { codec => rubydebug } } ``` 测试数据 2020-05-07 23:59:59 控制台输出 ``` { "@timestamp" => 2020-05-12T13:47:26.094Z, "type" => "date", "tags" => [ [0] "_jsonparsefailure" ], "@version" => "1", "message" => "2020-05-07 23:59:59", "messageDate" => 2020-05-07T21:59:59.000Z } ``` 需要注意我是在2020-05-12 21:47:26发送的消息,而服务器使用的时区为UTC/GMT 0。所以这个时候我插入了2020-05-07 23:59:59的时间,被指定为Europe/Paris时区,所以最后被转为2020-05-07T21:59:59.000Z ### 2.2 extractnumbers过滤器 注意默认情况下次过滤器为捆绑需要执行bin/logstash-plugin install logstash-filter-extractnumbers操作安装插件 此过滤器自动提取字符串中找到的所有数字 可配置的参数 ![](https://img.kancloud.cn/af/72/af728770dafcb72b8e461e04898733e3_964x82.png) 配置 ``` input { redis { key => "logstash-extractnumbers" host => "localhost" password => "dailearn" port => 6379 db => "0" data_type => "list" type => "extractnumbers" } } filter { extractnumbers { source => "message" target => "message2" } } ``` ``` output { stdout { codec => rubydebug } } ``` 测试数据 zhangSan5,age16 + 0.5 456 789 控制台输出 { "float1" => 0.5, "int2" => 789, "int1" => 456, "@timestamp" => 2020-05-17T03:51:23.695Z, "@version" => "1", "type" => "extractnumbers", "message" => "zhangSan5,age16 + 0.5 456 789", "tags" => [ [0] "_jsonparsefailure" ] } 虽然文档中介绍,此过滤器会尝试提取出字符串中所有的字段,但是实际中部分和字母结合的字符串结构并没有被提取出来,而那些被字母和其他符号被包裹起来的数字没有被完整的提取出来,比如之前测试的时候使用的zhangSan5,age16 + 0.5,456[789],888这样的数据就没有任何内容被提取出来。 ### 2.3 grok过滤器 > Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字符串,转换成为具体的结构化的数据,配合正则表达式使用。 下面针对Apache日志来分割处理 ![](https://img.kancloud.cn/59/b1/59b1b84967304c71fffbf14cd8ca475e_712x268.png) 下面是apache日志 ![](https://img.kancloud.cn/0e/3a/0e3a07461bfed61b77d240fcc812bf71_710x64.png) 日志中每个字段之间空格隔开,分别对应message中的字段。 如:%{IPORHOST:addre}  --> 192.168.10.197   但问题是IPORHOST又不是正则表达式,怎么能匹配IP地址呢? 因为IPPRHOST是grok表达式,它代表的正则表达式如下: ![](https://img.kancloud.cn/8d/d2/8dd2e54a5ac6864329a0805cf1516358_725x382.png) IPORHOST代表的是ipv4或者ipv6或者HOSTNAME所匹配的grok表达式。 上面的IPORHOST有点复杂,我们来看看简单点的,如USER ``` USERNAME \[a-zA-Z0-9.\_-\]+      ``` #USERNAME是匹配由字母,数字,“.”, "\_", "-"组成的任意字符 USER %{USERNAME} #USER代表USERNAME的正则表达式 第一行,用普通的正则表达式来定义一个 grok 表达式; 第二行,通过打印赋值格式,用前面定义好的 grok 表达式来定义另一个 grok 表达式。 **grok的语法:** %{syntax:semantic} syntax代表的是正则表达式替代字段,semantic是代表这个表达式对应的字段名,你可以自由命名。这个命名尽量能简单易懂的表达出这个字段代表的意思。 logstash安装时就带有已经写好的正则表达式。路径如下: /usr/local/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns 或者直接访问https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns 上面IPORHOST,USER等都是在里面已经定义好的!当然还有其他的,基本能满足我们的需求 **日志匹配** 当我们拿到一段日志,按照上面的grok表达式一个个去匹配时,我们如何确定我们匹配的是否正确呢? http://grokdebug.herokuapp.com/ 这个地址可以满足我们的测试需求。就拿上面apache的日志测试。 ![](https://img.kancloud.cn/8c/fc/8cfc03fd858727ace5bdf38bb4669ede_942x290.png) 点击后就出现如下数据,你写的每个grok表达式都获取到值了。为了测试准确,可以多测试几条日志。 ~~~ {   "addre": [     [       "192.168.10.97"     ]   ],   "HOSTNAME": [     [       "192.168.10.97",       "192.168.10.175"     ] ...........中间省略多行...........   "http_referer": [     [       "http://192.168.10.175/"     ]   ],   "URIPROTO": [     [       "http"     ]   ],   "URIHOST": [     [       "192.168.10.175"     ]   ],   "IPORHOST": [     [       "192.168.10.175"     ]   ],   "User_Agent": [     [       "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36"     ]   ] } ~~~ 每条日志总有些字段是没有数据显示,然后以“-”代替的。所有我们在匹配日志的时候也要判断。 如:(?:%{NUMBER:bytes}|-)   但是有些字符串是在太长,如:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36 我们可以使用%{GREEDYDATA browser}.          对应的grok表达式: GREEDYDATA  .\*    #GREEDYDATA表达式的意思能匹配任意字符串 **自定义grok表达式** 如果你感觉logstash自带的grok表达式不能满足需要,你也可以自己定义 如: ![](https://img.kancloud.cn/49/df/49dfd520d0fa2f833bbde64d5725ee5c_705x261.png) patterns\_dir为自定义的grok表达式的路径。 自定义的patterns中按照logstash自带的格式书写。 APACHE\_LOG %{IPORHOST:addre} %{USER:ident} %{USER:auth} \\\[%{HTTPDATE:timestamp}\\\] \\"%{WORD:http\_method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\\" %{NUMBER:status} (?:%{NUMBER:bytes}|-) \\"(?:%{URI:http\_referer}|-)\\" \\"%{GREEDYDATA:User\_Agent}\\" 我只是把apache日志匹配的grok表达式写入自定义文件中,简化conf文件。单个字段的正则表达式匹配你可以自己书写测试。 ### 2.3 提取Java日志字段 一般来说,我们从filebeat或者其他地方拿到下面的日志: ~~~text 2018-04-13 16:03:49.822 INFO o.n.p.j.c.XXXXX - Star Calculator ~~~ 然后想把它其中的一些信息提取出来再扔到es中存储,我们就需要grok match把日志信息切分成[索引数据](https://www.zhihu.com/search?q=%E7%B4%A2%E5%BC%95%E6%95%B0%E6%8D%AE&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D)(match本质是一个正则匹配) [grok match](https://www.zhihu.com/search?q=grok+match&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D): ~~~json match => { "message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type} %{JAVAFILE:log_file} - %{GREEDYDATA:log_content}"} ~~~ 切出来的数据 ~~~json { "log_date": [ [ "2018-04-13" ] ], "log_localtime": [ [ "16:03:49.822" ] ], "HOUR": [ [ "16" ] ], "MINUTE": [ [ "03" ] ], "SECOND": [ [ "49.822" ] ], "log_type": [ [ "INFO" ] ], "log_file": [ [ "o.n.p.j.c.XXXX" ] ], "log_content": [ [ "Star Calculator" ] ] } ~~~ 上面所有切出来的field都是es中[mapping index](https://www.zhihu.com/search?q=mapping+index&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D),都可以在用来做条件查询. [grokdebug.herokuapp.com](https://link.zhihu.com/?target=http%3A//grokdebug.herokuapp.com/)里面可以做测试. [grokdebug.herokuapp.com/patterns](https://link.zhihu.com/?target=http%3A//grokdebug.herokuapp.com/patterns)所有可用的patterns都可以在这里查到. 现在我们在用的配置见/logstash/logstash-k8s.conf Q: 需要指定mapping index的数据类型怎么办? A: grok match本质是一个正则匹配,默认出来的数据都是String.有些时候我们知道某个值其实是个数据类型,这时候可以直接指定数据类型. 不过match中仅支持直接转换成int ,float,语法是 %{NUMBER:response\_time:int} 完整配置: ~~~json match => { "message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type} %{JAVAFILE:log_file} - %{WORD:method} %{URIPATHPARAM:uri} %{NUMBER:status:int} %{NUMBER:size:int} %{NUMBER:response_time:int}"} ~~~ Q:[索引文件](https://www.zhihu.com/search?q=%E7%B4%A2%E5%BC%95%E6%96%87%E4%BB%B6&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D)想需要按日期分别存放,怎么办? A: out中指定index格式,如 index=> "k8s-%{+YYYY.MM.dd}" 完整out如下: ~~~json output { elasticsearch { hosts => "${ES_URL}" manage_template => false index => "k8s-%{+YYYY.MM.dd}" } } ~~~ 完整[logstash.conf](https://www.zhihu.com/search?q=logstash.conf&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D) ~~~text input { beats { host => "0.0.0.0" port => 5043 } } filter { if [type] == "kube-logs" { mutate { rename => ["log", "message"] } date { match => ["time", "ISO8601"] remove_field => ["time"] } grok { match => { "source" => "/var/log/containers/%{DATA:pod_name}_%{DATA:namespace}_%{GREEDYDATA:container_name}-%{DATA:container_id}.log"} match => { "message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type} %{JAVAFILE:log_file} - %{WORD:method} %{URIPATHPARAM:uri} %{NUMBER:status:int} %{NUMBER:size:int} %{NUMBER:response_time:int}"} remove_field => ["source"] break_on_match => false } } } output { elasticsearch { hosts => "${ES_URL}" manage_template => false index => "k8s-%{+YYYY.MM.dd}" } } ~~~