Isolating py.test DB sessions in Flask-SQLAlchemy
1.
According to Session Basics - SQLAlchemy documentation:
commit()
is used to commit the current transaction. It always issues flush() beforehand to flush any remaining state to the database; this is independent of the “autoflush” setting. ....
So transaction.rollback()
in session fixture function does not take effect, because the transaction is already committed.
2.
Change scope of fixtures to function
instead of session
so that db is cleared every time.
@pytest.yield_fixture(scope='function')
def app(request):
...
@pytest.yield_fixture(scope='function')
def db(app, request):
...
BTW, If you use in-memory sqlite database, you don't need to delete the db files, and it will be faster:
DB_URI = 'sqlite://' # SQLite :memory: database
...
@pytest.yield_fixture(scope='function')
def db(app, request):
_db.app = app
_db.create_all()
yield _db
_db.drop_all()
The method introduced in Alex Michael's blog post is not working because it's incomplete. According to the sqlalchemy documentation on joining sessions, Alex's solution works only if there are no rollback calls. Another difference is, a vanilla Session
object is used in sqla docs, compared to a scoped session on Alex's blog.
In the case of flask-sqlalchemy, the scoped session is automatically removed on request teardown. A call to session.remove
is made, which issues a rollback under the hood. To support rollbacks within the scope of the tests, use SAVEPOINT
:
import sqlalchemy as sa
@pytest.yield_fixture(scope='function')
def db_session(db):
"""
Creates a new database session for a test. Note you must use this fixture
if your test connects to db.
Here we not only support commit calls but also rollback calls in tests.
"""
connection = db.engine.connect()
transaction = connection.begin()
options = dict(bind=connection, binds={})
session = db.create_scoped_session(options=options)
session.begin_nested()
# session is actually a scoped_session
# for the `after_transaction_end` event, we need a session instance to
# listen for, hence the `session()` call
@sa.event.listens_for(session(), 'after_transaction_end')
def restart_savepoint(sess, trans):
if trans.nested and not trans._parent.nested:
session.expire_all()
session.begin_nested()
db.session = session
yield session
session.remove()
transaction.rollback()
connection.close()
Your database must support SAVEPOINT
though.