使用 Elastic Stack 分析地理空间数据 (二)

2022-11-06,,,,

文章转载自:https://blog.csdn.net/UbuntuTouch/article/details/106546064

在之前的文章 “Observability:使用 Elastic Stack 分析地理空间数据 (一)”,我详述了如何从 OpenSky Network API 接口把数据导入到 Elasticsearch,并对这些数据进行可视化分析。也许针对很对的情况这个已经很满足了,因为它确实可以帮我们从很多实时数据中提取很多有用用用的东西。

在今天的文章中,我们将参考之前的文章 “如何使用 Elasticsearch ingest 节点来丰富日志和指标” 。我们可以利用 Elasticsearch ingest 节点来更加丰富我们的数据,并对这些数据做更进一步的的分析。

为了达到这个目的,我们必须首先了解在之前索引中的 icao 字段。这个字段的意思是:

ICAO 机场代码或位置指示器是由四个字母组成的代码,用于指定世界各地的机场。 这些代码由国际民用航空组织定义并发布在国际民航组织7910号文件:位置指示器中,供空中交通管制和航空公司运营(例如飞行计划)使用。

我们之前的每个文档是这样的:

{
"velocity" : 0.0,
"icao" : "ad0851",
"true_track" : 264.38,
"time_position" : 1591190152,
"callsign" : "AAL2535",
"origin_country" : "United States",
"position_source" : "ADS-B",
"spi" : false,
"request_time" : 1591190160,
"last_contact" : 1591190152,
"@timestamp" : "2020-06-03T13:16:03.723Z",
"on_ground" : true,
"location" : "32.7334,-117.2035"
}

另外,我们可以在地址 https://opensky-network.org/datasets/metadata/ 找到一个如下文件:

在这里,我们可以找到一个叫做 aircraftDatabase.csv 的文件。它里面的内容如下:

在上面的表格中,我们发现有一个叫做 icao24 的字段。这个字段和我们之前的文档可以进行关联,从而我们可以得到更多关于某个航班的更多信息。

创建 enrich index

由于下载的文档时一个是一个 csv 的文件。我们可以使用 data visualizer 来导入。

点击上面的 Override settings 链接:

点击 Apply 按钮:

点击上面的 Import 按钮:

我们把这个索引的名字称作为 aircraft。点击 Advaned:

再次确认 mapping,如果没有问题的话,点击 Import 按钮:

由于这个文件比较大,所以需要一点时间来进行导入:

等完成后,我们可以在 Elasticsearch 中找到一个叫做 aircraft 的索引:

GET _cat/indices

上面显示有一个新的 aircraft 的索引生成了。

创建 Enrich policy

接下来,我们来创建 enrich policy。它告诉我们如何丰富数据。在 Kibana 中打入如下的命令:

PUT /_enrich/policy/flights_policy
{
"match": {
"enrich_fields": [
"acars",
"adsb",
"built",
"category_description",
"engines",
"first_flight_date",
"icao_aircraft_type",
"line_number",
"manufacturer_icao",
"manufacturer_name",
"model",
"modes",
"notes",
"operator",
"operator_callsign",
"operator_iata",
"operator_icao",
"owner",
"reg_until",
"registered",
"registration",
"seat_configuration",
"serial_number",
"status",
"test_reg",
"type_code"
],
"indices": [
"aircraft"
],
"match_field": "icao"
}
}

我们使用 execute enrich policy API 为该策略创建enrich索引:

POST /_enrich/policy/flights_policy/_execute

接着,我们创建一个叫做 flights_aircraft_enrichment 的 pipeline:

PUT /_ingest/pipeline/flights_aircraft_enrichment
{
"description": "joins incoming ADSB state info with richer aircraft metadata",
"processors": [
{
"enrich": {
"field": "icao",
"policy_name": "flights_policy",
"target_field": "aircraft"
}
}
]
}

到此为止,我们已经成功地创建了 丰富策略。接下来,我们将展示如何使用这个 pipeline 来丰富我们的数据。

丰富数据

为了能够使用我们上面定义好的 pipeline,我们重参考之前的文章 “Observability:使用 Elastic Stack 分析地理空间数据 (一)”里的 fligths_logstash.conf 文件,并修改如下的 output 部分:

