围绕连续变化的可观察性集合创建可观察器

本文关键字:创建 观察 集合 可观察性 连续 变化 | 更新日期: 2024-08-10 00:43:28

下面的代码片段是我创建以下功能的尝试:

  • 创建订阅主题集合的可观察序列
  • 当集合中的一个主题生成值时,序列结束,调用一个返回新主题集的方法,并在1处重新启动
  • 当对外部可观察对象的订阅被处理时,整个事情就停止了

关于我的实现的问题:

  • 为什么使用subjectsSub.SelectMounty(x=>x).Merge()而不使用subjectsSub.Merge?(我本以为会采用后一种工作方式)
  • 在庞大的Rx功能库中,有没有更简单、更优雅的解决方案

更新:这个示例实际上是从RxJS Typescript反向移植的,目的是让更多的人了解这个问题。事实上,原始版本在使用Javascript的单线程浏览器环境中运行,这应该更清楚地说明为什么这种"可观察的捕获"可能会起作用(它确实起作用,而且没有像扰乱RxJ内部这样的肮脏黑客)。


    class Program
    {
      private static readonly Queue<IObservable<Unit>[]> observableDependencies = new Queue<IObservable<Unit>[]>();
      private static IObservable<Unit>[] EvaluateExpressionAndCaptureTouchedObservables(Func<object> expression)
      {
        // wire some traps for capturing any observables "touched" by expression
        expression();
        // return observables touched by expression (not in this example of course)
        if (observableDependencies.Count > 0)
          return observableDependencies.Dequeue();
        return new[] {Observable.Never<Unit>()}; // keep going
      }
      private static IObservable<Unit> CreateObservable(
Subject<IObservable<Unit>[]> capturedObservables, Stopwatch sw)
      {
        return Observable.Create<Unit>(observer =>
        {
          var isComplete = new Subject<Unit>();
          var isAborted = false;
          var disp = Scheduler.Default.Schedule(self =>
          {
            Console.WriteLine("** Next iteration at {0}", sw.Elapsed);
            capturedObservables.SelectMany(x => x).Merge().TakeUntil(isComplete).Subscribe(x =>
            {
              observer.OnNext(Unit.Default);
              // self-destruct
              isComplete.OnNext(Unit.Default);
            },
            () =>
            {
              Console.WriteLine("completed");
              if (!isAborted)
                self();
            });
            capturedObservables.OnNext(EvaluateExpressionAndCaptureTouchedObservables());
          });
          return new CompositeDisposable(Disposable.Create(() =>
          {
            isAborted = true;
            // self-destruct
            isComplete.OnNext(Unit.Default);
          }), disp);
        });
      }
      private static void Main(string[] args)
      {
        var sw = new Stopwatch();
        sw.Start();
        observableDependencies.Enqueue(new[]
        {
          Observable.Timer(TimeSpan.FromSeconds(10)).Select(x => Unit.Default)
        });
        observableDependencies.Enqueue(new[]
        {
          Observable.Timer(TimeSpan.FromSeconds(5)).Select(x => Unit.Default),
          Observable.Return(10).Select(x => Unit.Default)
        });
        observableDependencies.Enqueue(new[] {Observable.Timer(TimeSpan.FromSeconds(3)).Select(x => Unit.Default)});
        var capturedObservables = new Subject<IObservable<Unit>[]>();
        var obs = CreateObservable(capturedObservables, sw);
        var disp = obs.Subscribe(x => Console.WriteLine("** fired at {0}", sw.Elapsed));
        Console.ReadLine();
        disp.Dispose();
        Console.ReadLine();
      }
    }

围绕连续变化的可观察性集合创建可观察器

要回答您的第一个问题,需要SelectMany,因为您有一个三级深度可观测:可观测阵列的主题。CCD_ 2只使一个层次变平。CCD_ 3只是CCD_ 4+CCD_。所以SelectMany.Merge正在应用2个平坦化运算符,这正是您所需要的。

第二个答案。。。看起来你可以只使用Merge+FirstOrDefault+Defer+Repeat,甚至不使用主题:

var disp = Observable
    .Defer(() => EvaluateExpressionAndCaptureTouchedObservables()
        .Merge()
        .FirstOrDefault(Unit.Default))
    .Repeat()
    .Subscribe(...);

Defer每次订阅时都会运行捕获功能

Merge使可观测阵列变平

一旦任何可观测值产生值,FirstOrDefault就结束流。如果所有这些都完成了,但没有产生值,那么它会产生一个可以观察到的Unit.Default

Repeat在结束时重新订阅(由于FirstOrDefault),这会触发另一次捕获(由于Defer)。

这显然是微不足道的转换回TypeScript。。。

或多或少受到Brandon建议启发的CreateObservable最终版本。这表明,99%的时候,你认为你必须求助于日程安排,但你做错了™

private static IObservable<Unit> CreateObservable()
{
  return Observable.Create<Unit>(observer =>
  {
    var innerDisp = Observable.Defer(() =>
    {
      return Observable.Merge(
        EvaluateExpressionAndCaptureTouchedObservables(() => false))
      .Take(1);  // done when any observable produces a value
    })
    .Repeat()
    .Subscribe(x =>
    {
      observer.OnNext(Unit.Default);
    });
    return innerDisp;
  });
}