天天看点

改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

改进 RxGo 包——添加filtering操作

任务

阅读 ReactiveX 文档。请在 pmlpml/RxGo 基础上,添加一组新的操作filtering

一、阅读ReactiveX 文档

Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。

ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。

二、查看pmlpml/RxGo

该库给出了基础数据类型的定义和框架的实现并实现了部分函数操作。通过查看源码并结合readme中的使用示例,可以看到rxgo中可以存在一系列可观察对象(Observable),通过订阅(Subscribe)操作,订阅之前的操作会发送数据给观察者。

查看transforms.go中定义的操作,发现其定义了一个transOperater结构,该结构中定义了一个函数,通过实现该函数可以实现不同的操作,例如过滤数据、变换数据等。例如在Filter操作中的o.operator实现是将条件为假的数据过滤掉,而将条件为真的数据发射。

if !end {
		if b, ok := item.(bool); ok && b {
			end = o.sendToFlow(ctx, x.Interface(), out)
		}
	}
           

而对比Map操作,由于其不需要过滤数据,因此只需将数据发射即可

if !end {
		end = o.sendToFlow(ctx, item, out)
	}
           

有了这一个最浅显的理解,下面开始试着动手操作

三、实现filtering中的部分操作

  • TakeLast

    发射Observable发射的最后N项数据

    改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

    由于需要发射Observable发射的最后N项数据,声明一个数组将先前Observable发射的所有数据保留起来,最后再将最后N个数据发射出去,因此,这会延迟原始Observable发射的任何数据项,直到它全部完成。

    为了完成这一操作,定义了一个新的结构体myOperater

type myOperater struct {
	num interface{}            //可以表示前N项或后N项
	mode int                   //表示哪一个操作
	opFunc func(ctx context.Context, o *Observable, item reflect.Value, out chan interface{}) (end bool)
}
           

并且重写了op函数,目的是先将先前Observable发射的所有数据存在数组中,通过相应操作对应的不同过滤条件对数据选择发射或不发射。

对于TakeLast操作,只需将最后N个数据发射即可,其它数据过滤掉。相关代码如下:

in := o.pred.outflow
for y := range in {
	arr = append(arr,y)
}
for i,x := range arr {
	if i < len(arr) - int(reflect.ValueOf(tsop.num).Int()) {     //TakeLast  or Last
		continue
	}else{
		//发射
	}
}
           
func (parent *Observable) TakeLast(f int) (o *Observable) {
	p := reflect.ValueOf(&f)
	fv := p.Elem()
	o = parent.newTransformObservable("takeLast")
	o.flip = fv.Interface()
	o.operator = myOperater{f, 0,func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
		if !end {
			end = o.sendToFlow(ctx, x.Interface(), out)
		}
		return
	}}
	o.flip_sup_ctx = true
	return o
}
           
  • Take

    只发射前面的N项数据

    改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作
    与TakeLast类似,只不过这次需要发射的数据是前N个数据。过滤条件如下
if i >= int(reflect.ValueOf(tsop.num).Int()) {
		break
	}
           
  • Last

    只发射最后的一项数据

    改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作
    只需将TakeLast的参数N固定为1即可。
  • First

    只发射第一项数据

    改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作
    只需将Take的参数N固定为1即可。
  • Skip

    抑制Observable发射的前N项数据

    改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作
    过滤条件为
if i < int(reflect.ValueOf(tsop.num).Int()) {
	continue
}
           
  • SkipLast

    抑制Observable发射的后N项数据

    改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作
    过滤条件为
if i >= len(arr) - int(reflect.ValueOf(tsop.num).Int()) {
	break
}
           
  • Distinct

    抑制(过滤掉)重复的数据项

    改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作
    为了过滤掉重复的数据项,建立一个map,map中的键为任意类型,值为bool类型,若map中已经存在相应的键,则其值为true,否则为false,若为false时则将对应的键放到一个新的数组里。这样就会得到一个无重复数据的数组。
for _,x := range arr {
	if mapv[x] == false {
		mapv[x] = true
		array = append(array,x)
	}
}
arr = array
           

将该数组中的所有数据发射即可。

  • 只发射指定类型的数据
    改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作
    为了过滤指定类型的数据,需要对数据进行判断,利用反射机制对interface的类型进行判断从而进行比较。
func (parent *Observable) OfType(tp interface{}) (o *Observable) {
	o = parent.newTransformObservable("ofType")
	o.operator = transOperater{func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
		if !end && reflect.TypeOf(x.Interface()) == reflect.TypeOf(tp) {
			end = o.sendToFlow(ctx, x.Interface(), out)
		}
		return
	}}
	o.flip_sup_ctx = true
	return o
}
           

