public static class Extensions
{
public static async Task ForEachAsync<T, U>(this IEnumerable<T> collection, Func<T, Task<U>> body, IObserver<U> observer
= null)
{
foreach (var
item in collection)
{
var res = await body(item);
if (null != observer)
{
observer.OnNext(res);
}
}
}
public static async Task ForEachAsyncPerformance<T, U>(this IEnumerable<T> collection, Func<T, Task<U>> body, IObserver<U> observer
= null)
{
var enumerator =
collection.GetEnumerator();
Task<U> task = null;
if (enumerator.MoveNext())
{
task = body(enumerator.Current);
}
while (enumerator.MoveNext())
{
var item = enumerator.Current;
var res = await task;
task = body(item);
if (null != observer)
{
observer.OnNext(res);
}
}
if (null != task)
{
var res = await task;
if (null != observer)
{
observer.OnNext(res);
}
}
}
public static async Task ForEachAsync<T, U>(this IEnumerable<T> collection, int
parallelism, Func<T, Task<U>> body, IObserver<U> observer
= null)
{
int inFlight =
0;
var tasks =
new HashSet<Task<U>>();
foreach (var
item in collection)
{
if (inFlight
>= parallelism)
{
var task = await Task.WhenAny(tasks);
tasks.Remove(task);
inFlight--;
if (null != observer)
{
observer.OnNext(task.Result);
}
}
inFlight++;
tasks.Add(body(item));
}
while (inFlight
> 0)
{
var task = await Task.WhenAny(tasks);
tasks.Remove(task);
inFlight--;
if (null != observer)
{
observer.OnNext(task.Result);
}
}
}
}
AsyncEnumerableExtensions.cs z