How to make SQLAlchemy in Tornado to be async?
I am using tornado with sqlalchemy in next way:
from tornado_mysql import pools
from sqlalchemy.sql import table, column, select, join
from sqlalchemy.dialects import postgresql, mysql
# from models import M, M2
t = table(...)
t2 = table(...)
xxx_id = 10
j = join(t, t2, t.c.t_id == t2.c.id)
s = select([t]).select_from(j).where(t.c.xxx == xxx_id)
sql_str = s.compile(dialect=mysql.dialect(),compile_kwargs={"literal_binds": True})
pool = pools.Pool(conn_data...)
cur = yield pool.execute(sql_str)
data = cur.fetchone()
In that case we are able to use sqlalchemy models, and sqlalchemy tools for constructig queries.
Not tornado, but we sort of made SQLAlchemy async in asyncio in the GINO project:
import asyncio
from gino import Gino, enable_task_local
from sqlalchemy import Column, Integer, Unicode, cast
db = Gino()
class User(db.Model):
__tablename__ = 'users'
id = Column(Integer(), primary_key=True)
nickname = Column(Unicode(), default='noname')
async def main():
await db.create_pool('postgresql://localhost/gino')
# Create object, `id` is assigned by database
u1 = await User.create(nickname='fantix')
print(u1.id, u1.nickname) # 1 fantix
# Retrieve the same row, as a different object
u2 = await User.get(u1.id)
print(u2.nickname) # fantix
# Update affects only database row and the operating object
await u2.update(nickname='daisy')
print(u2.nickname) # daisy
print(u1.nickname) # fantix
# Returns all user objects with "d" in their nicknames
users = await User.query.where(User.nickname.contains('d')).gino.all()
# Find one user object, None if not found
user = await User.query.where(User.nickname == 'daisy').gino.first()
# Execute complex statement and return command status
status = await User.update.values(
nickname='No.' + cast(User.id, Unicode),
).where(
User.id > 10,
).gino.status()
# Iterate over the results of a large query in a transaction as required
async with db.transaction():
async for u in User.query.order_by(User.id).gino.iterate():
print(u.id, u.nickname)
loop = asyncio.get_event_loop()
enable_task_local(loop)
loop.run_until_complete(main())
It looks a bit like, but actually quite different than SQLAlchemy ORM. Because we used only a part of SQLAlchemy core, and built a simple ORM on top of it. It uses asyncpg underneath, so it is for PostgreSQL only.
Update: GINO supports Tornado now, thanks to the contribution of Vladimir Goncharov. See docs here
ORMs are poorly suited for explicit asynchronous programming, that is, where the programmer must produce explicit callbacks anytime something that uses network access occurs. A primary reason for this is that ORMs make extensive use of the lazy loading pattern, which is more or less incompatible with explicit async. Code that looks like this:
user = Session.query(User).first()
print user.addresses
will actually emit two separate queries - one when you say first()
to load a row, and the next when you say user.addresses
, in the case that the .addresses
collection isn't already present, or has been expired. Essentially, nearly every line of code that deals with ORM constructs might block on IO, so you'd be in extensive callback spaghetti within seconds - and to make matters worse, the vast majority of those code lines won't actually block on IO, so all the overhead of connecting callbacks together for what would otherwise be simple attribute access operations will make your program vastly less efficient too.
A major issue with explicit asynchronous models is that they add tremendous Python function call overhead to complex systems - not just on the user-facing side like you get with lazy loading, but on the internal side as well regarding how the system provides abstraction around the Python database API (DBAPI). For SQLAlchemy to even have basic async support would impose a severe performance penalty on the vast majority of programs that don't use async patterns, and even those async programs that are not highly concurrent. Consider SQLAlchemy, or any other ORM or abstraction layer, might have code like the following:
def execute(connection, statement):
cursor = connection.cursor()
cursor.execute(statement)
results = cursor.fetchall()
cursor.close()
return results
The above code performs what seems to be a simple operation, executing a SQL statement on a connection. But using a fully async DBAPI like psycopg2's async extension, the above code blocks on IO at least three times. So to write the above code in explicit async style, even when there's no async engine in use and the callbacks aren't actually blocking, means the above outer function call becomes at least three function calls, instead of one, not including the overhead imposed by the explicit asynchronous system or the DBAPI calls themselves. So a simple application is automatically given a penalty of 3x the function call overhead surrounding a simple abstraction around statement execution. And in Python, function call overhead is everything.
For these reasons, I continue to be less than excited about the hype surrounding explicit async systems, at least to the degree that some folks seem to want to go all async for everything, like delivering web pages (see node.js). I'd recommend using implicit async systems instead, most notably gevent, where you get all the non-blocking IO benefits of an asynchronous model and none of the structural verbosity/downsides of explicit callbacks. I continue to try to understand use cases for these two approaches, so I'm puzzled by the appeal of the explicit async approach as a solution to all problems, i.e. as you see with node.js - we're using scripting languages in the first place to cut down on verbosity and code complexity, and explicit async for simple things like delivering web pages seems to do nothing but add boilerplate that can just as well be automated by gevent or similar, if blocking IO is even such a problem in a case like that (plenty of high volume websites do fine with a synchronous IO model). Gevent-based systems are production proven and their popularity is growing, so if you like the code automation that ORMs provide, you might also want to embrace the async-IO-scheduling automation that a system like gevent provides.
Update: Nick Coghlan pointed out his great article on the subject of explicit vs. implicit async which is also a must read here. And I've also been updated to the fact that pep-3156 now welcomes interoperability with gevent, reversing its previously stated disinterest in gevent, largely thanks to Nick's article. So in the future I would recommend a hybrid of Tornado using gevent for the database logic, once the system of integrating these approaches is available.
I had this same issue in the past and I couldn't find a reliable Async-MySQL library. However there is a cool solution using Asyncio + Postgres. You just need to use the aiopg library, which comes with SQLAlchemy support out of the box:
import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa
metadata = sa.MetaData()
tbl = sa.Table('tbl', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('val', sa.String(255)))
async def create_table(engine):
async with engine.acquire() as conn:
await conn.execute('DROP TABLE IF EXISTS tbl')
await conn.execute('''CREATE TABLE tbl (
id serial PRIMARY KEY,
val varchar(255))''')
async def go():
async with create_engine(user='aiopg',
database='aiopg',
host='127.0.0.1',
password='passwd') as engine:
async with engine.acquire() as conn:
await conn.execute(tbl.insert().values(val='abc'))
async for row in conn.execute(tbl.select()):
print(row.id, row.val)
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
Updated as mentioned by @cglacet