Reactive Programming

alt text

响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的声明式编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。 https://zh.wikipedia.org/wiki/%E5%93%8D%E5%BA%94%E5%BC%8F%E7%BC%96%E7%A8%8B

在这个基础上,你还有令人惊艳的函数去组合、创建、过滤这些 Streams,这就是函数式魔法的用武之地。Stream 能接受一个,甚至多个 Stream 为输入,你可以融合两个 Stream,也可以从一个 Stream 中过滤出你感兴趣的 Events 以生成一个新的 Stream,还可以把一个 Stream 中的数据值 映射到一个新的 Stream 中。

为什么我要使用响应式编程(RP)? 响应式编程提高了代码的抽象层级,所以你可以只关注定义了业务逻辑的那些相互依赖的事件,而非纠缠于大量的实现细节。RP 的代码往往会更加简明。

特别是在开发现在这些有着大量与数据事件相关的 UI events 的高互动性 Webapps、手机 apps 的时候,RP 的优势就更加明显。10年前,网页的交互就只是提交一个很长的表单到后端,而在前端只产生简单的渲染。Apps 就表现得更加的实时了:修改一个表单域就能自动地把修改后的值保存到后端,为一些内容"点赞"时,会实时的反应到其它在线用户那里等等。

现在的 Apps 有着大量各种各样的实时 Events,以给用户提供一个交互性较高的体验。我们需要工具去应对这个变化,而响应式编程就是一个答案。

请求和响应

在 Rx 中你该怎么处理这个问题呢? 好吧,首先,(几乎) 所有的东西都可以转为一个 Observable 。这就是Rx的咒语

Observable的本质其实就是一个随时间不断产生数据的一个集合,称之为流更容易理解。而其对象存在一个 subscribe 方法,调用该方法后,才会启动这个流(也就是数据才会开始产生),这里需要注意的是多次启动的每一个流都是独立的,互不干扰。

它在使用方式上,跟Promise有点像,但在能力上比Promise强大多了,不仅仅能够以流的形式对数据进行控制,还内置许许多多的内置工具方法让我们能十分方便的处理各种数据层面的操作,让我们的代码如丝一般顺滑。

优势:

声明式的编程风格 代码量的大幅度减少 代码可读性的提高 很好的处理异步 事件管理、调度引擎 十分丰富的操作符

例如

1
2
3
4
5
6
7
fs.readFile('a.txt', 'utf-8', function(err, data) {
    fs.readFile('b.txt', 'utf-8', function(err, data1) {
        fs.readFile('c.txt', 'utf-8', function(err, data2) {
            // ......
        })
    })
})

用 RX 的方式代码可读性会更高

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
function readData(filePath) {
    return new Observable((observer) => {
        fs.readFile(filePath, 'utf-8', (err, data) => {
            if (err) observer.error(err);
            observer.next(data);
        })
    });
}

Rx.Observable
.forkJoin(readData('a.txt'), readData('b.txt'), readData('c.txt'))
.subscribe(data => console.log(data));

反应式编程是一种编程范式,它和具体的语言无关(RxJava,RxJs,RxGo,Vert.x等都是反应式库),只要符合反应式流规范即可。

当然反应式编程也有一定的缺点:

学习路线陡峭,和常用的思维方式不同。 异常堆栈难以进行跟踪。 相比于命令式编程更容易导致内存泄漏。

主动和被动的区别

响应式编程(Reactive Programming),它是一种基于事件的模型。在上面的异步编程模式中,我们描述了两种获得上一个任务执行结果的方式,一个就是主动轮训,我们把它称为 Proactive 方式。 另一个就是被动接收反馈,我们称为 Reactive。 简单来说,在 Reactive 方式中,上一个任务的结果的反馈就是一个事件,这个事件的到来将会触发下一个任务的执行。

rxgo 使用

