1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
__version__ = "0.1" __all__ = ["SimpleHTTPRequestHandler"] import html import http.server import mimetypes import os import posixpath import re import shutil import urllib.error import urllib.parse import urllib.request from io import BytesIO class SimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler): """简单的http文件服务器,支持上传下载 """ server_version = "SimpleHTTPWithUpload/" + __version__ def do_GET(self): f = self.send_head() if f: self.copyfile(f, self.wfile) f.close() def do_HEAD(self): f = self.send_head() if f: f.close() def do_POST(self): r, info = self.deal_post_data() print((r, info, "by: ", self.client_address)) f = BytesIO() f.write(b'<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">') f.write(b"<html>\n<title>Upload Result Page</title>\n") f.write(b"<body>\n<h2>Upload Result Page</h2>\n") f.write(b"<hr>\n") if r: f.write(b"<strong>Success:</strong>") else: f.write(b"<strong>Failed:</strong>") f.write(info.encode()) f.write(("<br><a href=\"%s\">back</a>" % self.headers['referer']).encode()) f.write(b"</body>\n</html>\n") length = f.tell() f.seek(0) self.send_response(200) self.send_header("Content-type", "text/html") self.send_header("Content-Length", str(length)) self.end_headers() if f: self.copyfile(f, self.wfile) f.close() def deal_post_data(self): content_type = self.headers['content-type'] if not content_type: return (False, "Content-Type header doesn't contain boundary") boundary = content_type.split("=")[1].encode() remainbytes = int(self.headers['content-length']) line = self.rfile.readline() remainbytes -= len(line) if not boundary in line: return (False, "Content NOT begin with boundary") line = self.rfile.readline() remainbytes -= len(line) fn = re.findall( r'Content-Disposition.*name="file"; filename="(.*)"', line.decode()) if not fn: return (False, "Can't find out file name...") path = self.translate_path(self.path) fn = os.path.join(path, fn[0]) line = self.rfile.readline() remainbytes -= len(line) line = self.rfile.readline() remainbytes -= len(line) try: out = open(fn, 'wb') except IOError: return (False, "Can't create file to write, do you have permission to write?") preline = self.rfile.readline() remainbytes -= len(preline) while remainbytes > 0: line = self.rfile.readline() remainbytes -= len(line) if boundary in line: preline = preline[0:-1] if preline.endswith(b'\r'): preline = preline[0:-1] out.write(preline) out.close() return (True, "File '%s' upload success!" % fn) else: out.write(preline) preline = line return (False, "Unexpect Ends of data.") def send_head(self): path = self.translate_path(self.path) f = None if os.path.isdir(path): if not self.path.endswith('/'): # redirect browser - doing basically what apache does self.send_response(301) self.send_header("Location", self.path + "/") self.end_headers() return None for index in "index.html", "index.htm": index = os.path.join(path, index) if os.path.exists(index): path = index break else: return self.list_directory(path) ctype = self.guess_type(path) try: # Always read in binary mode. Opening files in text mode may cause # newline translations, making the actual size of the content # transmitted *less* than the content-length! f = open(path, 'rb') except IOError: self.send_error(404, "File not found") return None self.send_response(200) self.send_header("Content-type", ctype) fs = os.fstat(f.fileno()) self.send_header("Content-Length", str(fs[6])) self.send_header("Last-Modified", self.date_time_string(fs.st_mtime)) self.end_headers() return f def list_directory(self, path): try: list = os.listdir(path) except os.error: self.send_error(404, "No permission to list directory") return None list.sort(key=lambda a: a.lower()) f = BytesIO() displaypath = html.escape(urllib.parse.unquote(self.path)) f.write(b'<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">') f.write(("<html>\n<title>Directory listing for %s</title>\n" % displaypath).encode()) # 避免中文乱码 f.write( b"<head>\n<meta http-equiv='Content-Type' content='text/html; charset=utf-8' />\n</head>\n") f.write(("<body>\n<h2>Directory listing for %s</h2>\n" % displaypath).encode()) f.write(b"<hr>\n") f.write(b"<form ENCTYPE=\"multipart/form-data\" method=\"post\">") f.write(b"<input name=\"file\" type=\"file\"/>") f.write(b"<input type=\"submit\" value=\"upload\"/></form>\n") f.write(b"<hr>\n<ul>\n") for name in list: fullname = os.path.join(path, name) displayname = linkname = name # Append / for directories or @ for symbolic links if os.path.isdir(fullname): displayname = name + "/" linkname = name + "/" if os.path.islink(fullname): displayname = name + "@" # Note: a link to a directory displays with @ and links with / f.write(('<li><a href="%s">%s</a>\n' % (urllib.parse.quote(linkname), html.escape(displayname))).encode()) f.write(b"</ul>\n<hr>\n</body>\n</html>\n") length = f.tell() f.seek(0) self.send_response(200) self.send_header("Content-type", "text/html") self.send_header("Content-Length", str(length)) self.end_headers() return f def translate_path(self, path): path = path.split('?', 1)[0] path = path.split('#', 1)[0] path = posixpath.normpath(urllib.parse.unquote(path)) words = path.split('/') words = [_f for _f in words if _f] path = os.getcwd() for word in words: drive, word = os.path.splitdrive(word) head, word = os.path.split(word) if word in (os.curdir, os.pardir): continue path = os.path.join(path, word) return path def copyfile(self, source, outputfile): shutil.copyfileobj(source, outputfile) def guess_type(self, path): base, ext = posixpath.splitext(path) if ext in self.extensions_map: return self.extensions_map[ext] ext = ext.lower() if ext in self.extensions_map: return self.extensions_map[ext] else: return self.extensions_map[''] if not mimetypes.inited: mimetypes.init() # try to read system mime.types extensions_map = mimetypes.types_map.copy() extensions_map.update({ '': 'application/octet-stream', # Default '.py': 'text/plain', '.c': 'text/plain', '.h': 'text/plain', }) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('--bind', '-b', default='', metavar='ADDRESS', help='Specify alternate bind address ' '[default: all interfaces]') parser.add_argument('--port', '-p', default=8000, type=int, help='Specify alternate port [default: 8000]') args = parser.parse_args() http.server.test(HandlerClass=SimpleHTTPRequestHandler, port=args.port, bind=args.bind) |
参考链接
- Python httpServer服务器(初级)
- python实现SimpleHTTPServer的POST方法
- ImportError: No module named 'BaseHTTPServer':解决方案
- python 搭建简单的http server,可直接post文件
- python使用PIL处理图片后返回给前端的坑
- PEP 594 – Removing dead batteries from the standard library
- python http server 解析post 请求数据,使用 urllib.parse库
- 1分钟升级python3自带http服务器!支持文件上传!
- JamesTheAwesomeDude/postparse.py