开发者

Django inspectdb issue using Oracle database

Installed cx_oracle and ran inspectdb. Don't seem to get any output? Can somebody help? Is there a known issue using inspectdb with Oracle?

Below is the command and settings.py.

python manage.py inspectdb --database xxx_db

# This is an auto-generated Django model module.
# You'll have to do the following manually to clean this up:
#     * Rearrange models' order
#     * Make sure each model has one field with primary_key=True
# Feel free to rename the models, but don't rename db_table values or field names.
#开发者_StackOverflow社区
# Also note: You'll have to insert the output of 'django-admin.py sqlcustom [appname]'
# into your database.

from django.db import models

settings.py

DATABASES = {
    'xxx_db': {
        'ENGINE': 'django.db.backends.oracle',
        'NAME': 'abc',
        'USER': 'abc_read',
        'PASSWORD': 'abc_read',
        'HOST': 'apps.domain.com',
        'PORT': 'xxxx'
        },
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'aaaa',
        'USER': 'aaaa',
        'PASSWORD': 'xxxx',
        'HOST': '/tmp/mysql.sock',
        'PORT': ''
        }
}


Two things:

  1. Inspectdb doesn't officially support oracle (see Django Docs - inspectdb ) sad times.
  2. Django doesn't have very strong support for Oracle schemas ( see unresolved Django ticket 6148) So you may have better luck if you are able to connect using the main user for the schema, making the schema you wish to introspect the default schema.

I was able to get a basic model file output by changing a the select in introspection.py. For me I changed the get_table_list function in django/db/backends/oracle/introspection.py (around line 40) from:

def get_table_list(self, cursor):
    "Returns a list of table names in the current database."
    cursor.execute("SELECT TABLE_NAME FROM USER_TABLES")
    return [row[0].lower() for row in cursor.fetchall()]

To

def get_table_list(self, cursor):
    "Returns a list of table names in the current database."
    cursor.execute("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = 'SCHEMA_TO_QUERY'")
    return [row[0].lower() for row in cursor.fetchall()]

But gave up on django when I read the overall poor support for schemas in Oracle


It works for me.

Did you check user has rights to see all the tables in Oracle ?

Anyway I'm curious about what's the SQL being used by inspectdb.


@Plecebo is on the right track. The get_table_list method is the source of the problem, but the Select statement given did not work.

I temporarily hard coded the table names to quickly get what I needed:

(django/db/backends/oracle/introspection.py line 40)

def get_table_list(self, cursor):
    "Returns a list of table names in the current database."
    #cursor.execute("SELECT TABLE_NAME FROM USER_TABLES")
    return ['table_name1', 'table_name2']
    #return [row[0].lower() for row in cursor.fetchall()]


As this came up in my recent search to sort out inspectdb with Django 2.0 and an Oracle 11g legacy database I'm working with, I have taken a go at fixing introspection, and so far I've managed to get output for basic tables after modifying /lib/python3.6/site-packages/django/db/backends/oracle/introspection.py: (essentially replacing all user_* tables for all_* tables)

My currently working solution is the file contents below (introspection.py).

import warnings
from collections import namedtuple

import cx_Oracle

from django.db import models
from django.db.backends.base.introspection import (
    BaseDatabaseIntrospection, FieldInfo as BaseFieldInfo, TableInfo,
)
from django.utils.deprecation import RemovedInDjango21Warning

FieldInfo = namedtuple('FieldInfo', BaseFieldInfo._fields + ('is_autofield',))


