跳到主要内容

设计搜索服务

问题

如何用 Python 设计一个搜索服务?Elasticsearch 的核心原理是什么?

答案

架构

Elasticsearch 操作

search/es_client.py
from elasticsearch import AsyncElasticsearch

es = AsyncElasticsearch(["http://localhost:9200"])

# 创建索引
async def create_index():
await es.indices.create(
index="articles",
body={
"settings": {
"analysis": {
"analyzer": {
"ik_smart_analyzer": {
"type": "custom",
"tokenizer": "ik_smart",
}
}
}
},
"mappings": {
"properties": {
"title": {"type": "text", "analyzer": "ik_smart_analyzer"},
"content": {"type": "text", "analyzer": "ik_max_word"},
"tags": {"type": "keyword"},
"created_at": {"type": "date"},
"author_id": {"type": "integer"},
}
},
},
)

# 索引文档
async def index_article(article: dict):
await es.index(index="articles", id=article["id"], body=article)

# 批量索引
async def bulk_index(articles: list[dict]):
from elasticsearch.helpers import async_bulk
actions = [
{"_index": "articles", "_id": a["id"], "_source": a}
for a in articles
]
await async_bulk(es, actions)

搜索服务

search/service.py
from dataclasses import dataclass

@dataclass
class SearchResult:
total: int
items: list[dict]
aggregations: dict

class SearchService:
async def search(
self,
query: str,
page: int = 1,
size: int = 20,
tags: list[str] | None = None,
sort_by: str = "_score",
) -> SearchResult:
body = {
"query": {
"bool": {
"must": [
{
"multi_match": {
"query": query,
"fields": ["title^3", "content"], # title 权重 3 倍
"type": "best_fields",
}
}
],
"filter": [],
}
},
"highlight": {
"fields": {"title": {}, "content": {"fragment_size": 150}},
"pre_tags": ["<em>"],
"post_tags": ["</em>"],
},
"aggs": {
"tag_stats": {"terms": {"field": "tags", "size": 20}}
},
"from": (page - 1) * size,
"size": size,
}

# 标签过滤
if tags:
body["query"]["bool"]["filter"].append(
{"terms": {"tags": tags}}
)

result = await es.search(index="articles", body=body)

items = []
for hit in result["hits"]["hits"]:
item = hit["_source"]
item["score"] = hit["_score"]
item["highlight"] = hit.get("highlight", {})
items.append(item)

return SearchResult(
total=result["hits"]["total"]["value"],
items=items,
aggregations=result.get("aggregations", {}),
)

搜索建议(自动补全)

search/suggest.py
async def suggest(prefix: str, size: int = 5) -> list[str]:
"""基于 completion suggester 的搜索建议"""
result = await es.search(
index="articles",
body={
"suggest": {
"title_suggest": {
"prefix": prefix,
"completion": {
"field": "title.suggest",
"size": size,
"skip_duplicates": True,
},
}
}
},
)
options = result["suggest"]["title_suggest"][0]["options"]
return [opt["text"] for opt in options]

数据同步

search/sync.py
async def sync_from_mysql():
"""增量同步:基于更新时间"""
last_sync = get_last_sync_time()
articles = db.query(Article).filter(Article.updated_at > last_sync).all()

if articles:
await bulk_index([a.to_dict() for a in articles])
set_last_sync_time(articles[-1].updated_at)

常见面试问题

Q1: 倒排索引原理?

答案

普通索引:文档 ID → 内容。倒排索引:关键词 → 文档 ID 列表。

"Python" → [doc1, doc3, doc7]
"异步" → [doc2, doc3]

搜索「Python 异步」→ 取交集 → [doc3]

Q2: 如何提高搜索相关性?

答案

  1. 字段权重title^3 标题匹配权重更高
  2. BM25 算法:ES 默认,考虑词频和文档长度
  3. 同义词扩展:配置同义词词典
  4. 拼音/纠错:拼音插件、fuzzy 查询

Q3: ES 与 MySQL 数据一致性?

答案

方案延迟复杂度
同步双写高(事务一致性难)
异步消息秒级中(需 MQ)
定时同步分钟级
Binlog 监听秒级中(Canal/Debezium)

相关链接