Logstash

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。详细说明可以看官方文档。

下载地址:https://www.elastic.co/cn/logstash

实践

功能:数据同步,把mysql里面的数据同步到elasticsearch中

logstash有两种同步方式一种是以id作为同步边界,一种是以update_time作为同步边界。logstash有一个定时任务,定时任务到期后就会进行同步。

如果是以id为同步边界的话只会同步新增的数据,无法同步修改的数据。以update_time作为同步边界,初次同步的话会记录下更新的时间,下次同步的话就会把新增的数据以及修改过的数据进行同步。

这里采用的是update_time方式进行同步。

准备工作

把下载好的包上传到服务器(logstash-6.4.3.tar.gz),与elasticsearch的版本一致,解压到/usr/local/目录下

由于要连接mysql所以需要用到mysql驱动包,用的版本是mysql-connector-java-5.1.41.jar

创建一个foodie-items索引

创建目录

到时候需要把驱动包,Logstash同步配置文件,还有同步的数据库语句放到这个目录下。

cd /usr/local/logstash-6.4.3

mkdir sync

把驱动包放到sync目录下

创建需要同步的数据库语句

vim foodie-items.sql

需要注意的是表中的updated_time时间要比当前的时间还要大,才可以同步过去

SELECT

i.id AS itemId,

i.item_name AS itemName,

i.sell_counts AS sellCounts,

ii.url AS imgUrl,

tempSpec.price_discount AS price,

i.updated_time AS updated_time

FROM

items i

LEFT JOIN items_img ii ON i.id = ii.item_id

LEFT JOIN ( SELECT item_id, MIN( price_discount ) AS price_discount FROM items_spec GROUP BY item_id ) tempSpec ON i.id = tempSpec.item_id

WHERE

ii.is_main = 1

AND

i.updated_time >= :sql_last_value

sql_last_value是Logstash每次在同步完以后的一个边界值,logstash会自动加上

logstash同步配置文件

一个输入,一个输出

input {

jdbc {

# 设置 MySql/MariaDB 数据库url以及数据库名称

jdbc_connection_string => "jdbc:mysql://192.168.1.14:3306/foodie-shop-dev?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true"

# 用户名和密码

jdbc_user => "root"

jdbc_password => "671354"

# 数据库驱动所在位置,可以是绝对路径或者相对路径

jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-5.1.41.jar"

# 驱动类名

jdbc_driver_class => "com.mysql.jdbc.Driver"

# 开启分页

jdbc_paging_enabled => "true"

# 分页每页数量,可以自定义

jdbc_page_size => "1000"

# 执行的sql文件路径

statement_filepath => "/usr/local/logstash-6.4.3/sync/foodie-items.sql"

# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务

schedule => "* * * * *"

# 索引类型

type => "_doc"

# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件

use_column_value => true

# 记录上一次追踪的结果值(不需要手动创建)

last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time"

# 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间

tracking_column => "updated_time"

# tracking_column 对应字段的类型

tracking_column_type => "timestamp"

# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录

clean_run => false

# 数据库字段名称大写转小写

lowercase_column_names => false

}

}

output {

elasticsearch {

# es地址

hosts => ["192.168.233.132:9200"]

# 同步的索引名

index => "foodie-items"

# 设置_docID和数据相同

# document_id => "%{id}"

document_id => "%{itemId}"

}

# 日志输出

stdout {

codec => json_lines

}

}

同步启动

./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf

第一次没有track_time这个文件,所以系统会有一个默认的时间。

后面就会生成一个track_time的文件,里面记录着这次同步的时间。

@timestamp是logstash自己记录的时间,后面同步的话只有数据库的update_time时间大于@timestamp才会同步过来。

@timestamp表示的是本次同步的时间,如果下次同步数据的时候就会把update和该列的时间进行比较不一样就会进行同步

并且会在sync目录下记录本次同步的时间,下次只有update_time大于该时间的才会进行同步

效果

自定义模版配置中文分词器

logstash默认的模版是没有配置中文分词的,可以自己定义一个。

查看默认模版

http://192.168.233.132:9200/_template/logstash/

自定义模版

myik为自定义模版的名称

http://192.168.233.132:9200/_template/myik/

{

"order": 0,

"version": 1,

"index_patterns": [

"*"

],

"settings": {

"index": {

"refresh_interval": "5s"

}

},

"mappings": {

"_default_": {

"dynamic_templates": [

{

"message_field": {

"path_match": "message",

"match_mapping_type": "string",

"mapping": {

"type": "text",

"norms": false

}

}

},

{

"string_fields": {

"match": "*",

"match_mapping_type": "string",

"mapping": {

"type": "text",

"norms": false,

"analyzer": "ik_max_word",

"fields": {

"keyword": {

"type": "keyword",

"ignore_above": 256

}

}

}

}

}

],

"properties": {

"@timestamp": {

"type": "date"

},

"@version": {

"type": "keyword"

},

"geoip": {

"dynamic": true,

"properties": {

"ip": {

"type": "ip"

},

"location": {

"type": "geo_point"

},

"latitude": {

"type": "half_float"

},

"longitude": {

"type": "half_float"

}

}

}

}

}

},

"aliases": {}

}

把上面自定义的模版进行提交,同步数据就会使用这个模版了

创建一个新的索引

创建一个foodie-items-ik索引,并修改logstash-db-sync.conf配置文件,输出的索引需要改下index => “foodie-items-ik”

重新进行同步

./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf

只要是text类型的都使用上了中文分词器