RxGo 原理 RxGo 是一个用于处理异步数据流和事件的库,基于 ReactiveX 的理念。它提供了丰富的操作符来处理数据流,比如创建、转换、过滤、组合和错误处理等。RxGo 的核心概念包括 Observable(可观察对象)、Observer(观察者)和 Scheduler(调度器)。 是一个基于可观测数据流 Stream 结合观察者模式和迭代器模式的一种异步编程的应用库。 示例:bufferedCountTime bufferedCountTime 是一个操作符,用于将数据流按照指定的数量和时间进行缓冲。它会在缓冲区达到指定数量或时间间隔时,将缓冲区中的数据发送出去。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
	// "context"
	"fmt"
	"time"

	"github.com/reactivex/rxgo/v2"
)

func main() {
	// 创建一个 Observable
	observable := rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)()

	// 使用 bufferedCountTime 操作符
	<-observable.
		BufferWithTime(rxgo.WithDuration(1*time.Second)).
		ForEach(func(v any) {
			fmt.Printf("Buffered items: %v %T \n", v, v.([]any)[0])
		}, func(err error) {
			fmt.Println("Error:", err)
		}, func() {
			fmt.Println("Completed")
		})
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
	"context"
	"fmt"

	"github.com/reactivex/rxgo/v2"
)

func TheBasic() rxgo.Disposed {
	obs := rxgo.Just(1, 2, 3)()
	return obs.Map(func(_ context.Context, i any) (any, error) {
		return i.(int) * 10, nil
	}).ForEach(func(v any) {
		fmt.Println(v)
	}, func(err error) {
		fmt.Println("error>", err)
	}, func() {
		fmt.Println("end>")
	})
}
func main() {
	<-TheBasic()
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
	"context"
	"fmt"
	"strconv"

	"github.com/reactivex/rxgo/v2"
)

func TheBasic() rxgo.Disposed {
	obs := rxgo.Just(1, 2, 3, 4, 5, 65)()
	return obs.Map(func(_ context.Context, i any) (any, error) {
		return i.(int) * 10, nil
	}).GroupByDynamic(func(item rxgo.Item) string {
		return strconv.Itoa(item.V.(int) % 2)
	}, rxgo.WithBufferedChannel(3)).ForEach(func(v any) {
		stream := v.(rxgo.GroupedObservable)
		fmt.Println("key>", stream.Key)
		<-stream.ForEach(func(v any) {
			fmt.Println(v)
		}, func(err error) {}, func() {})

	}, func(err error) {
		fmt.Println("error>", err)
	}, func() {
		fmt.Println("end>")
	})
}
func main() {
	<-TheBasic()
}

/**
key> 0
10
20
30
40
50
650
end>

*/
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
	"context"
	"fmt"
	"strconv"

	"github.com/reactivex/rxgo/v2"
)

func TheBasic() rxgo.Disposed {
	obs := rxgo.Just(1, 2, 3, 4, 5, 65)()
	return obs.Map(func(_ context.Context, i any) (any, error) {
		return i.(int) * 10, nil
	}).GroupByDynamic(func(item rxgo.Item) string {
		return strconv.Itoa(item.V.(int) % 2)
	}, rxgo.WithBufferedChannel(3)).FlatMap(func(i rxgo.Item) rxgo.Observable {
		return i.V.(rxgo.GroupedObservable).Map(func(_ context.Context, v any) (any, error) {
			return v.(int) / 10, nil
		})

	}).ForEach(func(v any) {
		stream := v.(int)
		fmt.Println("key>", stream)
		// <-stream.ForEach(func(v any) {
		// 	fmt.Println(v)
		// }, func(err error) {}, func() {})

	}, func(err error) {
		fmt.Println("error>", err)
	}, func() {
		fmt.Println("end>")
	})
}
func main() {
	<-TheBasic()
}

Rxgo 异常重试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
	"context"
	"errors"
	"fmt"
	"math/rand"
	"time"

	"github.com/reactivex/rxgo/v2"
)

