(schema)
| 3015 | |
| 3016 | |
| 3017 | def _schema_elements(schema): |
| 3018 | if isinstance(schema, quoted_name) and schema.quote: |
| 3019 | return None, schema |
| 3020 | |
| 3021 | if schema in _memoized_schema: |
| 3022 | return _memoized_schema[schema] |
| 3023 | |
| 3024 | # tests for this function are in: |
| 3025 | # test/dialect/mssql/test_reflection.py -> |
| 3026 | # OwnerPlusDBTest.test_owner_database_pairs |
| 3027 | # test/dialect/mssql/test_compiler.py -> test_force_schema_* |
| 3028 | # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_* |
| 3029 | # |
| 3030 | |
| 3031 | if schema.startswith("__[SCHEMA_"): |
| 3032 | return None, schema |
| 3033 | |
| 3034 | push = [] |
| 3035 | symbol = "" |
| 3036 | bracket = False |
| 3037 | has_brackets = False |
| 3038 | for token in re.split(r"(\[|\]|\.)", schema): |
| 3039 | if not token: |
| 3040 | continue |
| 3041 | if token == "[": |
| 3042 | bracket = True |
| 3043 | has_brackets = True |
| 3044 | elif token == "]": |
| 3045 | bracket = False |
| 3046 | elif not bracket and token == ".": |
| 3047 | if has_brackets: |
| 3048 | push.append("[%s]" % symbol) |
| 3049 | else: |
| 3050 | push.append(symbol) |
| 3051 | symbol = "" |
| 3052 | has_brackets = False |
| 3053 | else: |
| 3054 | symbol += token |
| 3055 | if symbol: |
| 3056 | push.append(symbol) |
| 3057 | if len(push) > 1: |
| 3058 | dbname, owner = ".".join(push[0:-1]), push[-1] |
| 3059 | |
| 3060 | # test for internal brackets |
| 3061 | if re.match(r".*\].*\[.*", dbname[1:-1]): |
| 3062 | dbname = quoted_name(dbname, quote=False) |
| 3063 | else: |
| 3064 | dbname = dbname.lstrip("[").rstrip("]") |
| 3065 | |
| 3066 | elif len(push): |
| 3067 | dbname, owner = None, push[0] |
| 3068 | else: |
| 3069 | dbname, owner = None, None |
| 3070 | |
| 3071 | _memoized_schema[schema] = dbname, owner |
| 3072 | return dbname, owner |
| 3073 | |
| 3074 |
no test coverage detected