SqlAlchemy add new Field to class and create corresponding column in table

SQLAlchemy itself doesn't support automatic updates of schema, but there is a third party SQLAlchemy Migrate tool to automate migrations. Look though "Database schema versioning workflow" chapter to see how it works.


Sometimes Migrate is too much work - you just want to column automatically added when you run your changed code. So here is a function that does that.

Caveats: it pokes around in the SQLAlchemy internals and tends to require small changes every time SQLAlchemy undergoes a major revision. (There's probably a much better way of doing this - I am not a SQLAlchemy expert). It also doesn't handle constraints.

import logging
import re

import sqlalchemy
from sqlalchemy import MetaData, Table, exceptions
import sqlalchemy.engine.ddl

_new_sa_ddl = sqlalchemy.__version__.startswith('0.7')


def create_and_upgrade(engine, metadata):
    """For each table in metadata, if it is not in the database then create it. 
    If it is in the database then add any missing columns and warn about any columns
    whose spec has changed"""
    db_metadata = MetaData()
    db_metadata.bind = engine

    for model_table in metadata.sorted_tables:
        try:
            db_table = Table(model_table.name, db_metadata, autoload=True)
        except exceptions.NoSuchTableError:
            logging.info('Creating table %s' % model_table.name)
            model_table.create(bind=engine)
        else:
            if _new_sa_ddl:
                ddl_c = engine.dialect.ddl_compiler(engine.dialect, None)
            else:
                # 0.6
                ddl_c = engine.dialect.ddl_compiler(engine.dialect, db_table)
            # else:
                # 0.5
                # ddl_c = engine.dialect.schemagenerator(engine.dialect, engine.contextual_connect())

            logging.debug('Table %s already exists. Checking for missing columns' % model_table.name)

            model_columns = _column_names(model_table)
            db_columns = _column_names(db_table)

            to_create = model_columns - db_columns
            to_remove = db_columns - model_columns
            to_check = db_columns.intersection(model_columns)

            for c in to_create:
                model_column = getattr(model_table.c, c)
                logging.info('Adding column %s.%s' % (model_table.name, model_column.name))
                assert not model_column.constraints, \
                    'Arrrgh! I cannot automatically add columns with constraints to the database'\
                        'Please consider fixing me if you care!'
                model_col_spec = ddl_c.get_column_specification(model_column)
                sql = 'ALTER TABLE %s ADD %s' % (model_table.name, model_col_spec)
                engine.execute(sql)

            # It's difficult to reliably determine if the model has changed 
            # a column definition. E.g. the default precision of columns
            # is None, which means the database decides. Therefore when I look at the model
            # it may give the SQL for the column as INTEGER but when I look at the database
            # I have a definite precision, therefore the returned type is INTEGER(11)

            for c in to_check:
                model_column = model_table.c[c]
                db_column = db_table.c[c]
                x =  model_column == db_column

                logging.debug('Checking column %s.%s' % (model_table.name, model_column.name))
                model_col_spec = ddl_c.get_column_specification(model_column)
                db_col_spec = ddl_c.get_column_specification(db_column)

                model_col_spec = re.sub('[(][\d ,]+[)]', '', model_col_spec)
                db_col_spec = re.sub('[(][\d ,]+[)]', '', db_col_spec)
                db_col_spec = db_col_spec.replace('DECIMAL', 'NUMERIC')
                db_col_spec = db_col_spec.replace('TINYINT', 'BOOL')

                if model_col_spec != db_col_spec:
                    logging.warning('Column %s.%s has specification %r in the model but %r in the database' % 
                                       (model_table.name, model_column.name, model_col_spec, db_col_spec))

                if model_column.constraints or db_column.constraints:
                    # TODO, check constraints
                    logging.debug('Column constraints not checked. I am too dumb')

            for c in to_remove:
                model_column = getattr(db_table.c, c)
                logging.warning('Column %s.%s in the database is not in the model' % (model_table.name, model_column.name))


def _column_names(table):
    # Autoloaded columns return unicode column names - make sure we treat all are equal
    return set((unicode(i.name) for i in table.c))