22:FineBI配置数据集
目标:实现FineBI访问MySQL结果数据集的配置
实施
安装FineBI
- 参考《FineBI Windows版本安装手册.docx》安装FineBI
配置连接
代码语言:javascript复制数据连接名称:Momo
用户名:root
密码:自己MySQL的密码
数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8
数据准备
代码语言:javascript复制SELECT
id, momo_totalcount,momo_province,momo_username,momo_msgcount,
CASE momo_grouptype WHEN '1' THEN '总消息量' WHEN '2' THEN '各省份发送量' WHEN '3' THEN '各省份接收量'
WHEN '4' THEN '各用户发送量' WHEN '5' THEN '各用户接收量' END AS momo_grouptype
FROM momo_count
小结
- 实现FineBI访问MySQL结果数据集的配置
23:FineBI构建报表
- 目标:实现FineBI实时报表构建
- 路径
- step1:实时报表构建
- step2:实时报表配置
- step3:实时刷新测试
- 实施
- 实时报表构建
- 新建仪表盘
- 添加标题
- 实时总消息数
- 发送消息最多的Top10用户
- 接受消息最多的Top10用户
- 各省份发送消息Top10
- 各省份接收消息Top10
- 各省份总消息量
- 实时报表构建
- 小结
- 实现FineBI实时报表构建
24:FineBI实时配置测试
目标:实现实时报表测试
实施
实时报表配置
官方文档:https://help.fanruan.com/finebi/doc-view-363.html
添加jar包:将jar包放入FineBI安装目录的 webappswebrootWEB-INFlib目录下
- 注意:如果提示已存在,就选择覆盖
添加JS文件
创建js文件:refresh.js
代码语言:javascript复制setTimeout(function () {
var b =document.title;
var a =BI.designConfigure.reportId;//获取仪表板id
//这里要指定自己仪表盘的id
if (a=="d574631848bd4e33acae54f986d34e69") {
setInterval(function () {
BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
BI.Utils.broadcastAllWidgets2Refresh(true);
}, 3000);//5000000为定时刷新的频率,单位ms
}
}, 2000)
将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中
关闭FineBI缓存,然后关闭FineBI
修改jar包,添加js
代码语言:javascript复制<!-- 增加刷新功能 -->
<script type="text/javascript" src="/webroot/refresh.js"></script>
代码语言:javascript复制
重启FineBI
实时刷新测试
清空MySQL结果表
启动Flink程序:运行MoMoFlinkCount
启动Flume程序
代码语言:javascript复制cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
启动模拟数据
代码语言:javascript复制java -jar /export/data/momo_init/MoMo_DataGen.jar
/export/data/momo_init/MoMo_Data.xlsx
/export/data/momo_data/
10
代码语言:javascript复制- 观察报表
- 小结
- 实现FineBI实时测试
## 附录一:Maven依赖
```xml
<!--远程仓库-->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases><enabled>true</enabled></releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<!--Hbase 客户端-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<!--kafka 客户端-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!--JSON解析工具包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!--Flink依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<!--HTTP请求的的依赖-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.4</version>
</dependency>
<!--MySQL连接驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
附录二:离线消费者完整代码
代码语言:javascript复制package bigdata.itcast.cn.momo.offline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
/**
* @ClassName MomoKafkaToHbase
* @Description TODO 离线场景:消费Kafka的数据写入Hbase
* @Create By Maynor
*/
public class MomoKafkaToHbase {
private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static Connection conn;
private static Table table;
private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名
private static byte[] family = Bytes.toBytes("C1");//列族
//todo:2-构建Hbase连接
//静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能
static{
try {
//构建配置对象
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
//构建连接
conn = ConnectionFactory.createConnection(conf);
//获取表对象
table = conn.getTable(tableName);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
//todo:1-构建消费者,获取数据
consumerKafkaToHbase();
// String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");
// System.out.println(momoRowkey);
}
/**
* 用于消费Kafka的数据,将合法数据写入Hbase
*/
private static void consumerKafkaToHbase() throws Exception {
//构建配置对象
Properties props = new Properties();
//指定服务端地址
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
//指定消费者组的id
props.setProperty("group.id", "momo1");
//关闭自动提交
props.setProperty("enable.auto.commit", "false");
//指定K和V反序列化的类型
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//构建消费者的连接
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//指定订阅哪些Topic
consumer.subscribe(Arrays.asList("MOMO_MSG"));
//持续拉取数据
while (true) {
//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间
//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
//todo:3-处理拉取到的数据:打印
//取出每个分区的数据进行处理
Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区
//对每个分区的数据做处理
for (TopicPartition partition : partitions) {
List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据
//处理这个分区的数据
long offset = 0;
for (ConsumerRecord<String, String> record : partRecords) {
//获取Topic
String topic = record.topic();
//获取分区
int part = record.partition();
//获取offset
offset = record.offset();
//获取Key
String key = record.key();
//获取Value
String value = record.value();
System.out.println(topic "t" part "t" offset "t" key "t" value);
//将Value数据写入Hbase
if(value != null && !"".equals(value) && value.split("