Archive

Archive for September, 2019

Parallel.ForEach Async in C#

September 13th, 2019 Comments off

As mentioned in my previous post, to get a ‘proper’ parallel foreach that is async is a bit of a pain

So the solution is to write a true async function

public static async Task ForEach<T>(ICollection<T> source, Func<T, Task> body, CancellationToken token )
{
  // create the list of tasks we will be running
  var tasks = new List<Task>(source.Count);
  try
  {
    // and add them all at once.
    tasks.AddRange(source.Select(s => Task.Run(() => body(s), token)));

    // execute it all with a delay to throw.
    for (; ; )
    {
      // very short delay
      var delay = Task.Delay(1, token );

      // and all our tasks
      await Task.WhenAny( Task.WhenAll(tasks), delay).ConfigureAwait(false);
      if (tasks.All(t => t.IsCompleted))
      {
        break;
      }
      
      //
      // ... use a spinner or something
    }
    await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);

    // throw if we are done here.
    token.ThrowIfCancellationRequested();
  }
  finally
  {
    // find the error(s) that might have happened.
    var errors = tasks.Where(tt => tt.IsFaulted).Select(tu => tu.Exception).ToList();

    // we are back in our own thread
    if (errors.Count > 0)
    {
      throw new AggregateException(errors);
    }
  }
}

And you can call it …

await ParallelAsync.ForEach(number, async (numbers) =>
{
  // blah ... 
  
  // blah ....
  await DoSomethingAmazing( number ).ConfigureAwait(false);
}, CancellationToken.None).ConfigureAwait( false );

Of course, you can refine it by adding check for tokens that cannot be cancelled as well as empty sources

First prize you must make sure that the body of the ForEach takes in the token and cancels cleanly otherwise this will jump out with thread left up in the air… but at least it will get out.

Edit: As someone pointed out to me on StackOverflow there are a couple of subtle ways I can improve my implementation … so I added them here

Categories: development Tags: , , ,

What happens when you throw an exception in Parallel Loops

September 11th, 2019 Comments off

Bad things, bad things happen … explosions, tears … and maybe more

        var numbers = new[] {1, 2, 3, 4, 5};
        Parallel.ForEach( numbers, (number) =>
          {
            Console.WriteLine($"Working on number: {number}");
            if (number == 3)
            {
              throw new Exception( "Boom!");
            }
          }

In the example above, the code will explode because of the exception.
So obviously we will add a try catch block…

try
      {
        var numbers = new[] {1, 2, 3, 4, 5};
        Parallel.ForEach( numbers, (number) =>
          {
            Console.WriteLine($"Working on number: {number}");
            if (number == 3)
            {
              throw new Exception( "Boom!");
            }
          }
        );
      }
      catch (Exception e)
      {
        Console.WriteLine( $"Caught exception! {e.Message}");
      }

And that will work, (of course)

But what if we want to run an async function in parallel … then what do we do?

      try
      {
        var numbers = new[] { 1, 2, 3, 4, 5 };
        Parallel.ForEach(numbers, async (number) =>
          {
            
Console.WriteLine($"Working on number: {number}");
            
            if (number == 3)
            {
              throw new Exception("Boom!");
            }
          }
        );
      }
      catch (Exception e)
      {
        Console.WriteLine($"Caught exception! {e.Message}");
      }

In the case above you fire tasks, but don’t really wait for them to complete.
The async and await might give you the feeling that you are doing something truly in parallel … but you are not
Because Parallel.ForEach has no overload accepting a Func<Task>, it accepts only Action delegates.

The easy way out is to use Task as they were intended

try
      {
        // use a concurent queue so all the thread can add
        // while it is an overhead we do not expect to have that many exceptions in production code.
        var exceptions = new ConcurrentQueue<Exception>();
        var numbers = new[] { 1, 2, 3, 4, 5 };
        var tasks = new List<Task>();

        async Task t(int number)
        {
          // protect everything with a try catch
          try
          {
            await Task.Delay(100).ConfigureAwait(false);
            Console.WriteLine($"Working on number: {number}");
            if (number == 3)
            {
              throw new Exception("Boom!");
            }
          }
          catch (Exception e)
          {
            // save it for later.
            exceptions.Enqueue(e);
          }
        }

        foreach (var number in numbers)
        {
          tasks.Add( t(number) );
        }

        Task.WaitAll(tasks.ToArray());
         
        // we are back in our own thread
        if (exceptions.Count > 0)
        {
          throw new AggregateException( exceptions );
        }
      }
      catch (Exception e)
      {
        Console.WriteLine($"Caught exception! {e.Message}");
      }

Now obviously this is ugly code, but you get the idea … run all the tasks in parallel and wait for them all to finish.

Once they are all done, you can throw an aggregation of errors…. if there are any

Have a look at the code on my github page for more information/sample