lua + OpenResty + Canal 场景应用开发

2024-08-06 13:50:15 浏览数 (2)

lua OpenResty Canal 场景应用开发

该技术不仅仅是应用于, 广告缓存 只是常用于做广告的缓存!

广告缓存载入与读取

对于一个应用来多,每天都会有很多的用户来访问, 那么访问最多的就是首页了!

而对于首页这种,高访问,且 页面数据并不是,经常的变化! 为了减轻服务器的压力,直接将其制作成一个 静态的页面进行展示!

基本流程:

  • Nginx 都学习过了:可以通过反向代理实现,网关负载均衡的 服务器! 用户发送请求,首先通过 nginx , 通过nginx代理,负载均衡,请求对应的网关…模块。
  • 本次就是在用户登录/常用的操作, 请求时经过 nginx请求,在nginx中完成 调用redis数据操作,直接返回结果! 但,niginx并不具备 编程语言的特性 if else逻辑判断,访问数据库 redis; 它只是一个负载均衡器; 所以,需要通过lua 嵌入式语言 来完成
  • 首先访问nginx ,我们可以采用缓存的方式,先从nginx本地缓存中获取,获取到直接响应
  • 如果没有获取到,再次访问redis,我们可以从redis中获取数据,如果有则返回,并缓存到nginx中
  • 如果没有获取到,再次访问mysql 我们从mysql中获取数据,再将数据存储到redis中。
  • 需要注意的是: 对于数据的更改!第一次访问数据库,保存Redis / Nginx 之后, 后面对于数据的修改,因为Redis已将保存了之前的 脏数据(过期数据...) 所以修改的数据, 需要进更新!Canal
  • 而这里面,我们都可以使用LUA脚本嵌入到程序中执行这些查询相关的业务。 Lua更新 Lua请求!
  • Lua应用学习总结!

OpenResty lua 实现Demo开发!!

环境搭建这里就不介绍了, 可以观看本人的上一篇文章!

数据库:

介绍一下使用的数据库mysq5.5,56/表:shop_content

  • 脚本就不发了,本次Demo 也和数据库没有太大关系!随便建一个也没影响 [狗头]

OpenResty lua 实现请求数据!

定义Lua
  • 定义一个 Lua脚本,用户第一请求时,读数据库的资源…
  • 存Redis,Nginx:…

Sreader.lua

代码语言:javascript复制
--中文转码
ngx.header.content_type="application/json;charset=utf8"
--获取Url,截取请求参数 cid
local uri_args = ngx.req.get_uri_args();
local cid = uri_args["cid"];
--获取本地缓存
local cache_ngx = ngx.shared.my_cache;
--根据content_cache_ID拼接cid ,获取key ,nginx读取本地缓存数据;
local contentCache = cache_ngx:get('content_cache_'..cid);
local str="本地缓存";

--判断Nginx数据是否 空 或 nil(null) 
if contentCache == "" or contentCache == nil then
	--redis模板对象,超时时间,连接,get(key); 获取数据值;
    local redis = require("resty.redis");
    local red = redis:new()
    red:set_timeout(2000)
    red:connect("127.0.0.1", 6379)
--	red:auth("ok")		登录;
    local rescontent=red:get("content:"..cid);
    
    --判断Redis数据是否 空 或 nil(null) 
    if ngx.null == rescontent then
    	--JSON(数据转JSON存Redis方便操作) Mysql模板,超时,连接数据,连接,拼sql,执行,转JSON,存Redis Nginx,输出,关闭!
        local cjson = require("cjson");
        local mysql = require("resty.mysql");
        local db = mysql:new();
        db:set_timeout(2000)
        local props = {
            host = "127.0.0.1",
            port = 3306,
            database = "shop_content",		--根据数据库来定!!用户/密码 也要进行修改!
            user = "root",
            password = "ok"
        }
        
        local res = db:connect(props);
        local select_sql = "select url,pic from tb_content where status ='1' and category_id="..cid.." order by sort_order";
        res = db:query(select_sql);
        local responsejson = cjson.encode(res);
		str = "db查询";
		
        red:set("content:"..cid,responsejson);
        ngx.say(responsejson);
		ngx.say(str)
        db:close()
    else
    	--else表示Redis有值获取: 存Nginx key,value ,超时时间 单位10*60秒
        cache_ngx:set('content_cache_'..cid, rescontent, 10*60);
		str = "redis查询";
		--输出结果页面!
        ngx.say(rescontent)
		ngx.say(str)
    end
    --关闭Redis资源!
    red:close()
else
	--Nginx有值直接输出!
    ngx.say(contentCache)
	ngx.say(str)
end
OpenResty集成Lua 添加一个请求

