MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / async_main

Function async_main

examples/asyncio/basic.py:25–65  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

23
24
25async def async_main():
26 # engine is an instance of AsyncEngine
27 engine = create_async_engine(
28 "postgresql+asyncpg://scott:tiger@localhost/test",
29 echo=True,
30 )
31
32 # conn is an instance of AsyncConnection
33 async with engine.begin() as conn:
34 # to support SQLAlchemy DDL methods as well as legacy functions, the
35 # AsyncConnection.run_sync() awaitable method will pass a "sync"
36 # version of the AsyncConnection object to any synchronous method,
37 # where synchronous IO calls will be transparently translated for
38 # await.
39 await conn.run_sync(meta.drop_all)
40 await conn.run_sync(meta.create_all)
41
42 # for normal statement execution, a traditional "await execute()"
43 # pattern is used.
44 await conn.execute(
45 t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
46 )
47
48 async with engine.connect() as conn:
49 # the default result object is the
50 # sqlalchemy.engine.Result object
51 result = await conn.execute(t1.select())
52
53 # the results are buffered so no await call is necessary
54 # for this case.
55 print(result.fetchall())
56
57 # for a streaming result that buffers only segments of the
58 # result at time, the AsyncConnection.stream() method is used.
59 # this returns a sqlalchemy.ext.asyncio.AsyncResult object.
60 async_result = await conn.stream(t1.select())
61
62 # this object supports async iteration and awaitable
63 # versions of methods like .all(), fetchmany(), etc.
64 async for row in async_result:
65 print(row)
66
67
68asyncio.run(async_main())

Callers 1

basic.pyFile · 0.70

Calls 9

create_async_engineFunction · 0.90
beginMethod · 0.45
run_syncMethod · 0.45
executeMethod · 0.45
insertMethod · 0.45
connectMethod · 0.45
selectMethod · 0.45
fetchallMethod · 0.45
streamMethod · 0.45

Tested by

no test coverage detected