ES(二)车牌搜索

目标

使用ES对亿级别的过车信息进行检索,车牌支持模糊搜索,其他的属性如车牌颜色、车身颜色等为确定的几个值,可用数字表示,精确匹配过滤。数据量1亿,争取找到机器的配置,1秒内返回搜索结果。

使用聚合、游标实现其他功能如车辆统计、昼伏夜出、区域碰撞等功能,参考海康的实战平台介绍

安装

下载 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.2.tar.gz 直接解压后,运行前需要添加专有的用户,因为es能运行输入的脚本,为了安全,需要新增用户。增加用户的过程可以参考这里说明。 其他的文件句柄、虚拟内存大小的问题,可以参考ES安装问题

文件句柄的问题,临时有效切换到root下使用ulimit -n 65536,或者修改/etc/security/limits.conf下的文件,增加:

* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096

最后,要关闭防火墙(不是生产环境)。使用的centos在关闭防火墙的这步骤浪费了写时间,主要是linux的防火墙类型有点多,其实有的不是严格意义上的防火墙,SELinux、firewalld、iptables。具体区别以后再研究下,这里只大概了解下:

SELinux主要用于对文件(file), 文件夹 (directory), 过程(process)的限制。但是也会对端口访问有一定限制。是“最小权限原则”,也就是说如果开启不做配置的话,很多访问都是禁止的。

firewalld自身并不具备防火墙的功能,而是和iptables一样需要通过内核的netfilter来实现,也就是说firewalld和 iptables一样,他们的作用都是用于维护规则,而真正使用规则干活的是内核的netfilter,只不过firewalld和iptables的结构以及使用方法不一样罢了。

而且这firewalld也有点最小权限原则,默认很多是不能用的。

下面是如何进行临时关闭和永久关闭的命令:

systemctl stop firewalld.service
systemctl disable firewalld.service

service iptables stop
chkconfig iptables off

setenforce 0  
修改/etc/selinux/config 文件将SELINUX=enforcing改为SELINUX=disabled

如果配置完成,进入bin可以直接运行elasticsearch。

简单修改配置
这里只是初步修改,后面优化查询速度会再次调优,编辑config下的elasticsearch.yml文件,修改集群的名字,这个名字是必要修改的,es的节点会根据此名字自动发现组网,如果使用默认的名字,那么如果有两个es集群在一个网内,会造成混乱。然后就是修改es的日志和数据文件保存的地址,要找个大的硬盘放这两个文件。最后要修改下服务的ip地址,默认是只能从本级访问。配置文件修改如下:

cluster.name: wzy_es
path.data: /home/es/data
path.logs: /home/es/logs
network.host: 0.0.0.0

然后前台启动,如果没报异常,发个get命令(我这里使用的postman工具)http://10.45.157.55:9200(主机地址是157.55),看到如下回复就说明es正常了。

{
    "name": "tmJDWWD",
    "cluster_name": "wzy_es",
    "cluster_uuid": "YyM4OcrfQTal3VbnItLX8w",
    "version": {
        "number": "5.6.1",
        "build_hash": "667b497",
        "build_date": "2017-09-14T19:22:05.189Z",
        "build_snapshot": false,
        "lucene_version": "6.6.1"
    },
    "tagline": "You Know, for Search"
}

设置索引

搜索引擎速度快主要是倒排索引,所以需要对文档进行分词,车牌要支持模糊搜索,一般分词是使用NGram Tokenizer分词器。可以直接参考官网上的例子进行设置。例如车牌号”苏A12345 “会分成苏、苏A、苏A1、苏A12、苏A123、苏A1234、苏A12345、A、A1、A12等等这样子,这样在模糊搜索的时候就会匹配到。

其他的字段都是不用进行分析的字段,使用整型即可,不用在特殊设置,es中整型的字段不会在分析。

建立myindex 索引,并设置map参数,定义车牌分词使用的分词器。

PUT http://10.45.157.55:9200/myindex
{
  "settings": {
      "analysis": {
      "analyzer": {
        "my_analyzer": {
          "tokenizer": "my_tokenizer"
        }
      },
      "tokenizer": {
        "my_tokenizer": {
          "type": "ngram",
          "min_gram": 1,
          "max_gram": 8,
          "token_chars": [
            "letter",
            "digit"
          ]
        }
      }
    }
  }
}

