1#!/usr/bin/env python 2 3import argparse 4import collections 5import logging 6import os 7import re 8import subprocess 9import textwrap 10 11from gensyscalls import SupportedArchitectures, SysCallsTxtParser 12 13 14BPF_JGE = "BPF_JUMP(BPF_JMP|BPF_JGE|BPF_K, {0}, {1}, {2})" 15BPF_JEQ = "BPF_JUMP(BPF_JMP|BPF_JEQ|BPF_K, {0}, {1}, {2})" 16BPF_ALLOW = "BPF_STMT(BPF_RET|BPF_K, SECCOMP_RET_ALLOW)" 17 18 19class SyscallRange(object): 20 def __init__(self, name, value): 21 self.names = [name] 22 self.begin = value 23 self.end = self.begin + 1 24 25 def __str__(self): 26 return "(%s, %s, %s)" % (self.begin, self.end, self.names) 27 28 def add(self, name, value): 29 if value != self.end: 30 raise ValueError 31 self.end += 1 32 self.names.append(name) 33 34 35def load_syscall_names_from_file(file_path, architecture): 36 parser = SysCallsTxtParser() 37 parser.parse_open_file(open(file_path)) 38 return set([x["name"] for x in parser.syscalls if x.get(architecture)]) 39 40 41def load_syscall_priorities_from_file(file_path): 42 format_re = re.compile(r'^\s*([A-Za-z_][A-Za-z0-9_]+)\s*$') 43 priorities = [] 44 with open(file_path) as f: 45 for line in f: 46 m = format_re.match(line) 47 if not m: 48 continue 49 try: 50 name = m.group(1) 51 priorities.append(name) 52 except: 53 logging.debug('Failed to parse %s from %s', (line, file_path)) 54 pass 55 56 return priorities 57 58 59def merge_names(base_names, allowlist_names, blocklist_names): 60 if bool(blocklist_names - base_names): 61 raise RuntimeError("blocklist item not in bionic - aborting " + str( 62 blocklist_names - base_names)) 63 64 return (base_names - blocklist_names) | allowlist_names 65 66 67def extract_priority_syscalls(syscalls, priorities): 68 # Extract syscalls that are not in the priority list 69 other_syscalls = \ 70 [syscall for syscall in syscalls if syscall[0] not in priorities] 71 # For prioritized syscalls, keep the order in which they appear in th 72 # priority list 73 syscall_dict = {syscall[0]: syscall[1] for syscall in syscalls} 74 priority_syscalls = [] 75 for name in priorities: 76 if name in syscall_dict.keys(): 77 priority_syscalls.append((name, syscall_dict[name])) 78 return priority_syscalls, other_syscalls 79 80 81def parse_syscall_NRs(names_path): 82 # The input is now the preprocessed source file. This will contain a lot 83 # of junk from the preprocessor, but our lines will be in the format: 84 # 85 # #define __(ARM_)?NR_${NAME} ${VALUE} 86 # 87 # Where ${VALUE} is a preprocessor expression. 88 89 constant_re = re.compile( 90 r'^\s*#define\s+([A-Za-z_][A-Za-z0-9_]+)\s+(.+)\s*$') 91 token_re = re.compile(r'\b[A-Za-z_][A-Za-z0-9_]+\b') 92 constants = {} 93 with open(names_path) as f: 94 for line in f: 95 m = constant_re.match(line) 96 if not m: 97 continue 98 try: 99 name = m.group(1) 100 # eval() takes care of any arithmetic that may be done 101 value = eval(token_re.sub(lambda x: str(constants[x.group(0)]), 102 m.group(2))) 103 104 constants[name] = value 105 except: 106 logging.debug('Failed to parse %s', line) 107 pass 108 109 syscalls = {} 110 for name, value in constants.iteritems(): 111 if not name.startswith("__NR_") and not name.startswith("__ARM_NR"): 112 continue 113 if name.startswith("__NR_"): 114 # Remote the __NR_ prefix 115 name = name[len("__NR_"):] 116 syscalls[name] = value 117 118 return syscalls 119 120 121def convert_NRs_to_ranges(syscalls): 122 # Sort the values so we convert to ranges and binary chop 123 syscalls = sorted(syscalls, lambda x, y: cmp(x[1], y[1])) 124 125 # Turn into a list of ranges. Keep the names for the comments 126 ranges = [] 127 for name, value in syscalls: 128 if not ranges: 129 ranges.append(SyscallRange(name, value)) 130 continue 131 132 last_range = ranges[-1] 133 if last_range.end == value: 134 last_range.add(name, value) 135 else: 136 ranges.append(SyscallRange(name, value)) 137 return ranges 138 139 140# Converts the sorted ranges of allowed syscalls to a binary tree bpf 141# For a single range, output a simple jump to {fail} or {allow}. We can't set 142# the jump ranges yet, since we don't know the size of the filter, so use a 143# placeholder 144# For multiple ranges, split into two, convert the two halves and output a jump 145# to the correct half 146def convert_to_intermediate_bpf(ranges): 147 if len(ranges) == 1: 148 # We will replace {fail} and {allow} with appropriate range jumps later 149 return [BPF_JGE.format(ranges[0].end, "{fail}", "{allow}") + 150 ", //" + "|".join(ranges[0].names)] 151 else: 152 half = (len(ranges) + 1) / 2 153 first = convert_to_intermediate_bpf(ranges[:half]) 154 second = convert_to_intermediate_bpf(ranges[half:]) 155 jump = [BPF_JGE.format(ranges[half].begin, len(first), 0) + ","] 156 return jump + first + second 157 158 159# Converts the prioritized syscalls to a bpf list that is prepended to the 160# tree generated by convert_to_intermediate_bpf(). If we hit one of these 161# syscalls, shortcut to the allow statement at the bottom of the tree 162# immediately 163def convert_priority_to_intermediate_bpf(priority_syscalls): 164 result = [] 165 for i, syscall in enumerate(priority_syscalls): 166 result.append(BPF_JEQ.format(syscall[1], "{allow}", 0) + 167 ", //" + syscall[0]) 168 return result 169 170 171def convert_ranges_to_bpf(ranges, priority_syscalls): 172 bpf = convert_priority_to_intermediate_bpf(priority_syscalls) + \ 173 convert_to_intermediate_bpf(ranges) 174 175 # Now we know the size of the tree, we can substitute the {fail} and {allow} 176 # placeholders 177 for i, statement in enumerate(bpf): 178 # Replace placeholder with 179 # "distance to jump to fail, distance to jump to allow" 180 # We will add a kill statement and an allow statement after the tree 181 # With bpfs jmp 0 means the next statement, so the distance to the end is 182 # len(bpf) - i - 1, which is where we will put the kill statement, and 183 # then the statement after that is the allow statement 184 bpf[i] = statement.format(fail=str(len(bpf) - i), 185 allow=str(len(bpf) - i - 1)) 186 187 # Add the allow calls at the end. If the syscall is not matched, we will 188 # continue. This allows the user to choose to match further syscalls, and 189 # also to choose the action when we want to block 190 bpf.append(BPF_ALLOW + ",") 191 192 # Add check that we aren't off the bottom of the syscalls 193 bpf.insert(0, BPF_JGE.format(ranges[0].begin, 0, str(len(bpf))) + ',') 194 return bpf 195 196 197def convert_bpf_to_output(bpf, architecture, name_modifier): 198 if name_modifier: 199 name_modifier = name_modifier + "_" 200 else: 201 name_modifier = "" 202 header = textwrap.dedent("""\ 203 // File autogenerated by {self_path} - edit at your peril!! 204 205 #include <linux/filter.h> 206 #include <errno.h> 207 208 #include "seccomp/seccomp_bpfs.h" 209 const sock_filter {architecture}_{suffix}filter[] = {{ 210 """).format(self_path=os.path.basename(__file__), architecture=architecture, 211 suffix=name_modifier) 212 213 footer = textwrap.dedent("""\ 214 215 }}; 216 217 const size_t {architecture}_{suffix}filter_size = sizeof({architecture}_{suffix}filter) / sizeof(struct sock_filter); 218 """).format(architecture=architecture,suffix=name_modifier) 219 return header + "\n".join(bpf) + footer 220 221 222def construct_bpf(syscalls, architecture, name_modifier, priorities): 223 priority_syscalls, other_syscalls = \ 224 extract_priority_syscalls(syscalls, priorities) 225 ranges = convert_NRs_to_ranges(other_syscalls) 226 bpf = convert_ranges_to_bpf(ranges, priority_syscalls) 227 return convert_bpf_to_output(bpf, architecture, name_modifier) 228 229 230def gen_policy(name_modifier, out_dir, base_syscall_file, syscall_files, syscall_NRs, priority_file): 231 for arch in SupportedArchitectures: 232 base_names = load_syscall_names_from_file(base_syscall_file, arch) 233 allowlist_names = set() 234 blocklist_names = set() 235 for f in syscall_files: 236 if "blocklist" in f.lower(): 237 blocklist_names |= load_syscall_names_from_file(f, arch) 238 else: 239 allowlist_names |= load_syscall_names_from_file(f, arch) 240 priorities = [] 241 if priority_file: 242 priorities = load_syscall_priorities_from_file(priority_file) 243 244 allowed_syscalls = [] 245 for name in merge_names(base_names, allowlist_names, blocklist_names): 246 try: 247 allowed_syscalls.append((name, syscall_NRs[arch][name])) 248 except: 249 logging.exception("Failed to find %s in %s", name, arch) 250 raise 251 output = construct_bpf(allowed_syscalls, arch, name_modifier, priorities) 252 253 # And output policy 254 existing = "" 255 filename_modifier = "_" + name_modifier if name_modifier else "" 256 output_path = os.path.join(out_dir, 257 "{}{}_policy.cpp".format(arch, filename_modifier)) 258 with open(output_path, "w") as output_file: 259 output_file.write(output) 260 261 262def main(): 263 parser = argparse.ArgumentParser( 264 description="Generates a seccomp-bpf policy") 265 parser.add_argument("--verbose", "-v", help="Enables verbose logging.") 266 parser.add_argument("--name-modifier", 267 help=("Specifies the name modifier for the policy. " 268 "One of {app,system}.")) 269 parser.add_argument("--out-dir", 270 help="The output directory for the policy files") 271 parser.add_argument("base_file", metavar="base-file", type=str, 272 help="The path of the base syscall list (SYSCALLS.TXT).") 273 parser.add_argument("files", metavar="FILE", type=str, nargs="+", 274 help=("The path of the input files. In order to " 275 "simplify the build rules, it can take any of the " 276 "following files: \n" 277 "* /blocklist.*\.txt$/ syscall blocklist.\n" 278 "* /allowlist.*\.txt$/ syscall allowlist.\n" 279 "* /priority.txt$/ priorities for bpf rules.\n" 280 "* otherwise, syscall name-number mapping.\n")) 281 args = parser.parse_args() 282 283 if args.verbose: 284 logging.basicConfig(level=logging.DEBUG) 285 else: 286 logging.basicConfig(level=logging.INFO) 287 288 syscall_files = [] 289 priority_file = None 290 syscall_NRs = {} 291 for filename in args.files: 292 if filename.lower().endswith('.txt'): 293 if filename.lower().endswith('priority.txt'): 294 priority_file = filename 295 else: 296 syscall_files.append(filename) 297 else: 298 m = re.search(r"libseccomp_gen_syscall_nrs_([^/]+)", filename) 299 syscall_NRs[m.group(1)] = parse_syscall_NRs(filename) 300 301 gen_policy(name_modifier=args.name_modifier, out_dir=args.out_dir, 302 syscall_NRs=syscall_NRs, base_syscall_file=args.base_file, 303 syscall_files=syscall_files, priority_file=priority_file) 304 305 306if __name__ == "__main__": 307 main() 308