func main() {
	// 创建一个 Observable
	observable := rxgo.Create([]rxgo.Producer{
		func(ctx context.Context, next chan<- rxgo.Item) {
			for i := 0; i < 5; i++ {
				if rand.Intn(2) == 0 { // 随机产生错误
					fmt.Println("throw error handle ")
					next <- rxgo.Error(errors.New("random error"))
				} else {
					next <- rxgo.Of(i)
				}
				time.Sleep(500 * time.Millisecond)
			}
		},
	})

	// 使用 Retry 操作符,最多重试 3 次
	<-observable.
		Retry(5, func(err error) bool { return true }).
		ForEach(func(v any) {
			fmt.Println("Received:", v)
		}, func(err error) {
			fmt.Println("Error:", err)
		}, func() {
			fmt.Println("Completed")
		})
}

操作符详解

  1. 创建 Operator
  2. 转换类
  3. 过滤类
  4. 组合类
  5. 多播
  6. 工具类
  7. 数学类

创建可观察对象

Create — 通过以编程方式调用 Observer 方法从头开始创建 Observable Defer — 在观察者订阅之前不要创建 Observable,并为每个观察者创建一个新的 Observable Empty/Never/Thrown——创建具有非常精确和有限行为的 Observables FromChannel — 基于惰性通道创建一个 Observable FromEventSource — 基于渴望通道创建一个 Observable Interval — 创建一个 Observable,它发出由特定时间间隔间隔的整数序列 只是 — 将一组对象转换为发出该或那些对象的 Observable JustItem — 将一个对象转换为发出该对象的 Single Range — 创建一个发出一系列连续整数的 Observable Repeat — 创建一个可重复发出特定项目或项目序列的 Observable Start — 创建一个发出函数返回值的 Observable Timer — 创建一个在指定延迟后完成的 Observable

转变可观测值

Buffer——定期将 Observable 中的项目收集到包中并发出这些包,而不是一次发出一个项目 FlatMap — 将 Observable 发出的项转换为 Observables,然后将这些项的排放扁平化为单个 Observable GroupBy — 将一个 Observable 划分为一组 Observable,每个 Observable 发出与原始 Observable 不同的一组项目,按键组织 GroupByDynamic — 将一个 Observable 划分为一组动态 Observable,每个 Observable 都从原始 Observable 发出 GroupedObservable,按键组织 Map — 通过对每个项目应用函数来转换 Observable 发出的项目 Marshal — 通过对每个项目应用编组函数来转换 Observable 发出的项目 Scan — 按顺序将函数应用于 Observable 发出的每个项目,并发出每个连续值 Unmarshal — 通过对每个项目应用解组函数来转换 Observable 发出的项目 Window — 按顺序将函数应用于 Observable 发出的每个项目,并发出每个连续值

过滤可观察值

Debounce — 仅当特定时间跨度过去且未发射另一个项目时,才从 Observable 发射一个项目 Distinct/ DistinctUntilChanged — 抑制 Observable 发出的重复项 ElementAt — 只发出 Observable 发出的第 n 项 Filter——仅从 Observable 中发出那些通过谓词测试的项目 Find — 发出传递谓词的第一个项目,然后完成 First/ FirstOrDefault — 仅发出 Observable 中的第一项或满足条件的第一项 IgnoreElements — 不从 Observable 发出任何项目,但镜像其终止通知 Last/ LastOrDefault — 只发出 Observable 发出的最后一个项目 Sample — 发出 Observable 在周期性时间间隔内发出的最新项目 Skip — 抑制 Observable 发出的前 n 个项目 SkipLast — 抑制 Observable 发出的最后 n 个项目 Take — 只发出 Observable 发出的前 n 个项目 TakeLast — 只发出 Observable 发出的最后 n 个项目

结合观测值

