add a streaming CID set
used in https://github.com/ipfs/go-ipfs/pull/4804
This commit is contained in:
committed by
Steven Allen
parent
1543f4a136
commit
3655c1cdd4
35
set.go
35
set.go
@@ -1,5 +1,9 @@
|
||||
package cid
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// Set is a implementation of a set of Cids, that is, a structure
|
||||
// to which holds a single copy of every Cids that is added to it.
|
||||
type Set struct {
|
||||
@@ -65,3 +69,34 @@ func (s *Set) ForEach(f func(c *Cid) error) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StreamingSet is an extension of Set which allows to implement back-pressure
|
||||
// for the Visit function
|
||||
type StreamingSet struct {
|
||||
Set *Set
|
||||
New chan *Cid
|
||||
}
|
||||
|
||||
// NewStreamingSet initializes and returns new Set.
|
||||
func NewStreamingSet() *StreamingSet {
|
||||
return &StreamingSet{
|
||||
Set: NewSet(),
|
||||
New: make(chan *Cid),
|
||||
}
|
||||
}
|
||||
|
||||
// Visitor creates new visitor which adds a Cids to the set and emits them to
|
||||
// the set.New channel
|
||||
func (s *StreamingSet) Visitor(ctx context.Context) func(c *Cid) bool {
|
||||
return func(c *Cid) bool {
|
||||
if s.Set.Visit(c) {
|
||||
select {
|
||||
case s.New <- c:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user