lua.conf

代码语言:javascript复制
	location /reader{
		default_type text/html;
		content_by_lua_file D:/WSMwork/Sreader.lua;  #文件位置!
	}
测试:
  • 启动OpenResty
  • 启动Redis Mysql
  • 连续刷新!!

OpenResty lua 实现更新数据!

定义Lua
  • 数据库,数据更改,为了保证 缓存数据的同步 进行的操作!

Supdate.lua

代码语言:javascript复制
--中文转码,根据模块获取对象;
ngx.header.content_type="application/json;charset=utf8"
local cjson = require("cjson")
local mysql = require("resty.mysql")
--获取Url,截取请求参数 cid
local uri_args = ngx.req.get_uri_args()
local cid = uri_args["cid"]
--mysql读取数据!
local db = mysql:new()
db:set_timeout(1000)
local props = {
    host = "127.0.0.1",
    port = 3306,
    database = "shop_content",
    user = "root",
    password = "ok"
}
local res = db:connect(props)
local select_sql = "select url,pic from tb_content where status ='1' and category_id="..cid.." order by sort_order"
res = db:query(select_sql)
db:close()
--存redis
local redis = require("resty.redis")
local red = redis:new()
red:set_timeout(2000)
local ip ="127.0.0.1"
local port = 6379
red:connect(ip,port)
red:auth("ok")
red:set("content:"..cid,cjson.encode(res))
red:close()
--Redis,就不存Nginx了,Nginx数据10分支更新(上面设置了!),这里就不在同步了!

--页面输出结果:true 更新成功!
ngx.say("{flag:true}")
OpenResty集成Lua 添加一个请求

lua.conf

代码语言:javascript复制
	location /update{
		default_type text/html;
		content_by_lua_file D:/WSMwork/Supdate.lua;	#文件位置!
	}
测试:
  • 启动OpenResty
  • 启动Redis Mysql

此时Redis 就已将和Mysql数据库中同步了数据了…

Canal

什么是canal

