elasticSearch空间坐标的设置和查询

云计算 waitig 594℃ 百度已收录 0评论

原文网址:http://blog.csdn.net/yangwenbo214/article/details/54411004

一、数据准备

为了方便起见,我模拟臆造了json格式的数据

{"timestamp":"2017-01-13T13:13:32.2516955+08:00","deviceId":"myFirstDevice","windSpeed":17,"haze":284,"city":"Beijing","lat":33.9402,"lon":116.40739}
  • 1

模拟数据我用的是c#,大概如下:

 static void SendingRandomMessages()
        {
            //var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
            int len = 4;
            string[] citys = { "Beijing", "Shangjhai", "Guangzhou", "Shenzhen" };
            int[] avgWindSpeed = { 10, 16, 5, 7 };
            int[] avgWindSpeed1 = { 10, 16, 5, 7 };
            int[] avgHaze1 = { 200, 100, 50, 49 };
            int[] avgHaze = { 200, 100, 50, 49 };
            double[] latitude = { 39.3402, 31.23042, 23.13369, 22.54310 };
            double[] longitude = { 116.40739, 121.47370, 113.28880, 114.057860 };
            Random rand = new Random();
            while (true)
            {
                try
                {
                    for (int i = 0; i < len; i++)
                    {
                        avgWindSpeed[i] = avgWindSpeed1[i] + rand.Next(1, 11);
                        avgHaze[i] = avgHaze1[i] + rand.Next(10, 100);
                        var telemetryDataPoint = new
                        {
                            timestamp = DateTime.Now,
                            deviceId = "myFirstDevice",
                            windSpeed = avgWindSpeed[i],
                            haze = avgHaze[i],
                            city = citys[i],
                            lat = latitude[i],
                            lon = longitude[i]
                        };
                        var message = JsonConvert.SerializeObject(telemetryDataPoint);
                        //eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(message)));
                        Console.WriteLine("{0} > Get message: {1}", "eventHubName", message);
                    }
                }
                catch (Exception exception)
                {
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message);
                    Console.ResetColor();
                }

                Thread.Sleep(200);
            }
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

此处我是作为消息发到一个eventhub中,你正确的做法可以将json写到文本文件中,再通过logstash读取即可

我的目的有两个:

  1. 获取数据中的lat、lon经纬度数据在kibana Map中进行绘图
  2. 获取数据中的timestamp作为我在kibana中的搜索时间,默认情况下是@timestamp

二、解决问题的整体思路

  1. lat、lon本质上是float类型,此处需要设计一个mapping
  2. 日志内的时间,本质上应该是个字符串。我们得先卡出这个字段,然后用date match进行转换

三、解决实例

1. mapping的设计,我给出一个template

{
  "template": "geo-*",
  "settings": {
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "_default_": {
      "_all": {"enabled": true, "omit_norms": true},
      "dynamic_templates": [ {
        "message_field": {
          "match": "message",
          "match_mapping_type": "string",
          "mapping": {
            "type": "string", "index": "analyzed", "omit_norms": true
          }
        }
      }, {
        "string_fields": {
          "match": "*",
          "match_mapping_type": "string",
          "mapping": {
            "type": "string", "index": "analyzed", "omit_norms": true,
            "fields": {
              "raw": {"type": "string", "index": "not_analyzed", "ignore_above": 256}
            }
          }
        }
      } ],
      "properties": {
        "@version": { "type": "string", "index": "not_analyzed" },
        "lonlat": { "type": "geo_point" }
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

大概解释如下:

  • “template”: “geo-*”,所有geo开头的索引,都将会套用这个template配置

  • “lonlat”: { “type”: “geo_point” } 这个定义了lonlat为geo_point类型,为以后Map绘制奠定基础,这个是关键。

NOTE: 这个lonlat名字不能取成特定的关键名字?,我取成location一直报错。

更加详细的介绍你可以查看官网文档

2. 给出logstash的配置文件

input {
    file {
        path => "/opt/logstash/1.log"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}
filter {
  json {
    source => "message"
  }
  mutate {
   add_field => [ "[lonlat]", "%{lon}" ]
   add_field => [ "[lonlat]", "%{lat}" ]
 }
  date{
    match=>["timestamp","ISO8601"]
         timezone => "Asia/Shanghai"
         "target" => "logdate" }
}
output {
  stdout { codec => rubydebug }
  elasticsearch
        {
           hosts =>"wb-elk"
           index =>  "geo-%{+YYYY.MM.dd}"
#          template => "/opt/logstash/monster.json"
#          template_overwrite => true
        }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

大概解释一下:

  • json {source => “message”}这个能将json数据格式分解出一个个字段

  • mutate 这个是向geo_point中加入经纬度数据

  • date {match=>}这个是将匹配json数据分解出来的timestamp,并以时间格式赋值给logdate

  • output中注释掉的是template文件。我采用的是直接put template的方式,因此注释掉了。两个方法都可行

补充:经纬度信息格式确定和重命名:

      mutate {
          convert => { "lon" => "float" }
          convert => { "lat" => "float" }
      }

      mutate {
          rename => {
              "lon" => "[lonlat][lon]"
              "lat" => "[lonlat][lat]"
          }
      }

3.运行过程

这里写图片描述

这里写图片描述


本文由【waitig】发表在等英博客
本文固定链接:elasticSearch空间坐标的设置和查询
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)