Spring Data MongoDB Lookup with Pipeline Aggregation
I would like to add this my solution which is repeating in some aspect the solutions posted before.
Mongo driver v3.x
For Mongo driver v3.x I came to the following solution:
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.util.JSON;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
public class JsonOperation implements AggregationOperation {
private List<Document> documents;
public JsonOperation(String json) {
Object root = JSON.parse(json);
documents = root instanceof BasicDBObject
? Collections.singletonList(new Document(((BasicDBObject) root).toMap()))
: ((BasicDBList) root).stream().map(item -> new Document((Map<String, Object>) ((BasicDBObject) item).toMap())).collect(Collectors.toList());
}
@Override
public Document toDocument(AggregationOperationContext context) {
// Not necessary to return anything as we override toPipelineStages():
return null;
}
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return documents;
}
}
and then provided that aggregation steps are given in some resource aggregations.json
:
[
{
$match: {
"userId": "..."
}
},
{
$lookup: {
let: {
...
},
from: "another_collection",
pipeline: [
...
],
as: "things"
}
},
{
$sort: {
"date": 1
}
}
]
one can use above class as follows:
import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;
Collection<ResultDao> results = mongoTemplate.aggregate(newAggregation(new JsonOperation(resourceToString("aggregations.json", StandardCharsets.UTF_8))), "some_collection", ResultDao.class).getMappedResults();
Mongo driver v4.x
As JSON
class was removed from Mongo v4, I have rewritten the class as follows:
import java.util.Collections;
import java.util.List;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
public class JsonOperation implements AggregationOperation {
private List<Document> documents;
private static final String DUMMY_KEY = "dummy";
public JsonOperation(String json) {
documents = parseJson(json);
}
static final List<Document> parseJson(String json) {
return (json.startsWith("["))
? Document.parse("{\"" + DUMMY_KEY + "\": " + json + "}").getList(DUMMY_KEY, Document.class)
: Collections.singletonList(Document.parse(json));
}
@Override
public Document toDocument(AggregationOperationContext context) {
// Not necessary to return anything as we override toPipelineStages():
return null;
}
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return documents;
}
@Override
public String getOperator() {
return documents.iterator().next().keySet().iterator().next();
}
}
but implementation is now a bit ugly because of string manipulations. If somebody has a better idea of how to parse array of objects in a more elegant way, please edit this post or drop a comment. Ideally there should be some method in Mongo core that allows to parse either JSON object or list (returns BasicDBObject
/BasicDBList
or Document
/List<Document>
).
Also note that I have skipped the step of transforming Document
instances in toPipelineStages()
method as it is not necessary in my case:
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return documents.stream().map(document -> context.getMappedObject(document)).collect(Collectors.toList());
}
The drivers are pretty much always a little bit behind the current language features that MongoDB provides - hence some of the latest and greatest features are simply not nicely accessible through the API yet. I am afraid this is one of those cases and you'll need to resort to using strings. Kind of like so (untested):
AggregationOperation match = Aggregation.match(Criteria.where("dayOfWeek").is("SOME_VARIABLE_STRING_1"));
AggregationOperation match2 = Aggregation.match(Criteria.where("deliveryZipCodeTimings").ne([]));
String query = "{ $lookup: { from: 'deliveryZipCodeTiming', let: { location_id: '$fulfillmentLocationId' }, pipeline: [{ $match: { $expr: { $and: [ { $eq: ['$fulfillmentLocationId', '$$location_id']}, { $eq: ['$zipCode', 'SOME_VARIABLE_STRING_2']} ]} } }, { $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }], as: 'deliveryZipCodeTimings' } }";
Aggregation.newAggregation(match, (DBObject) JSON.parse(query), match2);
Building upon the info given by @dnickless, I was able to solve this. I'll post the complete solution in the hopes it helps someone else in the future.
I'm using mongodb-driver:3.6.4
First, I had to create a custom aggregation operation class so that I could pass in a custom JSON mongodb query to be used in the aggregation operation. This will allow me to use pipeline
within a $lookup
which is not supported with the driver version I am using.
public class CustomProjectAggregationOperation implements AggregationOperation {
private String jsonOperation;
public CustomProjectAggregationOperation(String jsonOperation) {
this.jsonOperation = jsonOperation;
}
@Override
public Document toDocument(AggregationOperationContext aggregationOperationContext) {
return aggregationOperationContext.getMappedObject(Document.parse(jsonOperation));
}
}
Now that we have the ability to pass a custom JSON query into our mongodb spring implementation, all that is left is to plug those values into a TypedAggregation query.
public List<FulfillmentChannel> getFulfillmentChannels(
String SOME_VARIABLE_STRING_1,
String SOME_VARIABLE_STRING_2) {
AggregationOperation match = Aggregation.match(
Criteria.where("dayOfWeek").is(SOME_VARIABLE_STRING_1));
AggregationOperation match2 = Aggregation.match(
Criteria.where("deliveryZipCodeTimings").ne(Collections.EMPTY_LIST));
String query =
"{ $lookup: { " +
"from: 'deliveryZipCodeTiming'," +
"let: { location_id: '$fulfillmentLocationId' }," +
"pipeline: [{" +
"$match: {$expr: {$and: [" +
"{ $eq: ['$fulfillmentLocationId', '$$location_id']}," +
"{ $eq: ['$zipCode', '" + SOME_VARIABLE_STRING_2 + "']}]}}}," +
"{ $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }]," +
"as: 'deliveryZipCodeTimings'}}";
TypedAggregation<FulfillmentChannel> aggregation = Aggregation.newAggregation(
FulfillmentChannel.class,
match,
new CustomProjectAggregationOperation(query),
match2
);
AggregationResults<FulfillmentChannel> results =
mongoTemplate.aggregate(aggregation, FulfillmentChannel.class);
return results.getMappedResults();
}
I faced some JSON parsing exceptions when I used the way explained in the accepted answer, so I dig deep the default MongoDB java driver(version 3) Document class to build up aggregation query and found out any aggregation query can be build u as follows,
Replace each of the element in the mongo console query as follows
- Curly braces({) -> new Document()
- parameter names are same
- Colon(:) -> Coma(,)
- Coma(,) -> .append()
- Square bracket([) -> Arrays.asList()
AggregationOperation customLookupOperation = new AggregationOperation() {
@Override
public Document toDocument(AggregationOperationContext context) {
return new Document(
"$lookup",
new Document("from", "deliveryZipCodeTiming")
.append("let",new Document("location_id", "$fulfillmentLocationId"))
.append("pipeline", Arrays.<Object> asList(
new Document("$match", new Document("$expr", new Document("$and",
Arrays.<Object>asList(
new Document("$eq", Arrays.<Object>asList("$fulfillmentLocationId", "$$location_id")),
new Document("$eq", Arrays.<Object>asList("$zipCode", "SOME_VARIABLE_STRING_2"))
)))),
new Document("$project", new Document("_id",0).append("zipCode", 1)
.append("cutoffTime", 1)
)
))
.append("as", "deliveryZipCodeTimings")
);
}
};
Finally you can use the aggregation operation in the aggrgation pipeline,
Aggregation aggregation = Aggregation.newAggregation(matchOperation,customLookupOperation,matchOperation2);