天天看點

改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

改進 RxGo 包——添加filtering操作

任務

閱讀 ReactiveX 文檔。請在 pmlpml/RxGo 基礎上,添加一組新的操作filtering

一、閱讀ReactiveX 文檔

Rx是一個函數庫,讓開發者可以利用可觀察序列和LINQ風格查詢操作符來編寫異步和基于事件的程式,使用Rx,開發者可以用Observables表示異步資料流,用LINQ操作符查詢異步資料流, 用Schedulers參數化異步資料流的并發處理,Rx可以這樣定義:Rx = Observables + LINQ + Schedulers。

ReactiveX.io給的定義是,Rx是一個使用可觀察資料流進行異步程式設計的程式設計接口,ReactiveX結合了觀察者模式、疊代器模式和函數式程式設計的精華。

二、檢視pmlpml/RxGo

該庫給出了基礎資料類型的定義和架構的實作并實作了部分函數操作。通過檢視源碼并結合readme中的使用示例,可以看到rxgo中可以存在一系列可觀察對象(Observable),通過訂閱(Subscribe)操作,訂閱之前的操作會發送資料給觀察者。

檢視transforms.go中定義的操作,發現其定義了一個transOperater結構,該結構中定義了一個函數,通過實作該函數可以實作不同的操作,例如過濾資料、變換資料等。例如在Filter操作中的o.operator實作是将條件為假的資料過濾掉,而将條件為真的資料發射。

if !end {
		if b, ok := item.(bool); ok && b {
			end = o.sendToFlow(ctx, x.Interface(), out)
		}
	}
           

而對比Map操作,由于其不需要過濾資料,是以隻需将資料發射即可

if !end {
		end = o.sendToFlow(ctx, item, out)
	}
           

有了這一個最淺顯的了解,下面開始試着動手操作

三、實作filtering中的部分操作

  • TakeLast

    發射Observable發射的最後N項資料

    改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

    由于需要發射Observable發射的最後N項資料,聲明一個數組将先前Observable發射的所有資料保留起來,最後再将最後N個資料發射出去,是以,這會延遲原始Observable發射的任何資料項,直到它全部完成。

    為了完成這一操作,定義了一個新的結構體myOperater

type myOperater struct {
	num interface{}            //可以表示前N項或後N項
	mode int                   //表示哪一個操作
	opFunc func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool)
}
           

并且重寫了op函數,目的是先将先前Observable發射的所有資料存在數組中,通過相應操作對應的不同過濾條件對資料選擇發射或不發射。

對于TakeLast操作,隻需将最後N個資料發射即可,其它資料過濾掉。相關代碼如下:

in := o.pred.outflow
for y := range in {
	arr = append(arr,y)
}
for i,x := range arr {
	if i < len(arr) - int(reflect.ValueOf(tsop.num).Int()) {     //TakeLast  or Last
		continue
	}else{
		//發射
	}
}
           
func (parent *Observable) TakeLast(f int) (o *Observable) {
	p := reflect.ValueOf(&f)
	fv := p.Elem()
	o = parent.newTransformObservable("takeLast")
	o.flip = fv.Interface()
	o.operator = myOperater{f, 0,func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
		if !end {
			end = o.sendToFlow(ctx, x.Interface(), out)
		}
		return
	}}
	o.flip_sup_ctx = true
	return o
}
           
  • Take

    隻發射前面的N項資料

    改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作
    與TakeLast類似,隻不過這次需要發射的資料是前N個資料。過濾條件如下
if i >= int(reflect.ValueOf(tsop.num).Int()) {
		break
	}
           
  • Last

    隻發射最後的一項資料

    改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作
    隻需将TakeLast的參數N固定為1即可。
  • First

    隻發射第一項資料

    改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作
    隻需将Take的參數N固定為1即可。
  • Skip

    抑制Observable發射的前N項資料

    改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作
    過濾條件為
if i < int(reflect.ValueOf(tsop.num).Int()) {
	continue
}
           
  • SkipLast

    抑制Observable發射的後N項資料

    改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作
    過濾條件為
if i >= len(arr) - int(reflect.ValueOf(tsop.num).Int()) {
	break
}
           
  • Distinct

    抑制(過濾掉)重複的資料項

    改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作
    為了過濾掉重複的資料項,建立一個map,map中的鍵為任意類型,值為bool類型,若map中已經存在相應的鍵,則其值為true,否則為false,若為false時則将對應的鍵放到一個新的數組裡。這樣就會得到一個無重複資料的數組。
for _,x := range arr {
	if mapv[x] == false {
		mapv[x] = true
		array = append(array,x)
	}
}
arr = array
           

将該數組中的所有資料發射即可。

  • 隻發射指定類型的資料
    改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作
    為了過濾指定類型的資料,需要對資料進行判斷,利用反射機制對interface的類型進行判斷進而進行比較。