CombineLatest — 当两个 Observable 中的任何一个发出一个项目时,通过指定的函数组合每个 Observable 发出的最新项目,并根据该函数的结果发出项目 Join — 在根据另一个 Observable 发出的项目定义的时间窗口内发出来自一个 Observable 的项目时,组合两个 Observable 发出的项目 合并 — 通过合并多个 Observables 的排放将其合并为一个 StartWithIterable — 在开始从源 Iterable 发出项目之前发出指定的项目序列 ZipFromIterable — 通过指定的函数将多个 Observable 的发射组合在一起,并根据该函数的结果为每个组合发射单个项目 错误处理运算符 Catch — 通过继续无错误的序列来从 onError 通知中恢复 Retry/ BackOffRetry — 如果源 Observable 发送 onError 通知,请重新订阅它,希望它能顺利完成 可观察的效用运算符 Do - 注册一个动作来处理各种 Observable 生命周期事件 Run — 创建一个观察者而不消耗发出的项目 Send — 在特定通道中发送 Observable 项目 Serialize — 强制 Observable 进行序列化调用并表现良好 TimeInterval — 将发出项目的 Observable 转换为发出这些发射之间经过的时间量指示的 Observable Timestamp — 为 Observable 发出的每个项目附加一个时间戳

条件和布尔运算符

All — 确定 Observable 发出的所有项目是否满足某些条件 Amb — 给定两个或多个源 Observables,仅从这些 Observables 中的第一个发出所有项目来发出项目 Contains — 确定 Observable 是否发出特定项 DefaultIfEmpty — 从源 Observable 发出项目,如果源 Observable 不发出任何内容,则为默认项目 SequenceEqual — 确定两个 Observable 是否发出相同的项目序列 SkipWhile — 丢弃 Observable 发出的项目,直到指定条件变为 false TakeUntil — 在第二个 Observable 发出项目或终止后丢弃由 Observable 发出的项目 TakeWhile — 在指定条件变为 false 后丢弃由 Observable 发出的项目

数学和聚合运算符

Average — 计算 Observable 发出的数字的平均值并发出该平均值 Concat — 发出来自两个或多个 Observables 的排放,而不将它们交错 Count — 计算源 Observable 发出的项目数量并仅发出该值 Max — 确定并发出由 Observable 发出的最大值项 Min — 确定并发出 Observable 发出的最小值项 Reduce — 按顺序将函数应用于 Observable 发出的每个项目,并发出最终值 Sum — 计算 Observable 发出的数字的总和并发出这个总和

转换可观察量的运算符

Error — 返回可观察对象抛出的第一个错误 Errors — 返回可观察对象抛出的所有错误 ToMap/ ToMapWithValueSelector/ ToSlice — 将 Observable 转换为另一个对象或数据结构

其他 rxgo 配置简要说明

这里主要讲解下我们在实际项目中用到的几个参数配置,其他的等待你的自行发掘。

rxgo.WithBufferedChannel(3): 设置 channel 的缓存大小;

rxgo.WithBackPressureStrategy(rxgo.Drop): 使用Drop策略意味着如果后面的流程没有准备好消耗一个数据,这个数据会被丢弃。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

/*---------------------------------------

	观察对象

---------------------------------------*/
var (
	// 事件bus
	eventCh         chan rxgo.Item
	eventObservable rxgo.Observable
)

func init() {
	//指令消息
	eventCh = make(chan rxgo.Item)
	// rxgo.FromEventSource 创建的是 热观察者, 即对从流的一开始产生的所有数据不感兴趣,只对我们开始观察之后的数据感兴趣。
	// 没有观察者的Observable发出的项目将会丢失。对上面表示不理解的同学可以看看 rxgo github 上面的介绍
	// 因为 rxgo.FromEventSource 默认在开始观察数据后, 是阻塞等待的
	// rxgo.WithBackPressureStrategy(rxgo.Drop)
	// 使用Drop策略意味着如果从FromEventSource后面的流程没有准备好消耗一个项目,这个项目会被丢弃。
	eventObservable = rxgo.FromEventSource(eventCh, rxgo.WithBackPressureStrategy(rxgo.Drop)) //observe()时指定buffer

}

func PublishEvent(e interface{}) {
	//发布事件
	eventCh <- rxgo.Of(e)
}

func GetObservable() rxgo.Observable {
	return eventObservable
}

func main() {
	for i := 0; i < 1; i++ {
		go consumer(i)
	}
	producer()

	time.Sleep(time.Second * 60)
}

