1from ctypes import * 2import re 3import os 4import sys 5import platform 6import fc_sort 7 8### 9# Check whether the regex will match a file path starting with the provided 10# prefix 11# 12# Compares regex entries in file_contexts with a path prefix. Regex entries 13# are often more specific than this file prefix. For example, the regex could 14# be /system/bin/foo\.sh and the prefix could be /system. This function 15# loops over the regex removing characters from the end until 16# 1) there is a match - return True or 2) run out of characters - return 17# False. 18# 19def MatchPathPrefix(pathregex, prefix): 20 for i in range(len(pathregex), 0, -1): 21 try: 22 pattern = re.compile('^' + pathregex[0:i] + "$") 23 except: 24 continue 25 if pattern.match(prefix): 26 return True 27 return False 28 29def MatchPathPrefixes(pathregex, Prefixes): 30 for Prefix in Prefixes: 31 if MatchPathPrefix(pathregex, Prefix): 32 return True 33 return False 34 35class TERule: 36 def __init__(self, rule): 37 data = rule.split(',') 38 self.flavor = data[0] 39 self.sctx = data[1] 40 self.tctx = data[2] 41 self.tclass = data[3] 42 self.perms = set((data[4].strip()).split(' ')) 43 self.rule = rule 44 45class Policy: 46 __ExpandedRules = set() 47 __Rules = set() 48 __FcDict = None 49 __FcSorted = None 50 __GenfsDict = None 51 __libsepolwrap = None 52 __policydbP = None 53 __BUFSIZE = 2048 54 55 def AssertPathTypesDoNotHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr): 56 # Query policy for the types associated with Attr 57 TypesPol = self.QueryTypeAttribute(Attr, True) 58 # Search file_contexts to find types associated with input paths. 59 TypesFc = self.__GetTypesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix) 60 violators = TypesFc.intersection(TypesPol) 61 ret = "" 62 if len(violators) > 0: 63 ret += "The following types on " 64 ret += " ".join(str(x) for x in sorted(MatchPrefix)) 65 ret += " must not be associated with the " 66 ret += "\"" + Attr + "\" attribute: " 67 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 68 return ret 69 70 # Check that all types for "filesystem" have "attribute" associated with them 71 # for types labeled in genfs_contexts. 72 def AssertGenfsFilesystemTypesHaveAttr(self, Filesystem, Attr): 73 TypesPol = self.QueryTypeAttribute(Attr, True) 74 TypesGenfs = self.__GenfsDict[Filesystem] 75 violators = TypesGenfs.difference(TypesPol) 76 77 ret = "" 78 if len(violators) > 0: 79 ret += "The following types in " + Filesystem 80 ret += " must be associated with the " 81 ret += "\"" + Attr + "\" attribute: " 82 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 83 return ret 84 85 # Check that path prefixes that match MatchPrefix, and do not Match 86 # DoNotMatchPrefix have the attribute Attr. 87 # For example assert that all types in /sys, and not in /sys/kernel/debugfs 88 # have the sysfs_type attribute. 89 def AssertPathTypesHaveAttr(self, MatchPrefix, DoNotMatchPrefix, Attr): 90 # Query policy for the types associated with Attr 91 TypesPol = self.QueryTypeAttribute(Attr, True) 92 # Search file_contexts to find paths/types that should be associated with 93 # Attr. 94 TypesFc = self.__GetTypesByFilePathPrefix(MatchPrefix, DoNotMatchPrefix) 95 violators = TypesFc.difference(TypesPol) 96 97 ret = "" 98 if len(violators) > 0: 99 ret += "The following types on " 100 ret += " ".join(str(x) for x in sorted(MatchPrefix)) 101 ret += " must be associated with the " 102 ret += "\"" + Attr + "\" attribute: " 103 ret += " ".join(str(x) for x in sorted(violators)) + "\n" 104 return ret 105 106 # Return all file_contexts entries that map to the input Type. 107 def QueryFc(self, Type): 108 if Type in self.__FcDict: 109 return self.__FcDict[Type] 110 else: 111 return None 112 113 # Return all attributes associated with a type if IsAttr=False or 114 # all types associated with an attribute if IsAttr=True 115 def QueryTypeAttribute(self, Type, IsAttr): 116 TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, 117 create_string_buffer(Type), IsAttr) 118 if (TypeIterP == None): 119 sys.exit("Failed to initialize type iterator") 120 buf = create_string_buffer(self.__BUFSIZE) 121 TypeAttr = set() 122 while True: 123 ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE, 124 self.__policydbP, TypeIterP) 125 if ret == 0: 126 TypeAttr.add(buf.value) 127 continue 128 if ret == 1: 129 break; 130 # We should never get here. 131 sys.exit("Failed to import policy") 132 self.__libsepolwrap.destroy_type_iter(TypeIterP) 133 return TypeAttr 134 135 def __TERuleMatch(self, Rule, **kwargs): 136 # Match source type 137 if ("scontext" in kwargs and 138 len(kwargs['scontext']) > 0 and 139 Rule.sctx not in kwargs['scontext']): 140 return False 141 # Match target type 142 if ("tcontext" in kwargs and 143 len(kwargs['tcontext']) > 0 and 144 Rule.tctx not in kwargs['tcontext']): 145 return False 146 # Match target class 147 if ("tclass" in kwargs and 148 len(kwargs['tclass']) > 0 and 149 not bool(set([Rule.tclass]) & kwargs['tclass'])): 150 return False 151 # Match any perms 152 if ("perms" in kwargs and 153 len(kwargs['perms']) > 0 and 154 not bool(Rule.perms & kwargs['perms'])): 155 return False 156 return True 157 158 # resolve a type to its attributes or 159 # resolve an attribute to its types and attributes 160 # For example if scontext is the domain attribute, then we need to 161 # include all types with the domain attribute such as untrusted_app and 162 # priv_app and all the attributes of those types such as appdomain. 163 def ResolveTypeAttribute(self, Type): 164 types = self.GetAllTypes(False) 165 attributes = self.GetAllTypes(True) 166 167 if Type in types: 168 return self.QueryTypeAttribute(Type, False) 169 elif Type in attributes: 170 TypesAndAttributes = set() 171 Types = self.QueryTypeAttribute(Type, True) 172 TypesAndAttributes |= Types 173 for T in Types: 174 TypesAndAttributes |= self.QueryTypeAttribute(T, False) 175 return TypesAndAttributes 176 else: 177 return set() 178 179 # Return all TERules that match: 180 # (any scontext) or (any tcontext) or (any tclass) or (any perms), 181 # perms. 182 # Any unspecified paramenter will match all. 183 # 184 # Example: QueryTERule(tcontext=["foo", "bar"], perms=["entrypoint"]) 185 # Will return any rule with: 186 # (tcontext="foo" or tcontext="bar") and ("entrypoint" in perms) 187 def QueryTERule(self, **kwargs): 188 if len(self.__Rules) == 0: 189 self.__InitTERules() 190 191 # add any matching types and attributes for scontext and tcontext 192 if ("scontext" in kwargs and len(kwargs['scontext']) > 0): 193 scontext = set() 194 for sctx in kwargs['scontext']: 195 scontext |= self.ResolveTypeAttribute(sctx) 196 kwargs['scontext'] = scontext 197 if ("tcontext" in kwargs and len(kwargs['tcontext']) > 0): 198 tcontext = set() 199 for tctx in kwargs['tcontext']: 200 tcontext |= self.ResolveTypeAttribute(tctx) 201 kwargs['tcontext'] = tcontext 202 for Rule in self.__Rules: 203 if self.__TERuleMatch(Rule, **kwargs): 204 yield Rule 205 206 # Same as QueryTERule but only using the expanded ruleset. 207 # i.e. all attributes have been expanded to their various types. 208 def QueryExpandedTERule(self, **kwargs): 209 if len(self.__ExpandedRules) == 0: 210 self.__InitExpandedTERules() 211 for Rule in self.__ExpandedRules: 212 if self.__TERuleMatch(Rule, **kwargs): 213 yield Rule 214 215 def GetAllTypes(self, isAttr): 216 TypeIterP = self.__libsepolwrap.init_type_iter(self.__policydbP, None, isAttr) 217 if (TypeIterP == None): 218 sys.exit("Failed to initialize type iterator") 219 buf = create_string_buffer(self.__BUFSIZE) 220 AllTypes = set() 221 while True: 222 ret = self.__libsepolwrap.get_type(buf, self.__BUFSIZE, 223 self.__policydbP, TypeIterP) 224 if ret == 0: 225 AllTypes.add(buf.value) 226 continue 227 if ret == 1: 228 break; 229 # We should never get here. 230 sys.exit("Failed to import policy") 231 self.__libsepolwrap.destroy_type_iter(TypeIterP) 232 return AllTypes 233 234 def __ExactMatchPathPrefix(self, pathregex, prefix): 235 pattern = re.compile('^' + pathregex + "$") 236 if pattern.match(prefix): 237 return True 238 return False 239 240 # Return a tuple (prefix, i) where i is the index of the most specific 241 # match of prefix in the sorted file_contexts. This is useful for limiting a 242 # file_contexts search to matches that are more specific and omitting less 243 # specific matches. For example, finding all matches to prefix /data/vendor 244 # should not include /data(/.*)? if /data/vendor(/.*)? is also specified. 245 def __FcSortedIndex(self, prefix): 246 index = 0 247 for i in range(0, len(self.__FcSorted)): 248 if self.__ExactMatchPathPrefix(self.__FcSorted[i].path, prefix): 249 index = i 250 return prefix, index 251 252 # Return a tuple of (path, Type) for all matching paths. Use the sorted 253 # file_contexts and index returned from __FcSortedIndex() to limit results 254 # to results that are more specific than the prefix. 255 def __MatchPathPrefixTypes(self, prefix, index): 256 PathType = [] 257 for i in range(index, len(self.__FcSorted)): 258 if MatchPathPrefix(self.__FcSorted[i].path, prefix): 259 PathType.append((self.__FcSorted[i].path, self.__FcSorted[i].Type)) 260 return PathType 261 262 # Return types that match MatchPrefixes but do not match 263 # DoNotMatchPrefixes 264 def __GetTypesByFilePathPrefix(self, MatchPrefixes, DoNotMatchPrefixes): 265 Types = set() 266 267 MatchPrefixesWithIndex = [] 268 for MatchPrefix in MatchPrefixes: 269 MatchPrefixesWithIndex.append(self.__FcSortedIndex(MatchPrefix)) 270 271 for MatchPrefixWithIndex in MatchPrefixesWithIndex: 272 PathTypes = self.__MatchPathPrefixTypes(*MatchPrefixWithIndex) 273 for PathType in PathTypes: 274 if MatchPathPrefixes(PathType[0], DoNotMatchPrefixes): 275 continue 276 Types.add(PathType[1]) 277 return Types 278 279 def __GetTERules(self, policydbP, avtabIterP, Rules): 280 if Rules is None: 281 Rules = set() 282 buf = create_string_buffer(self.__BUFSIZE) 283 ret = 0 284 while True: 285 ret = self.__libsepolwrap.get_allow_rule(buf, self.__BUFSIZE, 286 policydbP, avtabIterP) 287 if ret == 0: 288 Rule = TERule(buf.value) 289 Rules.add(Rule) 290 continue 291 if ret == 1: 292 break; 293 # We should never get here. 294 sys.exit("Failed to import policy") 295 296 def __InitTERules(self): 297 avtabIterP = self.__libsepolwrap.init_avtab(self.__policydbP) 298 if (avtabIterP == None): 299 sys.exit("Failed to initialize avtab") 300 self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules) 301 self.__libsepolwrap.destroy_avtab(avtabIterP) 302 avtabIterP = self.__libsepolwrap.init_cond_avtab(self.__policydbP) 303 if (avtabIterP == None): 304 sys.exit("Failed to initialize conditional avtab") 305 self.__GetTERules(self.__policydbP, avtabIterP, self.__Rules) 306 self.__libsepolwrap.destroy_avtab(avtabIterP) 307 308 def __InitExpandedTERules(self): 309 avtabIterP = self.__libsepolwrap.init_expanded_avtab(self.__policydbP) 310 if (avtabIterP == None): 311 sys.exit("Failed to initialize avtab") 312 self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules) 313 self.__libsepolwrap.destroy_expanded_avtab(avtabIterP) 314 avtabIterP = self.__libsepolwrap.init_expanded_cond_avtab(self.__policydbP) 315 if (avtabIterP == None): 316 sys.exit("Failed to initialize conditional avtab") 317 self.__GetTERules(self.__policydbP, avtabIterP, self.__ExpandedRules) 318 self.__libsepolwrap.destroy_expanded_avtab(avtabIterP) 319 320 # load ctypes-ified libsepol wrapper 321 def __InitLibsepolwrap(self, LibPath): 322 lib = CDLL(LibPath) 323 324 # int get_allow_rule(char *out, size_t len, void *policydbp, void *avtab_iterp); 325 lib.get_allow_rule.restype = c_int 326 lib.get_allow_rule.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p]; 327 # void *load_policy(const char *policy_path); 328 lib.load_policy.restype = c_void_p 329 lib.load_policy.argtypes = [c_char_p] 330 # void destroy_policy(void *policydbp); 331 lib.destroy_policy.argtypes = [c_void_p] 332 # void *init_expanded_avtab(void *policydbp); 333 lib.init_expanded_avtab.restype = c_void_p 334 lib.init_expanded_avtab.argtypes = [c_void_p] 335 # void *init_expanded_cond_avtab(void *policydbp); 336 lib.init_expanded_cond_avtab.restype = c_void_p 337 lib.init_expanded_cond_avtab.argtypes = [c_void_p] 338 # void destroy_expanded_avtab(void *avtab_iterp); 339 lib.destroy_expanded_avtab.argtypes = [c_void_p] 340 # void *init_avtab(void *policydbp); 341 lib.init_avtab.restype = c_void_p 342 lib.init_avtab.argtypes = [c_void_p] 343 # void *init_cond_avtab(void *policydbp); 344 lib.init_cond_avtab.restype = c_void_p 345 lib.init_cond_avtab.argtypes = [c_void_p] 346 # void destroy_avtab(void *avtab_iterp); 347 lib.destroy_avtab.argtypes = [c_void_p] 348 # int get_type(char *out, size_t max_size, void *policydbp, void *type_iterp); 349 lib.get_type.restype = c_int 350 lib.get_type.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p] 351 # void *init_type_iter(void *policydbp, const char *type, bool is_attr); 352 lib.init_type_iter.restype = c_void_p 353 lib.init_type_iter.argtypes = [c_void_p, c_char_p, c_bool] 354 # void destroy_type_iter(void *type_iterp); 355 lib.destroy_type_iter.argtypes = [c_void_p] 356 # void *init_genfs_iter(void *policydbp) 357 lib.init_genfs_iter.restype = c_void_p 358 lib.init_genfs_iter.argtypes = [c_void_p] 359 # int get_genfs(char *out, size_t max_size, void *genfs_iterp); 360 lib.get_genfs.restype = c_int 361 lib.get_genfs.argtypes = [c_char_p, c_size_t, c_void_p, c_void_p] 362 # void destroy_genfs_iter(void *genfs_iterp) 363 lib.destroy_genfs_iter.argtypes = [c_void_p] 364 365 self.__libsepolwrap = lib 366 367 def __GenfsDictAdd(self, Dict, buf): 368 fs, path, context = buf.split(" ") 369 Type = context.split(":")[2] 370 if not fs in Dict: 371 Dict[fs] = {Type} 372 else: 373 Dict[fs].add(Type) 374 375 def __InitGenfsCon(self): 376 self.__GenfsDict = {} 377 GenfsIterP = self.__libsepolwrap.init_genfs_iter(self.__policydbP) 378 if (GenfsIterP == None): 379 sys.exit("Failed to retreive genfs entries") 380 buf = create_string_buffer(self.__BUFSIZE) 381 while True: 382 ret = self.__libsepolwrap.get_genfs(buf, self.__BUFSIZE, 383 self.__policydbP, GenfsIterP) 384 if ret == 0: 385 self.__GenfsDictAdd(self.__GenfsDict, buf.value) 386 continue 387 if ret == 1: 388 self.__GenfsDictAdd(self.__GenfsDict, buf.value) 389 break; 390 # We should never get here. 391 sys.exit("Failed to get genfs entries") 392 self.__libsepolwrap.destroy_genfs_iter(GenfsIterP) 393 394 # load file_contexts 395 def __InitFC(self, FcPaths): 396 if FcPaths is None: 397 return 398 fc = [] 399 for path in FcPaths: 400 if not os.path.exists(path): 401 sys.exit("file_contexts file " + path + " does not exist.") 402 fd = open(path, "r") 403 fc += fd.readlines() 404 fd.close() 405 self.__FcDict = {} 406 for i in fc: 407 rec = i.split() 408 try: 409 t = rec[-1].split(":")[2] 410 if t in self.__FcDict: 411 self.__FcDict[t].append(rec[0]) 412 else: 413 self.__FcDict[t] = [rec[0]] 414 except: 415 pass 416 self.__FcSorted = fc_sort.FcSort(FcPaths) 417 418 # load policy 419 def __InitPolicy(self, PolicyPath): 420 cPolicyPath = create_string_buffer(PolicyPath) 421 self.__policydbP = self.__libsepolwrap.load_policy(cPolicyPath) 422 if (self.__policydbP is None): 423 sys.exit("Failed to load policy") 424 425 def __init__(self, PolicyPath, FcPaths, LibPath): 426 self.__InitLibsepolwrap(LibPath) 427 self.__InitFC(FcPaths) 428 self.__InitPolicy(PolicyPath) 429 self.__InitGenfsCon() 430 431 def __del__(self): 432 if self.__policydbP is not None: 433 self.__libsepolwrap.destroy_policy(self.__policydbP) 434