然后可以测试下分词器:

POST http://10.45.157.55:9200/myindex/_analyze  
{
  "analyzer": "my_analyzer",
  "text": "苏A12345"
}

能看到分词的结果有28条。

然后需要把自定义的分析器设置到需要分析的字段,更新map

POST: http://10.45.157.55:9200/myindex/_mapping/passinfo  
{
	"properties": {
		"event_id": {
			"type": "keyword"
		},
		"event_type": {
			"type": "long"
		},
		"land_id": {
			"type": "keyword"
		},
		"land_picture": {
			"type": "text",
			"index": false
		},
		"pass_time": {
			"type": "date",
			"format": "yyyy-MM-dd HH:mm:ss"
		},
		"plate": {
			"type": "text",
			"fields": {
				"keyword": {
					"type": "keyword"
				}
			},
			"analyzer": "my_analyzer"
		},
		"plate_category": {
			"type": "integer"
		},
		"plate_color": {
			"type": "long"
		},
		"plate_picture": {
			"type": "text",
			"index": false
		},
		"speed": {
			"type": "long"
		},
		"vehicle_category": {
			"type": "long"
		},
		"vehicle_color": {
			"type": "long"
		},
		"vehicle_length": {
			"type": "long"
		}
	}
}

这里只做功能验证,字段少些:

POST: http://10.45.157.55:9200/myindex/_mapping/passinfo  
{
  "properties" : {
    "plate" : {
      "type" :    "string",
      "analyzer" : "my_analyzer",
      "fields": {
            "keyword": {
                "type":  "keyword"
            }
        }
    },
    "plate_category": {
       "type": "integer"
    },
    "land_id": {
       "type": "keyword"
    },
    "pass_time": {
       "type": "date",
       "format": "yyyy-MM-dd HH:mm:ss"
    }
  }
}

plate为车牌,需要使用ngram分析器分析,plate_category为车牌类型,整型,land_id是抓拍到过车信息的卡口id,不需要分析,类型为keyword,time为抓拍的时间。

同样,可以使用

GET: http://10.45.157.55:9200/myindex/_mapping/passinfo  

验证查看下map

构建数据

这里先构造少量的,能够验证功能的数据。

plate plate_category land_id pass_time id
苏A12345 1 land001 2017/1/1 11:22:07 1
苏A12345 1 land002 2017/1/1 12:22:07 2
苏A12345 1 land003 2017/1/1 13:22:07 3
苏A66666 2 land001 2017/1/1 11:22:09 4
苏A66666 2 land002 2017/1/1 12:22:09 5
苏A66666 2 land003 2017/1/1 13:22:09 6
苏A12345 1 land001 2017/1/2 04:22:07 7
苏A12345 1 land002 2017/1/3 11:22:07 8
苏A66666 2 land004 2017/1/1 01:22:07 9
苏A77777 1 land001 2017/1/1 11:22:07 10
苏A88888 1 land001 2017/1/1 11:22:07 11

增加id记录为1的数据使用Post方法:

POST : http://10.45.157.55:9200/myindex/passinfo/1  
{
    "plate" : "苏A12345",
    "plate_category" : 1 ,
    "land_id" : "land001",
    "time" : "2017-01-01 11:22:07"
}

依次增加上面11条数据。增加完成后,可以用
GET http://10.45.157.55:9200/myindex/passinfo/_search
查看增加的信息。

业务实现

车辆查询

车牌模糊搜索
如查询“苏A”的车牌

POST http://10.45.157.55:9200/myindex/passinfo/_search
{
    "query" : {
        "match" : {
            "plate" : "苏A"
        }
    }
}

其他的,可以查询包含“234”,组合的“苏A 23”等等。

车牌 + 其他条件匹配 查询,车牌有“苏A”,并且车牌类型为2的车辆,需要用到term过滤。使用bool过滤。虽然match也可以作用于没有分析的字段上,但是还是用term过滤比较好,term过滤能够缓存。