func consumer(number int) {
	// 为什么是在 消费数据的地方设置 channel 的容量?
	// 这样可以根据消费者自身情况灵活调整
	o := GetObservable().Filter(func(i interface{}) bool {
		fmt.Printf("coming。。。。。。。。。。。。。, number: %v, data: %+v\n", number, i)
		return true
	}, rxgo.WithBufferedChannel(3))
	o.DoOnNext(func(i interface{}) {

	})
}

func producer() {

	for i := 0; i < 3; i++ {
		fmt.Println(i)
		PublishEvent(i)
	}
}

Connect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
	"context"

	"fmt"
	"github.com/reactivex/rxgo/v2"

	"time"
)

func main() {
	ch := make(chan rxgo.Item)
	go func() {
		ch <- rxgo.Of(1)
		ch <- rxgo.Of(2)
		ch <- rxgo.Of(3)
		close(ch)
	}()
	// Create a Connectable Observable
	observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

	// Create the first Observer
	observable.DoOnNext(func(i interface{}) {
		fmt.Printf("First observer: %d\n", i)
	})

	// Create the second Observer
	observable.DoOnNext(func(i interface{}) {
		fmt.Printf("Second observer: %d\n", i)
	})

	disposed, cancel := observable.Connect(context.TODO())
	go func() {
		// Do something
		time.Sleep(time.Second)
		// Then cancel the subscription
		cancel()
	}()
	// Wait for the subscription to be disposed
	<-disposed.Done()
}

Observable, Single, and Optional Single Iterable 是一个可以使用 Observe(opts …Option) <-chan Item 观察的对象。

Iterable 可以是:

Observable:发出 0 个或多个项目 A Single:发出 1 项 可选单项:发出 0 或 1 项

其他

首先使用Just創建一個僅有若干固定數據的 Observable; 使用Map()方法執行轉換(將圓形轉爲方形); 使用Filter()方法執行過濾(過濾掉黃色的方形)。

由於 Go 不支持多個可變參數,Just通過柯里化迂迴地實現了這個功能:

1
2
3
4
5
6
7
8
// rxgo/factory.go
func Just(items ...interface{}) func(opts ...Option) Observable {
  return func(opts ...Option) Observable {
    return &ObservableImpl{
      iterable: newJustIterable(items...)(opts...),
    }
  }
}

實際上rxgo.Item還可以包含錯誤。所以在使用時,我們應該做一層判斷:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func main() {
  observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {
    if item.Error() {
      fmt.Println("error:", item.E)
    } else {
      fmt.Println(item.V)
    }
  }
}

我們使用item.Error()檢查是否出現錯誤。然後使用item.V訪問數據,item.E訪問錯誤。

除了使用for range之外,我們還可以調用 Observable 的ForEach()方法來實現遍歷。ForEach()接受 3 個回調函數:

NextFunc:類型爲func (v interface {}),處理數據; ErrFunc:類型爲func (err error),處理錯誤; CompletedFunc:類型爲func (),Observable 完成時調用。 有點Promise那味了。使用ForEach(),可以將上面的示例改寫爲:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func main() {
  observable := rxgo.Just(1, 2, 3, 4, 5)()
  <-observable.ForEach(func(v interface{}) {
    fmt.Println("received:", v)
  }, func(err error) {
    fmt.Println("error:", err)
  }, func() {
    fmt.Println("completed")
  })
}

創建 Observable

上面使用最簡單的方式創建 Observable:直接調用Just()方法傳入一系列數據。下面再介紹幾種創建 Observable 的方式。

Create 傳入一個[]rxgo.Producer的切片,其中rxgo.Producer的類型爲func(ctx context.Context, next chan<- Item)。我們可以在代碼中調用rxgo.Of(value)生成數據,rxgo.Error(err)生成錯誤,然後發送到next通道中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20

