Flask+elasticsearch

安装elasticsearch模块

pip install elasticsearch

esutil.py工具类

from elasticsearch import Elasticsearch


class MyElasticsearch():

    def __init__(self, index_name: str,index_type: str,  ip="http://127.0.0.1:9200"):

        # self.es = Elasticsearch([ip], http_auth=('elastic', 'password'), port=9200)
        self.es = Elasticsearch(ip).options(
            request_timeout=20,
            retry_on_timeout=True,
            ignore_status=[400, 404]
        )
        self.index_type = index_type
        self.index_name = index_name

    def create_index(self):
        #创建索引
        if self.es.indices.exists(index=self.index_name) is True:
            self.es.indices.delete(index=self.index_name)
        self.es.indices.create(index=self.index_name, ignore=400)

    def delete_index(self):
        #删除索引
        try:
            self.es.indices.delete(index=self.index_name)
        except:
            pass

    def get_doc(self, uid):
        #根据id获取索引
        return self.es.get(index=self.index_name, id=uid)

    def insert_one(self, doc: dict,id = None):
        #插入单条数据
        self.es.index(index=self.index_name,id=id, body=doc)

    def insert_array(self, docs: list):
        #插入多条数据(没有实现id)
        for doc in docs:
            self.es.index(index=self.index_name, body=doc)

    def search(self, keyword, count: int = 30):
        #查询 同时查询 content和title 内容
        q = {
            "query":{
                "bool": {
                    "should":[
                        {
                            "multi_match":{
                                "query":      keyword,
                                "fields":     [ "content", "title" ]
                            }
                        }

                    ]
                }
            }
        }
        match_data = self.es.search(
            index=self.index_name, body=q, size=count)
        return match_data

简单测试代码

"""
先安装 pip install elasticsearch
"""
from esutil import MyElasticsearch

es = MyElasticsearch(index_name = 'cms',index_type='article',ip='http://es.youhuild.com:80' )

#创建索引
es.create_index()

#插入数据
a = {
    'id': 1,
    'ctime': '2021-07-23 16:28:00',
    'title': 'elasticsearch 测试标题',
    'content': '这个内容是文章内容'
}
es.insert_one(a)

#全文检索
result = es.search(keyword='文')

print(result)

##print(result['hits']['hits'])

《centos通过yum安装elasticsearch8(flask网站全文检索方案一)》

《cento中安装logstash并同步mysql数据到elasticsearch(flask网站全文检索方案二)》

《python集成elasticsearch 通用类(flask网站全文检索方案三)》