基于WIFI探针的商业大数据分析系统(hadoop+spark+hbase+bootstrap+echarts)

简介

服务端主要接收探针每三秒发送一次的数据,于接收端搭建Tomcat纵向集群,有效处理1300台以上的并发请求,将数据保存到数据分析平台待用,文件系统使用HDFS分布式文件系统。数据分析平台搭建于Linux系统,采用Spark&Habse的分布式搭建模式,后台数据分析程序在3秒之内分析完实时数据,传至前台可视化,实现实时数据的展示。
作品较好的完成包括客流量、入店量、入店率、来访周期、新老顾客、顾客活跃度、驻店时长、来访周期、跳出率、深访率在内的九大基础指标的分析。并于前端页面上注重直观展示数据的变化趋势,数据分析所涉及的范围根据店铺具体情况支持自定义阕值。
对于探针功能的拓展:探针绑定短信模块,后台实现短信接口,从而对探针实现远程控制与状态监控。针对不同店铺大小推出小店铺探针,中性店铺三探针,大型店铺多探针模式,中型以上店铺支持定位,从而进一步分析呈现区域热点。
此外作品特色的添加了分析预测的功能,并基于分析预测功能结合历史数据智能的为商家提供商业决策支持,其中包括营销方案的推送,店铺排名波动的提醒功能。

wifi探针数据分析

本项目实现的主要功能

  1. 通过探针设备采集可监测范围内的手机MAC地址、与探针距离、时间、地理位置等信息:
  2. 探针采集的数据可以定时发送到服务端保存:
  3. 利用大数据技术对数据进行人流量等指标的分析。最终以合理的方式展示数据处理结果。

1、数据收集

数据收集由服务器和探针设备共同完成,探针采集数据并发送到服务器,服务器接收探针设备的数据,处理成定格式保存至分布式文件系统(HDFS)中,供数据处理使用。

1.1 术语介绍:

  • STA: (station) 工作站,指手机或者电脑等连接WiFi的设备。
  • AP: (AcessPoint)接入点,指无线路由器等产生WiFi热点的设备。
  • SSID: ( Service Set dentifer)服务集标识,就是WiFi的名字。

1.2 探针采集数据的原理:

在无线领域中STA总是不断试图寻找周边存在的AP,所以我们可以利用这种特性来发现一个未连接 AP的STA,而对于一个已经连接到AP的STA,也可以通过截狭它发出的数据帧来获取MAC、与探针之间的距离和它当前连接的SSID等信息。


2、数据清洗

探针上传的数据是一种半结构化数据,主要格式参数如下:

  • id:嗅探设备ID
  • mmac:嗅探器设备自身Wifi MAC
  • rate:发送频率
  • wssid:嗅探器设备连接的WiFi的MAC地址
  • time:时间戳,采集这些MAC的时间
  • lat:纬度 lon:经度
  • addr:地址
  • mac:采集到的手机的MAC地址
  • rssi:手机的信号强度
  • range:手机距离嗅探设备的距离
  • ts:目标ssid,手机连接的WiFi的ssid
  • tmc:手机连接的WiFi的地址
  • tc:是否与路由器连接
  • ds:手机是否睡眠
  • essidn:曾今连接的WiFi的SSID

该数据属于半结构化数据,其中包含探针设备ID,设备自身WFIMAC,发送频率,设备连接的WFi的SID设备连接的WFI的MAC地址、时间戳,采集到这些MAC的时间、纬度、经度、地址信息,以及一组被探测到的设备信息, 设备信息包括手机的MAC、信号强度、与探针之间的距离、手机连接WiFi的SID手机连接的WFI的MAC地址、手机曾经连接过的WiFi的SSID。需要在清洗过程中去除所有无用的数据,使之变成结构化的文件,到这里数据清洗的第一步就完成了。

第二步使用Spark SQL完成,在这一步中完成时间点到时间段的转化,即在处理之前每一条记录表示一个终端在某 一时间点的状态,而在结果中一条记录表示一 个终端在一段时间内的状态。

经过数据清洗,不仅大大减小了数据集的容量,也为后续的数据处理提供了极大的方便。


3、数据保存

  1. 处理后的数据直接保存为文本文件,保存在HDFS中。
  2. 处理后的数据导入关系型数据库,供后续生成图表使用,该展示系统使用PHP做后台,前端使用HTML和JS生成图表。

4、客流数据分析

4.1 数据表设计

1、HDFS中是原始数据集:(data表,通过Spark SQL得到)

原始数据data 表主要字段:

  • tanzhen_id:探针设备的id
  • mac:用户设备的MAC.(不同的mac代表不同客户)
  • time:探测到当前设备的时间
  • range:该设备与探什之间的距离

2、visit表(取出同一mac所有数据,按照time遍历,得到每一个用户的每一次访问记录)