output {
stdout {
codec => rubydebug
} elasticsearch {
manage_template => "false"
index => "flights"
# pipeline => "flights_aircraft_enrichment"
hosts => "localhost:9200"
}
}

我们把上面的这一行的注释拿掉:

pipeline => "flights_aircraft_enrichment"

这样变成了:

output {
stdout {
codec => rubydebug
} elasticsearch {
manage_template => "false"
index => "flights"
pipeline => "flights_aircraft_enrichment"
hosts => "localhost:9200"
}
}

在启动 Logstash 之前,我们可以先删除之前的 flights 索引:

DELETE flights

再接着执行如下的命令:

PUT flights
{
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"baro_altitude": {
"type": "float"
},
"callsign": {
"type": "keyword"
},
"geo_altitude": {
"type": "float"
},
"icao": {
"type": "keyword"
},
"last_contact": {
"type": "long"
},
"location": {
"type": "geo_point"
},
"on_ground": {
"type": "boolean"
},
"origin_country": {
"type": "keyword"
},
"position_source": {
"type": "keyword"
},
"request_time": {
"type": "long"
},
"spi": {
"type": "boolean"
},
"squawk": {
"type": "long"
},
"time_position": {
"type": "long"
},
"true_track": {
"type": "float"
},
"velocity": {
"type": "float"
},
"vertical_rate": {
"type": "float"
}
}
}
}

重新运行 Logstash:

sudo ./bin/logstash -f fligths_logstash.conf

我们在 Kibana 中检查 flights 的 mapping:

GET flights/_mapping

我们可以看到一些新增加的各个新字段:

我们可以通过 search:

        "_source" : {
"aircraft" : {
"owner" : "Wells Fargo Trust Co Na Trustee",
"reg_until" : "2021-04-30",
"modes" : false,
"built" : "1984-01-01",
"acars" : false,
"manufacturer_icao" : "BOEING",
"serial_number" : "23018",
"manufacturer_name" : "Boeing",
"icao_aircraft_type" : "L2J",
"operator_callsign" : "GIANT",
"operator_icao" : "GTI",
"engines" : "GE CF6-80 SERIES",
"icao" : "a8a763",
"registration" : "N657GT",
"model" : "767-281",
"type_code" : "B762",
"adsb" : false
},
"true_track" : 272.81,
"velocity" : 5.14,
"spi" : false,
"origin_country" : "United States",
"@timestamp" : "2020-06-04T10:41:00.558Z",
"request_time" : 1591267250,
"time_position" : 1591267168,
"last_contact" : 1591267168,
"callsign" : "GTI165",
"icao" : "a8a763",
"location" : "39.0446,-84.6505",
"on_ground" : true,
"position_source" : "ADS-B"
}
}

我们可看到一个叫做 aircraft 的字段,它含有这个飞机所有被丰富的信息。

运用 Kibana 分析数据

找出前10的飞机型号

因为有新的字段进来,所以我们必须重新创建新的 inde pattern:

我们可以看到最多的是 PC-12/47E 这个机型。

找出飞机制造商的分布

我们看到 BOING 公司的市场份额是最大的。AIRBUS 处于第二的位置。

飞机机龄分布

我们可以看出来最多的飞机是2019年生产的。

飞机机型和飞行高度的关系

可以看出来 A320-214 飞机飞的是最高的。

Graph

运用 Graph 来找出数据直接的关系。如果你对 Graph 还不是很了解的话,请参阅我之前的教程 “Elastic Graph 介绍”。

点击 Create graph:

点击 Select a data source:

选择 flights* :

点击 Add fields:

添加 fields:

我们需要保持这个 graph。然后进行搜索:

从上面,我们可看出来 BOING 和我们想要的各个字段之间的关系。

我们从收集的数据可以有更多的其它的分析。在这里,我就不一一枚举了。你们可以做任何你想要的分析。

使用 Elastic Stack 分析地理空间数据 (二)的相关教程结束。

《使用 Elastic Stack 分析地理空间数据 (二).doc》

下载本文的Word格式文档,以方便收藏与打印。