Skip to content

C# 多线程 08-使用 Reactive Extensions 06-使用 Rx 创建异步操作

🏷️ 《C# 多线程》

示例代码

csharp
delegate string AsyncDelegate(string name);

/// <summary>
/// 使用 Rx 创建异步操作
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    IObservable<string> o = LongRunningOperationAsync("Task1");
    using (var sub = OutputToConsole(o))
    {
        Thread.Sleep(TimeSpan.FromSeconds(2));
    }
    Console.WriteLine("--------------------");

    Task<string> t = LongRunningOperationTaskAsync("Task2");
    // 使用 ToObservable 方法将 Task 转换为 Observable 方法
    using (var sub = OutputToConsole(t.ToObservable()))
    {
        Thread.Sleep(TimeSpan.FromSeconds(2));
    }
    Console.WriteLine("--------------------");
    
    // 将异步编程模块模式转换为 Observable 类
    AsyncDelegate asyncMethod = LongRunningOperation;
    Func<string, IObservable<string>> observableFactory = Observable.FromAsyncPattern<string, string>(asyncMethod.BeginInvoke, asyncMethod.EndInvoke);
    o = observableFactory("Task3");
    using (var sub = OutputToConsole(o))
    {
        Thread.Sleep(TimeSpan.FromSeconds(2));
    }
    Console.WriteLine("--------------------");

    // 对 Observable 操作使用 await
    o = observableFactory("Task4");
    AwaitOnObservable(o).Wait();
    Console.WriteLine("--------------------");

    // 把基于事件的异步模式直接转换为 Observable 类
    using (var timer = new System.Timers.Timer(1000))
    {
        var ot = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(
            h => timer.Elapsed += h
            , h => timer.Elapsed -= h
        );
        timer.Start();

        using (var sub = OutputToConsole(ot))
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
        }
        Console.WriteLine("--------------------");
        timer.Stop();
    }

    Console.ReadLine();
}

static async Task<T> AwaitOnObservable<T>(IObservable<T> observable)
{
    T obj = await observable;
    Console.WriteLine($"{obj}");
    return obj;
}

static Task<string> LongRunningOperationTaskAsync(string name)
{
    return Task.Run(() => LongRunningOperation(name));
}

static IObservable<string> LongRunningOperationAsync(string name)
{
    // Observable.Start 与 TPL 中的 Task.Run 方法很相似。
    // 启动异步操作并返回同一个字符串结果,然后退出
    return Observable.Start(() => LongRunningOperation(name));
}

static string LongRunningOperation(string name)
{
    Thread.Sleep(TimeSpan.FromSeconds(1));
    return $"Task {name} is completed. Thread id {Thread.CurrentThread.ManagedThreadId}";
}

static IDisposable OutputToConsole(IObservable<EventPattern<ElapsedEventArgs>> sequence)
{
    return sequence.Subscribe(
        obj => Console.WriteLine($"{obj.EventArgs.SignalTime}")
        , ex => Console.WriteLine($"Error: {ex.Message}")
        , () => Console.WriteLine("Completed")
    );
}

static IDisposable OutputToConsole<T>(IObservable<T> sequence)
{
    return sequence.Subscribe(
        obj => Console.WriteLine($"{obj}")
        , ex => Console.WriteLine($"Error: {ex.Message}")
        , () => Console.WriteLine("Completed")
    );
}

运行结果

txt
Task Task1 is completed. Thread id 4
Completed
--------------------
Task Task2 is completed. Thread id 4
Completed
--------------------
Task Task3 is completed. Thread id 4
Completed
--------------------
Task Task4 is completed. Thread id 5
--------------------
2017/8/25 17:30:35
2017/8/25 17:30:36
2017/8/25 17:30:37
2017/8/25 17:30:38
--------------------