visit表主要字段:

  • mac:标识不同用户
  • start_time:用户入店时间
  • leave_time:用户离店时间
  • stay_time:用户停留时间

思路:data表首先抽取每一个用户(MAC)的数据,对每个用户数据进行遍历,得到每个用户每一次的访问记录。通过visit表得到:客流量、入店率,来访周期,新老顾客,顾客活跃度等等。

原始数据表是数据接收服务器最终存储到HDFS中的数据,中间结果表是经过第2次收据清洗后的输出结果。

上传数据到HDFS(因为是用python等语言处理过的,所以传的数据格式为只提取有用的数据)

timeArray有四个time是因为时间格式划分为四块。

4.2 指标说明

上面我们完成了数据的初步处理,我们将得到以下指标:

  • 客流量:店铺或区域整体客流及趋势。
  • 入店量:进入店铺或区域的客流及趋势。
  • 入店率:进入店铺或区域的客流占全部客流的比例及趋势。来访周期:进入店铺或区域的顾客距离上次来店的间隔。
  • 来访周期 : 进入店铺或区域的顾客距离上次来店的问隔。
  • 新老顾客:一定时间段内首次/两次以上进入店铺的顾客。
  • 顾客活跃度:按顾客距离上次来访间隔划分为不同活跃度(高活跃度、中活跃度、低活跃度、沉睡活跃度) .
  • 驻店时长:进店铺的顾客在店内的停留时长,
  • 跳出率:进店铺后很快离店的顾客及占比(占总体客流) .
  • 深访率:进店铺深度访问的顾客及占比(占总体客流,可以根据定位轨迹或者停留时长判定).

店铺外人流/客流量:在实时接收探针数据过程中根据range字段(范围)以及数据条数实时得到。

入店量/离店量:是对visit表分别按start_time、 leave_time字段从小到大遍历统计规定时间段内的记录条数。

跳出率/深访率/驻店时长:对visit 表按time字段从小到大遍历统计规定时间段内记录条数stay_time小于三分钟大于20分钟的记录条数以及stay_time均值

新老顾客数/顺客活跃度:对visit表按time字段排序,按一定定时间段遍历,新顾客收等于该时间段结束时刻之前所有的顺客数减去该时间段开始时刻之前所有的顾客数,老顾客数等于该时间段内顾客数减去新顾客数。

4.3 spark源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.victor.spark.WiFiData
package com.victor.spark.WifiProject

import org.apache.spark.sql.SparkSession
import scala.util.control.Breaks

object new_customer_extract { // 实例化类

def main(args: Array[String]): Unit = { // 主方法--入口(Unit无返回值)

val spark = SparkSession.builder().appName("customer_extract").config(
"spark.some.config.option","some-values").getOrCreate() // 变量1(不可改变)

import java.io._
val writer = new PrintWriter(new File("/spark/data/re.txt")) //for the storage of result

import spark.implicits._
val df = spark.read.json("/spark/data/log.json") //read the data
//create the view data for df
df.createOrReplaceTempView("data")
spark.sql("cache table data")

//get all MAC of all users
val macArray = spark.sql("SELECT DISTINCT mac FROM data").collect()

var i =0
val inner = new Breaks
val lenth = macArray.length
//loop for each user
while(i<lenth){
var resultString = ""
var mac = macArray(i)(0)
val sql = "SELECT 'time' from data where mac='"+mac+"'order by 'time'"
val timeArray = spark.sql(sql).collect()

//to get timeList from timeArray

import scala.collection.mutable.ListBuffer
var timeList = new ListBuffer[Int]
var list_length = timeArray.length
var j = 0

while (j < list_length){
timeList += timeArray(i)(0).toString.toInt
j = j+1
}

var k = 0
var oldTime = 0
var newTime = 0

var maxVisitTimeInterval = 300
var startTime = 0
var leaveTime = 0

while (k < list_length){
if(k == 0){
oldTime = timeList(0)
newTime = timeList(0)
startTime = timeList(0)
}

else if(k == (list_length - 1)){
leaveTime = timeList(k)
var stayTime = leaveTime - startTime
resultString += """{"mac":"""" + mac + """,""" +""""in_time":"""+startTime+","+""""out_time":"""
+leaveTime+","+""""stay_Time":"""+stayTime+"}\n"
}else{
newTime = timeList(k)

if ((newTime - oldTime) > maxVisitTimeInterval){
leaveTime = oldTime
var stayTime = leaveTime-startTime
resultString += """{"mac":"""" + mac + """,""" +""""in_time":"""+startTime+","+""""out_time":"""
+leaveTime+","+""""stay_Time":"""+stayTime+"}\n"

startTime = newTime
oldTime = newTime
}else{
oldTime = newTime
}
}
k = k +1
}

writer.write(resultString)

i = i+1

}
writer.close()
spark.sql("uncache table data")
}
}

———————

项目地址

原文地址:https://blog.csdn.net/rainmaple20186/article/details/80340140

0%