【C#,Rx】MergeしたIObservableの少なくとも1個がOnCompletedしたらOnCompletedする

【C#,Rx】MergeしたIObservableの少なくとも1個がOnCompletedしたらOnCompletedする
スポンサーリンク

こんにちは。ゆとり(@yutori_techblog)です。

今回はReactiveExtensionsのMergeオペレータの話題です。

Mergeオペレータは複数のIObservableを合成できる便利なオペレータだよね!

でもMergeオペレータにはある仕様があって、それで困ることがあるの。

仕様って??

Mergeの合成結果がOnCompletedになるには、Merge元のIObservableが全部OnCompletedにならなきゃいけないの。

MergeオペレータとOnCompleted

MergeしたIObservableがOnCompletedとなるには、すべてのIObservableソースがOnCompletedになる必要があります。

3つのSubjectをMergeオペレータで合成して、その結果を購読するサンプルコードです。
Subjectからはそれぞれ3つの値を発行し、OnCompletedしています。

ソースコード
class MainClass
{
    public static void Main(string[] args)
    {
        Subject<int> subject1 = new Subject<int>();
        Subject<int> subject2 = new Subject<int>();
        Subject<int> subject3 = new Subject<int>();

        //Mergeメソッドで3つのIObservableを合成
        IObservable<int> observable =
            subject1.Merge(subject2).Merge(subject3);

        //MergeしたIObservableを購読する
        observable.Subscribe(
            i => Console.WriteLine($"OnNext:{i}"),
            ex => Console.WriteLine(ex.Message),
            () => Console.WriteLine("OnCompleted"));

        subject1.OnNext(1);
        subject1.OnNext(11);
        subject1.OnNext(111);
        subject1.OnCompleted();

        subject2.OnNext(2);
        subject2.OnNext(22);
        subject2.OnNext(222);
        subject2.OnCompleted();

        subject3.OnNext(3);
        subject3.OnNext(33);
        subject3.OnNext(333);
        subject3.OnCompleted();
    }
}
実行結果
OnNext:1
OnNext:11
OnNext:111
OnNext:2
OnNext:22
OnNext:222
OnNext:3
OnNext:33
OnNext:333
OnCompleted

Mergeした3つのSubjectをそれぞれOnCompletedしていますが、Merge結果のIObservableがOnCompletedとなるのは、3つのSubjectがすべてOnCompletedとなったときであることがわかると思います。

では、少なくとも1つがOnCompletedとなったときにMerge結果もOnCompletedとしたいときはどうしたら良いでしょうか?


僕が探した限りでは、それを実現できるオペレータは見つかりませんでした。

と、いうわけで、下記ページを参考にして、作ってみました。

UniRx入門シリーズ 目次はこちら まえがき 私は過去にUniRxについての記事を散発的に書いていましたが、体系に学習できる形になっていませんでした。 そこで…
qiita.com

少なくとも1個がOnCompletedでOnCompletedする自作MergeLeastオペレータ

下記MergeLeastオペレータ本体(MergeLeastクラス)と、それをメソッドチェーンで利用するための拡張メソッドをコピーして利用してください。

MergeLeastオペレータ本体
public class MergeLeast<T> : IObservable<T>
{
    private IObservable<T> _source, _second;

    public MergeLeast(IObservable<T> source, IObservable<T> second)
    {
        _source = source;
        _second = second;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return new MergeLeastInternal(this, observer).Run();
    }

    private class MergeLeastInternal : IObserver<T>
    {
        private MergeLeast<T> _parent;
        private IObserver<T> _observer;
        private IDisposable _secondDisposer;
        private object lockobj = new object();

        public MergeLeastInternal(MergeLeast<T> parent, IObserver<T> observer)
        {
            _parent = parent;
            _observer = observer;
        }

        public IDisposable Run()
        {
            _secondDisposer = _parent._second.Subscribe(
                v => _observer.OnNext(v),
                ex =>
                {
                    _observer.OnError(ex);
                    _observer = null;
                },
                () => _observer.OnCompleted());
            return _parent._source.Subscribe(this);
        }

        public void OnNext(T value)
        {
            lock (lockobj)
            {
                if (_observer == null) return;

                try
                {
                    _observer.OnNext(value);
                }
                catch (Exception ex)
                {
                    _observer.OnError(ex);
                    _observer = null;
                }
            }
        }

        public void OnError(Exception e)
        {
            lock (lockobj)
            {
                _observer.OnError(e);
                _observer = null;
                _secondDisposer.Dispose();
            }
        }

        public void OnCompleted()
        {
            lock (lockobj)
            {
                _observer.OnCompleted();
                _observer = null;
                _secondDisposer.Dispose();
            }
        }
    }
}
拡張メソッド
public static partial class ObservableOperators
{
    public static IObservable<T> MergeLeast<T>(this IObservable<T> source, IObservable<T> second)
    {
        return new MergeLeast<T>(source, second);
    }
}

使ってみる

先ほどのサンプルコードのMergeオペレータを、自作したMergeLeastオペレータに置換しただけのコードです。

class MainClass
{
    public static void Main(string[] args)
    {
        Subject<int> subject1 = new Subject<int>();
        Subject<int> subject2 = new Subject<int>();
        Subject<int> subject3 = new Subject<int>();

        //★MergeLeastメソッドで3つのIObservableを合成
        IObservable<int> observable =
            subject1.MergeLeast(subject2).MergeLeast(subject3);

        //MergeLeastで合成したIObservableを購読する
        observable.Subscribe(
            i => Console.WriteLine($"OnNext:{i}"),
            ex => Console.WriteLine(ex.Message),
            () => Console.WriteLine("OnCompleted"));

        subject1.OnNext(1);
        subject1.OnNext(11);
        subject1.OnNext(111);
        subject1.OnCompleted();

        subject2.OnNext(2);
        subject2.OnNext(22);
        subject2.OnNext(222);
        subject2.OnCompleted();

        subject3.OnNext(3);
        subject3.OnNext(33);
        subject3.OnNext(333);
        subject3.OnCompleted();
    }
}

これを実行すると…

実行結果
OnNext:1
OnNext:11
OnNext:111
OnCompleted

MergeLeastで合成した3つのIObservableのうち、ひとつでもOnCompletedになった時点で、合成結果もOnCompletedになっていることがわかります。

さいごに

Rxには便利なオペレータが多数用意されていますが、時と場合によってはかゆところに手が届かないことがありますね。
その場合は、思い切って自作してしまうことで自分の思い通りの動作を得ることができます。

さらに、オペレータの自作を通じてRxの知識をさらに深めることにも繋がるのではないでしょうか。

ご意見、ご質問等ございましたら、コメントもしくはTwitterへお気軽にお寄せください!