-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathocr_parallel_find.go
126 lines (112 loc) · 2.65 KB
/
ocr_parallel_find.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package lookup
import (
"image"
"sync"
)
// Search for all symbols in the image in parallel. Uses a Fan-out/fan-in approach.
func findAllInParallel(numWorkers int, symbols []*fontSymbol, img *imageBinary, threshold float64, rect image.Rectangle) ([]*fontSymbolLookup, error) {
f := ¶llelFinder{
numWorkers: max(numWorkers, 1),
symbols: symbols,
img: img,
threshold: threshold,
rect: rect,
}
return f.lookupAll()
}
type parallelFinder struct {
img *imageBinary
threshold float64
numWorkers int
symbols []*fontSymbol
rect image.Rectangle
}
type lookupResult struct {
l *fontSymbolLookup
err error
}
func (f *parallelFinder) prepare(done <-chan struct{}) <-chan *fontSymbol {
out := make(chan *fontSymbol)
go func() {
defer close(out)
for _, s := range f.symbols {
select {
case out <- s:
case <-done:
return
}
}
}()
return out
}
func (f *parallelFinder) addWorker(done <-chan struct{}, in <-chan *fontSymbol) <-chan lookupResult {
out := make(chan lookupResult)
go func() {
defer close(out)
for symbol := range in {
pp, err := lookupAll(f.img, f.rect.Min.X, f.rect.Min.Y, f.rect.Max.X, f.rect.Max.Y, symbol.image, f.threshold)
if err != nil {
out <- lookupResult{nil, err}
continue
}
if pp != nil {
for _, p := range pp {
l := newFontSymbolLookup(symbol, p.X, p.Y, p.G)
select {
case out <- lookupResult{l, nil}:
case <-done:
return
}
}
}
}
}()
return out
}
func (f *parallelFinder) merge(done chan struct{}, cs []<-chan lookupResult) <-chan lookupResult {
var wg sync.WaitGroup
out := make(chan lookupResult)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan lookupResult) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
if n.err != nil {
close(done)
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
func (f *parallelFinder) lookupAll() ([]*fontSymbolLookup, error) {
done := make(chan struct{})
in := f.prepare(done)
var workerOutputs = make([]<-chan lookupResult, f.numWorkers)
for w := 0; w < f.numWorkers; w++ {
workerOutputs[w] = f.addWorker(done, in)
}
var result []*fontSymbolLookup
for r := range f.merge(done, workerOutputs) {
if r.err != nil {
return nil, r.err
}
result = append(result, r.l)
}
close(done)
return result, nil
}