Skip to content

C# 多线程 08-使用 Reactive Extensions 02-编写自定义的可观察对象

🏷️ 《C# 多线程》

示例代码

csharp
/// <summary>
/// 编写自定义的可观察对象
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
    var observer = new CustomObserver();

    var goodObservable = new CustomSequence(new[] { 1, 2, 3, 4, 5 });
    var badObservable = new CustomSequence(null);

    // 同步订阅
    using (IDisposable subscription = goodObservable.Subscribe(observer))
    {
    }

    // 异步订阅
    using (IDisposable subscription = goodObservable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(observer))
    {
        // 延迟一段时间等待异步任务完成
        Thread.Sleep(TimeSpan.FromMilliseconds(100));
        Console.WriteLine("Press Enter to continue");
        Console.ReadLine();
    }

    // 异步订阅异常时
    using (IDisposable subscription = badObservable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(observer))
    {
        // 延迟一段时间等待异步任务完成
        Thread.Sleep(TimeSpan.FromMilliseconds(100));
        Console.WriteLine("Press Enter to continue");
        Console.ReadLine();
    }
}

/// <summary>
/// 自定义观察者
/// </summary>
class CustomObserver : IObserver<int>
{
    public void OnCompleted()
    {
        Console.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine($"Error: {error.Message}");
    }

    public void OnNext(int value)
    {
        Console.WriteLine($"Next value: {value}; Thread id: {Thread.CurrentThread.ManagedThreadId}");
    }
}

/// <summary>
/// 自定义可观察对象
/// </summary>
class CustomSequence : IObservable<int>
{
    private readonly IEnumerable<int> _numbers;

    public CustomSequence(IEnumerable<int> numbers)
    {
        _numbers = numbers;
    }

    public IDisposable Subscribe(IObserver<int> observer)
    {
        foreach (var number in _numbers)
        {
            observer.OnNext(number);
        }
        observer.OnCompleted();
        return Disposable.Empty;
    }
}

运行结果

txt
Next value: 1; Thread id: 1
Next value: 2; Thread id: 1
Next value: 3; Thread id: 1
Next value: 4; Thread id: 1
Next value: 5; Thread id: 1
Completed
Next value: 1; Thread id: 4
Next value: 2; Thread id: 4
Next value: 3; Thread id: 4
Next value: 4; Thread id: 4
Next value: 5; Thread id: 4
Completed
Press Enter to continue

Error: 未将对象引用设置到对象的实例。
Press Enter to continue