POST http://10.45.157.55:9200/myindex/passinfo/_search  
{
    "query": {
        "bool": {
            "must": {
                "match": { "plate": "苏A" }
            },
            "filter": {
                "term": { "plate_category" : 2 }
            }
        }
    }
}

再加上时间限制,可以这样写:

{
    "query": {
        "bool": {
            "must": [
                {"match": { "plate": "苏A" }},
                {"term": { "plate_category" : 2 }}
            ],
            "filter": {
                "range": {
					"pass_time" : {"gte" : "2017-09-01 00:00:00", "lte" : "2017-10-01 00:00:00"}
				}
            }
        }
    }
}

行车轨迹查询

给定时间段,按照车牌查询并按照时间升序排列。

{
    "query": {
        "bool": {
            "must": {
                "term": { "plate": "苏A12345" }
            },
            "filter": {
                "range": {
                    "pass_time" : { 
                        "gte" : "2017-1-1 00:00:00", 
                        "lt" : "2017-01-03 11:22:07" 
                    }
                }
            }
        }
    },
    "sort": { "time": { "order": "desc" }}
}

注意,这里虽然plate是分析的,但是也可以用term过滤,能缓存。

车流量统计

典型的es的聚合的应用,如统计一个月内的各个卡口的车流量: 按照卡口id,统计所有的卡口的过车数目:

{
    "size" : 0,
    "aggs" : { 
        "count" : { 
            "terms" : { 
              "field" : "land_id"
            }
        }
    }
}

按照天为单位,统计每天的过车数目:

{
   "size" : 0,
   "aggs": {
      "count": {
         "date_histogram": {
            "field": "time",
            "interval": "day", 
            "format": "yyyy-MM-dd" 
         }
      }
   }
}

统计每个卡口每天的过车数目,嵌套聚合(这功能很强):

{
    "size" : 0,
    "aggs" : { 
        "count" : { 
            "terms" : { 
              "field" : "land_id"
            },
            "aggs": {
              "count": {
                 "date_histogram": {
                    "field": "time",
                    "interval": "day", 
                    "format": "yyyy-MM-dd" 
                 }
              }
            }
        }            
    }
}

落脚点分析

同样是聚合,按照车牌号查询出来后,然后按照卡口id聚合。

{
    "size" : 0,
    "query" : {
        "term" : {
            "plate" : "苏A12345"
        }
    },
    "aggs" : { 
    "count" : { 
        "terms" : { 
          "field" : "land_id"
        }
      }
    }
}

初次入城分析

这个需求理解起来有些绕,首先是要设定哪几个卡口为初次入城的关键卡口,这些卡口上的每次过车时,需要进行城内所有卡口检索,看车辆以前是否出现过。

虽然可以用term过滤,速度快,虽然可以es也很擅长,但是并发量大的时候,用es可能会有压力。鉴于这个业务场景,多数查询并不是初次入城的车辆,可以在查询es前加层缓存,如果缓存不命中在查询es。有点类似于关系数据库前的缓存。

所以,这个功能也是实时分析的,后面的查询只是在查历史数据。而且后面的查询的时候不是任意卡口都能选择的,只能选择前面指定的初次入城的卡口。

区域碰撞分析

查询两个卡口中不同时间段有相同车牌号的车辆,卡口1和卡口2组成区域1,时间段为1月份,卡口3组成区域2,时间段为1月1号到1月10号。要查找在两个区域中不同时间段都有的过车信息。

这个处理有点复杂,需要取数据,统计,根据都有的车牌过滤。

可以使用es取数据,然后再内存中进行处理,可以使用es取两次数据,第一次取区域1内筛选出的数据,统计分析后到内存,为了防止内存过大,只统计车牌和出现次数,甚至如果不关心次数,直接放个车牌也行,然后取第二个区域内的数据,看是否车牌重复。

去数据这块,为了防止数据过大,提高效率,可以使用es的游标查询。 例如,对于第一个区域查询:

{    
    "query": {    
        "bool": {
            "must": {
                "terms": { "land_id": [ "land001", "land002"] }
            },
            "filter": {
                "range": {
                    "pass_time" : { 
                        "gte" : "2017-1-1 00:00:00", 
                        "lt" : "2017-01-31 23:59:59" 
                    }
                }
            }
        }
    }
}

