Patch the SQLTable for fallback support. Instead of a table variable just use the Create Table statement.
| 2540 | |
| 2541 | |
| 2542 | class SQLiteTable(SQLTable): |
| 2543 | """ |
| 2544 | Patch the SQLTable for fallback support. |
| 2545 | Instead of a table variable just use the Create Table statement. |
| 2546 | """ |
| 2547 | |
| 2548 | def __init__(self, *args, **kwargs) -> None: |
| 2549 | super().__init__(*args, **kwargs) |
| 2550 | |
| 2551 | self._register_date_adapters() |
| 2552 | |
| 2553 | def _register_date_adapters(self) -> None: |
| 2554 | # GH 8341 |
| 2555 | # register an adapter callable for datetime.time object |
| 2556 | import sqlite3 |
| 2557 | |
| 2558 | # this will transform time(12,34,56,789) into '12:34:56.000789' |
| 2559 | # (this is what sqlalchemy does) |
| 2560 | def _adapt_time(t) -> str: |
| 2561 | # This is faster than strftime |
| 2562 | return f"{t.hour:02d}:{t.minute:02d}:{t.second:02d}.{t.microsecond:06d}" |
| 2563 | |
| 2564 | # Also register adapters for date/datetime and co |
| 2565 | # xref https://docs.python.org/3.12/library/sqlite3.html#adapter-and-converter-recipes |
| 2566 | # Python 3.12+ doesn't auto-register adapters for us anymore |
| 2567 | |
| 2568 | adapt_date_iso = lambda val: val.isoformat() |
| 2569 | adapt_datetime_iso = lambda val: val.isoformat(" ") |
| 2570 | |
| 2571 | sqlite3.register_adapter(time, _adapt_time) |
| 2572 | |
| 2573 | sqlite3.register_adapter(date, adapt_date_iso) |
| 2574 | sqlite3.register_adapter(datetime, adapt_datetime_iso) |
| 2575 | |
| 2576 | convert_date = lambda val: date.fromisoformat(val.decode()) |
| 2577 | convert_timestamp = lambda val: datetime.fromisoformat(val.decode()) |
| 2578 | |
| 2579 | sqlite3.register_converter("date", convert_date) |
| 2580 | sqlite3.register_converter("timestamp", convert_timestamp) |
| 2581 | |
| 2582 | def sql_schema(self) -> str: |
| 2583 | return str(";\n".join(self.table)) |
| 2584 | |
| 2585 | def _execute_create(self) -> None: |
| 2586 | with self.pd_sql.run_transaction() as cur: |
| 2587 | for stmt in self.table: |
| 2588 | cur.execute(stmt) |
| 2589 | |
| 2590 | def insert_statement(self, *, num_rows: int) -> str: |
| 2591 | names = list(map(str, self.frame.columns)) |
| 2592 | wld = "?" # wildcard char |
| 2593 | escape = _get_valid_sqlite_name |
| 2594 | |
| 2595 | if self.index is not None: |
| 2596 | for idx in self.index[::-1]: |
| 2597 | names.insert(0, idx) |
| 2598 | |
| 2599 | bracketed_names = [escape(column) for column in names] |
no outgoing calls
no test coverage detected