(self)
| 1120 | assert self.b._strip_prefix('x1b34') == 'x1b34' |
| 1121 | |
| 1122 | def test_global_keyprefix(self): |
| 1123 | global_keyprefix = "test_global_keyprefix" |
| 1124 | app = copy.deepcopy(self.app) |
| 1125 | app.conf.get('result_backend_transport_options', {}).update({"global_keyprefix": global_keyprefix}) |
| 1126 | b = KVBackend(app=app) |
| 1127 | tid = uuid() |
| 1128 | assert bytes_to_str(b.get_key_for_task(tid)) == f"{global_keyprefix}_celery-task-meta-{tid}" |
| 1129 | assert bytes_to_str(b.get_key_for_group(tid)) == f"{global_keyprefix}_celery-taskset-meta-{tid}" |
| 1130 | assert bytes_to_str(b.get_key_for_chord(tid)) == f"{global_keyprefix}_chord-unlock-{tid}" |
| 1131 | |
| 1132 | global_keyprefix = "test_global_keyprefix_" |
| 1133 | app = copy.deepcopy(self.app) |
| 1134 | app.conf.get('result_backend_transport_options', {}).update({"global_keyprefix": global_keyprefix}) |
| 1135 | b = KVBackend(app=app) |
| 1136 | tid = uuid() |
| 1137 | assert bytes_to_str(b.get_key_for_task(tid)) == f"{global_keyprefix}celery-task-meta-{tid}" |
| 1138 | assert bytes_to_str(b.get_key_for_group(tid)) == f"{global_keyprefix}celery-taskset-meta-{tid}" |
| 1139 | assert bytes_to_str(b.get_key_for_chord(tid)) == f"{global_keyprefix}chord-unlock-{tid}" |
| 1140 | |
| 1141 | global_keyprefix = "test_global_keyprefix:" |
| 1142 | app = copy.deepcopy(self.app) |
| 1143 | app.conf.get('result_backend_transport_options', {}).update({"global_keyprefix": global_keyprefix}) |
| 1144 | b = KVBackend(app=app) |
| 1145 | tid = uuid() |
| 1146 | assert bytes_to_str(b.get_key_for_task(tid)) == f"{global_keyprefix}celery-task-meta-{tid}" |
| 1147 | assert bytes_to_str(b.get_key_for_group(tid)) == f"{global_keyprefix}celery-taskset-meta-{tid}" |
| 1148 | assert bytes_to_str(b.get_key_for_chord(tid)) == f"{global_keyprefix}chord-unlock-{tid}" |
| 1149 | |
| 1150 | def test_global_keyprefix_missing(self): |
| 1151 | tid = uuid() |
nothing calls this directly
no test coverage detected