返回结果是有8条记录的,假设结果集合非常大,有上百万条,那么可以处理点数据再次取点数据。这里一次取两条数据,游标有效时间是1分钟:

POST: http://10.45.157.55:9200/myindex/passinfo/_search?scroll=1m
{    
    "query": {    
        "bool": {
            "must": {
                "terms": { "land_id": [ "land001", "land002"] }
            },
            "filter": {
                "range": {
                    "pass_time" : { 
                        "gte" : "2017-1-1 00:00:00", 
                        "lt" : "2017-01-31 23:59:59" 
                    }
                }
            }
        }
    },
    "sort" : ["_doc"], 
    "size":  2
}

返回的结果:

{
    "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAABbFnRtSkRXV0RyUm82MTdEMVhwTTZtU3cAAAAAAAAAXBZ0bUpEV1dEclJvNjE3RDFYcE02bVN3AAAAAAAAAF0WdG1KRFdXRHJSbzYxN0QxWHBNNm1TdwAAAAAAAABeFnRtSkRXV0RyUm82MTdEMVhwTTZtU3cAAAAAAAAAXxZ0bUpEV1dEclJvNjE3RDFYcE02bVN3",
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 8,
        "max_score": null,
        "hits": [
            {
                "_index": "myindex",
                "_type": "passinfo",
                "_id": "4",
                "_score": null,
                "_source": {
                    "plate": "苏A66666",
                    "plate_category": 2,
                    "land_id": "land001",
                    "pass_time": "2017-01-01 11:22:07"
                },
                "sort": [
                    1483269727000
                ]
            },
            {
                "_index": "myindex",
                "_type": "passinfo",
                "_id": "1",
                "_score": null,
                "_source": {
                    "plate": "苏A12345",
                    "plate_category": 1,
                    "land_id": "land001",
                    "pass_time": "2017-01-01 11:22:07"
                },
                "sort": [
                    1483269727000
                ]
            }
        ]
    }
}

有个”_scroll_id”,这个id是下次取数据需要传递的参数,再次取数据:

POST http://10.45.157.55:9200/_search/scroll  
{
    "scroll": "1m", 
    "scroll_id" : "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAAABkFnRtSkRXV0RyUm82MTdEMVhwTTZtU3cAAAAAAAAAYBZ0bUpEV1dEclJvNjE3RDFYcE02bVN3AAAAAAAAAGEWdG1KRFdXRHJSbzYxN0QxWHBNNm1TdwAAAAAAAABiFnRtSkRXV0RyUm82MTdEMVhwTTZtU3cAAAAAAAAAYxZ0bUpEV1dEclJvNjE3RDFYcE02bVN3"
}

注意,url中没有索引和文档类型了,这个scrool_id是全局的,不用那么麻烦了。如果只对查询结果感兴趣而不关心结果的顺序,第一次的参数中”sort” : [“_doc”],意思是按照文档插入的顺序返回,意思其实是不进行排序,这样效率最高。如果有排序的需求,可以指定字段如”time”。

同行车分析

算法:

  1. 查询时间段内指定的被跟车的经过的卡口。
  2. 选取用户指定的时间段内的,所有卡口的过车记录。
  3. 对上述过车记录按照车牌分组,按照时间排序。
  4. 依次计算所有的结果车与需要的匹配的程度。

算法见: 一种基于轨迹碰撞的嫌疑车尾随车辆分析方法及装置

可以利用ES查询卡口的过车信息,后面的处理在Redis中实现,由于有时间段的限制,100个卡口,再加上跟车时间(时间较短)的排除,查出的数据量并不大。如果数据较大可以考虑游标批次处理的方法。

轨迹查询

与同行车分析的很像,只不过算法中的第一步就是给定的条件而已,如果是轨迹查询,数据量上可能比同行车分析数据量大,很可能需要用到游标分析,获取数据的时候,需要按照车牌号排序,这样可以一边获取,一边分析,丢弃掉分析时候相似度较低的数据,节约内存。

