接着我们看下writer的实现,writer的核心源码位于writer/single.go,writer的注册方式和存储的注册类似,它注册了一个single的writer
代码语言:javascript复制func init() {
graph.RegisterWriter("single", NewSingleReplication)
代码语言:javascript复制type Single struct {
qs graph.QuadStore
ignoreOpts graph.IgnoreOpts
}
代码语言:javascript复制func NewSingle(qs graph.QuadStore, opts graph.IgnoreOpts) (graph.QuadWriter, error) {
代码语言:javascript复制func NewSingleReplication(qs graph.QuadStore, opts graph.Options) (graph.QuadWriter, error) {
得到writer后就可以往里面添加四元祖,它内部是调用了quad的
ApplyDeltas
代码语言:javascript复制 func (s *Single) AddQuad(q quad.Quad) error {
return s.qs.ApplyDeltas(deltas, s.ignoreOpts)
代码语言:javascript复制func (s *Single) AddQuadSet(set []quad.Quad) error {
tx := graph.NewTransactionN(len(set))
return s.qs.ApplyDeltas(tx.Deltas, s.ignoreOpts)
删除过程类似
代码语言:javascript复制func (s *Single) RemoveQuad(q quad.Quad) error {
代码语言:javascript复制func (s *Single) RemoveNode(v quad.Value) error {
for _, d := range []quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label} {
r := graph.NewResultReader(s.qs, s.qs.QuadIterator(d, gv))
n, err := quad.Copy(del, r)
代码语言:javascript复制func (s *Single) ApplyTransaction(t *graph.Transaction) error {
return s.qs.ApplyDeltas(t.Deltas, s.ignoreOpts)
graph/quadwriter.go主要包含了插入和删除两个操作
代码语言:javascript复制type Procedure int8
代码语言:javascript复制const (
Add Procedure = 1
Delete Procedure = -1
)
一个增量操作包含四元祖和具体动作两个部分
代码语言:javascript复制type Delta struct {
Quad quad.Quad
Action Procedure
}
代码语言:javascript复制func Unwrap(qs QuadStore) QuadStore {
我们创建一个图数据库*Handle的时候,就是初始化了QuadStore和Writer两个属性
代码语言:javascript复制type Handle struct {
QuadStore
QuadWriter
}
代码语言:javascript复制type IgnoreOpts struct {
IgnoreDup, IgnoreMissing bool
}
writer约定了一个接口,具体如下:
代码语言:javascript复制type QuadWriter interface {
// AddQuad adds a quad to the store.
AddQuad(quad.Quad) error
// TODO(barakmich): Deprecate in favor of transaction.
// AddQuadSet adds a set of quads to the store, atomically if possible.
AddQuadSet([]quad.Quad) error
// RemoveQuad removes a quad matching the given one from the database,
// if it exists. Does nothing otherwise.
RemoveQuad(quad.Quad) error
// ApplyTransaction applies a set of quad changes.
ApplyTransaction(*Transaction) error
// RemoveNode removes all quads which have the given node as subject, predicate, object, or label.
//
// It returns ErrNodeNotExists if node is missing.
RemoveNode(quad.Value) error
// Close cleans up replication and closes the writing aspect of the database.
Close() error
}
代码语言:javascript复制type NewQuadWriterFunc func(QuadStore, Options) (QuadWriter, error)
代码语言:javascript复制var writerRegistry = make(map[string]NewQuadWriterFunc)
注册函数定义如下:
代码语言:javascript复制func RegisterWriter(name string, newFunc NewQuadWriterFunc) {
if _, found := writerRegistry[name]; found {
panic("already registered QuadWriter " name)
}
writerRegistry[name] = newFunc
}
代码语言:javascript复制func NewQuadWriter(name string, qs QuadStore, opts Options) (QuadWriter, error) {
newFunc, hasNew := writerRegistry[name]
if !hasNew {
return nil, errors.New("replication: name '" name "' is not registered")
}
return newFunc(qs, opts)
}
代码语言:javascript复制func WriterMethods() []string {
代码语言:javascript复制type BatchWriter interface {
quad.WriteCloser
Flush() error
}
代码语言:javascript复制func NewWriter(qs QuadWriter) BatchWriter {
代码语言:javascript复制type batchWriter struct {
qs QuadWriter
buf []quad.Quad
}
代码语言:javascript复制func (w *batchWriter) flushBuffer(force bool) error {
func (w *batchWriter) WriteQuad(q quad.Quad) error {
代码语言:javascript复制func NewTxWriter(tx *Transaction, p Procedure) quad.Writer {
代码语言:javascript复制type txWriter struct {
tx *Transaction
p Procedure
}
代码语言:javascript复制func NewRemover(qs QuadWriter) BatchWriter {
代码语言:javascript复制type removeWriter struct {
qs QuadWriter
}
代码语言:javascript复制func (w *removeWriter) WriteQuad(q quad.Quad) error {
return w.qs.RemoveQuad(q)
代码语言:javascript复制func NewQuadStoreReader(qs QuadStore) quad.ReadSkipCloser {
代码语言:javascript复制func NewResultReader(qs QuadStore, it Iterator) quad.ReadSkipCloser {
和Writer对应,Reader包含了QuadStore和迭代器
代码语言:javascript复制type quadReader struct {
qs QuadStore
it Iterator
}
代码语言:javascript复制func (r *quadReader) ReadQuad() (quad.Quad, error) {
if r.it.Next(context.TODO()) {
return r.qs.Quad(r.it.Result()), nil
imports.go
代码语言:javascript复制var (
StartMorphism = path.StartMorphism
StartPath = path.StartPath
NewTransaction = graph.NewTransaction
)
笔记哦核心的几个变量类型
代码语言:javascript复制type Iterator = graph.Iterator
type QuadStore = graph.QuadStore
type QuadWriter = graph.QuadWriter
代码语言:javascript复制type Path = path.Path
代码语言:javascript复制type Handle struct {
graph.QuadStore
graph.QuadWriter
}
三元祖其实就是四元祖去掉label
代码语言:javascript复制func Triple(subject, predicate, object interface{}) quad.Quad {
return Quad(subject, predicate, object, nil)
代码语言:javascript复制func Quad(subject, predicate, object, label interface{}) quad.Quad {
return quad.Make(subject, predicate, object, label)
初始化图数据库的时候,指定的writer就single
代码语言:javascript复制func NewGraph(name, dbpath string, opts graph.Options) (*Handle, error) {
qs, err := graph.NewQuadStore(name, dbpath, opts)
qw, err := graph.NewQuadWriter("single", qs, nil)
代码语言:javascript复制func NewMemoryGraph() (*Handle, error) {
return NewGraph("memstore", "", nil)
server/http/common.go里面提供了基于http服务的操作
代码语言:javascript复制func jsonResponse(w http.ResponseWriter, code int, err interface{}) {
data, _ := json.Marshal(s)
w.Write(data)
代码语言:javascript复制func HandleForRequest(h *graph.Handle, wtyp string, wopt graph.Options, r *http.Request) (*graph.Handle, error) {
qs, err := g.ForRequest(r)
qw, err := graph.NewQuadWriter(wtyp, qs, wopt)
return &graph.Handle{QuadStore: qs, QuadWriter: qw}, nil
appengine/appengine.go
代码语言:javascript复制func open(cfg *Config) (*graph.Handle, error) {
qs, err := graph.NewQuadStore(cfg.DatabaseType, cfg.DatabasePath, cfg.DatabaseOptions)
qw, err := graph.NewQuadWriter(cfg.ReplicationType, qs, cfg.ReplicationOptions)
return &graph.Handle{QuadStore: qs, QuadWriter: qw}, nil
graph/registry.go
代码语言:javascript复制func NewQuadStore(name string, dbpath string, opts Options) (QuadStore, error) {
r, registered := storeRegistry[name]
return r.NewFunc(dbpath, opts)