基于云计算和大数据的模拟车辆行车监控系统

一、 系统架构

这是基于云计算和大数据的模拟车辆行车监控系统,可模拟实现在线远程对车辆行车的信息记录以及数据处理。其中,记录信息其中包括车辆的id、经过的地点(经纬度)、时间,数据处理包括对数据的排序、错误数据的排查、通过时间以及地点在地图上获得车辆行驶的轨迹、车辆相遇次数。

系统包括数据产生模块、数据接受与处理模块、数据库模块、客户端模块。其中kafka进行数据的接收,并进行数据过滤,将过滤后的数据传递给Redis,Redis再将数据存入Hbase数据库,Spark从Hbase中获得数据,将处理后的数据再传递回Hbase,客户端从Hbase中获得数据并将其展示在前端。

逻辑架构如下:
这里写图片描述
物理架构图如下:
img

二、数据流程分析

1. 数据采集过程分析

数据采集过程包括Kafka数据采集、Redis数据过滤、Hbase数据入库三部分,其中包括三个实体:Kafka生产者、Kafka消费者兼Redis发布者、Redis订阅者。

数据采集
各自的作用如下:
Kafka生产者:负责从json文件中以行为单位读取数据源,通过Kafka生产者代码编写生产消息,将json读取的消息发布在topic上。

Kafka消费者兼Redis发布者:负责从topic上消费Kafka生产者生产的消息,将消息通过Redis发布订阅功能发布到一个信道,等待订阅者接受消息。

Redis订阅者:负责订阅发布者相应的信道,接受发布者的消息,将消息存入Hbase数据库。
数据过滤过程使用了Kafka streams对原始数据进行过滤,本小组采用HIGH-LEVEL STREAMS DSL进行处理。Kafka创建一个Filter流,流的源绑定filter-before topic,同时Kafka生成者将消息生产在这个topic上;流的出口绑定filter-after topic,Kafka消费者绑定这个topic消费消息。过滤器消息选择条件过滤掉不正确的经纬度数据,并将这部分数据存放在Redis filter 键里,合格的数据传送到filter-after topic上。

Redis的缓冲作用
在Redis订阅者上,由于生产者生产消息过快,如果选择一条一条的存入数据库,会出现存取数据过慢,导致生产者的消息经过规定的时间(本小组设置的时间是90秒)没有被消费,报出Timeout错误。为避免这样的问题,选择每1000条数据存入数据库一次,这样的方式优点在于每1000条数据才请求连接数据库一次。请求连接数据库是较耗时的一个步骤,频繁的请求连接数据库会拖慢程序的运行时长。在基础项时,选择将所有数据存入list,然后一次请求数据库连接,将所有数据存入数据库,请求数据库连接的时间占比很小。

出现的问题以及解决方案
1000条数据一次存入无法达到实时的记录,这是本小组项目的一个缺点,但同时,这个问题可以通过选择storm 流式框架数据处理来解决,直接在Kafka消费阶段对数据进行流式处理能达到实时效果。

2.数据查询和离线处理分析

数据查询:数据采集完成,所有数据存入Hbase数据库的‘Record’表中,行键设计为eid、placeid、time组合键,在数据查询时,需要将行键截取,获取对应的数据,与查询条件比较,返回满足条件的数据。

spark处理:spark分析过程包含三个阶段——程序源码发布到master节点、master将map程序分配给map节点进行map操作、master将reduce程序分配给reduce节点进行reduce操作。数据流向是map节点从master节点获取Hbase数据索引,进而获取数据,接着运行map程序将数据分散处理。Map程序处理完的数据流入reduce进行聚合处理,最后将reduce结果存入Hbase数据库中。
问题:在进行spark分析时,限于物理机,整个集群仅有一个master节点、一个map worker节点、一个reduce worker节点,在数据分析时出现的情况是map worker节点的工作任务量远远大于reduce worker的工作任务量。在任务启动时,集群中各个节点使用top命令查看当前节点的CPU占比,发现在整个任务中map worker 节点长时间高CPU占比工作,而reduce worker节点在map worker节点处理完成后有10秒左右的高CPU占比工作期,然后整个数据分析完成。鉴于上述的问题,考虑在主机充足的情况下,选择为map任务分配多台主机。使得任务执行量较均匀分布。

三、软件功能分析

1、完成基本搭建系统,完成过车统计功能

系统可根据输入的地点ID进行检索,显示通过该地点的车辆ID、时间、地点以及经纬度;或者根据输入的车辆ID,显示出该车辆经过的地点、经过时间以及对应地点的经纬度。
结果展示:
这里写图片描述

2.系统附加功能分析

(1)原始信息过滤

原始数据包含若干条错误记录,如经纬度不合法等,需要实时对kafka中接收到的数据进行过滤处理,将处理后的数据传递给Redis。
这里写图片描述

(2)车辆行驶轨迹重现

实现方式:我们想出了两种方法实现其轨迹重新。
A、hbase方式
建立一张新表,重新编排行键。
在hbaseTest类中完成具体操作。首先使用HBaseConf类中的getConnection()方法与HBase数据库进行连接。然后利用HBaseConf类中getTableByName()方法得到对表“Record”表的操作句柄。同时使用相同的方法得到对Trace表的操作句柄。之后,使用Table类中的getScanner()方法得到Record表中的所有数据,并记录中“result”中。因为重现轨迹的时候只需要车辆的标识信息(Eid)和车辆经过的时间(time)和经过地方的经纬度(latitude,longitude)所以我们只需要在“Trace”表中存入这些数据即可。
现在我们已经将得到的所有的“Record”表中的数据都存在了“result”中。然后将result中的所有数据扫描一遍,同时将每条记录中的“Eid,time,latitude,longitude”信息记录下来,同时将每一条记录的这些信息作为新的一条记录,以“Eid”为rowKey且以“time”为列族的第一列放在Put类的对象中,最后通过Table类的put()方法将新的记录存在“Trace”表中。这样得到的“Trace”表中的数据即会以“time”自动排序。
当所有数据被读取并被重新放入“Trace”表中后,关闭与数据库的连接,所有的信息即被重新规划好。

B、spark的MapReduce方式
展示结果:
输入要查询的车辆的ID,显示其行驶轨迹。鼠标点击地点,可显示其经纬度。

轨迹展示结果:
这里写图片描述

(3)车辆相遇次数统计

我们定义相遇为“两车之间出现在同一地点的时间间隔小于一分钟”。
首先,通过Spark从Hbase表中读取数据,自身以地点为键进行join操作,计算除自身外的车辆是否相遇;再以地点为键进行分组,同一组内的数据按照时间进行排序,遍历整个列表,找出满足小于一分钟的数据。

结果展示
输入要查询的车辆Id,查询结果显示与之相遇过的车辆的ID以及次数。
这里写图片描述

0%