func main() {
  observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
    next <- rxgo.Of(1)
    next <- rxgo.Of(2)
    next <- rxgo.Of(3)
    next <- rxgo.Error(errors.New("unknown"))
    next <- rxgo.Of(4)
    next <- rxgo.Of(5)
  }})

  ch := observable.Observe()
  for item := range ch {
    if item.Error() {
      fmt.Println("error:", item.E)
    } else {
      fmt.Println(item.V)
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15

func main() {
	ch := make(chan rxgo.Item)
	go func() {
		for i := 1; i <= 5; i++ {
			ch <- rxgo.Of(i)
		}
		close(ch)
	}()

	observable := rxgo.FromChannel(ch)
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

通道需要手動調用close()關閉,上面Create()方法內部rxgo自動幫我們執行了這個步驟。

Hot Observable VS Cold Observable

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 0; i < 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

Observable 分類 根據數據在何處生成,Observable 被分爲 Hot 和 Cold 兩種類型(類比熱啓動和冷啓動)。數據在其它地方生成的被成爲 Hot Observable。相反,在 Observable 內部生成數據的就是 Cold Observable。

使用上面介紹的方法創建的實際上都是 Hot Observable。

上面創建的是 Hot Observable。但是有個問題,第一次Observe()消耗了所有的數據,第二個就沒有數據輸出了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func main() {
  observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    for i := 0; i < 3; i++ {
      ch <- rxgo.Of(i)
    }
  }})

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

而 Cold Observable 就不會有這個問題,因爲它創建的流是獨立於每個觀察者的。即每次調用Observe()都創建一個新的 channel。我們使用Defer()方法創建 Cold Observable,它的參數與Create()方法一樣。

1
2
3
4
5
6
7
$ go run main.go
0
1
2
0
1
2

可连接 Observable

可連接的 Observable 可連接的(Connectable)Observable 對普通的 Observable 進行了一層組裝。調用它的Observe()方法時並不會立刻產生數據。使用它,我們可以等所有的觀察者都準備就緒了(即調用了Observe()方法)之後,再調用其Connect()方法開始生成數據。我們通過兩個示例比較使用普通的 Observable 和可連接的 Observable 有何不同。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("Second observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
}

上例中我們使用DoOnNext()方法來註冊觀察者。由於DoOnNext()方法是異步執行的,所以爲了等待結果輸出,在最後增加了一行time.Sleep。運行:

$ go run main.go First observer: 1 First observer: 2 First observer: 3 before subscribe second observer

由輸出可以看出,註冊第一個觀察者之後就開始產生數據了。

我們通過在創建 Observable 的方法中指定rxgo.WithPublishStrategy()選項就可以創建可連接的 Observable:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("Second observer: %d\n", i)
  })

  observable.Connect(context.Background())
  time.Sleep(3 * time.Second)
}

運行輸出:

$ go run main.go before subscribe second observer Second observer: 1 First observer: 1 First observer: 2 First observer: 3 Second observer: 2 Second observer: 3

转换 Observable

rxgo 提供了很多轉換函數,可以修改經過它的rxgo.Item,然後再發送給下一個階段。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type User struct {
  Name string `json:"name"`
  Age  int    `json:"age"`
}

func main() {
  observable := rxgo.Just(
    `{"name":"dj","age":18}`,
    `{"name":"jw","age":20}`,
  )()

  observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    return []byte(i.(string)), nil
  }).Unmarshal(json.Unmarshal, func() interface{} {
    return &User{}
  })

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

Buffer Buffer按照一定的規則收集接收到的數據,然後一次性發送出去(作爲切片),而不是收到一個發送一個。有 3 種類型的Buffer:

  • BufferWithCount(n):每收到n個數據發送一次,最後一次可能少於n個;
  • BufferWithTime(n):發送在一個時間間隔n內收到的數據;
  • BufferWithTimeOrCount(d, n):收到n個數據,或經過d時間間隔,發送當前收到的數據。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10

func main() {
  observable := rxgo.Just(1, 2, 3, 4)()

  observable = observable.BufferWithCount(3)

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}
1
2
3
$ go run main.go
[1 2 3]
[4]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func main() {
  ch := make(chan rxgo.Item, 1)

  go func() {
    i := 0
    for range time.Tick(time.Second) {
      ch <- rxgo.Of(i)
      i++
    }
  }()

  observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(3 * time.Second))

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