canal是阿里巴巴旗下的一款开源项目, 纯Java开发。

  • 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL (也支持mariaDB和mysql大致相同的数据库,为了防止Mysql被Oracle收购变的 收费了!
  • 基于日志增量订阅&消费支持的业务:
  • 这里我们可以简单地把canal理解为一个用来同步增量数据的一个工具。 上面我们写了一个Supdate.lua 主要用于同步Mysql的实时数据,而这绝对不是由开发者手动操作的! 可以通过Canal来实现!

canal能做什么

与其问canal能做什么,不如说数据同步有什么作用。

但是canal的数据同步不是全量的,而是增量。基于binary log增量订阅和消费,canal可以做:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护
  • 业务cache(缓存)刷新
  • 带业务逻辑的增量数据处理

工作原理

mysql主备复制实现

在Mysql 集群环境下,为了保证数据的安全,需要将一些日志操作,记录备份在多个从Slave节点中

从上层来看,复制分成三步

  • master将改变记录到二进制日志(binary log)中 (这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  • slave将master的binary log events拷贝到它的中继日志(relay log)
  • slave重做中继日志中的事件,将改变反映它自己的数据
canal的工作原理:

原理相对比较简单

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流) Canal伪装成Mysql的子节点,只要Mysql中数据发送更改!日志更新就会同步到子节点中!Canal就会接受请求做出响应! 而我们只要编写响应的代码: 同步数据!

如何搭建canal

首先有一个MySQL服务器 配置!

当前的 canal 支持源端 MySQL 版本包括: 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x 本人5.5.56

开启Mysql的日志:

  • my.ini设置如下信息:

my.ini

代码语言:javascript复制
#一定要在这个 下面,不能在其它地方不然出不来日志记录!
[mysqld]    
# 打开binlog
log-bin= D:MySqldatamysql-bin  #设置日志安装的地址!
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

ok配置好了之后,重启Mysql

  • win R: cmd 管理启动
  • 启动mysql:输入 net start mysql
  • 停止mysql:输入 net stop mysql
  • windows下不能直接重启(restart),只能先停止,再启动。

重启之后就可以在地址(D:MySqldatamysql-bin)中` 查看:

就是配置ok 了!

安装canal

去官网下载页面进行下载:https://github.com/alibaba/canal/releases

解压 canal.deployer-1.1.5-SNAPSHOT 继续配置 conf/example/instance.properties

找的这一项!配置!

启动canal

bin: 目录下存在很多的脚本文件, 直接cmd执行!

  • 9099 canal应用端口开启 Canal启动成功!! 它会实时的和你的本地数据库,进行监视, 只要日志文件发送更改…它就会捕获…

官方提供启动Demo 模块:

总结并没有太多的东西其实

  • test启动类:ClientSample.Java 注意Boot 工程要引入Spring 容器中去!
  • canal的依赖组件
  • Boot 工程的启动类,要加入监听的启动!
pom.xml

放在子模块下,并且注意版本配置… boot

代码语言:javascript复制
  <dependencies>
        <!-- cenal(阿里巴巴实现binlog订阅的框架,用于redis和DB数据同步) -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.2</version>
        </dependency>
        <!-- boot集成canal 所需要依赖 -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client-adapter</artifactId>
            <version>1.1.2</version>
            <type>pom</type>
        </dependency>
    </dependencies>
ClientSample.Java
代码语言:javascript复制
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;

//boot工程需要加入的实现类
//@Component  
public class ClientSample {
    //运行类
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");

        int batchSize = 1000;   //获取指定数量的数据
        int emptyCount = 0;     //记录循环次数

        //打开连接
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();

            //设置开启的时间 1200秒,20分支,因为这里是测试,正常项目这里直接死循环! where(true){}
            int totalEmtryCount = 1200;
            while (emptyCount < totalEmtryCount){
                Message message = connector.getWithoutAck(batchSize); //获取指定数量的数据
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    emptyCount  ;
                    System.out.println("empty count : "   emptyCount);  //输出当前次数.1200次时就结束直接结束!启动!

                    try {
                        //线程休眠1秒
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                } else {
                    //数据库日志更改获得数据 次数归0;
                    emptyCount = 0;
                    //如果有数据,处理数据,调用静态方法↓↓↓
                    printEntry(message.getEntries());
                }

                connector.ack(batchId);     //提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }


    private static void printEntry( List<Entry> entrys) {
        for (Entry entry : entrys) {
            //开启/关闭事务的实体类型,跳过,只同步响应 增删改成功的操作!
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"   entry.toString(),
                        e);
            }
            //获取操作类型:insert/update/delete类型
            EventType eventType = rowChage.getEventType();
            //打印Header信息
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            //获取RowChange对象里的每一行数据,打印出来
            for (RowData rowData : rowChage.getRowDatasList()) {
                //如果是删除语句
                if (eventType == EventType.DELETE) {
                    //下面的静态方法!
                    printColumn(rowData.getBeforeColumnsList());
                    //如果是新增语句
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                    //如果是更新的语句
                } else {
                    //变更前的数据
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    //遍历打印输出,类信息: 列:值
    private static void printColumn( List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName()   " : "   column.getValue()   "  update="   column.getUpdated());
            //获取变更的列,及值;
            if (column.getName().equalsIgnoreCase("要检测变更的列名")) {
            	//输出结果值;
               System.out.println("列名" column.getValue());
            }
        }
    }
}
启动,并修改数据库任意一条数据!
  • 每个一秒记录一次,程序中定义 1200结束!
  • 除非中间数据库,数据修改,关闭计数重新计数!并输出对应的操作! 而我们只要在微服项目中集成这个 canalDemo 并对其进行小小的更改! 1.注册到Eureka中方便调用模块 3.0-1200计数,建议更改为 死循环。程序不停我不停!! 2.修改Test 在对应的日志更改时处理不同的操作,update时调用,请求 nginx调用 lua 更新同步实时数据!

Canal注意 Boot集成

  • 上面集成的是普通,阿里提供的案例集成Boot当然要存在一些更改! ClientSample类上的注解@Component 使类加载至Spring中,导入对应依赖 主程序要运行改类的实例: ClientSample类中,不用设置指定监听时间,要设置true死循环,程序不停永久监听同步数据
ClientSample.Java 死循环案例
代码语言:javascript复制
@Component
public class ClientSample {

    @Autowired
    private RestTemplate restTemplate;

    public void run() throws Exception {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(".*\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); 				//提交确认
            }
        } finally {
            connector.disconnect();
        }
    }

    private void printEntry(List<CanalEntry.Entry> entrys) throws Exception {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"   entry.toString(),
                        e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName()   " : "   column.getValue()   "  update="   column.getUpdated());
        }
    }
}
主程序 要启动canal 监听

主程序

代码语言:javascript复制
import com.wsm.text.ClientSample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;		//Jdbc配置类
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
//本人这里直接使用了Jdbc进行添加数据,引入的配置类,如果不是直接忽略即可!(exclude = {DataSourceAutoConfiguration.class})	
public class MySearchApp {
    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext run = SpringApplication.run(MySearchApp.class, args);
        //启动监听器,不然不会进行监听!
        ClientSample bean = run.getBean(ClientSample.class);
        bean.run();
    }
}

0 人点赞