MCPcopy
hub / github.com/celery/celery / check_privileges

Function check_privileges

celery/platforms.py:788–830  ·  view source on GitHub ↗
(accept_content)

Source from the content-addressed store, hash-verified

786
787
788def check_privileges(accept_content):
789 if grp is None or pwd is None:
790 return
791 pickle_or_serialize = ('pickle' in accept_content
792 or 'application/group-python-serialize' in accept_content)
793
794 uid = os.getuid() if hasattr(os, 'getuid') else 65535
795 gid = os.getgid() if hasattr(os, 'getgid') else 65535
796 euid = os.geteuid() if hasattr(os, 'geteuid') else 65535
797 egid = os.getegid() if hasattr(os, 'getegid') else 65535
798
799 if hasattr(os, 'fchown'):
800 if not all(hasattr(os, attr)
801 for attr in ('getuid', 'getgid', 'geteuid', 'getegid')):
802 raise SecurityError('suspicious platform, contact support')
803
804 # Get the group database entry for the current user's group and effective
805 # group id using grp.getgrgid() method
806 # We must handle the case where either the gid or the egid are not found.
807 try:
808 gid_entry = grp.getgrgid(gid)
809 egid_entry = grp.getgrgid(egid)
810 except KeyError:
811 warnings.warn(SecurityWarning(ASSUMING_ROOT))
812 _warn_or_raise_security_error(egid, euid, gid, uid,
813 pickle_or_serialize)
814 return
815
816 # Get the group and effective group name based on gid
817 gid_grp_name = gid_entry[0]
818 egid_grp_name = egid_entry[0]
819
820 # Create lists to use in validation step later.
821 gids_in_use = (gid_grp_name, egid_grp_name)
822 groups_with_security_risk = ('sudo', 'wheel')
823
824 is_root = uid == 0 or euid == 0
825 # Confirm that the gid and egid are not one that
826 # can be used to escalate privileges.
827 if is_root or any(group in gids_in_use
828 for group in groups_with_security_risk):
829 _warn_or_raise_security_error(egid, euid, gid, uid,
830 pickle_or_serialize)
831
832
833def _warn_or_raise_security_error(egid, euid, gid, uid, pickle_or_serialize):

Calls 3

SecurityErrorClass · 0.85
SecurityWarningClass · 0.85