User: demonwav Date: 25 Feb 19 04:07 Revision: d030f489ce1dd4d5565cc426cbae5668f0bc42a3 Summary: Simplify constructor by merging chan and slice sources TeamCity URL: https://ci.demonwav.com/viewModification.html?tab=vcsModificationFiles&modId=5321&personal=false Index: readme.md =================================================================== --- readme.md (revision 4a1f95ed6dd11d7857ceeda64ad8dc6865b038f4) +++ readme.md (revision d030f489ce1dd4d5565cc426cbae5668f0bc42a3) @@ -43,10 +43,10 @@ seen := make([]int, 0) - streams.NewChanStream(c). + streams.NewStream(c). WithCancel(cancel). Filter(func(i int) bool { - return streams.NewSliceStream(seen).None(func(n int) bool { + return streams.NewStream(seen).None(func(n int) bool { return i%n == 0 }) }). Index: stream.go =================================================================== --- stream.go (revision 4a1f95ed6dd11d7857ceeda64ad8dc6865b038f4) +++ stream.go (revision d030f489ce1dd4d5565cc426cbae5668f0bc42a3) @@ -212,21 +212,37 @@ cancel *[]chan<- bool } -// NewChanStream creates a new Stream object that uses the provided channel as the source. The first argument to this -// function must be a <-chan R where R is some type. The implicit type of the returned Stream will be R. +// NewStream creates a new Stream object that uses the provided channel or slice as the source. The first argument to +// this function must be either a <-chan R, or []R, where R is some type. The implicit type of the returned Stream will +// be R. // -// The provided channel may be an infinite value generator. In this case, you must make sure to use limiting functions -// like Take or First to prevent the Stream from processing forever and crashing. +// If using a channel, the provided channel may be an infinite value generator. In this case, you must make sure to use +// limiting functions like Take or First to prevent the Stream from processing forever and crashing. // // The generic type signature of this function would be: // -// func NewChanStream(channel <-chan T, channel ...chan<- bool) *Stream +// func NewStream(source S, channel ...chan<- bool) *Stream // -// Any arguments provided after the stream are channels which should be used to stop any running goroutine which needs +// Which is to say there is some type S which is either a slice of T ([]T) or a receiving channel of T (<-chan T), which +// would make this return a pointer to a Stream of T's (*Stream). +// +// Any arguments provided after the source are channels which should be used to stop any running goroutine which needs // to be stopped when processing of the Stream completes. A single 'true' value will be sent to each channel given. The // send operation will not wait or block, so either define each channel as a buffered channel, or make sure you're // always listening to it. -func NewChanStream(channel AnyChannel, cancel ...chan<- bool) *Stream { +func NewStream(source AnyType, cancel ...chan<- bool) *Stream { + t := reflect.TypeOf(source) + switch t.Kind() { + case reflect.Slice: + return newSliceStream(source, cancel...) + case reflect.Chan: + return newChanStream(source, cancel...) + default: + panic("provided source is not a slice or channel") + } +} + +func newChanStream(channel AnyChannel, cancel ...chan<- bool) *Stream { return &Stream{func() (AnyType, bool) { item, ok := chanRecv(channel) if ok { @@ -237,13 +253,7 @@ }, &cancel} } -// NewSliceStream creates a new Stream object that uses the provided slice as the source. Teh first argument to this -// function must be a []R where R is some type. The implicit type of the returned Stream will be R. -// -// The generic type signature for this function would be: -// -// func NewSliceStream(slice []T) *Stream -func NewSliceStream(slice AnySlice) *Stream { +func newSliceStream(slice AnySlice, cancel ...chan<- bool) *Stream { index := 0 return &Stream{func() (AnyType, bool) { if index < sliceLen(slice) { @@ -252,7 +262,7 @@ return item, true } return nil, false - }, &[]chan<- bool{}} + }, &cancel} } func callFunc(f interface{}, args ...reflect.Value) []reflect.Value { Index: stream_test.go =================================================================== --- stream_test.go (revision 4a1f95ed6dd11d7857ceeda64ad8dc6865b038f4) +++ stream_test.go (revision d030f489ce1dd4d5565cc426cbae5668f0bc42a3) @@ -17,7 +17,7 @@ s := []string{"hello", "world"} - res := streams.NewSliceStream(s). + res := streams.NewStream(s). Filter(func(word string) bool { return strings.Contains(word, "or") }). @@ -46,7 +46,7 @@ close(c) }() - res := streams.NewChanStream(c). + res := streams.NewStream(c). Filter(func(word string) bool { return strings.Contains(word, "el") }). @@ -87,7 +87,7 @@ } }() - sum := streams.NewChanStream(c). + sum := streams.NewStream(c). WithCancel(cancel). Map(func(i int) int { return i * 2 @@ -120,7 +120,7 @@ } }() - sum := streams.NewChanStream(c, cancel). + sum := streams.NewStream(c, cancel). Map(func(i int) int { return i * 2 }). @@ -151,7 +151,7 @@ }() var res []int - streams.NewChanStream(c, cancel). + streams.NewStream(c, cancel). Take(10000). Sort(func(left, right int) bool { return left < right @@ -165,7 +165,7 @@ defer goleak.VerifyNoLeaks(t) data := []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} - avg := streams.NewSliceStream(data). + avg := streams.NewStream(data). AvgInt(func(f int64) int64 { return f }) @@ -177,7 +177,7 @@ defer goleak.VerifyNoLeaks(t) data := []int{0, 1, 2} - count := streams.NewSliceStream(data). + count := streams.NewStream(data). Filter(func(i int) bool { return i != 1 }). @@ -192,7 +192,7 @@ data := []int{0, 1, 2} var output []int - streams.NewSliceStream(data). + streams.NewStream(data). ForEach(func(i int) { output = append(output, i) }) @@ -223,7 +223,7 @@ } }() - streams.NewSliceStream(data). + streams.NewStream(data). WithCancel(q). ToChan(c) @@ -237,7 +237,7 @@ expected := []int{2} var output []int - streams.NewSliceStream(data). + streams.NewStream(data). Skip(2). ToSlice(&output) @@ -251,7 +251,7 @@ var output1 []int var output2 []int - streams.NewSliceStream(data). + streams.NewStream(data). OnEach(func(i int) { output2 = append(output2, i) }). @@ -270,7 +270,7 @@ var val int - streams.NewSliceStream(data). + streams.NewStream(data). Min(&val, func(left, right int) bool { return left < right }) @@ -285,7 +285,7 @@ var val int - streams.NewSliceStream(data). + streams.NewStream(data). Max(&val, func(left, right int) bool { return left < right }) @@ -300,7 +300,7 @@ var res map[rune]int - streams.NewSliceStream([]string{data}). + streams.NewStream([]string{data}). SliceFlatMap(func(line string) []rune { return []rune(line) }). @@ -335,12 +335,12 @@ var valMin int var valReduce int - streams.NewSliceStream(data). + streams.NewStream(data). Min(&valMin, func(left, right int) bool { return left < right }) - streams.NewSliceStream(data). + streams.NewStream(data). Reduce(&valReduce, int(math.MaxInt32), func(item, out int) int { if item < out { return item @@ -359,9 +359,9 @@ var out []int - streams.NewSliceStream(data). - Concat(streams.NewSliceStream(data)). - Concat(streams.NewSliceStream(data)). + streams.NewStream(data). + Concat(streams.NewStream(data)). + Concat(streams.NewStream(data)). ToSlice(&out) var expected []int @@ -379,8 +379,8 @@ var out []int - streams.NewSliceStream(data). - Zip(streams.NewSliceStream(data), 0, func(left, right int) int { + streams.NewStream(data). + Zip(streams.NewStream(data), 0, func(left, right int) int { return left + right }). ToSlice(&out) @@ -400,9 +400,9 @@ var out []int - streams.NewSliceStream(data). - Concat(streams.NewSliceStream(data)). - Zip(streams.NewSliceStream(data), 0, func(left, right int) int { + streams.NewStream(data). + Concat(streams.NewStream(data)). + Zip(streams.NewStream(data), 0, func(left, right int) int { return left + right }). ToSlice(&out)