読者です 読者をやめる 読者になる 読者になる

プログラムの事とか

お約束ですが「掲載内容は私個人の見解です」

Reactive Extensions使って非同期に読みながら非同期に書き込む処理を書いたけど書けなかった話

Rxすごい!便利!と思い始めて早数年。 いまだにWhereとかThrottleとか使う程度です。難しいの書けません。ぎりぎり読める程度です。

それでも使っていなきゃ使えるようにはならないので頑張るのです。

非同期で読み続けるデータを非同期に書き込みたい

f:id:puni-o:20160210114811p:plain

こんな感じ。

書いてみた。

static void Main(string[] args)
{
    var source = new Subject<string>();
    Writer(source);
    Reader(source);

    Console.ReadKey();
}

static async Task Reader(Subject<string> source) =>
    await Task.Run(async () =>
{
    for (var i = 0; i < 4; i++)
    {
        Console.WriteLine($"Read {i}.");
        source.OnNext(i.ToString());
        await Task.Delay(1000);
    }

    Console.WriteLine("Read Complete");
    source.OnCompleted();
});

static void Writer(Subject<string> source)
{
    Console.WriteLine("Open");
    source.Subscribe(
     onNext:async s =>
        {
            await Task.Delay(3000);
            Console.WriteLine("Write {0}", s);
        }
        ,onCompleted: () => Console.WriteLine("Close."));
}

MainでWriterから先に呼んでいるのは購読前のデータがどこかにいっちゃったからで、それさえどうすればいいのか分からない実力の持ち主ですよ、私は。 そもそもSubjectを渡しているところからしてこれでいいのかどうか・・・。

そして動かしてみた。

Open
Read 0.
Read 1.
Read 2.
Write 0
Read 3.
Write 1
Read Complete
Close.
Write 2
Write 3

こうなりました・・・

Closeの後Writeが走ってますね。onNextが非同期だからショーがない。しょーがない・・・

onNextを非同期にするのがそもそも間違いな気がしますが、書き込む関数が非同期ならこう書きたくなるじゃん。

そして結果的に書き込み処理中にonCompleteがやってきて終了処理の方が先にきました。

ちなみにonNextのasync取れば期待通りには動いてくれます。そもそも書き込む部分を非同期で並列に動かしていいと思えないし(どこかで同期が必要になりそう)

という感じで挫折。(async取って終了)

だれかタスケテ