From 2a939ef85fc1938b0963c5a21514c5ae7af9ada3 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Tue, 5 Nov 2013 18:26:07 -0500 Subject: [PATCH 1/3] lock around read operations in graph Writes and reads will fail with ErrBusy if there's concurrent reads or writes, respectively. It is not sufficient to only lock around writes. Upstream-commit: 1dc34e2b965253a62d52f84a0f548334d4d6aa9d Component: engine --- components/engine/gograph/gograph.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/components/engine/gograph/gograph.go b/components/engine/gograph/gograph.go index 32b3a491bb..2b25464344 100644 --- a/components/engine/gograph/gograph.go +++ b/components/engine/gograph/gograph.go @@ -138,7 +138,14 @@ func (db *Database) Set(fullPath, id string) (*Entity, error) { // Return true if a name already exists in the database func (db *Database) Exists(name string) bool { - return db.Get(name) != nil + db.mux.Lock() + defer db.mux.Unlock() + + e, err := db.get(name) + if err != nil { + return false + } + return e != nil } func (db *Database) setEdge(parentPath, name string, e *Entity) error { @@ -165,6 +172,9 @@ func (db *Database) RootEntity() *Entity { // Return the entity for a given path func (db *Database) Get(name string) *Entity { + db.mux.Lock() + defer db.mux.Unlock() + e, err := db.get(name) if err != nil { return nil @@ -200,6 +210,9 @@ func (db *Database) get(name string) (*Entity, error) { // List all entities by from the name // The key will be the full path of the entity func (db *Database) List(name string, depth int) Entities { + db.mux.Lock() + defer db.mux.Unlock() + out := Entities{} e, err := db.get(name) if err != nil { @@ -212,6 +225,9 @@ func (db *Database) List(name string, depth int) Entities { } func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error { + db.mux.Lock() + defer db.mux.Unlock() + e, err := db.get(name) if err != nil { return err @@ -226,6 +242,9 @@ func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error { // Return the refrence count for a specified id func (db *Database) Refs(id string) int { + db.mux.Lock() + defer db.mux.Unlock() + var count int if err := db.conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil { return 0 @@ -235,6 +254,9 @@ func (db *Database) Refs(id string) int { // Return all the id's path references func (db *Database) RefPaths(id string) Edges { + db.mux.Lock() + defer db.mux.Unlock() + refs := Edges{} rows, err := db.conn.Query("SELECT name, parent_id FROM edge WHERE entity_id = ?;", id) From b581ea208c28d7b2b8766c7d010661d78dd69e76 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Tue, 5 Nov 2013 22:07:14 -0500 Subject: [PATCH 2/3] gograph: Use RWMutex to allow concurrent readers Upstream-commit: 04aca7c9e3edddc57a1fbc11e57e6c62e6847126 Component: engine --- components/engine/gograph/gograph.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/components/engine/gograph/gograph.go b/components/engine/gograph/gograph.go index 2b25464344..626bf53ed8 100644 --- a/components/engine/gograph/gograph.go +++ b/components/engine/gograph/gograph.go @@ -48,7 +48,7 @@ type WalkFunc func(fullPath string, entity *Entity) error // Graph database for storing entities and their relationships type Database struct { conn *sql.DB - mux sync.Mutex + mux sync.RWMutex } // Create a new graph database initialized with a root entity @@ -138,8 +138,8 @@ func (db *Database) Set(fullPath, id string) (*Entity, error) { // Return true if a name already exists in the database func (db *Database) Exists(name string) bool { - db.mux.Lock() - defer db.mux.Unlock() + db.mux.RLock() + defer db.mux.RUnlock() e, err := db.get(name) if err != nil { @@ -172,8 +172,8 @@ func (db *Database) RootEntity() *Entity { // Return the entity for a given path func (db *Database) Get(name string) *Entity { - db.mux.Lock() - defer db.mux.Unlock() + db.mux.RLock() + defer db.mux.RUnlock() e, err := db.get(name) if err != nil { @@ -210,8 +210,8 @@ func (db *Database) get(name string) (*Entity, error) { // List all entities by from the name // The key will be the full path of the entity func (db *Database) List(name string, depth int) Entities { - db.mux.Lock() - defer db.mux.Unlock() + db.mux.RLock() + defer db.mux.RUnlock() out := Entities{} e, err := db.get(name) @@ -225,8 +225,8 @@ func (db *Database) List(name string, depth int) Entities { } func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error { - db.mux.Lock() - defer db.mux.Unlock() + db.mux.RLock() + defer db.mux.RUnlock() e, err := db.get(name) if err != nil { @@ -242,8 +242,8 @@ func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error { // Return the refrence count for a specified id func (db *Database) Refs(id string) int { - db.mux.Lock() - defer db.mux.Unlock() + db.mux.RLock() + defer db.mux.RUnlock() var count int if err := db.conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil { @@ -254,8 +254,8 @@ func (db *Database) Refs(id string) int { // Return all the id's path references func (db *Database) RefPaths(id string) Edges { - db.mux.Lock() - defer db.mux.Unlock() + db.mux.RLock() + defer db.mux.RUnlock() refs := Edges{} From f530e76f7fb6c14225e6d2c97c2a7835055247a6 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Tue, 5 Nov 2013 22:15:41 -0500 Subject: [PATCH 3/3] gograph: allow Walk() reentrance Hold the read lock while reading the child graph, then walk over the children without any lock, in order to avoid deadlock. Upstream-commit: 20881f1f787e093a6f3e9360624d8b436b9f7379 Component: engine --- components/engine/gograph/gograph.go | 101 +++++++++++++++------------ 1 file changed, 58 insertions(+), 43 deletions(-) diff --git a/components/engine/gograph/gograph.go b/components/engine/gograph/gograph.go index 626bf53ed8..aa6a4126a0 100644 --- a/components/engine/gograph/gograph.go +++ b/components/engine/gograph/gograph.go @@ -218,21 +218,28 @@ func (db *Database) List(name string, depth int) Entities { if err != nil { return out } - for c := range db.children(e, name, depth) { + + children, err := db.children(e, name, depth, nil) + if err != nil { + return out + } + + for _, c := range children { out[c.FullPath] = c.Entity } return out } +// Walk through the child graph of an entity, calling walkFunc for each child entity. +// It is safe for walkFunc to call graph functions. func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error { - db.mux.RLock() - defer db.mux.RUnlock() - - e, err := db.get(name) + children, err := db.Children(name, depth) if err != nil { return err } - for c := range db.children(e, name, depth) { + + // Note: the database lock must not be held while calling walkFunc + for _, c := range children { if err := walkFunc(c.FullPath, c.Entity); err != nil { return err } @@ -240,6 +247,19 @@ func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error { return nil } +// Return the children of the specified entity +func (db *Database) Children(name string, depth int) ([]WalkMeta, error) { + db.mux.RLock() + defer db.mux.RUnlock() + + e, err := db.get(name) + if err != nil { + return nil, err + } + + return db.children(e, name, depth, nil) +} + // Return the refrence count for a specified id func (db *Database) Refs(id string) int { db.mux.RLock() @@ -378,56 +398,51 @@ type WalkMeta struct { Edge *Edge } -func (db *Database) children(e *Entity, name string, depth int) <-chan WalkMeta { - out := make(chan WalkMeta) +func (db *Database) children(e *Entity, name string, depth int, entities []WalkMeta) ([]WalkMeta, error) { if e == nil { - close(out) - return out + return entities, nil } - go func() { - rows, err := db.conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id) - if err != nil { - close(out) + rows, err := db.conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var entityId, entityName string + if err := rows.Scan(&entityId, &entityName); err != nil { + return nil, err + } + child := &Entity{entityId} + edge := &Edge{ + ParentID: e.id, + Name: entityName, + EntityID: child.id, } - defer rows.Close() - for rows.Next() { - var entityId, entityName string - if err := rows.Scan(&entityId, &entityName); err != nil { - // Log error - continue - } - child := &Entity{entityId} - edge := &Edge{ - ParentID: e.id, - Name: entityName, - EntityID: child.id, - } + meta := WalkMeta{ + Parent: e, + Entity: child, + FullPath: path.Join(name, edge.Name), + Edge: edge, + } - meta := WalkMeta{ - Parent: e, - Entity: child, - FullPath: path.Join(name, edge.Name), - Edge: edge, - } + entities = append(entities, meta) - out <- meta - if depth == 0 { - continue - } + if depth != 0 { nDepth := depth if depth != -1 { nDepth -= 1 } - sc := db.children(child, meta.FullPath, nDepth) - for c := range sc { - out <- c + entities, err = db.children(child, meta.FullPath, nDepth, entities) + if err != nil { + return nil, err } } - close(out) - }() - return out + } + + return entities, nil } // Return the entity based on the parent path and name