$ go run main.go [0 1 2] [3 4 5] [6 7 8] …

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func main() {
  ch := make(chan rxgo.Item, 1)

  go func() {
    i := 0
    for range time.Tick(time.Second) {
      ch <- rxgo.Of(i)
      i++
    }
  }()

  observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(3*time.Second), 2)

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}
1
2
3
4
5
$ go run main.go
[0 1]
[2 3]
[4 5]
...

选项

rxgo 提供的大部分方法的最後一個參數是一個可變長的選項類型。這是 Go 中特有的、經典的選項設計模式。我們前面已經使用了:

rxgo.WithBufferedChannel(10):設置 channel 的緩存大小; rxgo.WithPool(n)/rxgo.WithCpuPool():使用多個 goroutine 執行轉換操作; rxgo.WithPublishStrategy():使用發佈策略,即創建可連接的 Observable。 除此之外,rxgo 還提供了很多其他選項。留待大家自行探索了。

重试机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
	"context"
	"errors"
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	obs := rxgo.Defer([]rxgo.Producer{
		func(ctx context.Context, ch chan<- rxgo.Item) {
			ch <- rxgo.Of(1)
			ch <- rxgo.Error(context.DeadlineExceeded)

		},
	})
	obs = obs.Map(func(ctx context.Context, i interface{}) (interface{}, error) {
		return i.(int) * 2, nil
	}).Retry(3, func(err error) bool {
		fmt.Println("retry on")
		return errors.Is(err, context.DeadlineExceeded)
	})
	<-obs.DoOnNext(func(k any) {
		// 这里执行异步操作,不阻塞后续步骤
		// time.Sleep(time.Second * 20)
		fmt.Printf("end it --  %v\n", k)
	})
	// .Subscribe(context.Background())

}

结合事务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package okxcontract


func MarketEnterLong(c *gin.Context) {
    var req BuyParam
    if err := c.ShouldBindJSON(&req); err != nil {
        utils.Error(c, err, req)
        return
    }
    if req.Sz <= 0 {
        req.Sz = 5
    }
    acc, err := data.GetAccount(req.AccountId)
    if err != nil {
        utils.Error(c, err, req)
        return
    }

    // Start a transaction
    tx := data.DB.Begin()
    if tx.Error != nil {
        utils.Error(c, tx.Error, req)
        return
    }

    obs := asyncHandleContractWithTransaction(c, &req, acc, "market", "long_enter", req.TdMode, model.Futures_OpenBuy, tx)
    <-obs.ForEach(func(k any) {
        log.Info("MarketEnterLong", k)
        utils.JSON(c, k)
    }, func(err error) {
        // Rollback the transaction in case of error
        tx.Rollback()
        utils.Error(c, err, req)
    }, func() {
        // Commit the transaction if everything is successful
        tx.Commit()
        log.Info("MarketEnterLong", "completed")
    })
}

异常处理

一个 Observable 订阅另一个 Observable , 然后根据策略处理异常或者结束,将结果抛给上层

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	k := 1
	// 数据源
	obs := rxgo.Create([]rxgo.Producer{
		func(ctx context.Context, ch chan<- rxgo.Item) {

			ch <- rxgo.Error(fmt.Errorf("error"))
			ch <- rxgo.Of(k)
		},
	})
	// 数据处理
	ob2 := rxgo.Create([]rxgo.Producer{
		func(ctx context.Context, ch chan<- rxgo.Item) {
			for v := range obs.Observe() {
				if v.Error() {
					// 打印异常,并且抛出到上层
					ch <- v
					fmt.Println("ERROR RECEIVE")
					return
				}
				fmt.Println("RESULT", v)
			}
		},
	})
	for v := range ob2.Observe() {
		if v.Error() {
			fmt.Println("ERROR CATCH >", v)
			return
		}
	}
	// 没有异常则正常结束
	fmt.Println("success end")

}