Source code for petlx.sql

"""
Extension module providing some convenience functions for working with SQL databases. SQLAlchemy is required, try
``apt-get install python-sqlalchemy`` or ``pip install SQLAlchemy``.

Acknowledgments: much of the code of this module is based on the ``csvsql`` utility in the
`csvkit <https://github.com/onyxfish/csvkit>`_ package.

"""


import datetime
import logging
import petl
from petl import header, columns, head
from petl.io import _is_dbapi_connection, _is_dbapi_cursor, _is_sqlalchemy_engine, _is_sqlalchemy_session, \
    _is_sqlalchemy_connection
from petlx.util import UnsatisfiedDependency


logger = logging.getLogger(__name__)
warning = logger.warning
info = logger.info
debug = logger.debug


dep_message = """
The package sqlalchemy is required. Instructions for installation can be found
at http://docs.sqlalchemy.org/ or try apt-get install python-sqlalchemy or pip install SQLAlchemy.
"""


DIALECTS = {
    'access': 'access.base',
    'firebird': 'firebird.kinterbasdb',
    'informix': 'informix.informixdb',
    'maxdb': 'maxdb.sapdb',
    'mssql': 'mssql.pyodbc',
    'mysql': 'mysql.mysqlconnector',
    'oracle': 'oracle.cx_oracle',
    'postgresql': 'postgresql.psycopg2',
    'sqlite': 'sqlite.pysqlite',
    'sybase': 'sybase.pyodbc'
}


SQL_INTEGER_MAX = 2147483647
SQL_INTEGER_MIN = -2147483647
NULL_COLUMN_MAX_LENGTH = 32


