(self, request)
| 17 | |
| 18 | @pytest.fixture(autouse=True, scope='class') |
| 19 | def class_certs(self, request): |
| 20 | self.tmpdir = tempfile.mkdtemp() |
| 21 | self.key_name = 'worker.key' |
| 22 | self.cert_name = 'worker.pem' |
| 23 | |
| 24 | key = self.gen_private_key() |
| 25 | cert = self.gen_certificate(key=key, |
| 26 | common_name='celery cecurity integration') |
| 27 | |
| 28 | pem_key = key.private_bytes( |
| 29 | encoding=serialization.Encoding.PEM, |
| 30 | format=serialization.PrivateFormat.TraditionalOpenSSL, |
| 31 | encryption_algorithm=serialization.NoEncryption() |
| 32 | ) |
| 33 | |
| 34 | pem_cert = cert.public_bytes( |
| 35 | encoding=serialization.Encoding.PEM, |
| 36 | ) |
| 37 | |
| 38 | with open(self.tmpdir + '/' + self.key_name, 'wb') as key: |
| 39 | key.write(pem_key) |
| 40 | with open(self.tmpdir + '/' + self.cert_name, 'wb') as cert: |
| 41 | cert.write(pem_cert) |
| 42 | |
| 43 | request.cls.tmpdir = self.tmpdir |
| 44 | request.cls.key_name = self.key_name |
| 45 | request.cls.cert_name = self.cert_name |
| 46 | |
| 47 | yield |
| 48 | |
| 49 | os.remove(self.tmpdir + '/' + self.key_name) |
| 50 | os.remove(self.tmpdir + '/' + self.cert_name) |
| 51 | os.rmdir(self.tmpdir) |
| 52 | |
| 53 | @pytest.fixture(autouse=True) |
| 54 | def _prepare_setup(self, manager): |
nothing calls this directly
no test coverage detected