MCPcopy
hub / github.com/redis/redis-py / test_alias

Method test_alias

tests/test_asyncio/test_search.py:787–845  ·  view source on GitHub ↗
(self, decoded_r: redis.Redis)

Source from the content-addressed store, hash-verified

785 @pytest.mark.redismod
786 @skip_ifmodversion_lt("2.0.0", "search")
787 async def test_alias(self, decoded_r: redis.Redis):
788 index1 = self.getClient(decoded_r)
789 index2 = self.getClient(decoded_r)
790
791 def1 = IndexDefinition(prefix=["index1:"])
792 def2 = IndexDefinition(prefix=["index2:"])
793
794 ftindex1 = index1.ft("testAlias")
795 ftindex2 = index2.ft("testAlias2")
796 await ftindex1.create_index((TextField("name"),), definition=def1)
797 await ftindex2.create_index((TextField("name"),), definition=def2)
798
799 await index1.hset("index1:lonestar", mapping={"name": "lonestar"})
800 await index2.hset("index2:yogurt", mapping={"name": "yogurt"})
801
802 if expects_resp2_shape(decoded_r) or expects_unified_shape(decoded_r):
803 res = (await ftindex1.search("*")).docs[0]
804 assert "index1:lonestar" == res.id
805
806 # create alias and check for results
807 await ftindex1.aliasadd("spaceballs")
808 alias_client = self.getClient(decoded_r).ft("spaceballs")
809 res = (await alias_client.search("*")).docs[0]
810 assert "index1:lonestar" == res.id
811
812 # Throw an exception when trying to add an alias that already exists
813 with pytest.raises(Exception):
814 await ftindex2.aliasadd("spaceballs")
815
816 # update alias and ensure new results
817 await ftindex2.aliasupdate("spaceballs")
818 alias_client2 = self.getClient(decoded_r).ft("spaceballs")
819
820 res = (await alias_client2.search("*")).docs[0]
821 assert "index2:yogurt" == res.id
822 elif expects_resp3_shape(decoded_r):
823 res = (await ftindex1.search("*"))["results"][0]
824 assert "index1:lonestar" == res["id"]
825
826 # create alias and check for results
827 await ftindex1.aliasadd("spaceballs")
828 alias_client = self.getClient(await decoded_r).ft("spaceballs")
829 res = (await alias_client.search("*"))["results"][0]
830 assert "index1:lonestar" == res["id"]
831
832 # Throw an exception when trying to add an alias that already exists
833 with pytest.raises(Exception):
834 await ftindex2.aliasadd("spaceballs")
835
836 # update alias and ensure new results
837 await ftindex2.aliasupdate("spaceballs")
838 alias_client2 = self.getClient(await decoded_r).ft("spaceballs")
839
840 res = (await alias_client2.search("*"))["results"][0]
841 assert "index2:yogurt" == res["id"]
842
843 await ftindex2.aliasdel("spaceballs")
844 with pytest.raises(Exception):

Callers

nothing calls this directly

Calls 13

IndexDefinitionClass · 0.90
TextFieldClass · 0.90
expects_resp2_shapeFunction · 0.90
expects_unified_shapeFunction · 0.90
expects_resp3_shapeFunction · 0.90
create_indexMethod · 0.80
hsetMethod · 0.80
aliasaddMethod · 0.80
aliasupdateMethod · 0.80
aliasdelMethod · 0.80
getClientMethod · 0.45
ftMethod · 0.45

Tested by

no test coverage detected