# -*- coding: utf-8 -*-
""" FTP manipulation library
@author: Milan Falešník <mfalesni@redhat.com>
"""
import ftplib
import os
import re
from datetime import datetime
from io import BytesIO
from time import mktime
from time import strptime
import fauxfactory
from cfme.utils.conf import cfme_data
from cfme.utils.conf import credentials
from cfme.utils.log import logger
try:
Pattern = re.Pattern
except AttributeError:
Pattern = re._pattern_type
[docs]class FTPException(Exception):
pass
[docs]class FTPDirectory(object):
""" FTP FS Directory encapsulation
This class represents one directory.
Contains pointers to all child directories (self.directories)
and also all files in current directory (self.files)
"""
def __init__(self, client, name, items, parent_dir=None, time=None):
""" Constructor
Args:
client: ftplib.FTP instance
name: Name of this directory
items: Content of this directory
parent_dir: Pointer to a parent directory to maintain hierarchy. None if root
time: Time of this object
"""
self.client = client
self.parent_dir = parent_dir
self.time = time
self.name = name
self.files = []
self.directories = []
for item in items:
if isinstance(item, dict): # Is a directory
self.directories.append(FTPDirectory(self.client,
item["dir"],
item["content"],
parent_dir=self,
time=item["time"]))
else:
self.files.append(FTPFile(self.client, item[0], self, item[1]))
@property
def path(self):
"""
Returns:
whole path for this directory
"""
return os.path.join(self.parent_dir.path if self.parent_dir else "", self.name)
def __repr__(self):
return "<FTPDirectory {}>".format(self.path)
[docs] def cd(self, path):
""" Change to a directory
Changes directory to a path specified by parameter path. There are three special cases:
/ - climbs by self.parent_dir up in the hierarchy until it reaches root element.
. - does nothing
.. - climbs one level up in hierarchy, if present, otherwise does the same as preceeding.
Args:
path: Path to change
"""
if path == ".":
return self
elif path == "..":
result = self
if result.parent_dir:
result = result.parent_dir
return result
elif path == "/":
result = self
while result.parent_dir:
result = result.parent_dir
return result
enter = path.strip("/").split("/", 1)
remainder = None
if len(enter) == 2:
enter, remainder = enter
for item in self.directories:
if item.name == enter:
if remainder:
return item.cd("/".join(remainder))
else:
return item
raise FTPException("Directory {}{} does not exist!".format(self.path, enter))
[docs] def search(self, by, files=True, directories=True):
""" Recursive search by string or regexp.
Searches throughout all the filesystem structure from top till the bottom until
it finds required files or dirctories.
You can specify either plain string or regexp. String search does classic ``in``,
regexp matching is done by exact matching (by.match).
Args:
by: Search string or regexp
files: Whether look for files
directories: Whether look for directories
Returns:
List of all objects found in FS
"""
def _scan(what, in_what):
if isinstance(what, Pattern):
return what.match(in_what) is not None
else:
return what in in_what
results = []
if files:
for f in self.files:
if _scan(by, f.name):
results.append(f)
for d in self.directories:
if directories:
if _scan(by, d.name):
results.append(d)
results.extend(d.search(by, files=files, directories=directories))
return results
[docs]class FTPFile(object):
""" FTP FS File encapsulation
This class represents one file in the FS hierarchy.
It encapsulates mainly its position in FS and adds the possibility
of downloading the file.
"""
def __init__(self, client, name, parent_dir, time=None):
""" Constructor
Args:
client: ftplib.FTP instance
name: File name (without path)
parent_dir: Directory in which this file is (FTPDirectory or path)
time: Time to match local computer's timezone
"""
self.client = client
self.parent_dir = parent_dir
self.name = name
self.time = time
@property
def path(self):
"""
Returns:
whole path for this file
"""
parent_dir = (
self.parent_dir.path if isinstance(self.parent_dir, FTPDirectory) else self.parent_dir
)
return os.path.join(parent_dir, self.name)
@property
def local_time(self):
"""
Returns:
time modified to match local computer's time zone
"""
return self.client.dt + self.time
def __repr__(self):
return "<FTPFile {}>".format(self.path)
[docs] def retr(self, callback):
""" Retrieve file
Wrapper around ftplib.FTP.retrbinary().
This function cd's to the directory where this file is present, then calls the
FTP's retrbinary() function with provided callable and then cd's back where it started
to keep it consistent.
Args:
callback: Any callable that accepts one parameter as the data
Raises:
AssertionError: When any of the CWD or CDUP commands fail.
ftplib.error_perm: When retrbinary call of ftplib fails
"""
dirs, f = self.path.rsplit("/", 1)
dirs = dirs.lstrip("/").split("/")
# Dive in
for d in dirs:
assert self.client.cwd(d), "Could not change into the directory {}!".format(d)
self.client.retrbinary(f, callback)
# Dive out
for d in dirs:
assert self.client.cdup(), "Could not get out of directory {}!".format(d)
[docs] def download(self, target=None):
""" Download file into this machine
Wrapper around self.retr function. It downloads the file from remote filesystem
into local filesystem. Name is either preserved original, or can be changed.
Args:
target: Target file name (None to preserver the original)
"""
if target is None:
target = self.name
with open(target, "wb") as output:
self.retr(output.write)
[docs]class FTPClient(object):
""" FTP Client encapsulation
This class provides basic encapsulation around ftplib's FTP class.
It wraps some methods and allows to easily delete whole directory or walk
through the directory tree.
Usage:
>>> from utils.ftp import FTPClient
>>> ftp = FTPClient("host", "user", "password")
>>> only_files_with_EVM_in_name = ftp.filesystem.search("EVM", directories=False)
>>> only_files_by_regexp = ftp.filesystem.search(re.compile("regexp"), directories=False)
>>> some_directory = ftp.filesystem.cd("a/b/c") # cd's to this directory
>>> root = some_directory.cd("/")
Always going through filesystem property is a bit slow as it parses the structure on each use.
If you are sure that the structure will remain intact between uses, you can do as follows
to save the time::
>>> fs = ftp.filesystem
Let's download some files::
>>> for f in ftp.filesystem.search("IMPORTANT_FILE", directories=False):
... f.download() # To pickup its original name
... f.download("custom_name")
We finished the testing, so we don't need the content of the directory::
>>> ftp.recursively_delete()
And it's gone.
"""
def __init__(self, host, login, password, upload_dir="/", time_diff=True):
""" Constructor
Args:
host: FTP server host
login: FTP login
password: FTP password
time_diff: Server and client time diff management
"""
self.host = host
self.login = login
self.password = password
self.ftp = None
self.dt = None
self.upload_dir = upload_dir
self.connect()
if time_diff:
self.update_time_difference()
[docs] def connect(self):
self.ftp = ftplib.FTP(self.host)
self.ftp.login(self.login, self.password)
logger.info("FTP Server login successful")
[docs] def update_time_difference(self):
""" Determine the time difference between the FTP server and this computer.
This is done by uploading a fake file, reading its time and deleting it.
Then the self.dt variable captures the time you need to ADD to the remote
time or SUBTRACT from local time.
The FTPFile object carries this automatically as it has .local_time property
which adds the client's .dt to its time.
"""
TIMECHECK_FILE_NAME = fauxfactory.gen_alphanumeric(length=16)
void_file = BytesIO(b'random_example')
self.cwd(self.upload_dir)
assert "Transfer complete" in self.storbinary(TIMECHECK_FILE_NAME, void_file),\
"Could not upload a file for time checking with name {}!".format(TIMECHECK_FILE_NAME)
void_file.close()
now = datetime.now()
for d, name, time in self.ls():
if name == TIMECHECK_FILE_NAME:
self.dt = now - time
self.dele(TIMECHECK_FILE_NAME)
self.cwd("/")
return True
raise FTPException("The timecheck file was not found in the current FTP directory")
[docs] def ls(self):
""" Lists the content of a directory.
Returns:
List of all items in current directory
Return format is [(is_dir?, "name", remote_time), ...]
"""
result = []
def _callback(line):
is_dir = line.upper().startswith("D")
# Max 8, then the final is file which can contain something blank
fields = re.split(r"\s+", line, maxsplit=8)
# This is because how informations in LIST are presented
# Nov 11 12:34 filename (from the end)
date = strptime(str(datetime.now().year) + " " + fields[-4] + " " + fields[-3] + " " +
fields[-2],
"%Y %b %d %H:%M")
# convert time.struct_time into datetime
date = datetime.fromtimestamp(mktime(date))
result.append((is_dir, fields[-1], date))
self.ftp.dir(_callback)
return result
[docs] def pwd(self):
""" Get current directory
Returns:
Current directory
Raises:
AssertionError: PWD command fails
"""
result = self.ftp.sendcmd("PWD")
assert "is the current directory" in result, "PWD command failed"
x, d, y = result.strip().split("\"")
return d.strip()
[docs] def cdup(self):
""" Goes one level up in directory hierarchy (cd ..)
"""
return self.ftp.sendcmd("CDUP")
[docs] def mkd(self, d):
""" Create a directory
Args:
d: Directory name
Returns:
Success of the action
"""
try:
return self.ftp.sendcmd("MKD {}".format(d)).startswith("250")
except ftplib.error_perm:
return False
[docs] def rmd(self, d):
""" Remove a directory
Args:
d: Directory name
Returns:
Success of the action
"""
try:
return self.ftp.sendcmd("RMD {}".format(d)).startswith("250")
except ftplib.error_perm:
return False
[docs] def dele(self, f):
""" Remove a file
Args:
f: File name
Returns:
Success of the action
"""
try:
return self.ftp.sendcmd("DELE {}".format(f)).startswith("250")
except ftplib.error_perm:
return False
[docs] def cwd(self, d):
""" Enter a directory
Args:
d: Directory name
Returns:
Success of the action
"""
try:
return self.ftp.sendcmd("CWD {}".format(d)).startswith("250")
except ftplib.error_perm:
return False
[docs] def close(self):
""" Finish work and close connection
"""
self.ftp.quit()
self.ftp.close()
self.ftp = None
[docs] def retrbinary(self, f, callback):
""" Download file
You need to specify the callback function, which accepts one parameter
(data), to be processed.
Args:
f: Requested file name
callback: Callable with one parameter accepting the data
"""
return self.ftp.retrbinary("RETR {}".format(f), callback)
[docs] def storbinary(self, f, file_obj):
""" Store file
You need to specify the file object.
Args:
f: Requested file name
file_obj: File object to be stored
"""
return self.ftp.storbinary("STOR {}".format(f), file_obj)
[docs] def recursively_delete(self, d=None):
""" Recursively deletes content of pwd
WARNING: Destructive!
Args:
d: Directory to enter (None for not entering - root directory)
d: str or None
Raises:
AssertionError: When some of the FTP commands fail.
"""
# Enter the directory
if d:
assert self.cwd(d), "Could not enter directory {}".format(d)
# Work in it
for isdir, name, time in self.ls():
if isdir:
self.recursively_delete(name)
else:
assert self.dele(name), "Could not delete {}!".format(name)
# Go out of it
if d:
# Go to parent directory
assert self.cdup(), "Could not go to parent directory of {}!".format(d)
# And delete it
assert self.rmd(d), "Could not remove directory {}!".format(d)
[docs] def tree(self, d=None):
""" Walks the tree recursively and creates a tree
Base structure is a list. List contains directory content and the type decides whether
it's a directory or a file:
- tuple: it's a file, therefore it represents file's name and time
- dict: it's a directory. Then the dict structure is as follows::
dir: directory name
content: list of directory content (recurse)
Args:
d: Directory to enter(None for no entering - root directory)
Returns:
Directory structure in lists nad dicts.
Raises:
AssertionError: When some of the FTP commands fail.
"""
# Enter the directory
items = []
if d:
assert self.cwd(d), "Could not enter directory {}".format(d)
# Work in it
for isdir, name, time in self.ls():
if isdir:
items.append({"dir": name, "content": self.tree(name), "time": time})
else:
items.append((name, time))
# Go out of it
if d:
# Go to parent directory
assert self.cdup(), "Could not go to parent directory of {}!".format(d)
return items
@property
def filesystem(self):
""" Returns the object structure of the filesystem
Returns:
Root directory
"""
return FTPDirectory(self, "/", self.tree())
# Context management methods
def __enter__(self):
""" Entering the context does nothing, because the client is already connected
"""
return self
def __exit__(self, type, value, traceback):
""" Exiting the context means just calling .close() on the client.
"""
self.close()
[docs]class FTPClientWrapper(FTPClient):
"""
This class is for miq remote file management with FTP. It is useful to make a collection of raw
files related to customer BZ testing directly or indirectly. This will help to easily download
testing related files in a runtime environment.
Args:
entity_path: entity which you want to access like `Datastores`, `Dialogs`.
entrypoint: FTP server entry point
host: FTP server host
login: FTP user
password: FTP password
Usage:
.. code-block:: python
fs = FileServer("miq")
fs.directory_names # list of current available entities
fs.mkd("Dialogs") # create new entity type
fs.rmd("Dialogs") # delete created entity type
fs = FileServer("miq/Reports")
fs.upload("foo.zip") # upload local file
fs.file_names # return list of available files
fs.files() # return list of available file objects
download_path = fs.download("foo.zip") # It will download foo.zip file
f = fs.get_file("foo.zip")
f.path # It will return file storage path
f.link # gives remote file link; can be used to download with 'wget'
f.download() # download file
"""
def __init__(self, entity_path=None, entrypoint=None, host=None, login=None, password=None):
ftp_data = cfme_data.ftpserver
host = host or ftp_data.host
login = login or credentials[ftp_data.credentials]["username"]
password = password or credentials[ftp_data.credentials]["password"]
self.entrypoint = entrypoint or ftp_data.entrypoint
self.entity_path = entity_path
super(FTPClientWrapper, self).__init__(
host=host, login=login, password=password, time_diff=False
)
# Change working directory as per entity_path if provided
self.cwd(os.path.join(self.entrypoint, self.entity_path if entity_path else ""))
@property
def file_names(self):
"""List of remote FTP file names"""
return [name for is_dir, name, _ in self.ls() if not is_dir]
[docs] def get_file(self, name):
"""
Arg:
name: name of remote file
Returns:
FTP file object
"""
if name in self.file_names:
return FTPFileWrapper(self, name=name, parent_dir=self.pwd())
else:
raise FTPException("{} not found".format(name))
[docs] def files(self):
"""List of FTP file objects"""
current_dir = self.pwd()
return [FTPFileWrapper(self, name=name, parent_dir=current_dir) for name in self.file_names]
[docs] def download(self, name, target=None):
""" Download FTP file
Arg:
name: remote file name
target: local path for download else it will consider current working path
Returns:
target path
"""
target = target if target else os.path.join("/tmp", name)
if name not in self.file_names:
raise FTPException("{} not found in {}".format(name, self.pwd()))
with open(target, "wb") as output:
self.retrbinary(name, output.write)
logger.info("'%s' successfully downloaded to '%s'", name, target)
return target
[docs] def upload(self, path, name=None):
""" Upload FTP file
Arg:
path: path of local file
name: set name of file default it will consider original name of uploading file
Returns:
Success of the action
"""
name = name or os.path.basename(path)
if name in self.file_names:
raise FTPException("{} already available in {}".format(name, self.pwd()))
with open(path, "rb") as f:
return self.storbinary(name, f)
@property
def directory_names(self):
""" List remote FTP directories"""
return [name for is_dir, name, _ in self.ls() if is_dir]
[docs]class FTPFileWrapper(FTPFile):
"""This is simple FTPFile class wrapper for FTPClientWrapper"""
@property
def link(self):
"""Remote FTP file url"""
return self.path.replace(self.client.entrypoint, self.client.host)
[docs] def download(self, target=None):
"""
Arg:
target: local path for download else it will consider current working path
Returns:
target path
"""
return self.client.download(self.name, target)