SQLAlchemy - "Dynamic Filter"
I assume you are using the ORM.
in that case, the filter
function returns a query object. You can conditionaly build the query by doing something like
query = Session.query(schema.Object).filter_by(attribute=value)
if condition:
query = query.filter_by(condition_attr=condition_val)
if another_condition:
query = query.filter_by(another=another_val)
#then finally execute it
results = query.all()
Another resolution to this question, this case is raised in a more secure way, since it verifies if the field to be filtered exists in the model.
To add operators to the value to which you want to filter. And not having to add a new parameter to the query, we can add an operator before the value e.g ?foo=>1
, '?foo=<1,
?foo=>=1,
?foo=<=1 ', ?foo=!1
,?foo=1
, and finally between which would be like this `?foo=a, b'.
from sqlalchemy.orm import class_mapper
import re
# input parameters
filter_by = {
"column1": "!1", # not equal to
"column2": "1", # equal to
"column3": ">1", # great to. etc...
}
def computed_operator(column, v):
if re.match(r"^!", v):
"""__ne__"""
val = re.sub(r"!", "", v)
return column.__ne__(val)
if re.match(r">(?!=)", v):
"""__gt__"""
val = re.sub(r">(?!=)", "", v)
return column.__gt__(val)
if re.match(r"<(?!=)", v):
"""__lt__"""
val = re.sub(r"<(?!=)", "", v)
return column.__lt__(val)
if re.match(r">=", v):
"""__ge__"""
val = re.sub(r">=", "", v)
return column.__ge__(val)
if re.match(r"<=", v):
"""__le__"""
val = re.sub(r"<=", "", v)
return column.__le__(val)
if re.match(r"(\w*),(\w*)", v):
"""between"""
a, b = re.split(r",", v)
return column.between(a, b)
""" default __eq__ """
return column.__eq__(v)
query = Table.query
filters = []
for k, v in filter_by.items():
mapper = class_mapper(Table)
if not hasattr(mapper.columns, k):
continue
filters.append(computed_operator(mapper.columns[k], "{}".format(v))
query = query.filter(*filters)
query.all()
The function filter(*criterion)
means you can use tuple as it's argument, @Wolph has detail here:
SQLALchemy dynamic filter_by for detail
if we speak of SQLAlchemy core, there is another way:
from sqlalchemy import and_
filters = [table.c.col1 == filter1, table.c.col2 > filter2]
query = table.select().where(and_(*filters))
If you're trying to filter based on incoming form criteria:
form = request.form.to_dict()
filters = []
for col in form:
sqlalchemybinaryexpression = (getattr(MODEL, col) == form[col])
filters.append(sqlalchemybinaryexpression)
query = table.select().where(and_(*filters))
Where MODEL is your SQLAlchemy Model