围绕连续变化的可观察性集合创建可观察器
本文关键字:创建 观察 集合 可观察性 连续 变化 | 更新日期: 2024-08-10 00:43:28
下面的代码片段是我创建以下功能的尝试:
- 创建订阅主题集合的可观察序列
- 当集合中的一个主题生成值时,序列结束,调用一个返回新主题集的方法,并在1处重新启动
- 当对外部可观察对象的订阅被处理时,整个事情就停止了
关于我的实现的问题:
- 为什么使用subjectsSub.SelectMounty(x=>x).Merge()而不使用subjectsSub.Merge?(我本以为会采用后一种工作方式)
- 在庞大的Rx功能库中,有没有更简单、更优雅的解决方案
更新:这个示例实际上是从RxJS Typescript反向移植的,目的是让更多的人了解这个问题。事实上,原始版本在使用Javascript的单线程浏览器环境中运行,这应该更清楚地说明为什么这种"可观察的捕获"可能会起作用(它确实起作用,而且没有像扰乱RxJ内部这样的肮脏黑客)。
class Program
{
private static readonly Queue<IObservable<Unit>[]> observableDependencies = new Queue<IObservable<Unit>[]>();
private static IObservable<Unit>[] EvaluateExpressionAndCaptureTouchedObservables(Func<object> expression)
{
// wire some traps for capturing any observables "touched" by expression
expression();
// return observables touched by expression (not in this example of course)
if (observableDependencies.Count > 0)
return observableDependencies.Dequeue();
return new[] {Observable.Never<Unit>()}; // keep going
}
private static IObservable<Unit> CreateObservable(
Subject<IObservable<Unit>[]> capturedObservables, Stopwatch sw)
{
return Observable.Create<Unit>(observer =>
{
var isComplete = new Subject<Unit>();
var isAborted = false;
var disp = Scheduler.Default.Schedule(self =>
{
Console.WriteLine("** Next iteration at {0}", sw.Elapsed);
capturedObservables.SelectMany(x => x).Merge().TakeUntil(isComplete).Subscribe(x =>
{
observer.OnNext(Unit.Default);
// self-destruct
isComplete.OnNext(Unit.Default);
},
() =>
{
Console.WriteLine("completed");
if (!isAborted)
self();
});
capturedObservables.OnNext(EvaluateExpressionAndCaptureTouchedObservables());
});
return new CompositeDisposable(Disposable.Create(() =>
{
isAborted = true;
// self-destruct
isComplete.OnNext(Unit.Default);
}), disp);
});
}
private static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
observableDependencies.Enqueue(new[]
{
Observable.Timer(TimeSpan.FromSeconds(10)).Select(x => Unit.Default)
});
observableDependencies.Enqueue(new[]
{
Observable.Timer(TimeSpan.FromSeconds(5)).Select(x => Unit.Default),
Observable.Return(10).Select(x => Unit.Default)
});
observableDependencies.Enqueue(new[] {Observable.Timer(TimeSpan.FromSeconds(3)).Select(x => Unit.Default)});
var capturedObservables = new Subject<IObservable<Unit>[]>();
var obs = CreateObservable(capturedObservables, sw);
var disp = obs.Subscribe(x => Console.WriteLine("** fired at {0}", sw.Elapsed));
Console.ReadLine();
disp.Dispose();
Console.ReadLine();
}
}
要回答您的第一个问题,需要SelectMany
,因为您有一个三级深度可观测:可观测阵列的主题。CCD_ 2只使一个层次变平。CCD_ 3只是CCD_ 4+CCD_。所以SelectMany.Merge
正在应用2个平坦化运算符,这正是您所需要的。
第二个答案。。。看起来你可以只使用Merge
+FirstOrDefault
+Defer
+Repeat
,甚至不使用主题:
var disp = Observable
.Defer(() => EvaluateExpressionAndCaptureTouchedObservables()
.Merge()
.FirstOrDefault(Unit.Default))
.Repeat()
.Subscribe(...);
Defer
每次订阅时都会运行捕获功能
Merge
使可观测阵列变平
一旦任何可观测值产生值,FirstOrDefault
就结束流。如果所有这些都完成了,但没有产生值,那么它会产生一个可以观察到的Unit.Default
。
Repeat
在结束时重新订阅(由于FirstOrDefault
),这会触发另一次捕获(由于Defer
)。
这显然是微不足道的转换回TypeScript。。。
或多或少受到Brandon建议启发的CreateObservable最终版本。这表明,99%的时候,你认为你必须求助于日程安排,但你做错了™
private static IObservable<Unit> CreateObservable()
{
return Observable.Create<Unit>(observer =>
{
var innerDisp = Observable.Defer(() =>
{
return Observable.Merge(
EvaluateExpressionAndCaptureTouchedObservables(() => false))
.Take(1); // done when any observable produces a value
})
.Repeat()
.Subscribe(x =>
{
observer.OnNext(Unit.Default);
});
return innerDisp;
});
}