(url, driver, query_str)
| 21 | |
| 22 | @generate_driver_url.for_db("mysql", "mariadb") |
| 23 | def generate_driver_url(url, driver, query_str): |
| 24 | backend = url.get_backend_name() |
| 25 | |
| 26 | # NOTE: at the moment, tests are running mariadbconnector |
| 27 | # against both mariadb and mysql backends. if we want this to be |
| 28 | # limited, do the decision making here to reject a "mysql+mariadbconnector" |
| 29 | # URL. Optionally also re-enable the module level |
| 30 | # MySQLDialect_mariadbconnector.is_mysql flag as well, which must include |
| 31 | # a unit and/or functional test. |
| 32 | |
| 33 | # all the Jenkins tests have been running mysqlclient Python library |
| 34 | # built against mariadb client drivers for years against all MySQL / |
| 35 | # MariaDB versions going back to MySQL 5.6, currently they can talk |
| 36 | # to MySQL databases without problems. |
| 37 | |
| 38 | if backend == "mysql": |
| 39 | dialect_cls = url.get_dialect() |
| 40 | if dialect_cls._is_mariadb_from_url(url): |
| 41 | backend = "mariadb" |
| 42 | |
| 43 | new_url = url.set( |
| 44 | drivername="%s+%s" % (backend, driver) |
| 45 | ).update_query_string(query_str) |
| 46 | |
| 47 | if driver == "mariadbconnector": |
| 48 | new_url = new_url.difference_update_query(["charset"]) |
| 49 | elif driver == "mysqlconnector": |
| 50 | new_url = new_url.update_query_pairs( |
| 51 | [("collation", "utf8mb4_general_ci")] |
| 52 | ) |
| 53 | |
| 54 | try: |
| 55 | new_url.get_dialect() |
| 56 | except exc.NoSuchModuleError: |
| 57 | return None |
| 58 | else: |
| 59 | return new_url |
| 60 | |
| 61 | |
| 62 | @create_db.for_db("mysql", "mariadb") |
nothing calls this directly
no test coverage detected