dotnet experts, is there a better way to integrate Polly and Async for concurrent network calls?
So I have to make network calls to process data, and each host will start timing out if I send more than a certain amount of concurrent files. The problem is, I can't just use foreach's MAX_DEGREE_OF_PARALLELISM since I could be executing multiple doWork() functions in parallel (doWork() is called everytime a zip files with data is deposited into a directory).
I thought of using both HttpClientHandler's MaxConnectionsPerServer property, however that just parks the connection at the OS level while the timeout is still counting, so you'll get a ton of timeouts if you use this method.
I ended up deciding on using Polly's Bulkhead with one Bulkhead per host + WaitAndRetry policies to both limit concurrency and automatically retry network and server errors. My only problem is I don't know if the way I did it is correct since I am by no means an async expert. Is this the recommended way to do this or is there a better way?
NetworkCallAsync() right now is returning the PolicyResult instead of the Result, and I don't know if it is an ideal abstraction. (Everything here is just generic pseudocode for this example)
public static void doWork(){var networkTask = dataFiles.ForEachAsync(MAX_PARALLELIS, async file => {var policyResult = await NetworkCallAsync(content, host);if(policyResult.Outcome == OutcomeType.Successful){handleSuccess(policyResult.Result);}else{handleError(policyResult.Result, polictyResult.FinalException);}}networkTask.Wait();}public static async System.Threading.Tasks.Task<Result> NetworkCallAsync(string content, string host){var policy = getRetryPolicy(host);var result = policy.ExecuteAndCaptureAsync(async () => {return await _timbrarAsync(content, host);});return await result;}public static async System.Threading.Tasks.Task<Timbrar> _NetworkCallAsync(string content, string host){var uri = host + "/";var httpContent = new StringContent(content, Encoding.UTF8);using(var response = await httpClient.PostAsync(uri, httpContent)){return Result(response)}}static Dictionary<string, Polly.Bulkhead.BulkheadPolicy> policies = new Dictionary<string, Polly.Bulkhead.BulkheadPolicy>();public static Policy getHostPolicy(string host){if(!policies.ContainsKey(host)){lock(_policyLock){if(!policies.ContainsKey(host)){var hostPolicy = Polly.Policy.BulkheadAsync(MAX_CONNECTIONS, MAX_QUEUE_SIZE);policies.Add(host, hostPolicy);}}}return policies[host];}public static PolicyWrap<TimbrarResult> getRetryPolicy(string host){var waitAndRetryPolicy =Policy.Handle<Exception>().OrResult<Result>(result => result.httpCode != 200).WaitAndRetryAsync(3,retryAttempt => TimeSpan.FromSeconds(3 * retryAttempt),(result, ts) => {var errorMsg = result?.Exception + " " + result?.Result?.Message;log.Warning(errorMsg);});return waitAndRetryPolicy.WrapAsync(getHostPolicy(host));}
// Utility function ForEach that correctly executes async tasks
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body){return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop)select Task.Run(async delegate {using (partition)while (partition.MoveNext())await body(partition.Current);}));}
0 comments:
Post a Comment