四、功能测试

测试代码

RxGo.Just("1","2","3","4","5").TakeLast(3).Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

输出结果:

改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

测试代码

RxGo.Just("1","2","3","4","5").Take(3).Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

输出结果:

改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

测试代码

RxGo.Just("1","2","3","4","5").Last().Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

输出结果:

改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

测试代码

RxGo.Just("1","2","3","4","5").First().Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

输出结果:

改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

测试代码

RxGo.Just("1","2","3","4","5").Skip(2).Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

输出结果:

改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

测试代码

RxGo.Just("1","2","3","4","5").SkipLast(2).Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

输出结果:

改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

测试代码

RxGo.Just(1, 2, 2, 3, 4, 1, 5, 6, 3, 7, 8, 7, 9, 9, 10).Distinct().Subscribe(func(x interface{}) {
		fmt.Println(x)
	})
           

输出结果:

改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

测试代码

var t_int int
	RxGo.Just(1, "2", true, 3, "4", false).OfType(t_int).Subscribe(func(x int) {
		fmt.Println(x)
	})
           

输出结果:

改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

五、单元测试

编写测试文件如下:

package rxgo_test

import (
	"testing"
	"reflect"
	rxgo "github.com/github-user/rxgo"
)

func TestTakeLast(t *testing.T){
	res := []int{}
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).TakeLast(5).Subscribe(func(x int) {
		res = append(res, x)
	})
	expect := []int{6, 7, 8, 9, 10}
	if !reflect.DeepEqual(res, expect){
		t.Errorf("TakeLast Test Error!")
	}
}

func TestTake(t *testing.T){
	res := []int{}
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Take(5).Subscribe(func(x int) {
		res = append(res, x)
	})
	expect := []int{1, 2, 3, 4, 5}
	if !reflect.DeepEqual(res, expect){
		t.Errorf("Take Test Error!")
	}
}

func TestLast(t *testing.T){
	var res int
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Last().Subscribe(func(x int) {
		res = x
	})
	if res != 10 {
		t.Errorf("Last Test Error!")
	}
}

func TestFirst(t *testing.T){
	var res int
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).First().Subscribe(func(x int) {
		res = x
	})
	if res != 1{
		t.Errorf("First Test Error!")
	}
}

func TestSkip(t *testing.T){
	res := []int{}
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).Skip(3).Subscribe(func(x int) {
		res = append(res, x)
	})
	expect := []int{4, 5, 6, 7, 8, 9, 10}
	if !reflect.DeepEqual(res, expect){
		t.Errorf("Skip Test Error!")
	}
}

func TestSkipLast(t *testing.T){
	res := []int{}
	rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).SkipLast(3).Subscribe(func(x int) {
		res = append(res, x)
	})
	expect := []int{1, 2, 3, 4, 5, 6, 7}
	if !reflect.DeepEqual(res, expect){
		t.Errorf("SkipLast Test Error!")
	}
}

func TestDistinct(t *testing.T){
	res := []int{}
	rxgo.Just(1, 2, 2, 3, 4, 1, 5, 6, 3, 7, 8, 7, 9, 9, 10).Distinct().Subscribe(func(x int) {
		res = append(res, x)
	})
	expect := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	if !reflect.DeepEqual(res, expect){
		t.Errorf("Distinct Test Error!")
	}
}

func TestOfType(t *testing.T){
	res_int := []int{}
	res_string := []string{}
	res_bool := []bool{}
	var t_int int
	var t_string string
	var t_bool bool
	rxgo.Just(1, "2", true, 3, "4", false).OfType(t_int).Subscribe(func(x int) {
		res_int = append(res_int, x)
	})
	rxgo.Just(1, "2", true, 3, "4", false).OfType(t_string).Subscribe(func(x string) {
		res_string = append(res_string, x)
	})
	rxgo.Just(1, "2", true, 3, "4", false).OfType(t_bool).Subscribe(func(x bool) {
		res_bool = append(res_bool, x)
	})
	expect_int := []int{1, 3}
	if !reflect.DeepEqual(res_int, expect_int){
		t.Errorf("OfType Test Error!")
	}
	expect_string := []string{"2", "4"}
	if !reflect.DeepEqual(res_string, expect_string){
		t.Errorf("OfType Test Error!")
	}
	expect_bool := []bool{true, false}
	if !reflect.DeepEqual(res_bool, expect_bool){
		t.Errorf("OfType Test Error!")
	}
}
           

执行测试

改进 RxGo 包——添加filtering操作改进 RxGo 包——添加filtering操作

项目地址

gitee地址

继续阅读