c# - 从 rx.net Observable.FromAsync 捕获异常

我有以下响应式扩展 Subject 并且需要记录异常,并且还干净地关闭了在 Observable.FromAsync 上创建的这个可能阻塞的异步任务。

与此相关并在取消令牌时,任何等待令牌的东西都会抛出异常 TaskCanceledException

我怎样才能捕获这些异常 - 在关机时忽略 TaskCanceledException ,并记录其余的?

internal sealed class TradeAggregator : IDisposable
{
    private readonly Subject<TradeExecuted> feed = new();
    private readonly CancellationTokenSource cts = new();
    private bool isStopped;
    private bool isDisposed;

    public TradeAggregator(Func<TradeAggregate, CancellationToken, Task> dispatch)
    {
        feed
            .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
            .SelectMany(x => x.ToList())
            .Select(trades => Observable.FromAsync(() => dispatch(AggregateTrades(trades), cts.Token)))
            .Concat() // Ensure that the results are serialized.
            .Subscribe(cts.Token); // Check status of calls.
    }

    private TradeAggregate AggregateTrades(IEnumerable<TradeExecuted> trades)
    {
        // Do stuff.
        return new TradeAggregate();
    }

    public void OnNext(ExecutedTrade trade) => this.feed.OnNext(trade);

    public void Stop()
    {
        if (isStopped) return;
        isStopped = true;
        cts.Cancel();
    }

    public void Dispose()
    {
        if (isDisposed) return;
        isDisposed = true;
        
        Stop();
        feed.Dispose();
        cts.Dispose();
    }
}

回答1

使用不同的 Subscribe 重载:

feed
        .GroupByUntil(x => (x.Execution.Contract.Symbol, x.Execution.AccountId, x.Tenant, x.UserId), x => Observable.Timer(TimeSpan.FromSeconds(5)))
        .SelectMany(x => x.ToList())
        .Select(trades => Observable.FromAsync(() => dispatch(AggregateTrades(trades), cts.Token)))
        .Concat() // Ensure that the results are serialized.
        .Subscribe(
          x => {}, //OnNext, do nothing
          e => {  //OnError, handle
            if(e.GetType() == typeof(TaskCanceledException))
              ; //ignore
            else
            {
              ; //log
            }
          },
          cts.Token
        );

相似文章

javascript - 动态修改 Long SVG 的颜色

我从https://undraw.co/获得了一个SVG放在我的网站上。我正在尝试添加一个颜色主题切换器,其中需要更改颜色的事情之一是这些SVG上的颜色(例如橙色到绿色)。我寻找了一些解决方案,但是修...

html - SVG 样式的问题

我有一个SVG并且我正在尝试做两件事。我想将它放在.leftdiv中,我尝试使用:Display:flex;Justify-content:center;Align-items:center;text...