How can I use LINQ in CosmosDB SDK v3.0 async query?
If your application follows a layered architecture and you'd like to give your domain layer full control over the query then it's possible to wrap cosmos IQueryable<Person>
with a custom IQueryProvider
that implements IAsyncEnumerable
e.g.
By doing that you can hide the implementation details of asynchronously iterating over the result from your domain layer.
Persistence layer
public class PersonRepository
{
public IQueryable<Person> Persons => _cosmosContainer.GetItemLinqQueryable<Person>().ToCosmosAsyncQueryable();
}
Domain layer
var persons = await _personRepository.Persons
.Where(p => p.Name == "Name")
.AsAsyncQueryable()
.ToListAsync(cancellationToken);
ToListAsync
is available fromSystem.Linq.Async
that can be referenced from your domain layer
Domain layer extensions
public static IAsyncEnumerable<T> AsAsyncQueryable<T>(this IQueryable<T> queryable)
{
return (IAsyncEnumerable<T>)queryable;
}
Persistence layer extensions
internal static class CosmosAsyncQueryableExtensions
{
internal static IQueryable<T> ToCosmosAsyncQueryable<T>(this IOrderedQueryable<T> source)
{
return new CosmosAsyncQueryable<T>(source);
}
}
internal class CosmosAsyncQueryable<TResult> : IEnumerable<TResult>, IQueryable<TResult>, IAsyncEnumerable<TResult>
{
private readonly IQueryable<TResult> _queryable;
public CosmosAsyncQueryable(IQueryable<TResult> queryable)
{
_queryable = queryable;
Provider = new CosmosAsyncQueryableProvider(queryable.Provider);
}
public Type ElementType => typeof(TResult);
public Expression Expression => _queryable.Expression;
public IQueryProvider Provider { get; }
public IEnumerator<TResult> GetEnumerator() => _queryable.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => _queryable.GetEnumerator();
public async IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
var iterator = _queryable.ToFeedIterator();
while (iterator.HasMoreResults)
{
foreach (var item in await iterator.ReadNextAsync(cancellationToken))
{
yield return item;
}
}
}
}
internal class CosmosAsyncQueryableProvider : IQueryProvider
{
private readonly IQueryProvider _provider;
public CosmosAsyncQueryableProvider(IQueryProvider provider) => _provider = provider;
public IQueryable<TElement> CreateQuery<TElement>(Expression expression) =>
new CosmosAsyncQueryable<TElement>(_provider.CreateQuery<TElement>(expression));
public IQueryable CreateQuery(Expression expression) => CreateQuery<object>(expression);
public object Execute(Expression expression) => _provider.Execute(expression);
public TResult Execute<TResult>(Expression expression) => _provider.Execute<TResult>(expression);
}
You would use ToFeedIterator()
and FeedIterator<T>.ReadNextAsync()
.
var db = Client.GetDatabase(databaseId);
var container = db.GetContainer(containerId);
var q = container.GetItemLinqQueryable<Person>();
var iterator = q.Where(p => p.Name == "Name").ToFeedIterator();
var results = await iterator.ReadNextAsync();