最近有些事儿比较忙,python的学习就断断续续,这个练习来得比预期的晚,不过还好,不管做什么,我都希望能认真对待,认真做好每一件事。
引入
这个练习原书中称作“使用XML-RPC进行文件共享”,题目是从使用的技术介绍的,做完这个练习之后我觉得最终其实实现的是一个迅雷的雏形——一个P2P下载器。当然了实际使用中的下载工具远远比这个要复杂,包括断点续传,断网重连等等。
那么我们学习的方法也就明确了,使用到了P2P的概念和XML-RPC的原理,那么就从这两个开始吧。
关于P2P
额,这里不进行官方的概念介绍,结合这个练习说明P2P的大体原理。使用P2P之后,我们查找一个共享资源的大体流程如下
# P2P协议的理解,弱化了server-client的概念,在网络中,每个节点都可能是server,也可能是client,暂且把发起请求的称为client,处理请求的称为server1. client向服务器A发起寻找文件test的请求2. A现在自己的目录下面寻找是否有test3. 如果有则返回该文件给client,程序结束4. 如果A服务器没有,则A遍历在线节点列表5. 如果找到,返回给client,结束6. 如果没有,返回空
由上的过程看出网络中每个节点需要具备以下功能:
# 每个节点需要维护的资源共享目录:在网络中存在n台机器,每台服务器都有一个可供其他服务器访问的目录(目录下面有各种文件)在线节点列表:每天机器都有一个所有愿意共项目录的服务器列表# 服务器的功能服务器应该能更新在线节点列表如果在线节点很多,是不是应该一直寻找下去(设置一个查找深度)服务器查询某个文件是否存在服务器可以下载某一个文件到自己的共享目录
很明白了,在P2P网络中每个节点可以为别人提供资源,也可以向其他节点请求资源,对于每个节点来说就是共享出自己的资源,然后知道去哪里请求资源(需要一个其他节点的列表)。
关于xml-rpc
rpc:远程过程调用(之前学习分布式操作系统的时候想不到远程过程调用是什么,使用场景是什么,但是相信总起到了我意识到或者没意识到的作用),心爱那个心爱那个其实和http协议也有类似的地方,都是请求其他计算机的资源,并不关心在其他计算机上是怎么实现的。其原理也就是
# xml_rpc原理在server A定义方法method在client使用A的proxy调用method
当然了,rpc还有更详细的协议规范,但是这一层都被xml-rpc通过封装屏蔽了。这里也不作进一步探寻了。
具体实现
主要有两个模块,client和server,主要功能在server中实现,client只是对server调用进行了一些封装。
server.py
#!/usr/bin/env python# -*- coding=utf-8 -×-from xmlrpclib import ServerProxy, Faultfrom os.path import join, isfile, abspathfrom SimpleXMLRPCServer import SimpleXMLRPCServerfrom urlparse import urlparseimport sysMAX_HISTORY_LENGTH = 6SimpleXMLRPCServer.allow_reuse_address = 1UNHANDLED = 100ACCESS_DENIED = 200class UnhandledQuery(Fault): """ 表示无法处理的查询异常 """ def __init__(self, message='Could not handle the query'): Fault.__init__(self, UNHANDLED, message)class AccessDenied(Fault): """ 在用户试图访问未被授权的资源时引发的异常 """ def __init__(self, message='Access denied'): Fault.__init__(self, ACCESS_DENIED, message)def inside(dir, name): """ 判断访问的目录是否有限制访问的目录,限制非法目录访问,比如/var/www/../data """ dirs = abspath(dirs) name = abspath(name) return name.startswith(join(dirs, ''))def getPort(url): """ 根据url获取端口号 """ name = urlparse(url)[1] parts = name.split(":") return int(parts[-1])class Node: """ P2P网络中的节点 """ def __init__(self, url, directory, secret): self.url = url self.directory = directory self.secret = secret self.know = set() def query(self, query, history=[]): """ 查找文件,先在本机查找,如果找到则返回,找不到则查找其他已知节点 """ try: print 'search local' return self._handle(query) except UnhandledQuery: # 添加到history,标记查找的深度 history = history + [self.url] if len(history) >= MAX_HISTORY_LENGTH: raise print 'search other server' return self._broadcast(query, history) def hello(self, other): """ 认识其他节点 """ self.know.add(other) return 0 def fetch(self, query, secret): """ 用于从节点下载数据 """ print 'server.fetch' if secret != self.secret: raise AccessDenied result = self.query(query) print 'result----',result # 把查询到的数据写到本地 f = open(join(self.directory, query), 'w') f.write(result) f.close() return 0 def _start(self): """ 内部使用,用于启动XML_RPC服务器 """ server = SimpleXMLRPCServer(("", getPort(self.url)), logRequests=False) server.register_instance(self) server.serve_forever() def _handle(self, query): """ 搜索文件在本服务器上是否存在 """ dirs = self.directory name = join(dirs, query) if not isfile(name): raise UnhandledQuery return open(name).read() def _broadcast(self, query, history): """ 内部时使用,用于将查询广播到其他已知节点 """ for other in self.know.copy(): try: # 根据url创建远程节点的proxy,使用xml_rpc进行远程调用 print 'search ----', other server = ServerProxy(other) print 'start query', other return server.query(query, history) except Fault, f: if f.faultCode == UNHANDLED: pass else: self.know.remove(other) except: # 说明该server已经不可用 self.know.remove(other) # 如果在所有节点中没有找到,就返回空 raise UnhandledQuerydef main(): url, directory, secret = sys.argv[1:] node = Node(url, directory, secret) node._start()if __name__ == '__main__': main()
client.py
#! /usr/bin/env python# -*- coding=utf-8 -*-from xmlrpclib import ServerProxy, Faultfrom cmd import Cmdfrom random import choicefrom string import lowercasefrom server import Node, UNHANDLEDfrom threading import Threadfrom time import sleepimport sysHEAD_START = 0.1 # secondSECRET_LEN = 100def randomStr(len): """ 生成指定长度的字符串 """ chars = [] letters = lowercase[:26] while len > 0: len = len - 1 chars.append(choice(letters)) return ''.join(chars)class Client1(Cmd): """ Node基于文本的界面 """ def __init__(self, url, dirname, urlfile): """ 初始化Cmd 随机生成秘钥 启动服务器 启动字符界面 """ print 'in' Cmd.__init__(self) self.secret = randomStr(SECRET_LEN) n = Node(url, dirname, self.secret) t = Thread(target=n._start) t.setDaemon(1) t.start() # 等待服务器启动 sleep(HEAD_START) self.server = ServerProxy(url) for line in open(urlfile): line = line.strip() self.server.hello(line) def do_fetch(self, arg): "调用服务器的fetch方法" try: print 'do_fetch' self.server.fetch(arg, self.secret) except Fault, f: print f if f.faultCode != UNHANDLED: raise print 'Could not find the file ', arg def do_exit(self, arg): "退出程序" print sys.exit() do_EOF = do_exit # 接收到EOF的时候也退出程序 # 设置提示符 prompt = '>'def main(): urlfile, directory, url = sys.argv[1:] print urlfile, directory, url print '---------' print dir() client = Client1(url, directory, urlfile) client.cmdloop()if __name__ == '__main__': main()
总结
在运行这个练习的时候,要注意死锁,因为有共享资源的问题
# 线程死锁server A的url list里面有Server Bserver B的url list里面有Server A当A请求B的时候,B发现自己也没有,B根据url list查询其他server,这个时候根据list需要去查询A但是这个时候A正在等待B的回应,所以就造成了A等B,B等A的死锁情况# 解决死锁每个server B在向url list里面的server A发出请求之前判断本神是否是Server A正在请求B
完整代码