Isolating py.test DB sessions in Flask-SQLAlchemy

1.

According to Session Basics - SQLAlchemy documentation:

commit() is used to commit the current transaction. It always issues flush() beforehand to flush any remaining state to the database; this is independent of the “autoflush” setting. ....

So transaction.rollback() in session fixture function does not take effect, because the transaction is already committed.


2.

Change scope of fixtures to function instead of session so that db is cleared every time.

@pytest.yield_fixture(scope='function')
def app(request):
    ...

@pytest.yield_fixture(scope='function')
def db(app, request):
    ...

BTW, If you use in-memory sqlite database, you don't need to delete the db files, and it will be faster:

DB_URI = 'sqlite://'  # SQLite :memory: database

...

@pytest.yield_fixture(scope='function')
def db(app, request):
    _db.app = app
    _db.create_all()
    yield _db
    _db.drop_all()

The method introduced in Alex Michael's blog post is not working because it's incomplete. According to the sqlalchemy documentation on joining sessions, Alex's solution works only if there are no rollback calls. Another difference is, a vanilla Session object is used in sqla docs, compared to a scoped session on Alex's blog.

In the case of flask-sqlalchemy, the scoped session is automatically removed on request teardown. A call to session.remove is made, which issues a rollback under the hood. To support rollbacks within the scope of the tests, use SAVEPOINT:

import sqlalchemy as sa


@pytest.yield_fixture(scope='function')
def db_session(db):
    """
    Creates a new database session for a test. Note you must use this fixture
    if your test connects to db.

    Here we not only support commit calls but also rollback calls in tests.
    """
    connection = db.engine.connect()
    transaction = connection.begin()

    options = dict(bind=connection, binds={})
    session = db.create_scoped_session(options=options)

    session.begin_nested()

    # session is actually a scoped_session
    # for the `after_transaction_end` event, we need a session instance to
    # listen for, hence the `session()` call
    @sa.event.listens_for(session(), 'after_transaction_end')
    def restart_savepoint(sess, trans):
        if trans.nested and not trans._parent.nested:
            session.expire_all()
            session.begin_nested()

    db.session = session

    yield session

    session.remove()
    transaction.rollback()
    connection.close()

Your database must support SAVEPOINT though.