forked from reugn/go-streams
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fs.go
108 lines (93 loc) · 2.28 KB
/
fs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package extension
import (
"bufio"
"log"
"os"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
"github.com/reugn/go-streams/internal/ospkg"
)
// FileSource represents an inbound connector that creates a stream of
// elements from a file. The streaming element is a new line in the file.
type FileSource struct {
fileName string
in chan any
}
var _ streams.Source = (*FileSource)(nil)
// NewFileSource returns a new FileSource connector.
func NewFileSource(fileName string) *FileSource {
source := &FileSource{
fileName: fileName,
in: make(chan any),
}
source.init()
return source
}
func (fs *FileSource) init() {
go func() {
file, err := os.Open(fs.fileName)
if err != nil {
log.Fatalf("FileSource failed to open the file %s", fs.fileName)
}
defer file.Close()
reader := bufio.NewReader(file)
for {
lineBytes, isPrefix, err := reader.ReadLine()
if err != nil {
close(fs.in)
break
}
var element string
if isPrefix {
element = string(lineBytes)
} else {
element = string(lineBytes) + ospkg.NewLine
}
fs.in <- element
}
}()
}
// Via streams data to a specified operator and returns it.
func (fs *FileSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(fs, operator)
return operator
}
// Out returns the output channel of the FileSource connector.
func (fs *FileSource) Out() <-chan any {
return fs.in
}
// FileSink represents an outbound connector that writes streaming data
// to a file.
type FileSink struct {
fileName string
in chan any
}
var _ streams.Sink = (*FileSink)(nil)
// NewFileSink returns a new FileSink connector.
func NewFileSink(fileName string) *FileSink {
sink := &FileSink{
fileName: fileName,
in: make(chan any),
}
sink.init()
return sink
}
func (fs *FileSink) init() {
go func() {
file, err := os.OpenFile(fs.fileName, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
log.Fatalf("FileSink failed to open the file %s", fs.fileName)
}
defer file.Close()
for element := range fs.in {
_, err = file.WriteString(element.(string))
if err != nil {
log.Fatalf("FileSink failed to write to the file %s", fs.fileName)
}
}
}()
}
// In returns the input channel of the FileSink connector.
func (fs *FileSink) In() chan<- any {
return fs.in
}