-------------------------常用工具------------------------------------------------------------
!/bin/env python
import os, sys, ftplib from getpass import getpass from mimetypes import guess_type, add_type
dfltSite = 'home.rmi.net' dfltRdir = '.' dfltUser = 'lutz'
class FtpTools:
代码语言:javascript复制def getlocaldir(self):
return (len(sys.argv) > 1 and sys.argv[1]) or '.'
def getcleanall(self):
return input('Clean target dir first?')[:1] in ['y','Y']
def getpassword(self):
return getpass(
'Password for %s on %s:' % (self.remoteuser, self.remotesite))
def configTransfer(self, site=dfltSite, rdir=dfltRdir, user=dfltUser):
self.nonpassive = False # passive FTP on by default in 2.1
self.remotesite = site # transfer to/from this site
self.remotedir = rdir # and this dir ('.' means acct root)
self.remoteuser = user
self.localdir = self.getlocaldir()
self.cleanall = self.getcleanall()
self.remotepass = self.getpassword()
def isTextKind(self, remotename, trace=True):
add_type('text/x-python-win', '.pyw') # not in tables
mimetype, encoding = guess_type(remotename, strict=False)# allow extras
mimetype = mimetype or '?/?' # type unknown?
maintype = mimetype.split('/')[0] # get 1st part
if trace: print(maintype, encoding or '')
return maintype == 'text' and encoding == None # not compressed
def connectFtp(self):
print('connecting...')
connection = ftplib.FTP(self.remotesite) # connect to FTP site
connection.login(self.remoteuser, self.remotepass) # log in as user/pswd
connection.cwd(self.remotedir) # cd to dir to xfer
if self.nonpassive: # force active mode
connection.set_pasv(False) # most do passive
self.connection = connection
def cleanLocals(self):
if self.cleanall:
for localname in os.listdir(self.localdir): # local dirlisting
try: # local file delete
print('deleting local', localname)
os.remove(os.path.join(self.localdir, localname))
except:
print('cannot delete local', localname)
def cleanRemotes(self):
if self.cleanall:
for remotename in self.connection.nlst(): # remote dir listing
try: # remote file delete
print('deleting remote', remotename)
self.connection.delete(remotename)
except:
print('cannot delete remote', remotename)
def downloadOne(self, remotename, localpath):
if self.isTextKind(remotename):
localfile = open(localpath, 'w', encoding=self.connection.encoding)
def callback(line): localfile.write(line 'n')
self.connection.retrlines('RETR ' remotename, callback)
else:
localfile = open(localpath, 'wb')
self.connection.retrbinary('RETR ' remotename, localfile.write)
localfile.close()
def uploadOne(self, localname, localpath, remotename):
if self.isTextKind(localname):
localfile = open(localpath, 'rb')
self.connection.storlines('STOR ' remotename, localfile)
else:
localfile = open(localpath, 'rb')
self.connection.storbinary('STOR ' remotename, localfile)
localfile.close()
def downloadDir(self):
remotefiles = self.connection.nlst() # nlst is remote listing
for remotename in remotefiles:
if remotename in ('.', '..'): continue
localpath = os.path.join(self.localdir, remotename)
print('downloading', remotename, 'to', localpath, 'as', end=' ')
self.downloadOne(remotename, localpath)
print('Done:', len(remotefiles), 'files downloaded.')
def uploadDir(self):
localfiles = os.listdir(self.localdir) # listdir is local listing
for localname in localfiles:
localpath = os.path.join(self.localdir, localname)
print('uploading', localpath, 'to', localname, 'as', end=' ')
self.uploadOne(localname, localpath, localname)
print('Done:', len(localfiles), 'files uploaded.')
def run(self, cleanTarget=lambda:None, transferAct=lambda:None):
self.connectFtp()
cleanTarget()
transferAct()
self.connection.quit()
if name == 'main': ftp = FtpTools() xfermode = 'download' if len(sys.argv) > 1: xfermode = sys.argv.pop(1) # get del 2nd arg if xfermode == 'download': ftp.configTransfer() ftp.run(cleanTarget=ftp.cleanLocals, transferAct=ftp.downloadDir) elif xfermode == 'upload': ftp.configTransfer(site='learning-python.com', rdir='books', user='lutz') ftp.run(cleanTarget=ftp.cleanRemotes, transferAct=ftp.uploadDir) else: print('Usage: ftptools.py ["download" | "upload"] [localdir]')
------------------------------------------------------------------#上传目录树
!/bin/env python
import os, ftptools
class UploadAll(ftptools.FtpTools): def init(self): self.fcount = self.dcount = 0
代码语言:javascript复制def getcleanall(self):
return False # don't even ask
def uploadDir(self, localdir):
localfiles = os.listdir(localdir)
for localname in localfiles:
localpath = os.path.join(localdir, localname)
print('uploading', localpath, 'to', localname, end=' ')
if not os.path.isdir(localpath):
self.uploadOne(localname, localpath, localname)
self.fcount = 1
else:
try:
self.connection.mkd(localname)
print('directory created')
except:
print('directory not created')
self.connection.cwd(localname) # change remote dir
self.uploadDir(localpath) # upload local subdir
self.connection.cwd('..') # change back up
self.dcount = 1
print('directory exited')
if name == 'main': ftp = UploadAll() ftp.configTransfer(site='learning-python.com', rdir='training', user='lutz') ftp.run(transferAct = lambda: ftp.uploadDir(ftp.localdir)) print('Done:', ftp.fcount, 'files and', ftp.dcount, 'directories uploaded.')
----------------------------------------------删除远程目录树
!/bin/env python
from ftptools import FtpTools
class CleanAll(FtpTools): """ delete an entire remote tree of subdirectories """ def init(self): self.fcount = self.dcount = 0
代码语言:javascript复制def getlocaldir(self):
return None # irrelevent here
def getcleanall(self):
return True # implied here
def cleanDir(self):
lines = [] # each level has own lines
self.connection.dir(lines.append) # list current remote dir
for line in lines:
parsed = line.split() # split on whitespace
permiss = parsed[0] # assume 'drw... ... filename'
fname = parsed[-1]
if fname in ('.', '..'): # some include cwd and parent
continue
elif permiss[0] != 'd': # simple file: delete
print('file', fname)
self.connection.delete(fname)
self.fcount = 1
else: # directory: recur, del
print('directory', fname)
self.connection.cwd(fname) # chdir into remote dir
self.cleanDir() # clean subdirectory
self.connection.cwd('..') # chdir remote back up
self.connection.rmd(fname) # delete empty remote dir
self.dcount = 1
print('directory exited')
if name == 'main': ftp = CleanAll() ftp.configTransfer(site='learning-python.com', rdir='training', user='lutz') ftp.run(cleanTarget=ftp.cleanDir) print('Done:', ftp.fcount, 'files and', ftp.dcount, 'directories cleaned.')