123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- import os
- import signal
- import threading
- import urllib2, urllib
- import zipfile
- import tarfile
- import subprocess
- import optparse
- import sys, re
- class SDK:
- def __init__(self):
- try:
-
- self.default_path = os.getcwd()
- if sys.platform == "win32":
- self.mswindows = True
- else:
- self.mswindows = False
-
- home = os.path.expanduser('~')
-
-
- parser = optparse.OptionParser()
- parser.add_option('-u', '--url', dest = 'url', default = 'https://ftp.mozilla.org/pub/mozilla.org/labs/jetpack/addon-sdk-latest.zip')
- parser.add_option('-p', '--path', dest = 'path', default = self.default_path)
- parser.add_option('-b', '--binary', dest = 'binary')
- (options, args) = parser.parse_args()
-
-
- self.link = options.url
-
- if options.path!=self.default_path:
- if self.mswindows:
- self.base_path = home + str(options.path).strip() + '\\'
- else:
- self.base_path = home + str(options.path).strip() + '/'
- else:
- if self.mswindows:
- self.base_path = str(options.path).strip() + '\\'
- else:
- self.base_path = str(options.path).strip() + '/'
- assert ' ' not in self.base_path, "You cannot have a space in your home path. Please remove the space before you continue."
- print('Your Base path is =' + self.base_path)
-
-
- self.bin = options.binary
-
-
-
- i = self.link.rfind('/')
- self.fname = self.link[i+1:]
- z = re.search('zip',self.fname,re.I)
- g = re.search('gz',self.fname,re.I)
- if z:
- print 'zip file present in the URL.'
- self.zip = True
- self.gz = False
- elif g:
- print 'gz file present in the URL'
- self.gz = True
- self.zip = False
- else:
- print 'zip/gz file not present. Check the URL.'
- return
- print("File name is =" + self.fname)
-
-
- self.fpath = self.base_path + self.fname
- print('Your local file path will be=' + self.fpath)
- except AssertionError, e:
- print e.args[0]
- sys.exit(1)
-
- def download(self,url,fpath,fname):
- try:
-
- print("Downloading...Please be patient!")
- urllib.urlretrieve(url,filename = fname)
- print('Download was successful.')
- except ValueError:
- print 'The URL is ether broken or the file does not exist. Please enter the correct URL.'
- raise
- except urllib2.URLError:
- print '\nURL not correct. Check again!'
- raise
-
- def extract(self, zipfilepath, extfile):
- try:
-
- timeout = 30
-
- try:
- os.chdir(zipfilepath)
- except OSError:
-
- print 'O/S Error:' + zipfilepath + 'does not exist'
- raise
-
- if self.zip:
- try:
- f = zipfile.ZipFile(extfile, "r")
- except IOError as (errno, strerror):
- print "I/O error - Cannot perform extract operation: {1}".format(errno, strerror)
- raise
- list = f.namelist()[0]
- temp_name = list.split('/')
- print('Folder Name= ' +temp_name[0])
- self.folder_name = temp_name[0]
- elif self.gz:
- try:
- f = tarfile.open(extfile,'r')
- except IOError as (errno, strerror):
- print "I/O error - Cannot perform extract operation: {1}".format(errno, strerror)
- raise
- list = f.getnames()[0]
- temp_name = list.split('/')
- print('Folder Name= ' +temp_name[0])
- self.folder_name = temp_name[0]
- print ('Starting to Extract...')
-
-
- kill_check = threading.Event()
-
- if self.zip:
-
- if self.mswindows:
- zipfile.ZipFile.extractall(f)
- else:
- p = subprocess.Popen('unzip '+extfile, stdout=subprocess.PIPE, shell=True)
- pid = p.pid
- elif self.gz:
-
- if self.mswindows:
- tarfile.TarFile.extractall(f)
- else:
- p = subprocess.Popen('tar -xf '+extfile, stdout=subprocess.PIPE, shell=True)
- pid = p.pid
-
-
- if self.mswindows==False:
- watch = threading.Timer(timeout, kill_process, args=(pid, kill_check, self.mswindows ))
- watch.start()
- (stdout, stderr) = p.communicate()
- watch.cancel()
- success = not kill_check.isSet()
-
-
- if not success:
- raise RuntimeError
- kill_check.clear()
- print('Extraction Successful.')
- except RuntimeError:
- print "Ending the program"
- sys.exit(1)
- except:
- print "Error during file extraction: ", sys.exc_info()[0]
- raise
-
- def run_testall(self, home_path, folder_name):
- try:
- timeout = 500
- self.new_dir = home_path + folder_name
- try:
- os.chdir(self.new_dir)
- except OSError:
-
- print 'O/S Error: Jetpack directory does not exist at ' + self.new_dir
- raise
- print '\nStarting tests...'
-
-
- kill_check = threading.Event()
-
- log_path = home_path + 'tests.log'
-
- if self.bin != None:
- if self.mswindows:
- p = subprocess.Popen("bin\\activate && cfx testall -a firefox -b \"" + self.bin + "\"" , stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- proc_handle = p._handle
- (stdout,stderr) = p.communicate()
- else:
- p = subprocess.Popen('. bin/activate; cfx testall -a firefox -b ' + self.bin , stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- pid = p.pid
- (stdout,stderr) = p.communicate()
- elif self.bin == None:
- if self.mswindows:
- p=subprocess.Popen('bin\\activate && cfx testall -a firefox > '+log_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- proc_handle = p._handle
- (stdout,stderr) = p.communicate()
- else:
- p = subprocess.Popen('. bin/activate; cfx testall -a firefox > '+log_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- pid = p.pid
- (stdout,stderr) = p.communicate()
-
-
- f=open(log_path,"w")
- f.write(stdout+stderr)
- f.close()
-
- if self.mswindows:
- watch = threading.Timer(timeout, kill_process, args=(proc_handle, kill_check, self.mswindows))
- else:
- watch = threading.Timer(timeout, kill_process, args=(pid, kill_check, self.mswindows))
- watch.start()
- watch.cancel()
- success = not kill_check.isSet()
- if not success:
- raise RuntimeError
- kill_check.clear()
-
- if p.returncode!=0:
- print('\nAll tests were not successful. Check the test-logs in the jetpack directory.')
- result_sdk(home_path)
-
- raise RuntimeError
- else:
- ret_code=result_sdk(home_path)
- if ret_code==0:
- print('\nAll tests were successful. Yay \o/ . Running a sample package test now...')
- else:
- print ('\nThere were errors during the tests.Take a look at logs')
- raise RuntimeError
- except RuntimeError:
- print "Ending the program"
- sys.exit(1)
- except:
- print "Error during the testall command execution:", sys.exc_info()[0]
- raise
-
- def package(self, example_dir):
- try:
- timeout = 30
-
- print '\nNow Running packaging tests...'
-
- kill_check = threading.Event()
-
- exlog_path = example_dir + 'test-example.log'
-
- if self.bin!=None:
- if self.mswindows:
- p = subprocess.Popen('bin\\activate && cfx run --pkgdir examples\\reading-data --static-args="{\\"quitWhenDone\\":true}" -b \"" + self.bin + "\"' , stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- proc_handle = p._handle
- (stdout, stderr) = p.communicate()
- else:
- p = subprocess.Popen('. bin/activate; cfx run --pkgdir examples/reading-data --static-args=\'{\"quitWhenDone\":true}\' -b ' + self.bin , stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- pid = p.pid
- (stdout, stderr) = p.communicate()
- elif self.bin==None:
- if self.mswindows:
- p = subprocess.Popen('bin\\activate && cfx run --pkgdir examples\\reading-data --static-args="{\\"quitWhenDone\\":true}"', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- proc_handle = p._handle
- (stdout, stderr) = p.communicate()
- else:
- p = subprocess.Popen('. bin/activate; cfx run --pkgdir examples/reading-data --static-args=\'{\"quitWhenDone\":true}\' ', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- pid = p.pid
- (stdout, stderr) = p.communicate()
-
-
- f=open(exlog_path,"w")
- f.write(stdout+stderr)
- f.close()
-
-
- if self.mswindows:
- watch = threading.Timer(timeout, kill_process, args=(proc_handle, kill_check, self.mswindows))
- else:
- watch = threading.Timer(timeout, kill_process, args=(pid, kill_check, self.mswindows))
- watch.start()
- watch.cancel()
- success = not kill_check.isSet()
- if not success:
- raise RuntimeError
- kill_check.clear()
- if p.returncode != 0:
- print('\nSample tests were not executed correctly. Check the test-example log in jetpack diretory.')
- result_example(example_dir)
- raise RuntimeError
- else:
- ret_code=result_example(example_dir)
- if ret_code==0:
- print('\nAll tests pass. The SDK is working! Yay \o/')
- else:
- print ('\nTests passed with warning.Take a look at logs')
- sys.exit(1)
-
- except RuntimeError:
- print "Ending program"
- sys.exit(1)
- except:
- print "Error during running sample tests:", sys.exc_info()[0]
- raise
-
- def result_sdk(sdk_dir):
- log_path = sdk_dir + 'tests.log'
- print 'Results are logged at:' + log_path
- try:
- f = open(log_path,'r')
-
- except IOError :
- print 'I/O error - Cannot open test log at ' + log_path
- raise
- for line in reversed(open(log_path).readlines()):
- if line.strip()=='FAIL':
- print ('\nOverall result - FAIL. Look at the test log at '+log_path)
- return 1
- return 0
-
- def result_example(sdk_dir):
- exlog_path = sdk_dir + 'test-example.log'
- print 'Sample test results are logged at:' + exlog_path
- try:
- f = open(exlog_path,'r')
-
- except IOError :
- print 'I/O error - Cannot open sample test log at ' + exlog_path
- raise
-
-
- for line in reversed(open(exlog_path).readlines()):
- if line.strip()=='FAIL':
- print ('\nOverall result for Sample tests - FAIL. Look at the test log at '+exlog_path)
- return 1
- return 0
- def kill_process(process, kill_check, mswindows):
- print '\nProcess Timedout. Killing the process. Please Rerun this script.'
- if mswindows:
- win32api.TerminateProcess(process, -1)
- else:
- os.kill(process, signal.SIGKILL)
- kill_check.set()
- return
- if __name__ == "__main__":
- obj = SDK()
- obj.download(obj.link,obj.fpath,obj.fname)
- obj.extract(obj.base_path,obj.fname)
- obj.run_testall(obj.base_path,obj.folder_name)
- obj.package(obj.base_path)
|