Skip to content

Commit c28b864

Browse files
committed
fixed map
1 parent fc9f6c7 commit c28b864

File tree

3 files changed

+71
-8
lines changed

3 files changed

+71
-8
lines changed

Promise.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,17 @@ func (obj *Promise) Catch(callback Callback) *Promise {
173173
//Finally ... a synchronous call to finally do something at the end of Promise Chain
174174
func (obj *Promise) Finally(callback func(interface{})) interface{} {
175175
obj.valueWaitLock.Wait()
176+
value := obj.value
177+
vp, ok := value.(*Promise)
178+
if ok {
179+
vp.valueWaitLock.Wait()
180+
value = vp.value
181+
}
176182
if callback != nil {
177-
callback(obj.value)
183+
callback(value)
178184
}
179-
return obj.value
185+
186+
return value
180187
}
181188

182189
/**
@@ -200,12 +207,21 @@ func Map(promises []*Promise, cb Callback) []*Promise {
200207
}
201208

202209
//Reduce ... asynchronous reducer , does not waits for all promise to be resolve , it launches reduce callback as soon as first result is available
203-
// func Reduce(promises []*Promise, reducer func(acc interface{}, value interface{}), start interface{}) *Promise {
204-
205-
// return Create(func(resolve Callback, reject Callback) {
206-
207-
// })
208-
// }
210+
// It will be used to process both errors and values. So reduces should account for that.
211+
func Reduce(promises []*Promise, reducer func(index int, acc interface{}, value interface{}) interface{}, acc interface{}) *Promise {
212+
return Create(func(resolve Callback, reject Callback) {
213+
promise := Resolve(acc)
214+
count := 0
215+
for asyncValue := range AsyncGenerator(promises) {
216+
index := count
217+
promise = promise.Then(func(acc interface{}) (interface{}, error) {
218+
return reducer(index, acc, asyncValue), nil
219+
})
220+
count++
221+
}
222+
resolve(promise)
223+
})
224+
}
209225

210226
//// FUNCTIONAL DONE /////////////////////
211227

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,32 @@ for value := range Promise.AsyncGenerator(promises) {
257257

258258
```
259259

260+
*My Map is all parallel , can I have a reduce, that pipelines with request delays ? 👩‍⚖️
261+
Yes the Reduce provided here internally uses AsyncGenerator to grab data and quicly
262+
launch promise task to to luanch reducer for each data. Effectively pipelining with
263+
the promises passsed to it.
264+
```golang
265+
266+
promises := make([]*Promise.Promise, 3)
267+
for i := 0; i < len(promises); i++ {
268+
index := i
269+
promises[i] = Promise.Create(func(resolve Promise.Callback, reject Promise.Callback) {
270+
time.Sleep(time.Duration(index)*time.Second + time.Duration(10))
271+
resolve(index + 1)
272+
})
273+
}
274+
275+
value := Promise.Reduce(promises, func(index int, acci interface{}, valuei interface{}) interface{} {
276+
acc, _ := acci.(int)
277+
value, _ := valuei.(int)
278+
acc = value + acc
279+
return acc
280+
}, 0).Finally(nil)
281+
282+
```
283+
284+
285+
260286

261287
### Development
262288

test/promise_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,3 +461,24 @@ func TestAsyncGenerator(t *testing.T) {
461461
t.Errorf("Failed async generator")
462462
}
463463
}
464+
465+
func TestReducer(t *testing.T) {
466+
promises := make([]*Promise.Promise, 3)
467+
for i := 0; i < len(promises); i++ {
468+
index := i
469+
promises[i] = Promise.Create(func(resolve Promise.Callback, reject Promise.Callback) {
470+
time.Sleep(time.Duration(index)*time.Second + time.Duration(10))
471+
resolve(index + 1)
472+
})
473+
}
474+
475+
value := Promise.Reduce(promises, func(index int, acci interface{}, valuei interface{}) interface{} {
476+
acc, _ := acci.(int)
477+
value, _ := valuei.(int)
478+
acc = value + acc
479+
return acc
480+
}, 0).Finally(nil)
481+
if value != 6 {
482+
t.Errorf("Reduce did not work")
483+
}
484+
}

0 commit comments

Comments
 (0)