"""Gopher protocol client interface.""" # gopher://uninformativ.de/0/gopher/RFCs/rfc1436.txt # # repo: # https://bitbucket.org/kveroneau/pythonexperiments/src/281fe19d536d2329ca3cad7b 2aac1c03807be791/gopher/?at=default import socket # Recognized file types A_TEXT = '0' A_MENU = '1' A_CSO = '2' A_ERROR = '3' A_MACBINHEX = '4' A_PCBINHEX = '5' A_UUENCODED = '6' A_INDEX = '7' A_TELNET = '8' A_BINARY = '9' A_DUPLICATE = '+' A_SOUND = 's' A_EVENT = 'e' A_CALENDAR = 'c' A_HTML = 'h' A_TN3270 = 'T' A_MIME = 'M' A_IMAGE = 'I' A_WHOIS = 'w' A_QUERY = 'q' A_GIF = 'g' A_HTML = 'h' # HTML file A_WWW = 'w' # WWW address A_PLUS_IMAGE = ':' A_PLUS_MOVIE = ';' A_PLUS_SOUND = '<' TEXT_TYPES = ['0', '1', '7'] class GopherType(object): text_only = True def __init__(self, data, gtype, client): self._data = data self._gtype = gtype self._client = client def get_data(self): return self._data def __str__(self): return self._data def __repr__(self): return '<%s: %s>' % (self.__class__.__name__, self._gtype) class TextFile(GopherType): def __str__(self): return '\n'.join(self._data) class GopherMenu(GopherType): _menu = None def get_data(self): if self._menu is None: self._menu = [] for item in self._data: gtype = item[0] entry = item[1:].split('\t') self._menu.append({'type':gtype, 'name':entry[0], 'selector':'%s%s' % (gtype,entry[1]), 'host':entry[2], 'port':entry[3]}) return self._menu def name_list(self): names = [] menu = self.get_data() for entry in menu: names.append(entry['name']) return names def display_names(self): print '\n'.join(self.name_list()) def get_entry(self, index): entry = self.get_data()[index] if self._client._host == entry['host'] and self._client._port == entry['port']: return self._client.get_selector(entry['selector']) g = Gopher(entry['host'], entry['port']) return g.get_selector(entry['selector']) def __str__(self): return '\n'.join(self.name_list()) class Gopher(object): def __init__(self, host, port=70): self._host, self._port = host, int(port) def send_selector(self, selector, query=None): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((self._host, self._port)) if query is not None: selector += '\t%s' % query sock.sendall('%s\r\n' % selector) sock.shutdown(1) return sock.makefile('rb') def path_to_selector(self, path): return path[2:] def get_textfile(self, selector, query=None): f = self.send_selector(selector, query) lines = [] while True: line = f.readline() if not line: break if line[-2:] == '\r\n': line = line[:-2] elif line[-1:] in '\r\n': line = line[:-1] if line == '.': break if line[:2] == '..': line = line[1:] lines.append(line) return lines def get_binary(self, selector): f = self.send_selector(selector) return f.read() def get_selector(self, selector, query=None): gtype = selector[0] selector = selector[1:] if gtype in TEXT_TYPES: data = self.get_textfile(selector, query) if gtype == '0': return TextFile(data, gtype, self) else: return GopherMenu(data, gtype, self) return self.get_binary(selector) def get_root_menu(self): return self.get_selector('1') def test(): """Trivial test program.""" from optparse import OptionParser parser = OptionParser() parser.add_option('--host', dest='host', default='gopher.floodgap.com', help='Gopher hostname to connect to') parser.add_option('-p', '--port', type='int', dest='port', default=70, help='Gopher port to connect to') parser.add_option('-s', '--selector', dest='selector', default='0/gopher/proxy', help='Gopher selector to use') parser.add_option('-q', '--query', dest='query', help='Gopher search query to use.') options, args = parser.parse_args() if options.host != 'gopher.floodgap.com' and options.selector == '0/gopher/proxy': options.selector = '1' if len(args) == 2: options.host, options.selector = args[0:2] g = Gopher(options.host, options.port) data = g.get_selector(options.selector, options.query) print data # Run the test when run as script if __name__ == '__main__': test() .