另外,如果是精确匹配,或者知道过滤条件,例如轨迹查询,如果选择8个卡口,需要至少7个匹配的轨迹,那么可以根据车牌聚合,聚合后根据统计的数目过滤下,返回一个较小的满足条件的集合列表,然后用户选择特定车牌的时候,在二次查询。

这里的根据车牌聚合,有点问题。聚合的时候因为plate是text类型,按照这聚合的时候,es提示:

Fielddata is disabled on text fields by default.

具体原因见官网解释,简单来说,对于一个text进行聚合,聚合的结果也不是你想要的,会根据所有的分词结果分别进行聚合,官网举例:

A text field is analyzed before indexing so that a value like New York can be found by searching for new or for york. A terms aggregation on this field will return a new bucket and a york bucket, when you probably want a single bucket called New York.

正确的做法是给text加个额外的doc_value字段

Instead, you should have a text field for full text searches, and an unanalyzed keyword field with doc_values enabled for aggregations

这个字段增加可以使用PUT

PUT:http://10.45.157.55:9200/myindex/_mapping/passinfo  
{
    "properties": {
        "plate": {
            "type": "text",
            "analyzer": "my_analyzer",
            "fields": {
            "keyword": { "type":  "keyword" }
           }
        }
    }
}

然后再聚合的时候,可以用plate.keyword的方式进行聚合

POST : http://10.45.157.55:9200/myindex/passinfo/_search
{
    "size" : 0,
    "aggs" : { 
    "count" : { 
        "terms" : { 
          "field" : "plate.keyword"
        }
      }
    }
}

不过这里需要注意,es不会对以前的数据在进行分析的,map中新增加的,只会对以后增加的文档生效,所以,这个fields要提前做好。

然后,要加上上面说的对聚合后的数据进行二次过滤,用到es的Bucket Script Aggregation特性:

POST : http://10.45.157.55:9200/myindex/passinfo/_search  
{
    "size" : 0,
    "aggs" : { 
        "count" : { 
            "terms" : { 
              "field" : "plate.keyword"
            },
            "aggs": {
              "count": { # 1
                 "sum": {
                    "field": "null" # 2
                 }
              },
              "having":{
                "bucket_selector": {
                  "buckets_path": {
                  "totalCount": "_count" # 3
                  },
                "script": "params.totalCount > 1"
                }
              }
           }
        }            
    }
}

这个调试了挺长时间,#1的第二个聚合其实是没有什么用的,只是为了使用这个”_count”,解释如下:

The buckets_path is relative to the parent aggregation and _count and _key are both special keywords to indicate the document count of the bucket and the key of the bucket respectively.

默认这bucket_path是aggs的路径,这也隐含了一个条件,如果想使用bucket_selector,是一种pipeline aggregation,是必须在子aggs内使用的。但是就没有方法对一层aggs后的进行过滤了,查看了下github上的解释,说一层的情况应该是在客户端去过滤,但是像车牌这种聚合,结果量会很大,如果客户端过滤可能要读很多不必要的数据,所以,就用上面这种比较丑陋的方式实现了。

在#1这个地方,我看网上很多demo中的aggs是没有aggs的具体实现的,直接上来就是bucket_selector的部分,但是可能是因为版本问题,这es的版本5.6.1是必须有这aggs实现的,否则会报错误”Unknown BaseAggregationBuilder”,把”having”当作aggs名,把“bucket_selector”当作aggs的builder。

而#2这里,把field的值写成null,是为了不增加额外的计算工作,不知道写成null有没有什么查询不命中的消耗,es也没有报错,这里待推敲,自己创造的写法。

昼伏夜出分析

es也不好处理,因为要用车牌号取交集,可以每天用游标读取,昼伏的取一次,夜出的取一次,每天处理,记录结果。

参考

Elasticsearch权威指南(中文版)
Elasticsearch权威指南(中文官方web版)
Mastering Elasticsearch(中文版)
Day19 ES内存那点事
实时搜索引擎Elasticsearch(4)——Aggregations (聚合)API的使用 使用scroll实现Elasticsearch数据遍历和深度分页 游标查询 Scroll

Table of Contents