Go短网址项目实战---下
- 添加协程
- 完整代码
- 用 JSON 持久化存储
- 分布式程序
- 使用代理缓存
- 带缓存的 ProxyStore
- 汇总
- 总结
- 项目完整源码
添加协程
如果有太多客户端同时尝试添加 URL,目前版本依旧存在性能问题。得益于锁机制,我们的 map 可以在并发访问环境下安全地更新,但每条新产生的记录都要立即写入磁盘,这种机制成为了瓶颈。写入操作可能同时发生,根据不同操作系统的特性,可能会产生数据损坏。就算不产生写入冲突,每个客户端在 Put 函数返回前,必须等待数据写入磁盘。因此,在一个 I/O 负载很高的系统中,客户端为了完成 Add 请求,将等待更长的不必要的时间。
为缓解该问题,必须对 Put 和存储进程解耦:我们将使用 Go 的并发机制。我们不再将记录直接写入磁盘,而是发送到一个通道中,它是某种形式的缓冲区,因而发送函数不必等待它完成。
保存进程会从该通道读取数据并写入磁盘。它是以 saveLoop 协程启动的独立线程。现在 main 和 saveLoop 并行地执行,不会再发生阻塞。
将 FileStore 的 file 字段替换为 record 类型的通道:save chan record。
代码语言:javascript复制type FileStore struct {
*RamStore
save chan record
}
通道和 map 一样,必须用 make 创建。我们会以此修改 NewFileStore 工厂函数,并给定缓冲区大小为1000,例如:save := make(chan record, saveQueueLength)。为解决性能问题,Put 可以发送记录 record 到带缓冲的 save 通道:
代码语言:javascript复制func (s *FileStore) Put(url string) string {
for {
key := s.genKey(s.count())
if s.set(key, url) {
s.save <- record{key, url}
return key
}
}
}
save 通道的另一端必须有一个接收者:新的 saveLoop 方法在独立的协程中运行,它接收 record 值并将它们写入到文件。saveLoop 是在 NewFileStore() 函数中用 go 关键字启动的。现在,可以移除不必要的打开文件的代码。以下是修改后的 NewFileStore():
代码语言:javascript复制func NewFileStore(filename string) *FileStore {
fileStore := &FileStore{RamStore: NewRamStore(), save: make(chan record, saveQueueLength)}
//从磁盘读取映射到内存
if err := fileStore.load(filename); err != nil {
log.Println("error loading data in fileStore: ", err)
}
//单独的持久化协程
go fileStore.saveLoop(filename)
return fileStore
}
以下是 saveLoop 方法的代码:
代码语言:javascript复制func (s *FileStore) saveLoop(filename string) {
var f, err = os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
log.Fatal("FileStore:", err)
}
defer f.Close()
e := gob.NewEncoder(f)
for {
// taking a record from the channel and encoding it
r := <-s.save
if err := e.Encode(r); err != nil {
log.Println("FileStore:", err)
}
}
}
在无限循环中,记录从 save 通道读取,然后编码到文件中。
还有一个改进可以使 goto 更灵活:我们可以将文件名、监听地址和主机名定义为标志(flag),来代替在程序中硬编码或定义常量。这样当程序启动时,可以在命令行中指定它们的新值,如果没有指定,将采用 flag 的默认值。该功能来自另一个包,所以需要 import “flag”
先创建一些全局变量来保存 flag 的值:
代码语言:javascript复制var (
listenAddr = flag.String("http", ":8080", "http listen address")
dataFile = flag.String("file", "store.gob", "data store file name")
hostname = flag.String("host", "localhost:8080", "host name and port")
)
为了处理命令行参数,必须把 flag.Parse() 添加到 main 函数中,在 flag 解析后才能实例化 FileStore,一旦得知了 dataFile 的值。
代码语言:javascript复制var store *URLStore
func main() {
flag.Parse()
store = NewFileStore(*dataFile)
http.HandleFunc("/", Redirect)
http.HandleFunc("/add", Add)
http.ListenAndServe(*listenAddr, nil)
}
现在 Add 处理函数中须用 *hostname 替换 localhost:8080:
代码语言:javascript复制fmt.Fprintf(w, "http://%s/%s", *hostname, key)
完整代码
修改后的FileStore完整代码:
代码语言:javascript复制package dao
import (
"encoding/json"
"io"
"log"
"os"
)
const saveQueueLength = 1000
type FileStore struct {
*RamStore
save chan record
}
type record struct {
Key, URL string
}
func NewFileStore(filename string) *FileStore {
fileStore := &FileStore{RamStore: NewRamStore(), save: make(chan record, saveQueueLength)}
//从磁盘读取映射到内存
if err := fileStore.load(filename); err != nil {
log.Println("error loading data in fileStore: ", err)
}
//单独的持久化协程
go fileStore.saveLoop(filename)
return fileStore
}
func (s *FileStore) load(filename string) error {
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatal("FileStore:", err)
}
if _, err := file.Seek(0, 0); err != nil {
return err
}
d := json.NewDecoder(file)
for err == nil {
var r record
if err = d.Decode(&r); err == nil {
s.set(r.Key, r.URL)
}
}
if err == io.EOF {
return nil
}
return err
}
func (s *FileStore) Put(url string) string {
for {
key := s.genKey(s.count())
if s.set(key, url) {
s.save <- record{key, url}
return key
}
}
}
func (s *FileStore) saveLoop(filename string) {
var f, err = os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatal("FileStore:", err)
}
defer f.Close()
e := json.NewEncoder(f)
for {
// taking a record from the channel and encoding it
r := <-s.save
if err := e.Encode(r); err != nil {
log.Println("FileStore:", err)
}
}
}
修改后的server完整代码:
代码语言:javascript复制package server
import (
"LessUrl/dao"
"flag"
"fmt"
"net/http"
)
const AddForm = `
<form method="POST" action="/add">
URL: <input type="text" name="url">
<input type="submit" value="Add">
</form>
`
var (
listenAddr = flag.String("http", ":8080", "http listen address")
dataFile = flag.String("file", "store.gob", "data store file name")
hostname = flag.String("host", "localhost:8080", "host name and port")
)
//默认为内存存储
var store *dao.FileStore
func Start() {
flag.Parse()
store = dao.NewFileStore(*dataFile)
http.HandleFunc("/", redirect)
http.HandleFunc("/add", add)
http.ListenAndServe(*listenAddr, nil)
}
func add(w http.ResponseWriter, r *http.Request) {
url := r.FormValue("url")
if url == "" {
w.Header().Set("Content-Type", "text/html")
fmt.Fprint(w, AddForm)
return
}
key := store.Put(url)
fmt.Fprintf(w, "http://%s/%s", *hostname, key)
}
func redirect(w http.ResponseWriter, r *http.Request) {
key := r.URL.Path[1:]
url := store.Get(key)
if url == "" {
http.NotFound(w, r)
return
}
http.Redirect(w, r, url, http.StatusFound)
}
用 JSON 持久化存储
如果你是个敏锐的测试者也许已经注意到了,当 goto 程序启动 2 次,第 2 次启动后能读取短 URL 且完美地工作。然而从第 3 次开始,会得到错误:
代码语言:javascript复制Error loading URLStore: extra data in buffer
这是由于 gob 是基于流的协议,它不支持重新开始。为补救该问题,这里我们使用 json 作为存储协议。
它以纯文本形式存储数据,因此也可以被非 Go 语言编写的进程读取。同时也显示了更换一种不同的持久化协议是多么简单,因为与存储打交道的代码被清晰地隔离在 2 个方法中,即 load 和 saveLoop。
从创建新的空文件 store.json 开始,更改 main.go 中声明文件名变量的那一行:
代码语言:javascript复制var dataFile = flag.String("file", "store.json", "data store file name")
在 store.go 中导入 json 取代 gob。然后在 saveLoop 中唯一需要被修改的行:
代码语言:javascript复制e := gob.NewEncoder(f)
更改为:
代码语言:javascript复制e := json.NewEncoder(f)
类似的,在 load 方法中:
代码语言:javascript复制d := gob.NewDecoder(f)
修改为:
代码语言:javascript复制d := json.NewDecoder(f)
这就是所有要改动的地方!编译,启动并测试,你会发现之前的错误不会再发生了。
如果是在win上编写的go代码,想要在linux运行,只需要在编译前,将GOOS环境变量设置为linux即可
分布式程序
目前为止 goto 以单线程运行,但即使用协程,在一台机器上运行的单一进程,也只能为一定数量的并发请求提供服务。一个缩短网址服务,相对于 Add(用 Put() 写入),通常 Redirect 服务(用 Get() 读取)要多得多。因此我们应该可以创建任意数量的只读的从(slave)服务器,提供服务并缓存 Get 方法调用的结果,将 Put 请求转发给主(master)服务器,类似如下架构:
对于 slave 进程,要在网络上运行 goto 应用的一个 master 节点实例,它们必须能相互通信。Go 的 rpc 包为跨越网络发起函数调用提供了便捷的途径。这里将把 FileStore 变为 RPC 服务。
slave 进程将应对 Get 请求以交付长 URL。当一个长 URL 要被转换为缩短版本(使用 Put 方法)时,它们通过 rpc 连接把任务委托给 master 进程,因此只有 master 节点会写入数据文件。
截至目前 FileStore 上基本的 Get() 和 Put() 方法具有如下签名:
代码语言:javascript复制func (s *RamStore) Get(smallUrl string) string
func (s *FileStore) Put(longUrl string) string
而 RPC 调用仅能使用如下形式的方法(t 是 T 类型的值):
代码语言:javascript复制func (t T) Name(args *ArgType, reply *ReplyType) error
要使 FileStore 成为 RPC 服务,需要修改 Put 和 Get 方法使它们符合上述函数签名。以下是修改后的签名:
代码语言:javascript复制func (s *FileStore) Put(longUrl, smallUrl *string) error
func (s *RamStore) Get(smallUrl, longUrl *string) error
Get() 代码变更为:
代码语言:javascript复制func (s *RamStore) Get(smallUrl, longUrl *string) error {
s.mu.RLock()
defer s.mu.RUnlock()
*longUrl = s.urls[*smallUrl]
return nil
}
Put() 代码做同样的改动:
代码语言:javascript复制func (s *FileStore) Put(longUrl, smallUrl *string) error {
for {
*smallUrl = s.genKey(s.count())
if s.set(*smallUrl, *longUrl) {
s.save <- record{*smallUrl, *longUrl}
return nil
}
}
}
还必须修改 HTTP 处理函数以适应 FileStore 上的更改。Redirect 处理函数现在返回 FileStore 给出错误的字符串形式:
代码语言:javascript复制func redirect(w http.ResponseWriter, r *http.Request) {
key := r.URL.Path[1:]
var url string
if err := store.Get(&key, &url); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
http.Redirect(w, r, url, http.StatusFound)
}
Add 处理函数也以基本相同的方式修改:
代码语言:javascript复制func add(w http.ResponseWriter, r *http.Request) {
url := r.FormValue("url")
if url == "" {
w.Header().Set("Content-Type", "text/html")
fmt.Fprint(w, AddForm)
return
}
var key string
if err:=store.Put(&url,&key);err!=nil{
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "http://%s/%s", *hostname, key)
}
要使应用程序更灵活,正如之前章节所为,可以添加一个命令行标志(flag)来决定是否在 main() 函数中启用 RPC 服务器:
代码语言:javascript复制var rpcEnabled = flag.Bool("rpc", false, "enable RPC server")
要使 RPC 工作,还要用 rpc 包来注册 FileStore,并用 HandleHTTP 创建基于 HTTP 的 RPC 处理器:
代码语言:javascript复制func Start() {
flag.Parse()
store = dao.NewFileStore(*dataFile)
if *rpcEnabled {
rpc.RegisterName("FileStore", store)
rpc.HandleHTTP()
}
http.HandleFunc("/", redirect)
http.HandleFunc("/add", add)
http.ListenAndServe(*listenAddr, nil)
}
使用代理缓存
FileStore 已经成为了有效的 RPC 服务,现在可以创建另一种代表 RPC 客户端的类型,它会转发请求到 RPC 服务器,我们称它为 ProxyStore。
代码语言:javascript复制type ProxyStore struct {
client *rpc.Client
}
一个 RPC 客户端必须使用 DialHTTP() 方法连接到服务器,所以我们把这句加入 NewProxyStore 函数,它用于创建 ProxyStore 对象。
代码语言:javascript复制func NewProxyStore(addr string) *ProxyStore {
client, err := rpc.DialHTTP("tcp", addr)
if err != nil {
log.Println("Error constructing ProxyStore:", err)
}
return &ProxyStore{client: client}
}
ProxyStore 有 Get 和 Put 方法,它们利用 RPC 客户端的 Call 方法,将请求直接传递给服务器:
代码语言:javascript复制func (s *ProxyStore) Get(key, url *string) error {
return s.client.Call("Store.Get", key, url)
}
func (s *ProxyStore) Put(url, key *string) error {
return s.client.Call("Store.Put", url, key)
}
带缓存的 ProxyStore
可是,如果 slave 进程只是简单地代理所有的工作到 master 节点,不会得到任何增益!我们打算用 slave 节点来应对 Get 请求。要做到这点,它们必须有 FileStore 中 map 的一份副本(缓存)。因此我们对 ProxyStore 的定义进行扩展,将 FileStore 包含在其中:
代码语言:javascript复制type ProxyStore struct {
fileStore *FileStore
client *rpc.Client
}
NewProxyStore 也必须做修改:
代码语言:javascript复制func NewProxyStore(addr string) *ProxyStore {
client, err := rpc.DialHTTP("tcp", addr)
if err != nil {
log.Println("ProxyStore:", err)
}
return &ProxyStore{urls: NewFileStore(""), client: client}
}
还必须修改 NewFileStore 以便给出空文件名时,不会尝试从磁盘写入或读取文件:
代码语言:javascript复制func NewFileStore(filename string) *FileStore {
fileStore := &FileStore{RamStore: NewRamStore(), save: make(chan record, saveQueueLength)}
//从磁盘读取映射到内存
if filename!=""{
if err := fileStore.load(filename); err != nil {
log.Println("error loading data in fileStore: ", err)
}
//单独的持久化协程
go fileStore.saveLoop(filename)
}
return fileStore
}
ProxyStore 的 Get 方法需要扩展:它应该首先检查缓存中是否有对应的键。如果有,Get 返回已缓存的结果。否则,应该发起 RPC 调用,然后用返回结果更新其本地缓存:
代码语言:javascript复制func (s *ProxyStore) Get(key, url *string) error {
if err := s.urls.Get(key, url); err == nil { // url found in local map
return nil
}
// url not found in local map, make rpc-call:
if err := s.client.Call("Store.Get", key, url); err != nil {
return err
}
s.urls.Set(key, url)
return nil
}
同样地,Put 方法仅当成功完成了远程 RPC Put 调用,才更新本地缓存:
代码语言:javascript复制func (s *ProxyStore) Put(url, key *string) error {
if err := s.client.Call("Store.Put", url, key); err != nil {
return err
}
s.urls.Set(key, url)
return nil
}
汇总
slave 节点使用 ProxyStore,只有 master 使用 FileStore。有鉴于创造它们的方式,它们看上去十分一致:两者都实现了相同签名的 Get 和 Put 方法,因此我们可以指定一个 Store 接口来概括它们的行为:
代码语言:javascript复制type Store interface {
Put(url, key *string) error
Get(key, url *string) error
}
现在全局变量 store 可以成为 Store 类型:
代码语言:javascript复制var store Store
最后,我们改写 main() 函数以便程序只作为 master 或 slave 启动(我们只能这么做,因为现在 store 是 Store 接口类型!)。
为此我们添加一个没有默认值的新命令行标志 masterAddr。
代码语言:javascript复制var masterAddr = flag.String("master", "", "RPC master address")
如果给出 master 地址,就启动一个 slave 进程并创建新的 ProxyStore;否则启动 master 进程并创建新的 FileStore:
代码语言:javascript复制func main() {
flag.Parse()
if *masterAddr != "" { // we are a slave
store = NewProxyStore(*masterAddr)
} else { // we are the master
store = NewFileStore(*dataFile)
}
...
}
这样,我们已启用了 ProxyStore 作为 web 前端,以代替 FileStore。
其余的前端代码继续和之前一样地工作,它们不必在意 Store 接口。只有 master 进程会写数据文件。
现在可以加载一个 master 节点和数个 slave 节点,对 slave 进行压力测试。
总结
通过逐步构建 goto 应用程序,我们遇到了几乎所有的 Go 语言特性。
虽然这个程序按照我们的目标行事,仍然有一些可改进的途径:
- 审美:用户界面可以(极大地)美化。为此可以使用 Go 的 template 包。
- 可靠性:master/slave 之间的 RPC 连接应该可以更可靠:如果客户端到服务器之间的连接中断,客户端应该尝试重连。用一个“dialer” 协程可以达成。
- 资源减负:由于 URL 数据库大小不断增长,内存占用可能会成为一个问题。可以通过多台 master 服务器按照键分片来解决。
- 删除:要支持删除短 URL,master 和 slave 之间的交互将变得更复杂。
项目完整源码
- 顶层Store接口
type Store interface {
//Get 通过短URL得到长URL---用于重定向
Get(smallUrl, longUrl *string) error
//Put 传入长URL生成短URL
Put(url, key *string) error
}
- 实现类一FileStore
package dao
import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
)
//缓冲通道的最大长度
const saveQueueLength = 1000
type FileStore struct {
//存储映射关系的集合
urls map[string]string
//锁---不需要额外声明初始化
mu sync.RWMutex
//持久化通道
save chan record
}
//描述映射关系的对象
type record struct {
Key, URL string
}
func NewFileStore(filename string) *FileStore {
fileStore := &FileStore{urls: make(map[string]string), save: make(chan record, saveQueueLength)}
//通过文件名是否为空,判断是否是主线程,主线程才负责持久化
if filename != "" {
//从磁盘读取映射到内存
if err := fileStore.load(filename); err != nil {
log.Println("error loading data in fileStore: ", err)
}
//单独的持久化协程
go fileStore.saveLoop(filename)
}
return fileStore
}
func (s *FileStore) Get(key, url *string) error {
s.mu.RLock()
defer s.mu.RUnlock()
*url = s.urls[*key]
fmt.Printf("根据 key=%s ,查询到的url=%sn", *key, *url)
return nil
}
//保存映射关系
func (s *FileStore) Put(url, key *string) error {
for {
*key = s.genKey(s.count())
fmt.Printf("保存映射关系: key= %s , url= %s n", *key, *url)
if s.Set(*key, *url) {
s.save <- record{*key, *url}
return nil
}
}
}
//此磁盘加载映射数据到内存
func (s *FileStore) load(filename string) error {
fmt.Printf("从[%s]文件加载映射数据n", filename)
file, err := openFile(filename)
//文件读指针置位
if _, err := file.Seek(0, 0); err != nil {
return err
}
//使用JSON解码器进行读取
d := json.NewDecoder(file)
//读取文件,直到读取完毕
for err == nil {
var r record
if err = d.Decode(&r); err == nil {
s.Set(r.Key, r.URL)
}
}
if err == io.EOF {
return nil
}
fmt.Println("数据加载完毕")
return err
}
func openFile(filename string) (*os.File, error) {
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatal("FileStore:", err)
}
return file, err
}
func (s *FileStore) saveLoop(filename string) {
f, _ := openFile(filename)
defer f.Close()
e := json.NewEncoder(f)
for {
// taking a record from the channel and encoding it
r := <-s.save
fmt.Printf("持久化映射中, key=%s,url=%sn", r.Key, r.URL)
if err := e.Encode(r); err != nil {
log.Println("FileStore:", err)
}
}
}
func (s *FileStore) Set(smallUrl, longUrl string) bool {
s.mu.Lock()
defer s.mu.Unlock()
_, present := s.urls[smallUrl]
if present {
s.mu.Unlock()
return false
}
s.urls[smallUrl] = longUrl
return true
}
func (s *FileStore) count() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.urls)
}
func (s *FileStore) genKey(key int) string {
return strconv.Itoa(key)
}
- 实现类二 ProxyStore
package dao
import (
"errors"
"log"
"net/rpc"
)
type ProxyStore struct {
fileStore *FileStore
client *rpc.Client
}
func NewProxyStore(addr string) *ProxyStore {
client, err := rpc.DialHTTP("tcp", addr)
if err != nil {
log.Println("Error constructing ProxyStore:", err)
}
return &ProxyStore{fileStore: NewFileStore(""), client: client}
}
func (s *ProxyStore) Get(key, url *string) error {
//本地缓存有,直接返回
if err := s.fileStore.Get(key, url); err == nil {
return nil
}
//远程调用,尝试获取
s.client.Call("Store.Get", key, url)
if url == nil {
return errors.New("url not found")
}
s.fileStore.Set(*key, *url)
return nil
}
func (s *ProxyStore) Put(url, key *string) error {
//交给master节点进行文件缓存
if err := s.client.Call("Store.Put", url, key); err != nil {
return err
}
//本地缓存
s.fileStore.Set(*key, *url)
return nil
}
- server.go
package server
import (
"LessUrl/dao"
"flag"
"fmt"
"net/http"
"net/rpc"
)
const AddForm = `
<form method="POST" action="/add">
URL: <input type="text" name="url">
<input type="submit" value="Add">
</form>
`
var (
listenAddr = flag.String("http", ":8080", "http listen address")
dataFile = flag.String("file", "store.json", "data store file name")
hostname = flag.String("host", "110.40.155.17:8080", "host name and port")
rpcEnabled = flag.Bool("rpc", false, "enable RPC server")
masterAddr = flag.String("master", "", "RPC master address")
)
//注意接口已经是指针了,这一点和结构体不同
var store dao.Store
func Start() {
flag.Parse()
if *masterAddr != "" { // we are a slave
store = dao.NewProxyStore(*masterAddr)
} else { // we are the master
store = dao.NewFileStore(*dataFile)
}
openRpc()
http.HandleFunc("/", redirect)
http.HandleFunc("/add", add)
http.ListenAndServe(*listenAddr, nil)
}
func openRpc() {
if *rpcEnabled {
fmt.Println("开启RPC远程调用")
rpc.RegisterName("FileStore", store)
rpc.HandleHTTP()
}
}
func add(w http.ResponseWriter, r *http.Request) {
url := r.FormValue("url")
if url == "" {
w.Header().Set("Content-Type", "text/html")
fmt.Fprint(w, AddForm)
return
}
var key string
if err := store.Put(&url, &key); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "http://%s/%s", *hostname, key)
}
func redirect(w http.ResponseWriter, r *http.Request) {
key := r.URL.Path[1:]
var url string
if err := store.Get(&key, &url); err != nil || url == "" {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
http.Redirect(w, r, url, http.StatusFound)
}
- main.go
package main
import "LessUrl/server"
func main() {
server.Start()
}
Gitee仓库链接如下:
https://gitee.com/DaHuYuXiXi/go-to/