diff options
Diffstat (limited to 'lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/dialects/sqlite/base.py')
-rwxr-xr-x | lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/dialects/sqlite/base.py | 753 |
1 files changed, 0 insertions, 753 deletions
diff --git a/lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/dialects/sqlite/base.py b/lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/dialects/sqlite/base.py deleted file mode 100755 index 4ab50931..00000000 --- a/lib/python2.7/site-packages/SQLAlchemy-0.7.0-py2.7-linux-x86_64.egg/sqlalchemy/dialects/sqlite/base.py +++ /dev/null @@ -1,753 +0,0 @@ -# sqlite/base.py -# Copyright (C) 2005-2011 the SQLAlchemy authors and contributors <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: http://www.opensource.org/licenses/mit-license.php - -"""Support for the SQLite database. - -For information on connecting using a specific driver, see the documentation -section regarding that driver. - -Date and Time Types -------------------- - -SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does not provide -out of the box functionality for translating values between Python `datetime` objects -and a SQLite-supported format. SQLAlchemy's own :class:`~sqlalchemy.types.DateTime` -and related types provide date formatting and parsing functionality when SQlite is used. -The implementation classes are :class:`.DATETIME`, :class:`.DATE` and :class:`.TIME`. -These types represent dates and times as ISO formatted strings, which also nicely -support ordering. There's no reliance on typical "libc" internals for these functions -so historical dates are fully supported. - -Auto Incrementing Behavior --------------------------- - -Background on SQLite's autoincrement is at: http://sqlite.org/autoinc.html - -Two things to note: - -* The AUTOINCREMENT keyword is **not** required for SQLite tables to - generate primary key values automatically. AUTOINCREMENT only means that - the algorithm used to generate ROWID values should be slightly different. -* SQLite does **not** generate primary key (i.e. ROWID) values, even for - one column, if the table has a composite (i.e. multi-column) primary key. - This is regardless of the AUTOINCREMENT keyword being present or not. - -To specifically render the AUTOINCREMENT keyword on the primary key -column when rendering DDL, add the flag ``sqlite_autoincrement=True`` -to the Table construct:: - - Table('sometable', metadata, - Column('id', Integer, primary_key=True), - sqlite_autoincrement=True) - -Transaction Isolation Level ---------------------------- - -:func:`create_engine` accepts an ``isolation_level`` parameter which results in -the command ``PRAGMA read_uncommitted <level>`` being invoked for every new -connection. Valid values for this parameter are ``SERIALIZABLE`` and -``READ UNCOMMITTED`` corresponding to a value of 0 and 1, respectively. - -""" - -import datetime, re - -from sqlalchemy import sql, exc -from sqlalchemy.engine import default, base, reflection -from sqlalchemy import types as sqltypes -from sqlalchemy import util -from sqlalchemy.sql import compiler -from sqlalchemy import processors - -from sqlalchemy.types import BLOB, BOOLEAN, CHAR, DATE, DATETIME, DECIMAL,\ - FLOAT, REAL, INTEGER, NUMERIC, SMALLINT, TEXT, TIME, TIMESTAMP, VARCHAR - -class _DateTimeMixin(object): - _reg = None - _storage_format = None - - def __init__(self, storage_format=None, regexp=None, **kw): - super(_DateTimeMixin, self).__init__(**kw) - if regexp is not None: - self._reg = re.compile(regexp) - if storage_format is not None: - self._storage_format = storage_format - -class DATETIME(_DateTimeMixin, sqltypes.DateTime): - """Represent a Python datetime object in SQLite using a string. - - The default string storage format is:: - - "%04d-%02d-%02d %02d:%02d:%02d.%06d" % (value.year, - value.month, value.day, - value.hour, value.minute, - value.second, value.microsecond) - - e.g.:: - - 2011-03-15 12:05:57.10558 - - The storage format can be customized to some degree using the - ``storage_format`` and ``regexp`` parameters, such as:: - - import re - from sqlalchemy.dialects.sqlite import DATETIME - - dt = DATETIME( - storage_format="%04d/%02d/%02d %02d-%02d-%02d-%06d", - regexp=re.compile("(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)(?:-(\d+))?") - ) - - :param storage_format: format string which will be appled to the - tuple ``(value.year, value.month, value.day, value.hour, - value.minute, value.second, value.microsecond)``, given a - Python datetime.datetime() object. - - :param regexp: regular expression which will be applied to - incoming result rows. The resulting match object is appled to - the Python datetime() constructor via ``*map(int, - match_obj.groups(0))``. - """ - - _storage_format = "%04d-%02d-%02d %02d:%02d:%02d.%06d" - - def bind_processor(self, dialect): - datetime_datetime = datetime.datetime - datetime_date = datetime.date - format = self._storage_format - def process(value): - if value is None: - return None - elif isinstance(value, datetime_datetime): - return format % (value.year, value.month, value.day, - value.hour, value.minute, value.second, - value.microsecond) - elif isinstance(value, datetime_date): - return format % (value.year, value.month, value.day, - 0, 0, 0, 0) - else: - raise TypeError("SQLite DateTime type only accepts Python " - "datetime and date objects as input.") - return process - - def result_processor(self, dialect, coltype): - if self._reg: - return processors.str_to_datetime_processor_factory( - self._reg, datetime.datetime) - else: - return processors.str_to_datetime - -class DATE(_DateTimeMixin, sqltypes.Date): - """Represent a Python date object in SQLite using a string. - - The default string storage format is:: - - "%04d-%02d-%02d" % (value.year, value.month, value.day) - - e.g.:: - - 2011-03-15 - - The storage format can be customized to some degree using the - ``storage_format`` and ``regexp`` parameters, such as:: - - import re - from sqlalchemy.dialects.sqlite import DATE - - d = DATE( - storage_format="%02d/%02d/%02d", - regexp=re.compile("(\d+)/(\d+)/(\d+)") - ) - - :param storage_format: format string which will be appled to the - tuple ``(value.year, value.month, value.day)``, - given a Python datetime.date() object. - - :param regexp: regular expression which will be applied to - incoming result rows. The resulting match object is appled to - the Python date() constructor via ``*map(int, - match_obj.groups(0))``. - - """ - - _storage_format = "%04d-%02d-%02d" - - def bind_processor(self, dialect): - datetime_date = datetime.date - format = self._storage_format - def process(value): - if value is None: - return None - elif isinstance(value, datetime_date): - return format % (value.year, value.month, value.day) - else: - raise TypeError("SQLite Date type only accepts Python " - "date objects as input.") - return process - - def result_processor(self, dialect, coltype): - if self._reg: - return processors.str_to_datetime_processor_factory( - self._reg, datetime.date) - else: - return processors.str_to_date - -class TIME(_DateTimeMixin, sqltypes.Time): - """Represent a Python time object in SQLite using a string. - - The default string storage format is:: - - "%02d:%02d:%02d.%06d" % (value.hour, value.minute, - value.second, - value.microsecond) - - e.g.:: - - 12:05:57.10558 - - The storage format can be customized to some degree using the - ``storage_format`` and ``regexp`` parameters, such as:: - - import re - from sqlalchemy.dialects.sqlite import TIME - - t = TIME( - storage_format="%02d-%02d-%02d-%06d", - regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?") - ) - - :param storage_format: format string which will be appled - to the tuple ``(value.hour, value.minute, value.second, - value.microsecond)``, given a Python datetime.time() object. - - :param regexp: regular expression which will be applied to - incoming result rows. The resulting match object is appled to - the Python time() constructor via ``*map(int, - match_obj.groups(0))``. - - """ - - _storage_format = "%02d:%02d:%02d.%06d" - - def bind_processor(self, dialect): - datetime_time = datetime.time - format = self._storage_format - def process(value): - if value is None: - return None - elif isinstance(value, datetime_time): - return format % (value.hour, value.minute, value.second, - value.microsecond) - else: - raise TypeError("SQLite Time type only accepts Python " - "time objects as input.") - return process - - def result_processor(self, dialect, coltype): - if self._reg: - return processors.str_to_datetime_processor_factory( - self._reg, datetime.time) - else: - return processors.str_to_time - -colspecs = { - sqltypes.Date: DATE, - sqltypes.DateTime: DATETIME, - sqltypes.Time: TIME, -} - -ischema_names = { - 'BLOB': sqltypes.BLOB, - 'BOOL': sqltypes.BOOLEAN, - 'BOOLEAN': sqltypes.BOOLEAN, - 'CHAR': sqltypes.CHAR, - 'DATE': sqltypes.DATE, - 'DATETIME': sqltypes.DATETIME, - 'DECIMAL': sqltypes.DECIMAL, - 'FLOAT': sqltypes.FLOAT, - 'INT': sqltypes.INTEGER, - 'INTEGER': sqltypes.INTEGER, - 'NUMERIC': sqltypes.NUMERIC, - 'REAL': sqltypes.REAL, - 'SMALLINT': sqltypes.SMALLINT, - 'TEXT': sqltypes.TEXT, - 'TIME': sqltypes.TIME, - 'TIMESTAMP': sqltypes.TIMESTAMP, - 'VARCHAR': sqltypes.VARCHAR, -} - - - -class SQLiteCompiler(compiler.SQLCompiler): - extract_map = util.update_copy( - compiler.SQLCompiler.extract_map, - { - 'month': '%m', - 'day': '%d', - 'year': '%Y', - 'second': '%S', - 'hour': '%H', - 'doy': '%j', - 'minute': '%M', - 'epoch': '%s', - 'dow': '%w', - 'week': '%W' - }) - - def visit_now_func(self, fn, **kw): - return "CURRENT_TIMESTAMP" - - def visit_char_length_func(self, fn, **kw): - return "length%s" % self.function_argspec(fn) - - def visit_cast(self, cast, **kwargs): - if self.dialect.supports_cast: - return super(SQLiteCompiler, self).visit_cast(cast) - else: - return self.process(cast.clause) - - def visit_extract(self, extract, **kw): - try: - return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( - self.extract_map[extract.field], self.process(extract.expr, **kw)) - except KeyError: - raise exc.ArgumentError( - "%s is not a valid extract argument." % extract.field) - - def limit_clause(self, select): - text = "" - if select._limit is not None: - text += "\n LIMIT " + self.process(sql.literal(select._limit)) - if select._offset is not None: - if select._limit is None: - text += "\n LIMIT " + self.process(sql.literal(-1)) - text += " OFFSET " + self.process(sql.literal(select._offset)) - else: - text += " OFFSET " + self.process(sql.literal(0)) - return text - - def for_update_clause(self, select): - # sqlite has no "FOR UPDATE" AFAICT - return '' - - -class SQLiteDDLCompiler(compiler.DDLCompiler): - - def get_column_specification(self, column, **kwargs): - colspec = self.preparer.format_column(column) + " " + self.dialect.type_compiler.process(column.type) - default = self.get_column_default_string(column) - if default is not None: - colspec += " DEFAULT " + default - - if not column.nullable: - colspec += " NOT NULL" - - if column.primary_key and \ - column.table.kwargs.get('sqlite_autoincrement', False) and \ - len(column.table.primary_key.columns) == 1 and \ - issubclass(column.type._type_affinity, sqltypes.Integer) and \ - not column.foreign_keys: - colspec += " PRIMARY KEY AUTOINCREMENT" - - return colspec - - def visit_primary_key_constraint(self, constraint): - # for columns with sqlite_autoincrement=True, - # the PRIMARY KEY constraint can only be inline - # with the column itself. - if len(constraint.columns) == 1: - c = list(constraint)[0] - if c.primary_key and \ - c.table.kwargs.get('sqlite_autoincrement', False) and \ - issubclass(c.type._type_affinity, sqltypes.Integer) and \ - not c.foreign_keys: - return None - - return super(SQLiteDDLCompiler, self).\ - visit_primary_key_constraint(constraint) - - def visit_foreign_key_constraint(self, constraint): - - local_table = constraint._elements.values()[0].parent.table - remote_table = list(constraint._elements.values())[0].column.table - - if local_table.schema != remote_table.schema: - return None - else: - return super(SQLiteDDLCompiler, self).visit_foreign_key_constraint(constraint) - - def define_constraint_remote_table(self, constraint, table, preparer): - """Format the remote table clause of a CREATE CONSTRAINT clause.""" - - return preparer.format_table(table, use_schema=False) - - def visit_create_index(self, create): - index = create.element - preparer = self.preparer - text = "CREATE " - if index.unique: - text += "UNIQUE " - text += "INDEX %s ON %s (%s)" \ - % (preparer.format_index(index, - name=self._index_identifier(index.name)), - preparer.format_table(index.table, use_schema=False), - ', '.join(preparer.quote(c.name, c.quote) - for c in index.columns)) - return text - -class SQLiteTypeCompiler(compiler.GenericTypeCompiler): - def visit_large_binary(self, type_): - return self.visit_BLOB(type_) - -class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): - reserved_words = set([ - 'add', 'after', 'all', 'alter', 'analyze', 'and', 'as', 'asc', - 'attach', 'autoincrement', 'before', 'begin', 'between', 'by', - 'cascade', 'case', 'cast', 'check', 'collate', 'column', 'commit', - 'conflict', 'constraint', 'create', 'cross', 'current_date', - 'current_time', 'current_timestamp', 'database', 'default', - 'deferrable', 'deferred', 'delete', 'desc', 'detach', 'distinct', - 'drop', 'each', 'else', 'end', 'escape', 'except', 'exclusive', - 'explain', 'false', 'fail', 'for', 'foreign', 'from', 'full', 'glob', - 'group', 'having', 'if', 'ignore', 'immediate', 'in', 'index', - 'indexed', 'initially', 'inner', 'insert', 'instead', 'intersect', 'into', 'is', - 'isnull', 'join', 'key', 'left', 'like', 'limit', 'match', 'natural', - 'not', 'notnull', 'null', 'of', 'offset', 'on', 'or', 'order', 'outer', - 'plan', 'pragma', 'primary', 'query', 'raise', 'references', - 'reindex', 'rename', 'replace', 'restrict', 'right', 'rollback', - 'row', 'select', 'set', 'table', 'temp', 'temporary', 'then', 'to', - 'transaction', 'trigger', 'true', 'union', 'unique', 'update', 'using', - 'vacuum', 'values', 'view', 'virtual', 'when', 'where', - ]) - - def format_index(self, index, use_schema=True, name=None): - """Prepare a quoted index and schema name.""" - - if name is None: - name = index.name - result = self.quote(name, index.quote) - if not self.omit_schema and use_schema and getattr(index.table, "schema", None): - result = self.quote_schema(index.table.schema, index.table.quote_schema) + "." + result - return result - -class SQLiteExecutionContext(default.DefaultExecutionContext): - def get_result_proxy(self): - rp = base.ResultProxy(self) - if rp._metadata: - # adjust for dotted column names. SQLite - # in the case of UNION may store col names as - # "tablename.colname" - # in cursor.description - for colname in rp._metadata.keys: - if "." in colname: - trunc_col = colname.split(".")[1] - rp._metadata._set_keymap_synonym(trunc_col, colname) - return rp - -class SQLiteDialect(default.DefaultDialect): - name = 'sqlite' - supports_alter = False - supports_unicode_statements = True - supports_unicode_binds = True - supports_default_values = True - supports_empty_insert = False - supports_cast = True - - default_paramstyle = 'qmark' - statement_compiler = SQLiteCompiler - ddl_compiler = SQLiteDDLCompiler - type_compiler = SQLiteTypeCompiler - preparer = SQLiteIdentifierPreparer - ischema_names = ischema_names - colspecs = colspecs - isolation_level = None - execution_ctx_cls = SQLiteExecutionContext - - supports_cast = True - supports_default_values = True - - def __init__(self, isolation_level=None, native_datetime=False, **kwargs): - default.DefaultDialect.__init__(self, **kwargs) - self.isolation_level = isolation_level - - # this flag used by pysqlite dialect, and perhaps others in the - # future, to indicate the driver is handling date/timestamp - # conversions (and perhaps datetime/time as well on some - # hypothetical driver ?) - self.native_datetime = native_datetime - - if self.dbapi is not None: - self.supports_default_values = \ - self.dbapi.sqlite_version_info >= (3, 3, 8) - self.supports_cast = \ - self.dbapi.sqlite_version_info >= (3, 2, 3) - - _isolation_lookup = { - 'READ UNCOMMITTED':1, - 'SERIALIZABLE':0 - } - def set_isolation_level(self, connection, level): - try: - isolation_level = self._isolation_lookup[level.replace('_', ' ')] - except KeyError: - raise exc.ArgumentError( - "Invalid value '%s' for isolation_level. " - "Valid isolation levels for %s are %s" % - (level, self.name, ", ".join(self._isolation_lookup)) - ) - cursor = connection.cursor() - cursor.execute("PRAGMA read_uncommitted = %d" % isolation_level) - cursor.close() - - def get_isolation_level(self, connection): - cursor = connection.cursor() - cursor.execute('PRAGMA read_uncommitted') - value = cursor.fetchone()[0] - cursor.close() - if value == 0: - return "SERIALIZABLE" - elif value == 1: - return "READ UNCOMMITTED" - else: - assert False, "Unknown isolation level %s" % value - - def on_connect(self): - if self.isolation_level is not None: - def connect(conn): - self.set_isolation_level(conn, self.isolation_level) - return connect - else: - return None - - @reflection.cache - def get_table_names(self, connection, schema=None, **kw): - if schema is not None: - qschema = self.identifier_preparer.quote_identifier(schema) - master = '%s.sqlite_master' % qschema - s = ("SELECT name FROM %s " - "WHERE type='table' ORDER BY name") % (master,) - rs = connection.execute(s) - else: - try: - s = ("SELECT name FROM " - " (SELECT * FROM sqlite_master UNION ALL " - " SELECT * FROM sqlite_temp_master) " - "WHERE type='table' ORDER BY name") - rs = connection.execute(s) - except exc.DBAPIError: - raise - s = ("SELECT name FROM sqlite_master " - "WHERE type='table' ORDER BY name") - rs = connection.execute(s) - - return [row[0] for row in rs] - - def has_table(self, connection, table_name, schema=None): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - qtable = quote(table_name) - cursor = _pragma_cursor(connection.execute("%stable_info(%s)" % (pragma, qtable))) - row = cursor.fetchone() - - # consume remaining rows, to work around - # http://www.sqlite.org/cvstrac/tktview?tn=1884 - while not cursor.closed and cursor.fetchone() is not None: - pass - - return (row is not None) - - @reflection.cache - def get_view_names(self, connection, schema=None, **kw): - if schema is not None: - qschema = self.identifier_preparer.quote_identifier(schema) - master = '%s.sqlite_master' % qschema - s = ("SELECT name FROM %s " - "WHERE type='view' ORDER BY name") % (master,) - rs = connection.execute(s) - else: - try: - s = ("SELECT name FROM " - " (SELECT * FROM sqlite_master UNION ALL " - " SELECT * FROM sqlite_temp_master) " - "WHERE type='view' ORDER BY name") - rs = connection.execute(s) - except exc.DBAPIError: - raise - s = ("SELECT name FROM sqlite_master " - "WHERE type='view' ORDER BY name") - rs = connection.execute(s) - - return [row[0] for row in rs] - - @reflection.cache - def get_view_definition(self, connection, view_name, schema=None, **kw): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - qschema = self.identifier_preparer.quote_identifier(schema) - master = '%s.sqlite_master' % qschema - s = ("SELECT sql FROM %s WHERE name = '%s'" - "AND type='view'") % (master, view_name) - rs = connection.execute(s) - else: - try: - s = ("SELECT sql FROM " - " (SELECT * FROM sqlite_master UNION ALL " - " SELECT * FROM sqlite_temp_master) " - "WHERE name = '%s' " - "AND type='view'") % view_name - rs = connection.execute(s) - except exc.DBAPIError: - raise - s = ("SELECT sql FROM sqlite_master WHERE name = '%s' " - "AND type='view'") % view_name - rs = connection.execute(s) - - result = rs.fetchall() - if result: - return result[0].sql - - @reflection.cache - def get_columns(self, connection, table_name, schema=None, **kw): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - qtable = quote(table_name) - c = _pragma_cursor(connection.execute("%stable_info(%s)" % (pragma, qtable))) - found_table = False - columns = [] - while True: - row = c.fetchone() - if row is None: - break - (name, type_, nullable, default, has_default, primary_key) = (row[1], row[2].upper(), not row[3], row[4], row[4] is not None, row[5]) - name = re.sub(r'^\"|\"$', '', name) - if default: - default = re.sub(r"^\'|\'$", '', default) - match = re.match(r'(\w+)(\(.*?\))?', type_) - if match: - coltype = match.group(1) - args = match.group(2) - else: - coltype = "VARCHAR" - args = '' - try: - coltype = self.ischema_names[coltype] - if args is not None: - args = re.findall(r'(\d+)', args) - coltype = coltype(*[int(a) for a in args]) - except KeyError: - util.warn("Did not recognize type '%s' of column '%s'" % - (coltype, name)) - coltype = sqltypes.NullType() - - columns.append({ - 'name' : name, - 'type' : coltype, - 'nullable' : nullable, - 'default' : default, - 'autoincrement':default is None, - 'primary_key': primary_key - }) - return columns - - @reflection.cache - def get_primary_keys(self, connection, table_name, schema=None, **kw): - cols = self.get_columns(connection, table_name, schema, **kw) - pkeys = [] - for col in cols: - if col['primary_key']: - pkeys.append(col['name']) - return pkeys - - @reflection.cache - def get_foreign_keys(self, connection, table_name, schema=None, **kw): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - qtable = quote(table_name) - c = _pragma_cursor(connection.execute("%sforeign_key_list(%s)" % (pragma, qtable))) - fkeys = [] - fks = {} - while True: - row = c.fetchone() - if row is None: - break - (constraint_name, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) - # sqlite won't return rcol if the table - # was created with REFERENCES <tablename>, no col - if rcol is None: - rcol = lcol - rtbl = re.sub(r'^\"|\"$', '', rtbl) - lcol = re.sub(r'^\"|\"$', '', lcol) - rcol = re.sub(r'^\"|\"$', '', rcol) - try: - fk = fks[constraint_name] - except KeyError: - fk = { - 'name' : constraint_name, - 'constrained_columns' : [], - 'referred_schema' : None, - 'referred_table' : rtbl, - 'referred_columns' : [] - } - fkeys.append(fk) - fks[constraint_name] = fk - - # look up the table based on the given table's engine, not 'self', - # since it could be a ProxyEngine - if lcol not in fk['constrained_columns']: - fk['constrained_columns'].append(lcol) - if rcol not in fk['referred_columns']: - fk['referred_columns'].append(rcol) - return fkeys - - @reflection.cache - def get_indexes(self, connection, table_name, schema=None, **kw): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - include_auto_indexes = kw.pop('include_auto_indexes', False) - qtable = quote(table_name) - c = _pragma_cursor(connection.execute("%sindex_list(%s)" % (pragma, qtable))) - indexes = [] - while True: - row = c.fetchone() - if row is None: - break - # ignore implicit primary key index. - # http://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html - elif not include_auto_indexes and row[1].startswith('sqlite_autoindex'): - continue - - indexes.append(dict(name=row[1], column_names=[], unique=row[2])) - # loop thru unique indexes to get the column names. - for idx in indexes: - c = connection.execute("%sindex_info(%s)" % (pragma, quote(idx['name']))) - cols = idx['column_names'] - while True: - row = c.fetchone() - if row is None: - break - cols.append(row[2]) - return indexes - - -def _pragma_cursor(cursor): - """work around SQLite issue whereby cursor.description - is blank when PRAGMA returns no rows.""" - - if cursor.closed: - cursor.fetchone = lambda: None - return cursor |