MCPcopy
hub / github.com/psycopg/psycopg / test_get_config_rotates_connections

Function test_get_config_rotates_connections

tests/pool/test_pool.py:1125–1177  ·  view source on GitHub ↗
(dsn)

Source from the content-addressed store, hash-verified

1123
1124
1125def test_get_config_rotates_connections(dsn):
1126 config_rotation_counter = 0
1127
1128 def rotating_config():
1129 nonlocal config_rotation_counter
1130 config_rotation_counter += 1
1131 return dsn
1132
1133 app_names = ["app-1", "app-2"]
1134 kwargs_counter = 0
1135
1136 def rotating_kwargs():
1137 # Return a different application_name for each new connection
1138 nonlocal kwargs_counter
1139 kwargs_counter += 1
1140 return {"application_name": app_names[kwargs_counter % len(app_names)]}
1141
1142 p = pool.ConnectionPool(
1143 conninfo=rotating_config,
1144 kwargs=rotating_kwargs,
1145 min_size=2,
1146 max_lifetime=0.2,
1147 open=False,
1148 )
1149
1150 try:
1151 p.open()
1152 p.wait()
1153
1154 # Make sure we created two connections (rotating_config called twice)
1155 assert config_rotation_counter == 2
1156
1157 # Acquire both connections and check application_name
1158 with p.connection() as conn1, p.connection() as conn2:
1159 row1 = conn1.execute("SHOW application_name")
1160 row2 = conn2.execute("SHOW application_name")
1161
1162 name1 = row1.fetchone()
1163 assert (
1164 name1 is not None
1165 ), "first call to SHOW application_name returned no rows"
1166 assert name1[0] in app_names
1167
1168 name2 = row2.fetchone()
1169 assert (
1170 name2 is not None
1171 ), "second call to SHOW application_name returned no rows"
1172 assert name2[0] in app_names
1173
1174 # Make sure that names are different.
1175 assert name1 != name2
1176 finally:
1177 p.close()
1178
1179
1180@pytest.mark.slow

Callers

nothing calls this directly

Calls 6

openMethod · 0.95
waitMethod · 0.95
connectionMethod · 0.95
closeMethod · 0.95
executeMethod · 0.45
fetchoneMethod · 0.45

Tested by

no test coverage detected