logstash+mysql同步

logstash简介

简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。

1、安装logstash

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

在/etc/yum.repos.d/目录下 新创建文件logstash.repo

vim /etc/yum.repos.d/logstash.repo

写入如下内容:

[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

执行安装命令:

yum install logstash
rpm -ql logstash #查看logstash的安装目录
whereis logstash
ln -s /usr/share/logstash/bin/logstash /bin/ #创建软链接

测试安装是否成功

logstash -e 'input { stdin { } } output { stdout {} }' 

Ctl+c 结束

2、上传mysql-connector-java-8.0.17.jar mysql驱动

mkdir -p /data/logstash

将mysql-connector-java-8.0.17.jar 放到/data/logstash目录下

3、编写logstash-input-jdbc.conf配置文件

vim /data/logstash/logstash-input-jdbc.conf

内容如下(全量、增量都包括):

input {
  jdbc {
    # mysql相关jdbc配置
    jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "123456"

    # jdbc连接mysql驱动的文件目录可去官网下载:https://dev.mysql.com/downloads/connector/j/
    jdbc_driver_library => "/data/logstash/mysql-connector-java-8.0.17.jar"
    # the name of the driver class for mysql
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => true
    jdbc_page_size => "50000"

    jdbc_default_timezone =>"Asia/Shanghai"

    # mysql文件, 也可以直接写SQL语句在此处后面一定跟上排序否则有可能遗漏数据或者一直处理同一批数据如下
     statement => "select * from user where id > :sql_last_value order by id asc"
     #statement_filepath => "./config/jdbc.sql"

    #启用追踪则需要指定tracking_column默认是timestamp()
    use_column_value => true

    # 如果 use_column_value 为真,需配置此参数. track 的数据库 column , column 必须是递增的. 一般是mysql主键
    tracking_column => "id"
    #追踪字段的类型目前只有数字(numeric)和时间类型(timestamp)默认是数字类型()
    tracking_column_type => "numeric"
    # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    last_run_metadata_path => "/data/logstash/logstash_capital_last_id"

    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => false

    #是否将 字段(column) 名称转小写
    lowercase_column_names => false
     # 这里类似crontab,可以定制定时操作比如每分钟执行一次同步(    )
    schedule => "* * * * *"
    #如果配置多个数据源需要用type来区分
    type => "cms"
  }
}
filter {}
output {
  #使用if语句判断type来指定输出的块()
  if[type]=="cms"{
      elasticsearch {
        hosts => ["localhost:9200"]
        index => "cms"
        document_id => "%{id}"
        template_overwrite => true
      }
  }

  # 这里输出调试正式运行时可以注释掉
  stdout {
      codec => json_lines
  }
}

4、手动运行logstash

logstash -f /data/logstash/logstash-input-jdbc.conf

5、通过配置文件后台启动 logstash

nohup  logstash -f /data/logstash/logstash-input-jdbc.conf  &

6、通过systemd方式设置开机启动

将/data/logstash/logstash-input-jdbc.conf 复制到 /etc/logstash/conf.d/ 目录下

cp /data/logstash/logstash-input-jdbc.conf /etc/logstash/conf.d/
systemctl enable logstash.service #设置开机启动
systemctl start logstash.service #手动开启
systemctl restart logstash.service # 重启
systemctl stop logstash.service #停止
systemctl status logstash.service #查看状态

7、查看运行日志

tail -f /var/log/logstash/logstash-plain.log #查看运行日志

《centos通过yum安装elasticsearch8(flask网站全文检索方案一)》

《cento中安装logstash并同步mysql数据到elasticsearch(flask网站全文检索方案二)》

《python集成elasticsearch 通用类(flask网站全文检索方案三)》