func (parent *Observable) OfType(tp interface{}) (o *Observable) {
	o = parent.newTransformObservable("ofType")
	o.operator = transOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
		if !end && reflect.TypeOf(x.Interface()) == reflect.TypeOf(tp) {
			end = o.sendToFlow(ctx, x.Interface(), out)
		}
		return
	}}
	o.flip_sup_ctx = true
	return o
}
           

四、功能測試

測試代碼

RxGo.Just("1","2","3","4","5").TakeLast(3).Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

輸出結果:

改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

測試代碼

RxGo.Just("1","2","3","4","5").Take(3).Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

輸出結果:

改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

測試代碼

RxGo.Just("1","2","3","4","5").Last().Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

輸出結果:

改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

測試代碼

RxGo.Just("1","2","3","4","5").First().Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

輸出結果:

改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

測試代碼

RxGo.Just("1","2","3","4","5").Skip(2).Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

輸出結果:

改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

測試代碼

RxGo.Just("1","2","3","4","5").SkipLast(2).Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

輸出結果:

改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

測試代碼

RxGo.Just(1, 2, 2, 3, 4, 1, 5, 6, 3, 7, 8, 7, 9, 9, 10).Distinct().Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

輸出結果:

改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

測試代碼

var t_int int
	RxGo.Just(1, "2", true, 3, "4", false).OfType(t_int).Subscribe(func(x int) {
		fmt.Println(x)
	})
           

輸出結果:

改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

五、單元測試

編寫測試檔案如下:

package rxgo_test

import (
	"testing"
	"reflect"
	rxgo "github.com/github-user/rxgo"
)

func TestTakeLast(t *testing.T){
	res := []int{}
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).TakeLast(5).Subscribe(func(x int) {
		res = append(res, x)
	})
	expect := []int{6, 7, 8, 9, 10}
	if !reflect.DeepEqual(res, expect){
		t.Errorf("TakeLast Test Error!")
	}
}

func TestTake(t *testing.T){
	res := []int{}
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Take(5).Subscribe(func(x int) {
		res = append(res, x)
	})
	expect := []int{1, 2, 3, 4, 5}
	if !reflect.DeepEqual(res, expect){
		t.Errorf("Take Test Error!")
	}
}

func TestLast(t *testing.T){
	var res int
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Last().Subscribe(func(x int) {
		res = x
	})
	if res != 10 {
		t.Errorf("Last Test Error!")
	}
}

func TestFirst(t *testing.T){
	var res int
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).First().Subscribe(func(x int) {
		res = x
	})
	if res != 1{
		t.Errorf("First Test Error!")
	}
}

func TestSkip(t *testing.T){
	res := []int{}
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Skip(3).Subscribe(func(x int) {
		res = append(res, x)
	})
	expect := []int{4, 5, 6, 7, 8, 9, 10}
	if !reflect.DeepEqual(res, expect){
		t.Errorf("Skip Test Error!")
	}
}

func TestSkipLast(t *testing.T){
	res := []int{}
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).SkipLast(3).Subscribe(func(x int) {
		res = append(res, x)
	})
	expect := []int{1, 2, 3, 4, 5, 6, 7}
	if !reflect.DeepEqual(res, expect){
		t.Errorf("SkipLast Test Error!")
	}
}

func TestDistinct(t *testing.T){
	res := []int{}
	rxgo.Just(1, 2, 2, 3, 4, 1, 5, 6, 3, 7, 8, 7, 9, 9, 10).Distinct().Subscribe(func(x int) {
		res = append(res, x)
	})
	expect := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	if !reflect.DeepEqual(res, expect){
		t.Errorf("Distinct Test Error!")
	}
}

func TestOfType(t *testing.T){
	res_int := []int{}
	res_string := []string{}
	res_bool := []bool{}
	var t_int int
	var t_string string
	var t_bool bool
	rxgo.Just(1, "2", true, 3, "4", false).OfType(t_int).Subscribe(func(x int) {
		res_int = append(res_int, x)
	})
	rxgo.Just(1, "2", true, 3, "4", false).OfType(t_string).Subscribe(func(x string) {
		res_string = append(res_string, x)
	})
	rxgo.Just(1, "2", true, 3, "4", false).OfType(t_bool).Subscribe(func(x bool) {
		res_bool = append(res_bool, x)
	})
	expect_int := []int{1, 3}
	if !reflect.DeepEqual(res_int, expect_int){
		t.Errorf("OfType Test Error!")
	}
	expect_string := []string{"2", "4"}
	if !reflect.DeepEqual(res_string, expect_string){
		t.Errorf("OfType Test Error!")
	}
	expect_bool := []bool{true, false}
	if !reflect.DeepEqual(res_bool, expect_bool){
		t.Errorf("OfType Test Error!")
	}
}
           

執行測試

改進 RxGo 包——添加filtering操作改進 RxGo 包——添加filtering操作

項目位址

gitee位址

繼續閱讀