class DatabaseIntrospection(BaseDatabaseIntrospection):
    # Maps type objects to Django Field types.
    data_types_reverse = {
        cx_Oracle.BLOB: 'BinaryField',
        cx_Oracle.CLOB: 'TextField',
        cx_Oracle.DATETIME: 'DateField',
        cx_Oracle.FIXED_CHAR: 'CharField',
        cx_Oracle.FIXED_NCHAR: 'CharField',
        cx_Oracle.NATIVE_FLOAT: 'FloatField',
        cx_Oracle.NCHAR: 'CharField',
        cx_Oracle.NCLOB: 'TextField',
        cx_Oracle.NUMBER: 'DecimalField',
        cx_Oracle.STRING: 'CharField',
        cx_Oracle.TIMESTAMP: 'DateTimeField',
    }

    cache_bust_counter = 1

    def get_field_type(self, data_type, description):
        if data_type == cx_Oracle.NUMBER:
            precision, scale = description[4:6]
            if scale == 0:
                if precision > 11:
                    return 'BigAutoField' if description.is_autofield else 'BigIntegerField'
                elif precision == 1:
                    return 'BooleanField'
                elif description.is_autofield:
                    return 'AutoField'
                else:
                    return 'IntegerField'
            elif scale == -127:
                return 'FloatField'

        return super().get_field_type(data_type, description)

    def get_table_list(self, cursor):
        """Return a list of table and view names in the current database."""
        # cursor.execute("SELECT TABLE_NAME, 't' FROM USER_TABLES UNION ALL "
        #                "SELECT VIEW_NAME, 'v' FROM USER_VIEWS")
        cursor.execute("SELECT TABLE_NAME, 't' FROM ALL_TABLES WHERE OWNER = 'V500' ")
        return [TableInfo(row[0].lower(), row[1]) for row in cursor.fetchall()]

    def get_table_description(self, cursor, table_name):
        """
        Return a description of the table with the DB-API cursor.description
        interface.
        """
        cursor.execute("""
            SELECT
                column_name,
                data_default,
                CASE
                    WHEN char_used IS NULL THEN data_length
                    ELSE char_length
                END as internal_size,
                0 as is_autofield
            FROM ALL_TAB_COLUMNS
            WHERE table_name = UPPER(%s)""", [table_name])
        field_map = {
            column: (internal_size, default if default != 'NULL' else None, is_autofield)
            for column, default, internal_size, is_autofield in cursor.fetchall()
        }
        self.cache_bust_counter += 1
        cursor.execute("SELECT * FROM {} WHERE ROWNUM < 2 AND {} > 0".format(
            self.connection.ops.quote_name(table_name),
            self.cache_bust_counter))
        description = []
        for desc in cursor.description:
            name = desc[0]
            internal_size, default, is_autofield = field_map[name]
            name = name % {}  # cx_Oracle, for some reason, doubles percent signs.
            description.append(FieldInfo(*(
                (name.lower(),) +
                desc[1:3] +
                (internal_size, desc[4] or 0, desc[5] or 0) +
                desc[6:] +
                (default, is_autofield)
            )))
        return description

    def table_name_converter(self, name):
        """Table name comparison is case insensitive under Oracle."""
        return name.lower()

    def get_sequences(self, cursor, table_name, table_fields=()):
        # Tables don't exist in 11g (this function added in django 2
        # cursor.execute("""
        #     SELECT
        #         user_tab_identity_cols.sequence_name,
        #         user_tab_identity_cols.column_name
        #     FROM
        #         user_tab_identity_cols,
        #         user_constraints,
        #         user_cons_columns cols
        #     WHERE
        #         user_constraints.constraint_name = cols.constraint_name
        #         AND user_constraints.table_name = user_tab_identity_cols.table_name
        #         AND cols.column_name = user_tab_identity_cols.column_name
        #         AND user_constraints.constraint_type = 'P'
        #         AND user_tab_identity_cols.table_name = UPPER(%s)
        # """, [table_name])
        # # Oracle allows only one identity column per table.
        # row = cursor.fetchone()
        # if row:
        #     return [{'name': row[0].lower(), 'table': table_name, 'column': row[1].lower()}]
        # # To keep backward compatibility for AutoFields that aren't Oracle
        # # identity columns.
        # for f in table_fields:
        #     if isinstance(f, models.AutoField):
        #         return [{'table': table_name, 'column': f.column}]
        return []

    def get_relations(self, cursor, table_name):
        """
        Return a dictionary of {field_name: (field_name_other_table, other_table)}
        representing all relationships to the given table.
        """
        table_name = table_name.upper()
        cursor.execute("""
    SELECT ca.column_name, cb.table_name, cb.column_name
    FROM   ALL_CONSTRAINTS, ALL_CONS_COLUMNS ca, ALL_CONS_COLUMNS cb
    WHERE  ALL_CONSTRAINTS.table_name = %s AND
           ALL_CONSTRAINTS.constraint_name = ca.constraint_name AND
           ALL_CONSTRAINTS.r_constraint_name = cb.constraint_name AND
           ca.position = cb.position""", [table_name])

        relations = {}
        for row in cursor.fetchall():
            relations[row[0].lower()] = (row[2].lower(), row[1].lower())
        return relations

    def get_key_columns(self, cursor, table_name):
        cursor.execute("""
            SELECT ccol.column_name, rcol.table_name AS referenced_table, rcol.column_name AS referenced_column
            FROM ALL_CONSTRAINTS c
            JOIN ALL_CONS_COLUMNS ccol
              ON ccol.constraint_name = c.constraint_name
            JOIN ALL_CONS_COLUMNS rcol
              ON rcol.constraint_name = c.r_constraint_name
            WHERE c.table_name = %s AND c.constraint_type = 'R'""", [table_name.upper()])
        return [tuple(cell.lower() for cell in row)
                for row in cursor.fetchall()]

    def get_indexes(self, cursor, table_name):
        warnings.warn(
            "get_indexes() is deprecated in favor of get_constraints().",
            RemovedInDjango21Warning, stacklevel=2
        )
        sql = """
    SELECT LOWER(uic1.column_name) AS column_name,
           CASE ALL_CONSTRAINTS.constraint_type
               WHEN 'P' THEN 1 ELSE 0
           END AS is_primary_key,
           CASE ALL_INDEXES.uniqueness
               WHEN 'UNIQUE' THEN 1 ELSE 0
           END AS is_unique
    FROM   ALL_CONSTRAINTS, ALL_INDEXES, ALL_IND_COLUMNS uic1
    WHERE  ALL_CONSTRAINTS.constraint_type (+) = 'P'
      AND  ALL_CONSTRAINTS.index_name (+) = uic1.index_name
      AND  ALL_INDEXES.uniqueness (+) = 'UNIQUE'
      AND  ALL_INDEXES.index_name (+) = uic1.index_name
      AND  uic1.table_name = UPPER(%s)
      AND  uic1.column_position = 1
      AND  NOT EXISTS (
              SELECT 1
              FROM   ALL_IND_COLUMNS uic2
              WHERE  uic2.index_name = uic1.index_name
                AND  uic2.column_position = 2
           )
        """
        cursor.execute(sql, [table_name])
        indexes = {}
        for row in cursor.fetchall():
            indexes[row[0]] = {'primary_key': bool(row[1]),
                               'unique': bool(row[2])}
        return indexes

    def get_constraints(self, cursor, table_name):
        """
        Retrieve any constraints or keys (unique, pk, fk, check, index) across
        one or more columns.
        """
        constraints = {}
        # Loop over the constraints, getting PKs, uniques, and checks
        cursor.execute("""
            SELECT
                ALL_CONSTRAINTS.constraint_name,
                LISTAGG(LOWER(cols.column_name), ',') WITHIN GROUP (ORDER BY cols.position),
                CASE ALL_CONSTRAINTS.constraint_type
                    WHEN 'P' THEN 1
                    ELSE 0
                END AS is_primary_key,
                CASE
                    WHEN ALL_CONSTRAINTS.constraint_type IN ('P', 'U') THEN 1
                    ELSE 0
                END AS is_unique,
                CASE ALL_CONSTRAINTS.constraint_type
                    WHEN 'C' THEN 1
                    ELSE 0
                END AS is_check_constraint
            FROM
                ALL_CONSTRAINTS
            LEFT OUTER JOIN
                ALL_CONS_COLUMNS cols ON ALL_CONSTRAINTS.constraint_name = cols.constraint_name
            WHERE
                ALL_CONSTRAINTS.constraint_type = ANY('P', 'U', 'C')
                AND ALL_CONSTRAINTS.table_name = UPPER(%s)
            GROUP BY ALL_CONSTRAINTS.constraint_name, ALL_CONSTRAINTS.constraint_type
        """, [table_name])
        for constraint, columns, pk, unique, check in cursor.fetchall():
            constraints[constraint] = {
                'columns': columns.split(','),
                'primary_key': pk,
                'unique': unique,
                'foreign_key': None,
                'check': check,
                'index': unique,  # All uniques come with an index
            }
        # Foreign key constraints
        cursor.execute("""
            SELECT
                cons.constraint_name,
                LISTAGG(LOWER(cols.column_name), ',') WITHIN GROUP (ORDER BY cols.position),
                LOWER(rcols.table_name),
                LOWER(rcols.column_name)
            FROM
                ALL_CONSTRAINTS cons
            INNER JOIN
                ALL_CONS_COLUMNS rcols ON rcols.constraint_name = cons.r_constraint_name AND rcols.position = 1
            LEFT OUTER JOIN
                ALL_CONS_COLUMNS cols ON cons.constraint_name = cols.constraint_name
            WHERE
                cons.constraint_type = 'R' AND
                cons.table_name = UPPER(%s)
            GROUP BY cons.constraint_name, rcols.table_name, rcols.column_name
        """, [table_name])
        for constraint, columns, other_table, other_column in cursor.fetchall():
            constraints[constraint] = {
                'primary_key': False,
                'unique': False,
                'foreign_key': (other_table, other_column),
                'check': False,
                'index': False,
                'columns': columns.split(','),
            }
        # Now get indexes
        cursor.execute("""
            SELECT
                ind.index_name,
                LOWER(ind.index_type),
                LISTAGG(LOWER(cols.column_name), ',') WITHIN GROUP (ORDER BY cols.column_position),
                LISTAGG(cols.descend, ',') WITHIN GROUP (ORDER BY cols.column_position)
            FROM
                ALL_IND_COLUMNS cols, ALL_INDEXES ind
            WHERE
                cols.table_name = UPPER(%s) AND
                NOT EXISTS (
                    SELECT 1
                    FROM ALL_CONSTRAINTS cons
                    WHERE ind.index_name = cons.index_name
                ) AND cols.index_name = ind.index_name
            GROUP BY ind.index_name, ind.index_type
        """, [table_name])
        for constraint, type_, columns, orders in cursor.fetchall():
            constraints[constraint] = {
                'primary_key': False,
                'unique': False,
                'foreign_key': None,
                'check': False,
                'index': True,
                'type': 'idx' if type_ == 'normal' else type_,
                'columns': columns.split(','),
                'orders': orders.split(','),
            }
        return constraints

Noting that Django 1.11 is the last version to officially support 11g was something I picked up from How to make Django 2.0 to use Oracle 11g syntax instead of 12c?, which prompted me to look at the 1.11 codebase to see what had changed (https://github.com/django/django/blob/stable/1.11.x/django/db/backends/oracle/introspection.py)

At present I'm working to read data from a read only database connection, so fixing migrate isn't something I've got motivation for presently. At the time of writing the inspectdb function is outputting 6000+ tables as python code without any major issues.


Haaaa.. just facing this problem and found a silly reason!!

No need to edit any lower layer files as was done in one of the post. You face this issue when you dont have tables in that Database. Hehe..

Create a few tables and then try. It works like a charm.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