[docs]def make_sqlalchemy_column(col, colname, constraints=True): """ Infer an appropriate SQLAlchemy column type based on a sequence of values. Parameters ---------- col : sequence A sequence of values to use to infer type, length etc. colname : string Name of column constraints : bool If True use length and nullable constraints """ try: import sqlalchemy except ImportError as e: raise UnsatisfiedDependency(e, dep_message) col_not_none = [v for v in col if v is not None] sql_column_kwargs = {} sql_type_kwargs = {} if len(col_not_none) == 0: sql_column_type = sqlalchemy.String if constraints: sql_type_kwargs['length'] = NULL_COLUMN_MAX_LENGTH elif all(isinstance(v, bool) for v in col_not_none): sql_column_type = sqlalchemy.Boolean elif all(isinstance(v, int) for v in col_not_none): if max(col_not_none) > SQL_INTEGER_MAX or min(col_not_none) < SQL_INTEGER_MIN: sql_column_type = sqlalchemy.BigInteger else: sql_column_type = sqlalchemy.Integer elif all(isinstance(v, long) for v in col_not_none): sql_column_type = sqlalchemy.BigInteger elif all(isinstance(v, (int, long)) for v in col_not_none): sql_column_type = sqlalchemy.BigInteger elif all(isinstance(v, (int, long, float)) for v in col_not_none): sql_column_type = sqlalchemy.Float elif all(isinstance(v, datetime.date) for v in col_not_none): sql_column_type = sqlalchemy.Date elif all(isinstance(v, datetime.time) for v in col_not_none): sql_column_type = sqlalchemy.Time elif all(isinstance(v, datetime.datetime) for v in col_not_none): sql_column_type = sqlalchemy.DateTime else: sql_column_type = sqlalchemy.String if constraints: sql_type_kwargs['length'] = max([len(unicode(v)) for v in col]) if constraints: sql_column_kwargs['nullable'] = len(col_not_none) < len(col) return sqlalchemy.Column(colname, sql_column_type(**sql_type_kwargs), **sql_column_kwargs)
[docs]def make_sqlalchemy_table(table, tablename, schema=None, constraints=True, metadata=None): """ Create an SQLAlchemy table based on a :mod:`petl` table. Parameters ---------- table : sequence of sequences (petl table) Table data to use to infer types etc. tablename : string Name of the table schema : string Name of the database schema to create the table in constraints : bool If True use length and nullable constraints metadata : sqlalchemy.MetaData Custom table metadata """ try: import sqlalchemy except ImportError as e: raise UnsatisfiedDependency(e, dep_message) if not metadata: metadata = sqlalchemy.MetaData() sql_table = sqlalchemy.Table(tablename, metadata, schema=schema) fields = header(table) cols = columns(table) for f in fields: sql_column = make_sqlalchemy_column(cols[f], f, constraints=constraints) sql_table.append_column(sql_column) return sql_table
[docs]def make_create_table_statement(table, tablename, schema=None, constraints=True, metadata=None, dialect=None): """ Generate a CREATE TABLE statement based on a :mod:`petl` table. Parameters ---------- table : sequence of sequences (petl table) Table data to use to infer types etc. tablename : string Name of the table schema : string Name of the database schema to create the table in constraints : bool If True use length and nullable constraints metadata : sqlalchemy.MetaData Custom table metadata dialect : string One of {'access', 'sybase', 'sqlite', 'informix', 'firebird', 'mysql', 'oracle', 'maxdb', 'postgresql', 'mssql'} """ try: import sqlalchemy except ImportError as e: raise UnsatisfiedDependency(e, dep_message) sql_table = make_sqlalchemy_table(table, tablename, schema=schema, constraints=constraints, metadata=metadata) if dialect: module = __import__('sqlalchemy.dialects.%s' % DIALECTS[dialect], fromlist=['dialect']) sql_dialect = module.dialect() else: sql_dialect = None return unicode(sqlalchemy.schema.CreateTable(sql_table).compile(dialect=sql_dialect)).strip() + ';'
[docs]def create_table(table, dbo, tablename, schema=None, commit=True, constraints=True, metadata=None, dialect=None, sample=1000): """ Create a database table based on a sample of data in the given table. Parameters ---------- table : sequence of sequences (petl table) Table data to load dbo : database object DB-API 2.0 connection, callable returning a DB-API 2.0 cursor, or SQLAlchemy connection, engine or session tablename : string Name of the table schema : string Name of the database schema to create the table in commit : bool If True commit the changes constraints : bool If True use length and nullable constraints (only relevant if create=True) metadata : sqlalchemy.MetaData Custom table metadata (only relevant if create=True) dialect : string One of {'access', 'sybase', 'sqlite', 'informix', 'firebird', 'mysql', 'oracle', 'maxdb', 'postgresql', 'mssql'} (only relevant if create=True) sample : int Number of rows to sample when inferring types etc., set to 0 to use the whole table (only relevant if create=True) """ if sample > 0: table = head(table, sample) sql = make_create_table_statement(table, tablename, schema=schema, constraints=constraints, metadata=metadata, dialect=dialect) _execute(sql, dbo, commit=commit)
[docs]def drop_table(dbo, tablename, schema=None, commit=True): """ Drop a database table if it exists. Parameters ---------- dbo : database object DB-API 2.0 connection, callable returning a DB-API 2.0 cursor, or SQLAlchemy connection, engine or session tablename : string Name of the table schema : string Name of the database schema the table is in commit : bool If True commit the changes """ # sanitise table name tablename = petl.io._quote(tablename) if schema is not None: tablename = petl.io._quote(schema) + '.' + tablename sql = u'DROP TABLE IF EXISTS %s;' % tablename _execute(sql, dbo, commit)
def _execute(sql, dbo, commit): debug(sql) # need to deal with polymorphic dbo argument # what sort of duck is it? # does it quack like a standard DB-API 2.0 connection? if _is_dbapi_connection(dbo): debug('assuming %r is standard DB-API 2.0 connection', dbo) _execute_dbapi_connection(sql, dbo, commit) # does it quack like a standard DB-API 2.0 cursor? elif _is_dbapi_cursor(dbo): debug('assuming %r is standard DB-API 2.0 cursor') _execute_dbapi_cursor(sql, dbo, commit) # does it quack like an SQLAlchemy engine? elif _is_sqlalchemy_engine(dbo): debug('assuming %r is an instance of sqlalchemy.engine.base.Engine', dbo) _execute_sqlalchemy_engine(sql, dbo, commit) # does it quack like an SQLAlchemy session? elif _is_sqlalchemy_session(dbo): debug('assuming %r is an instance of sqlalchemy.orm.session.Session', dbo) _execute_sqlalchemy_session(sql, dbo, commit) # does it quack like an SQLAlchemy connection? elif _is_sqlalchemy_connection(dbo): debug('assuming %r is an instance of sqlalchemy.engine.base.Connection', dbo) _execute_sqlalchemy_connection(sql, dbo, commit) elif callable(dbo): debug('assuming %r is a function returning standard DB-API 2.0 cursor objects', dbo) _execute_dbapi_mkcurs(sql, dbo, commit) # some other sort of duck... else: raise Exception('unsupported database object type: %r' % dbo) def _execute_dbapi_connection(sql, connection, commit): debug('obtain a cursor') cursor = connection.cursor() debug('execute SQL') cursor.execute(sql) debug('close the cursor') cursor.close() if commit: debug('commit transaction') connection.commit() def _execute_dbapi_mkcurs(sql, mkcurs, commit): debug('obtain a cursor') cursor = mkcurs() debug('execute SQL') cursor.execute(sql) debug('close the cursor') cursor.close() if commit: debug('commit transaction') # N.B., we depend on this optional DB-API 2.0 attribute being implemented assert hasattr(cursor, 'connection'), 'could not obtain connection via cursor' connection = cursor.connection connection.commit() def _execute_dbapi_cursor(sql, cursor, commit): debug('execute SQL') cursor.execute(sql) # don't close the cursor, leave that to the application if commit: debug('commit transaction') # N.B., we depend on this optional DB-API 2.0 attribute being implemented assert hasattr(cursor, 'connection'), 'could not obtain connection via cursor' connection = cursor.connection connection.commit() def _execute_sqlalchemy_connection(sql, connection, commit): if commit: debug('begin transaction') trans = connection.begin() debug('execute SQL') connection.execute(sql) if commit: debug('commit transaction') trans.commit() # N.B., don't close connection, leave that to the application def _execute_sqlalchemy_engine(sql, engine, commit): _execute_sqlalchemy_connection(sql, engine.contextual_connect(), commit) def _execute_sqlalchemy_session(sql, session, commit): _execute_sqlalchemy_connection(sql, session.connection(), commit)
[docs]def todb(table, dbo, tablename, schema=None, commit=True, create=False, drop=False, constraints=True, metadata=None, dialect=None, sample=1000): """ Drop-in replacement for :func:`petl.todb` which also supports automatic table creation. Parameters ---------- table : sequence of sequences (petl table) Table data to load dbo : database object DB-API 2.0 connection, callable returning a DB-API 2.0 cursor, or SQLAlchemy connection, engine or session tablename : string Name of the table schema : string Name of the database schema to create the table in commit : bool If True commit the changes create : bool If True attempt to create the table before loading, inferring types from a sample of the data drop : bool If True attempt to drop the table before recreating (only relevant if create=True) constraints : bool If True use length and nullable constraints (only relevant if create=True) metadata : sqlalchemy.MetaData Custom table metadata (only relevant if create=True) dialect : string One of {'access', 'sybase', 'sqlite', 'informix', 'firebird', 'mysql', 'oracle', 'maxdb', 'postgresql', 'mssql'} (only relevant if create=True) sample : int Number of rows to sample when inferring types etc. Set to 0 to use the whole table. """ if create: if drop: drop_table(dbo, tablename, schema=schema, commit=commit) create_table(table, dbo, tablename, schema=schema, commit=commit, constraints=constraints, metadata=metadata, dialect=dialect, sample=sample) petl.todb(table, dbo, tablename, schema=schema, commit=commit)
import sys from petlx.integration import integrate integrate(sys.modules[__name__])