ES(二)车牌搜索
目标
使用ES对亿级别的过车信息进行检索,车牌支持模糊搜索,其他的属性如车牌颜色、车身颜色等为确定的几个值,可用数字表示,精确匹配过滤。数据量1亿,争取找到机器的配置,1秒内返回搜索结果。
使用聚合、游标实现其他功能如车辆统计、昼伏夜出、区域碰撞等功能,参考海康的实战平台介绍。
安装
下载 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.2.tar.gz 直接解压后,运行前需要添加专有的用户,因为es能运行输入的脚本,为了安全,需要新增用户。增加用户的过程可以参考这里说明。 其他的文件句柄、虚拟内存大小的问题,可以参考ES安装问题。
文件句柄的问题,临时有效切换到root下使用ulimit -n 65536,或者修改/etc/security/limits.conf下的文件,增加:
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096
最后,要关闭防火墙(不是生产环境)。使用的centos在关闭防火墙的这步骤浪费了写时间,主要是linux的防火墙类型有点多,其实有的不是严格意义上的防火墙,SELinux、firewalld、iptables。具体区别以后再研究下,这里只大概了解下:
SELinux主要用于对文件(file), 文件夹 (directory), 过程(process)的限制。但是也会对端口访问有一定限制。是“最小权限原则”,也就是说如果开启不做配置的话,很多访问都是禁止的。
firewalld自身并不具备防火墙的功能,而是和iptables一样需要通过内核的netfilter来实现,也就是说firewalld和 iptables一样,他们的作用都是用于维护规则,而真正使用规则干活的是内核的netfilter,只不过firewalld和iptables的结构以及使用方法不一样罢了。
而且这firewalld也有点最小权限原则,默认很多是不能用的。
下面是如何进行临时关闭和永久关闭的命令:
systemctl stop firewalld.service
systemctl disable firewalld.service
service iptables stop
chkconfig iptables off
setenforce 0
修改/etc/selinux/config 文件将SELINUX=enforcing改为SELINUX=disabled
如果配置完成,进入bin可以直接运行elasticsearch。
简单修改配置
这里只是初步修改,后面优化查询速度会再次调优,编辑config下的elasticsearch.yml文件,修改集群的名字,这个名字是必要修改的,es的节点会根据此名字自动发现组网,如果使用默认的名字,那么如果有两个es集群在一个网内,会造成混乱。然后就是修改es的日志和数据文件保存的地址,要找个大的硬盘放这两个文件。最后要修改下服务的ip地址,默认是只能从本级访问。配置文件修改如下:
cluster.name: wzy_es
path.data: /home/es/data
path.logs: /home/es/logs
network.host: 0.0.0.0
然后前台启动,如果没报异常,发个get命令(我这里使用的postman工具)http://10.45.157.55:9200(主机地址是157.55),看到如下回复就说明es正常了。
{
"name": "tmJDWWD",
"cluster_name": "wzy_es",
"cluster_uuid": "YyM4OcrfQTal3VbnItLX8w",
"version": {
"number": "5.6.1",
"build_hash": "667b497",
"build_date": "2017-09-14T19:22:05.189Z",
"build_snapshot": false,
"lucene_version": "6.6.1"
},
"tagline": "You Know, for Search"
}
设置索引
搜索引擎速度快主要是倒排索引,所以需要对文档进行分词,车牌要支持模糊搜索,一般分词是使用NGram Tokenizer分词器。可以直接参考官网上的例子进行设置。例如车牌号”苏A12345 “会分成苏、苏A、苏A1、苏A12、苏A123、苏A1234、苏A12345、A、A1、A12等等这样子,这样在模糊搜索的时候就会匹配到。
其他的字段都是不用进行分析的字段,使用整型即可,不用在特殊设置,es中整型的字段不会在分析。
建立myindex 索引,并设置map参数,定义车牌分词使用的分词器。
PUT http://10.45.157.55:9200/myindex
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "my_tokenizer"
}
},
"tokenizer": {
"my_tokenizer": {
"type": "ngram",
"min_gram": 1,
"max_gram": 8,
"token_chars": [
"letter",
"digit"
]
}
}
}
}
}
然后可以测试下分词器:
POST http://10.45.157.55:9200/myindex/_analyze
{
"analyzer": "my_analyzer",
"text": "苏A12345"
}
能看到分词的结果有28条。
然后需要把自定义的分析器设置到需要分析的字段,更新map
POST: http://10.45.157.55:9200/myindex/_mapping/passinfo
{
"properties": {
"event_id": {
"type": "keyword"
},
"event_type": {
"type": "long"
},
"land_id": {
"type": "keyword"
},
"land_picture": {
"type": "text",
"index": false
},
"pass_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"plate": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
},
"analyzer": "my_analyzer"
},
"plate_category": {
"type": "integer"
},
"plate_color": {
"type": "long"
},
"plate_picture": {
"type": "text",
"index": false
},
"speed": {
"type": "long"
},
"vehicle_category": {
"type": "long"
},
"vehicle_color": {
"type": "long"
},
"vehicle_length": {
"type": "long"
}
}
}
这里只做功能验证,字段少些:
POST: http://10.45.157.55:9200/myindex/_mapping/passinfo
{
"properties" : {
"plate" : {
"type" : "string",
"analyzer" : "my_analyzer",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"plate_category": {
"type": "integer"
},
"land_id": {
"type": "keyword"
},
"pass_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
plate为车牌,需要使用ngram分析器分析,plate_category为车牌类型,整型,land_id是抓拍到过车信息的卡口id,不需要分析,类型为keyword,time为抓拍的时间。
同样,可以使用
GET: http://10.45.157.55:9200/myindex/_mapping/passinfo
验证查看下map
构建数据
这里先构造少量的,能够验证功能的数据。
plate | plate_category | land_id | pass_time | id |
---|---|---|---|---|
苏A12345 | 1 | land001 | 2017/1/1 11:22:07 | 1 |
苏A12345 | 1 | land002 | 2017/1/1 12:22:07 | 2 |
苏A12345 | 1 | land003 | 2017/1/1 13:22:07 | 3 |
苏A66666 | 2 | land001 | 2017/1/1 11:22:09 | 4 |
苏A66666 | 2 | land002 | 2017/1/1 12:22:09 | 5 |
苏A66666 | 2 | land003 | 2017/1/1 13:22:09 | 6 |
苏A12345 | 1 | land001 | 2017/1/2 04:22:07 | 7 |
苏A12345 | 1 | land002 | 2017/1/3 11:22:07 | 8 |
苏A66666 | 2 | land004 | 2017/1/1 01:22:07 | 9 |
苏A77777 | 1 | land001 | 2017/1/1 11:22:07 | 10 |
苏A88888 | 1 | land001 | 2017/1/1 11:22:07 | 11 |
增加id记录为1的数据使用Post方法:
POST : http://10.45.157.55:9200/myindex/passinfo/1
{
"plate" : "苏A12345",
"plate_category" : 1 ,
"land_id" : "land001",
"time" : "2017-01-01 11:22:07"
}
依次增加上面11条数据。增加完成后,可以用
GET http://10.45.157.55:9200/myindex/passinfo/_search
查看增加的信息。
业务实现
车辆查询
车牌模糊搜索
如查询“苏A”的车牌
POST http://10.45.157.55:9200/myindex/passinfo/_search
{
"query" : {
"match" : {
"plate" : "苏A"
}
}
}
其他的,可以查询包含“234”,组合的“苏A 23”等等。
车牌 + 其他条件匹配 查询,车牌有“苏A”,并且车牌类型为2的车辆,需要用到term过滤。使用bool过滤。虽然match也可以作用于没有分析的字段上,但是还是用term过滤比较好,term过滤能够缓存。
POST http://10.45.157.55:9200/myindex/passinfo/_search
{
"query": {
"bool": {
"must": {
"match": { "plate": "苏A" }
},
"filter": {
"term": { "plate_category" : 2 }
}
}
}
}
再加上时间限制,可以这样写:
{
"query": {
"bool": {
"must": [
{"match": { "plate": "苏A" }},
{"term": { "plate_category" : 2 }}
],
"filter": {
"range": {
"pass_time" : {"gte" : "2017-09-01 00:00:00", "lte" : "2017-10-01 00:00:00"}
}
}
}
}
}
行车轨迹查询
给定时间段,按照车牌查询并按照时间升序排列。
{
"query": {
"bool": {
"must": {
"term": { "plate": "苏A12345" }
},
"filter": {
"range": {
"pass_time" : {
"gte" : "2017-1-1 00:00:00",
"lt" : "2017-01-03 11:22:07"
}
}
}
}
},
"sort": { "time": { "order": "desc" }}
}
注意,这里虽然plate是分析的,但是也可以用term过滤,能缓存。
车流量统计
典型的es的聚合的应用,如统计一个月内的各个卡口的车流量: 按照卡口id,统计所有的卡口的过车数目:
{
"size" : 0,
"aggs" : {
"count" : {
"terms" : {
"field" : "land_id"
}
}
}
}
按照天为单位,统计每天的过车数目:
{
"size" : 0,
"aggs": {
"count": {
"date_histogram": {
"field": "time",
"interval": "day",
"format": "yyyy-MM-dd"
}
}
}
}
统计每个卡口每天的过车数目,嵌套聚合(这功能很强):
{
"size" : 0,
"aggs" : {
"count" : {
"terms" : {
"field" : "land_id"
},
"aggs": {
"count": {
"date_histogram": {
"field": "time",
"interval": "day",
"format": "yyyy-MM-dd"
}
}
}
}
}
}
落脚点分析
同样是聚合,按照车牌号查询出来后,然后按照卡口id聚合。
{
"size" : 0,
"query" : {
"term" : {
"plate" : "苏A12345"
}
},
"aggs" : {
"count" : {
"terms" : {
"field" : "land_id"
}
}
}
}
初次入城分析
这个需求理解起来有些绕,首先是要设定哪几个卡口为初次入城的关键卡口,这些卡口上的每次过车时,需要进行城内所有卡口检索,看车辆以前是否出现过。
虽然可以用term过滤,速度快,虽然可以es也很擅长,但是并发量大的时候,用es可能会有压力。鉴于这个业务场景,多数查询并不是初次入城的车辆,可以在查询es前加层缓存,如果缓存不命中在查询es。有点类似于关系数据库前的缓存。
所以,这个功能也是实时分析的,后面的查询只是在查历史数据。而且后面的查询的时候不是任意卡口都能选择的,只能选择前面指定的初次入城的卡口。
区域碰撞分析
查询两个卡口中不同时间段有相同车牌号的车辆,卡口1和卡口2组成区域1,时间段为1月份,卡口3组成区域2,时间段为1月1号到1月10号。要查找在两个区域中不同时间段都有的过车信息。
这个处理有点复杂,需要取数据,统计,根据都有的车牌过滤。
可以使用es取数据,然后再内存中进行处理,可以使用es取两次数据,第一次取区域1内筛选出的数据,统计分析后到内存,为了防止内存过大,只统计车牌和出现次数,甚至如果不关心次数,直接放个车牌也行,然后取第二个区域内的数据,看是否车牌重复。
去数据这块,为了防止数据过大,提高效率,可以使用es的游标查询。 例如,对于第一个区域查询:
{
"query": {
"bool": {
"must": {
"terms": { "land_id": [ "land001", "land002"] }
},
"filter": {
"range": {
"pass_time" : {
"gte" : "2017-1-1 00:00:00",
"lt" : "2017-01-31 23:59:59"
}
}
}
}
}
}
返回结果是有8条记录的,假设结果集合非常大,有上百万条,那么可以处理点数据再次取点数据。这里一次取两条数据,游标有效时间是1分钟:
POST: http://10.45.157.55:9200/myindex/passinfo/_search?scroll=1m
{
"query": {
"bool": {
"must": {
"terms": { "land_id": [ "land001", "land002"] }
},
"filter": {
"range": {
"pass_time" : {
"gte" : "2017-1-1 00:00:00",
"lt" : "2017-01-31 23:59:59"
}
}
}
}
},
"sort" : ["_doc"],
"size": 2
}
返回的结果:
{
"_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAABbFnRtSkRXV0RyUm82MTdEMVhwTTZtU3cAAAAAAAAAXBZ0bUpEV1dEclJvNjE3RDFYcE02bVN3AAAAAAAAAF0WdG1KRFdXRHJSbzYxN0QxWHBNNm1TdwAAAAAAAABeFnRtSkRXV0RyUm82MTdEMVhwTTZtU3cAAAAAAAAAXxZ0bUpEV1dEclJvNjE3RDFYcE02bVN3",
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 8,
"max_score": null,
"hits": [
{
"_index": "myindex",
"_type": "passinfo",
"_id": "4",
"_score": null,
"_source": {
"plate": "苏A66666",
"plate_category": 2,
"land_id": "land001",
"pass_time": "2017-01-01 11:22:07"
},
"sort": [
1483269727000
]
},
{
"_index": "myindex",
"_type": "passinfo",
"_id": "1",
"_score": null,
"_source": {
"plate": "苏A12345",
"plate_category": 1,
"land_id": "land001",
"pass_time": "2017-01-01 11:22:07"
},
"sort": [
1483269727000
]
}
]
}
}
有个”_scroll_id”,这个id是下次取数据需要传递的参数,再次取数据:
POST http://10.45.157.55:9200/_search/scroll
{
"scroll": "1m",
"scroll_id" : "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAABkFnRtSkRXV0RyUm82MTdEMVhwTTZtU3cAAAAAAAAAYBZ0bUpEV1dEclJvNjE3RDFYcE02bVN3AAAAAAAAAGEWdG1KRFdXRHJSbzYxN0QxWHBNNm1TdwAAAAAAAABiFnRtSkRXV0RyUm82MTdEMVhwTTZtU3cAAAAAAAAAYxZ0bUpEV1dEclJvNjE3RDFYcE02bVN3"
}
注意,url中没有索引和文档类型了,这个scrool_id是全局的,不用那么麻烦了。如果只对查询结果感兴趣而不关心结果的顺序,第一次的参数中”sort” : [“_doc”],意思是按照文档插入的顺序返回,意思其实是不进行排序,这样效率最高。如果有排序的需求,可以指定字段如”time”。
同行车分析
算法:
- 查询时间段内指定的被跟车的经过的卡口。
- 选取用户指定的时间段内的,所有卡口的过车记录。
- 对上述过车记录按照车牌分组,按照时间排序。
- 依次计算所有的结果车与需要的匹配的程度。
可以利用ES查询卡口的过车信息,后面的处理在Redis中实现,由于有时间段的限制,100个卡口,再加上跟车时间(时间较短)的排除,查出的数据量并不大。如果数据较大可以考虑游标批次处理的方法。
轨迹查询
与同行车分析的很像,只不过算法中的第一步就是给定的条件而已,如果是轨迹查询,数据量上可能比同行车分析数据量大,很可能需要用到游标分析,获取数据的时候,需要按照车牌号排序,这样可以一边获取,一边分析,丢弃掉分析时候相似度较低的数据,节约内存。
另外,如果是精确匹配,或者知道过滤条件,例如轨迹查询,如果选择8个卡口,需要至少7个匹配的轨迹,那么可以根据车牌聚合,聚合后根据统计的数目过滤下,返回一个较小的满足条件的集合列表,然后用户选择特定车牌的时候,在二次查询。
这里的根据车牌聚合,有点问题。聚合的时候因为plate是text类型,按照这聚合的时候,es提示:
Fielddata is disabled on text fields by default.
具体原因见官网解释,简单来说,对于一个text进行聚合,聚合的结果也不是你想要的,会根据所有的分词结果分别进行聚合,官网举例:
A text field is analyzed before indexing so that a value like New York can be found by searching for new or for york. A terms aggregation on this field will return a new bucket and a york bucket, when you probably want a single bucket called New York.
正确的做法是给text加个额外的doc_value字段
Instead, you should have a text field for full text searches, and an unanalyzed keyword field with doc_values enabled for aggregations
这个字段增加可以使用PUT
PUT:http://10.45.157.55:9200/myindex/_mapping/passinfo
{
"properties": {
"plate": {
"type": "text",
"analyzer": "my_analyzer",
"fields": {
"keyword": { "type": "keyword" }
}
}
}
}
然后再聚合的时候,可以用plate.keyword的方式进行聚合
POST : http://10.45.157.55:9200/myindex/passinfo/_search
{
"size" : 0,
"aggs" : {
"count" : {
"terms" : {
"field" : "plate.keyword"
}
}
}
}
不过这里需要注意,es不会对以前的数据在进行分析的,map中新增加的,只会对以后增加的文档生效,所以,这个fields要提前做好。
然后,要加上上面说的对聚合后的数据进行二次过滤,用到es的Bucket Script Aggregation特性:
POST : http://10.45.157.55:9200/myindex/passinfo/_search
{
"size" : 0,
"aggs" : {
"count" : {
"terms" : {
"field" : "plate.keyword"
},
"aggs": {
"count": { # 1
"sum": {
"field": "null" # 2
}
},
"having":{
"bucket_selector": {
"buckets_path": {
"totalCount": "_count" # 3
},
"script": "params.totalCount > 1"
}
}
}
}
}
}
这个调试了挺长时间,#1的第二个聚合其实是没有什么用的,只是为了使用这个”_count”,解释如下:
The buckets_path is relative to the parent aggregation and _count and _key are both special keywords to indicate the document count of the bucket and the key of the bucket respectively.
默认这bucket_path是aggs的路径,这也隐含了一个条件,如果想使用bucket_selector,是一种pipeline aggregation,是必须在子aggs内使用的。但是就没有方法对一层aggs后的进行过滤了,查看了下github上的解释,说一层的情况应该是在客户端去过滤,但是像车牌这种聚合,结果量会很大,如果客户端过滤可能要读很多不必要的数据,所以,就用上面这种比较丑陋的方式实现了。
在#1这个地方,我看网上很多demo中的aggs是没有aggs的具体实现的,直接上来就是bucket_selector的部分,但是可能是因为版本问题,这es的版本5.6.1是必须有这aggs实现的,否则会报错误”Unknown BaseAggregationBuilder”,把”having”当作aggs名,把“bucket_selector”当作aggs的builder。
而#2这里,把field的值写成null,是为了不增加额外的计算工作,不知道写成null有没有什么查询不命中的消耗,es也没有报错,这里待推敲,自己创造的写法。
昼伏夜出分析
es也不好处理,因为要用车牌号取交集,可以每天用游标读取,昼伏的取一次,夜出的取一次,每天处理,记录结果。
参考
Elasticsearch权威指南(中文版)
Elasticsearch权威指南(中文官方web版)
Mastering Elasticsearch(中文版)
Day19 ES内存那点事
实时搜索引擎Elasticsearch(4)——Aggregations (聚合)API的使用 使用scroll实现Elasticsearch数据遍历和深度分页 游标查询 Scroll