How can I use LINQ in CosmosDB SDK v3.0 async query?

If your application follows a layered architecture and you'd like to give your domain layer full control over the query then it's possible to wrap cosmos IQueryable<Person> with a custom IQueryProvider that implements IAsyncEnumerable e.g.

By doing that you can hide the implementation details of asynchronously iterating over the result from your domain layer.

Persistence layer

public class PersonRepository
{
    public IQueryable<Person> Persons => _cosmosContainer.GetItemLinqQueryable<Person>().ToCosmosAsyncQueryable();
}

Domain layer

var persons = await _personRepository.Persons
    .Where(p => p.Name == "Name")
    .AsAsyncQueryable()
    .ToListAsync(cancellationToken);
  • ToListAsync is available from System.Linq.Async that can be referenced from your domain layer

Domain layer extensions

public static IAsyncEnumerable<T> AsAsyncQueryable<T>(this IQueryable<T> queryable)
{
    return (IAsyncEnumerable<T>)queryable;
}

Persistence layer extensions

internal static class CosmosAsyncQueryableExtensions
{
    internal static IQueryable<T> ToCosmosAsyncQueryable<T>(this IOrderedQueryable<T> source)
    {
        return new CosmosAsyncQueryable<T>(source);
    }
}

internal class CosmosAsyncQueryable<TResult> : IEnumerable<TResult>, IQueryable<TResult>, IAsyncEnumerable<TResult>
{
    private readonly IQueryable<TResult> _queryable;

    public CosmosAsyncQueryable(IQueryable<TResult> queryable)
    {
        _queryable = queryable;
        Provider = new CosmosAsyncQueryableProvider(queryable.Provider);
    }

    public Type ElementType => typeof(TResult);

    public Expression Expression => _queryable.Expression;

    public IQueryProvider Provider { get; }

    public IEnumerator<TResult> GetEnumerator() => _queryable.GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator() => _queryable.GetEnumerator();

    public async IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
    {
        var iterator = _queryable.ToFeedIterator();

        while (iterator.HasMoreResults)
        {
            foreach (var item in await iterator.ReadNextAsync(cancellationToken))
            {
                yield return item;
            }
        }
    }
}

internal class CosmosAsyncQueryableProvider : IQueryProvider
{
    private readonly IQueryProvider _provider;

    public CosmosAsyncQueryableProvider(IQueryProvider provider) => _provider = provider;

    public IQueryable<TElement> CreateQuery<TElement>(Expression expression) =>
        new CosmosAsyncQueryable<TElement>(_provider.CreateQuery<TElement>(expression));

    public IQueryable CreateQuery(Expression expression) => CreateQuery<object>(expression);

    public object Execute(Expression expression) => _provider.Execute(expression);

    public TResult Execute<TResult>(Expression expression) => _provider.Execute<TResult>(expression);
}

You would use ToFeedIterator() and FeedIterator<T>.ReadNextAsync().

var db = Client.GetDatabase(databaseId);
var container = db.GetContainer(containerId);

var q = container.GetItemLinqQueryable<Person>();
var iterator = q.Where(p => p.Name == "Name").ToFeedIterator();
var results = await iterator.ReadNextAsync();