Can someone describe how the logic flows in this recursive Async Azure Storage call?
Can someone help me understand how this code is exec开发者_JS百科uted, and where the "result" is, and at what point can I start doing stuff with the complete result.
protected void QuerySongsExecuteSegmentedAsync(CloudTableClient cloudTableClient)
{
TableServiceContext tableServiceContext = cloudTableClient.GetDataServiceContext();
tableServiceContext.ResolveType = (unused) => typeof(Song);
CloudTableQuery<Song> cloudTableQuery =
(from entity in tableServiceContext.CreateQuery<Song>("Songs").Take(10)
select entity ).AsTableServiceQuery<Song>();
IAsyncResult iAsyncResult =
cloudTableQuery.BeginExecuteSegmented(BeginExecuteSegmentedIsDone, cloudTableQuery);
}
static void BeginExecuteSegmentedIsDone(IAsyncResult result)
{
CloudTableQuery<Song> cloudTableQuery = result.AsyncState as CloudTableQuery<Song>;
ResultSegment<Song> resultSegment = cloudTableQuery.EndExecuteSegmented(result);
List<Song> listSongs = resultSegment.Results.ToList<Song>();
if (resultSegment.HasMoreResults)
{
IAsyncResult iAsyncResult =
cloudTableQuery.BeginExecuteSegmented(
resultSegment.ContinuationToken, BeginExecuteSegmentedIsDone, cloudTableQuery);
}
}
Queries against Windows Azure table storage can return partial results with a continuation token, which means you need to reissue the query (with the continuation token) to get the next batch of results. Typically, you'll see code that just uses .AsTableServiceQuery()
and then enumerates, which will cause that chain of calls to happen transparently during enumeration.
This code is doing it explicitly, by using BeginExecuteSegmented to retrieve each batch of results. Right below the line List<Song> listSongs = resultSegment.Results.ToList<Song>()
, you should be able to consume those Song
s. (More may be coming, but those results should be valid and usable.)
If you want to use TPL (Task Parallel Library) to execute asynchronously you can use a pattern like this.
public static Task<IEnumerable<T>> ExecuteAsync<T>(DataServiceQuery<T> query, TableServiceContext tableContext)
{
List<T> values = null;
var cloudQuery = query.AsTableServiceQuery();
Func<ResultSegment<T>, Task<ResultSegment<T>>> getSegment = null;
getSegment = new Func<ResultSegment<T>, Task<ResultSegment<T>>>((previous) =>
{
return ExecuteSegmentAsync(cloudQuery, previous).ContinueWith(exec =>
{
if (exec.IsFaulted || exec.IsCanceled)
{
return exec;
}
else
{
var segment = exec.Result;
if(segment.HasMoreResults)
{
if(values == null)
{
values = new List<T>();
}
values.AddRange(segment.Results);
return getSegment(segment);
}
else
{
return exec;
}
}
}).Unwrap();
});
return getSegment(null).ContinueWith(exec =>
{
exec.ThrowOnError("ExecuteAsync");
var segment = exec.Result;
if(values == null)
{
return segment.Results;
}
else
{
values.AddRange(segment.Results);
return values;
}
});
}
private static Task<ResultSegment<T>> ExecuteSegmentAsync<T>(CloudTableQuery<T> query, ResultSegment<T> previous)
{
var tcs = new TaskCompletionSource<ResultSegment<T>>();
RetryPolicy.ExecuteAction(ac =>
{
// Invoke the begin method of the asynchronous call.
if(previous != null)
{
query.BeginExecuteSegmented(previous.ContinuationToken, ac, null);
}
else
{
query.BeginExecuteSegmented(ac, null);
}
},
ar =>
{
// Invoke the end method of the asynchronous call.
return query.EndExecuteSegmented(ar);
},
values =>
{
tcs.SetResult(values);
},
ex =>
{
// Async opetation failed after multiple retries
tcs.SetException(ex);
});
return tcs.Task;
}
精彩评论