lua OpenResty Canal 场景应用开发
该技术不仅仅是应用于, 广告缓存 只是常用于做广告的缓存!
广告缓存载入与读取
对于一个应用来多,每天都会有很多的用户来访问, 那么访问最多的就是首页了!
而对于首页这种,高访问,且 页面数据并不是,经常的变化!
为了减轻服务器的压力,直接将其制作成一个 静态的页面进行展示!
基本流程:
- Nginx 都学习过了:可以通过反向代理实现,网关负载均衡的 服务器! 用户发送请求,首先通过 nginx , 通过nginx代理,负载均衡,请求对应的网关…模块。
- 本次就是在用户登录/常用的操作, 请求时经过 nginx请求,在nginx中完成 调用redis数据操作,直接返回结果!
但,niginx并不具备
编程语言的特性
if else逻辑判断,访问数据库 redis; 它只是一个负载均衡器; 所以,需要通过lua 嵌入式语言
来完成
- 首先访问nginx ,我们可以采用缓存的方式,先从nginx本地缓存中获取,获取到直接响应
- 如果没有获取到,再次访问redis,我们可以从redis中获取数据,如果有则返回,并缓存到nginx中
- 如果没有获取到,再次访问mysql 我们从mysql中获取数据,再将数据存储到redis中。
- 需要注意的是:
对于数据的更改!第一次访问数据库,保存Redis / Nginx 之后,
后面对于数据的修改,因为Redis已将保存了之前的脏数据(过期数据...)
所以修改的数据, 需要进更新!Canal - 而这里面,我们都可以使用LUA脚本嵌入到程序中执行这些查询相关的业务。
Lua更新 Lua请求!
- Lua应用学习总结!
OpenResty lua 实现Demo开发!!
环境搭建这里就不介绍了, 可以观看本人的上一篇文章!
数据库:
介绍一下使用的数据库mysq5.5,56
/表:shop_content
- 脚本就不发了,本次Demo 也和数据库没有太大关系!随便建一个也没影响 [狗头]
OpenResty lua 实现请求数据!
定义Lua
- 定义一个 Lua脚本,用户第一请求时,读数据库的资源…
- 存Redis,Nginx:…
Sreader.lua
--中文转码
ngx.header.content_type="application/json;charset=utf8"
--获取Url,截取请求参数 cid
local uri_args = ngx.req.get_uri_args();
local cid = uri_args["cid"];
--获取本地缓存
local cache_ngx = ngx.shared.my_cache;
--根据content_cache_ID拼接cid ,获取key ,nginx读取本地缓存数据;
local contentCache = cache_ngx:get('content_cache_'..cid);
local str="本地缓存";
--判断Nginx数据是否 空 或 nil(null)
if contentCache == "" or contentCache == nil then
--redis模板对象,超时时间,连接,get(key); 获取数据值;
local redis = require("resty.redis");
local red = redis:new()
red:set_timeout(2000)
red:connect("127.0.0.1", 6379)
-- red:auth("ok") 登录;
local rescontent=red:get("content:"..cid);
--判断Redis数据是否 空 或 nil(null)
if ngx.null == rescontent then
--JSON(数据转JSON存Redis方便操作) Mysql模板,超时,连接数据,连接,拼sql,执行,转JSON,存Redis Nginx,输出,关闭!
local cjson = require("cjson");
local mysql = require("resty.mysql");
local db = mysql:new();
db:set_timeout(2000)
local props = {
host = "127.0.0.1",
port = 3306,
database = "shop_content", --根据数据库来定!!用户/密码 也要进行修改!
user = "root",
password = "ok"
}
local res = db:connect(props);
local select_sql = "select url,pic from tb_content where status ='1' and category_id="..cid.." order by sort_order";
res = db:query(select_sql);
local responsejson = cjson.encode(res);
str = "db查询";
red:set("content:"..cid,responsejson);
ngx.say(responsejson);
ngx.say(str)
db:close()
else
--else表示Redis有值获取: 存Nginx key,value ,超时时间 单位10*60秒
cache_ngx:set('content_cache_'..cid, rescontent, 10*60);
str = "redis查询";
--输出结果页面!
ngx.say(rescontent)
ngx.say(str)
end
--关闭Redis资源!
red:close()
else
--Nginx有值直接输出!
ngx.say(contentCache)
ngx.say(str)
end
OpenResty集成Lua 添加一个请求
:
lua.conf
location /reader{
default_type text/html;
content_by_lua_file D:/WSMwork/Sreader.lua; #文件位置!
}
测试:
- 启动OpenResty
- 启动Redis Mysql
- 连续刷新!!
OpenResty lua 实现更新数据!
定义Lua
- 数据库,数据更改,为了保证
缓存数据的同步
进行的操作!
Supdate.lua
--中文转码,根据模块获取对象;
ngx.header.content_type="application/json;charset=utf8"
local cjson = require("cjson")
local mysql = require("resty.mysql")
--获取Url,截取请求参数 cid
local uri_args = ngx.req.get_uri_args()
local cid = uri_args["cid"]
--mysql读取数据!
local db = mysql:new()
db:set_timeout(1000)
local props = {
host = "127.0.0.1",
port = 3306,
database = "shop_content",
user = "root",
password = "ok"
}
local res = db:connect(props)
local select_sql = "select url,pic from tb_content where status ='1' and category_id="..cid.." order by sort_order"
res = db:query(select_sql)
db:close()
--存redis
local redis = require("resty.redis")
local red = redis:new()
red:set_timeout(2000)
local ip ="127.0.0.1"
local port = 6379
red:connect(ip,port)
red:auth("ok")
red:set("content:"..cid,cjson.encode(res))
red:close()
--Redis,就不存Nginx了,Nginx数据10分支更新(上面设置了!),这里就不在同步了!
--页面输出结果:true 更新成功!
ngx.say("{flag:true}")
OpenResty集成Lua 添加一个请求
:
lua.conf
location /update{
default_type text/html;
content_by_lua_file D:/WSMwork/Supdate.lua; #文件位置!
}
测试:
- 启动OpenResty
- 启动Redis Mysql
此时Redis 就已将和Mysql数据库中同步了数据了…
Canal
什么是canal
canal是阿里巴巴旗下的一款开源项目, 纯Java开发。
- 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL
(也支持mariaDB
和mysql大致相同的数据库,为了防止Mysql被Oracle收购变的 收费了!
)。 - 基于日志增量订阅&消费支持的业务:
- 这里我们可以简单地把canal理解为一个用来同步增量数据的一个工具。
上面我们写了一个Supdate.lua 主要用于同步Mysql的实时数据,而这绝对不是由开发者手动操作的!
可以通过Canal来实现!
canal能做什么
与其问canal能做什么,不如说数据同步有什么作用。
但是canal的数据同步不是全量的,而是增量。基于binary log增量订阅和消费,canal可以做:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护
- 业务cache(缓存)刷新
- 带业务逻辑的增量数据处理
工作原理
mysql主备复制实现
在Mysql 集群环境下,为了保证数据的安全,需要将一些日志操作,记录备份在多个从Slave节点中
从上层来看,复制分成三步
- master将改变记录到二进制日志(binary log)中 (这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
- slave将master的binary log events拷贝到它的中继日志(relay log)
- slave重做中继日志中的事件,将改变反映它自己的数据
canal的工作原理:
原理相对比较简单
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
Canal伪装成Mysql的子节点,只要Mysql中数据发送更改!日志更新就会同步到子节点中!Canal就会接受请求做出响应!
而我们只要编写响应的代码:同步数据!
如何搭建canal
首先有一个MySQL服务器 配置!
当前的 canal 支持源端 MySQL 版本包括: 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x 本人5.5.56
开启Mysql的日志:
- my.ini设置如下信息:
my.ini
#一定要在这个 下面,不能在其它地方不然出不来日志记录!
[mysqld]
# 打开binlog
log-bin= D:MySqldatamysql-bin #设置日志安装的地址!
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1
ok配置好了之后,重启Mysql
- win R: cmd 管理启动
- 启动mysql:输入
net start mysql
- 停止mysql:输入
net stop mysql
- windows下不能直接重启(restart),只能先停止,再启动。
重启之后就可以在地址(D:MySqldatamysql-bin)
中` 查看:
就是配置ok 了!
安装canal
去官网下载页面进行下载:https://github.com/alibaba/canal/releases
解压 canal.deployer-1.1.5-SNAPSHOT
继续配置 conf/example/instance.properties
找的这一项!配置!
启动canal
bin: 目录下存在很多的脚本文件, 直接cmd执行!
- 9099 canal应用端口开启
Canal启动成功!!
它会实时的和你的本地数据库,进行监视, 只要日志文件发送更改…它就会捕获…
官方提供启动Demo 模块:
总结
并没有太多的东西其实
- test启动类:
ClientSample.Java
注意Boot 工程要引入Spring 容器中去! canal的依赖组件
- Boot 工程的启动类,要加入监听的启动!
pom.xml
放在子模块下,并且注意版本配置… boot
代码语言:javascript复制 <dependencies>
<!-- cenal(阿里巴巴实现binlog订阅的框架,用于redis和DB数据同步) -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<!-- boot集成canal 所需要依赖 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client-adapter</artifactId>
<version>1.1.2</version>
<type>pom</type>
</dependency>
</dependencies>
ClientSample.Java
代码语言:javascript复制import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
//boot工程需要加入的实现类
//@Component
public class ClientSample {
//运行类
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000; //获取指定数量的数据
int emptyCount = 0; //记录循环次数
//打开连接
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(".*\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
//设置开启的时间 1200秒,20分支,因为这里是测试,正常项目这里直接死循环! where(true){}
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount){
Message message = connector.getWithoutAck(batchSize); //获取指定数量的数据
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
emptyCount ;
System.out.println("empty count : " emptyCount); //输出当前次数.1200次时就结束直接结束!启动!
try {
//线程休眠1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//数据库日志更改获得数据 次数归0;
emptyCount = 0;
//如果有数据,处理数据,调用静态方法↓↓↓
printEntry(message.getEntries());
}
connector.ack(batchId); //提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry( List<Entry> entrys) {
for (Entry entry : entrys) {
//开启/关闭事务的实体类型,跳过,只同步响应 增删改成功的操作!
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" entry.toString(),
e);
}
//获取操作类型:insert/update/delete类型
EventType eventType = rowChage.getEventType();
//打印Header信息
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//获取RowChange对象里的每一行数据,打印出来
for (RowData rowData : rowChage.getRowDatasList()) {
//如果是删除语句
if (eventType == EventType.DELETE) {
//下面的静态方法!
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
//遍历打印输出,类信息: 列:值
private static void printColumn( List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() " : " column.getValue() " update=" column.getUpdated());
//获取变更的列,及值;
if (column.getName().equalsIgnoreCase("要检测变更的列名")) {
//输出结果值;
System.out.println("列名" column.getValue());
}
}
}
}
启动,并修改数据库任意一条数据!
- 每个一秒记录一次,程序中定义 1200结束!
- 除非中间数据库,数据修改,关闭计数重新计数!并输出对应的操作!
而我们只要在微服项目中集成这个 canalDemo
并对其进行小小的更改!
1.注册到Eureka中方便调用模块 3.0-1200计数,建议更改为 死循环。程序不停我不停!! 2.修改Test 在对应的日志更改时处理不同的操作,update时调用,请求 nginx调用 lua 更新同步实时数据!
Canal注意 Boot集成
- 上面集成的是普通,阿里提供的案例集成Boot当然要存在一些更改!
ClientSample类上的注解
@Component 使类加载至Spring中,导入对应依赖
主程序要运行改类的实例: ClientSample类中,不用设置指定监听时间,要设置true死循环,程序不停永久监听同步数据
ClientSample.Java
死循环案例
代码语言:javascript复制@Component
public class ClientSample {
@Autowired
private RestTemplate restTemplate;
public void run() throws Exception {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(message.getEntries());
}
connector.ack(batchId); //提交确认
}
} finally {
connector.disconnect();
}
}
private void printEntry(List<CanalEntry.Entry> entrys) throws Exception {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() " : " column.getValue() " update=" column.getUpdated());
}
}
}
主程序 要启动canal 监听
主程序
import com.wsm.text.ClientSample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; //Jdbc配置类
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
//本人这里直接使用了Jdbc进行添加数据,引入的配置类,如果不是直接忽略即可!(exclude = {DataSourceAutoConfiguration.class})
public class MySearchApp {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext run = SpringApplication.run(MySearchApp.class, args);
//启动监听器,不然不会进行监听!
ClientSample bean = run.getBean(ClientSample.class);
bean.run();
}
}