|
2 | 2 | from django.db import connections |
3 | 3 |
|
4 | 4 | from . import models |
| 5 | +from .models import EncryptionKey |
5 | 6 | from .test_base import EncryptionTestCase |
6 | 7 |
|
7 | 8 |
|
@@ -113,28 +114,20 @@ def test_key_creation_and_lookup(self): |
113 | 114 | generate and store a data key in the vault, then |
114 | 115 | query the vault with the keyAltName. |
115 | 116 | """ |
116 | | - connection = connections["encrypted"] |
117 | | - client = connection.connection |
118 | | - auto_encryption_opts = client._options.auto_encryption_opts |
119 | | - |
120 | | - key_vault_db, key_vault_coll = auto_encryption_opts._key_vault_namespace.split(".", 1) |
121 | | - vault_coll = client[key_vault_db][key_vault_coll] |
122 | | - |
123 | 117 | model_class = models.CharModel |
124 | 118 | test_key_alt_name = f"{model_class._meta.db_table}.value" |
125 | | - vault_coll.delete_many({"keyAltNames": test_key_alt_name}) |
126 | | - |
127 | | - with connection.schema_editor() as editor: |
| 119 | + # Delete the test key and verify it's gone. |
| 120 | + EncryptionKey.objects.filter(key_alt_name=test_key_alt_name).delete() |
| 121 | + with self.assertRaises(EncryptionKey.DoesNotExist): |
| 122 | + EncryptionKey.objects.get(key_alt_name=test_key_alt_name) |
| 123 | + # Regenerate the keyId. |
| 124 | + with connections["encrypted"].schema_editor() as editor: |
128 | 125 | encrypted_fields = editor._get_encrypted_fields(model_class) |
129 | | - |
130 | | - # Validate schema contains a keyId for our field |
131 | | - self.assertTrue(encrypted_fields["fields"]) |
| 126 | + # Validate schema contains a keyId for the field. |
132 | 127 | field_info = encrypted_fields["fields"][0] |
133 | 128 | self.assertEqual(field_info["path"], "value") |
134 | 129 | self.assertIsInstance(field_info["keyId"], Binary) |
135 | | - |
136 | | - # Lookup in key vault by the keyAltName created |
137 | | - key_doc = vault_coll.find_one({"keyAltNames": test_key_alt_name}) |
138 | | - self.assertIsNotNone(key_doc, "Key should exist in vault") |
139 | | - self.assertEqual(key_doc["_id"], field_info["keyId"]) |
140 | | - self.assertIn(test_key_alt_name, key_doc["keyAltNames"]) |
| 130 | + # Lookup in key vault by the keyAltName. |
| 131 | + key = EncryptionKey.objects.get(key_alt_name=test_key_alt_name) |
| 132 | + self.assertEqual(key.id, field_info["keyId"]) |
| 133 | + self.assertEqual(key.key_alt_name, [test_key_alt_name]) |
0 commit comments