239 lines
7.5 KiB
Python
239 lines
7.5 KiB
Python
# -*- coding: UTF-8 -*-
|
|
from dogtail.utils import isA11yEnabled, enableA11y
|
|
if isA11yEnabled() is False:
|
|
enableA11y(True)
|
|
|
|
from time import time, sleep
|
|
from functools import wraps
|
|
from os import strerror, errno, system
|
|
from signal import signal, alarm, SIGALRM
|
|
from subprocess import Popen, PIPE
|
|
from behave import step
|
|
from gi.repository import GLib, Gio
|
|
import fcntl, os
|
|
|
|
from dogtail.rawinput import keyCombo, absoluteMotion, pressKey
|
|
from dogtail.tree import root
|
|
from unittest import TestCase
|
|
|
|
|
|
# Create a dummy unittest class to have nice assertions
|
|
class dummy(TestCase):
|
|
def runTest(self): # pylint: disable=R0201
|
|
assert True
|
|
|
|
|
|
def wait_until(my_lambda, element, timeout=30, period=0.25):
|
|
"""
|
|
This function keeps running lambda with specified params until the result is True
|
|
or timeout is reached
|
|
Sample usages:
|
|
* wait_until(lambda x: x.name != 'Loading...', context.app)
|
|
Pause until window title is not 'Loading...'.
|
|
Return False if window title is still 'Loading...'
|
|
Throw an exception if window doesn't exist after default timeout
|
|
|
|
* wait_until(lambda element, expected: x.text == expected, element, ('Expected text'))
|
|
Wait until element text becomes the expected (passed to the lambda)
|
|
|
|
"""
|
|
exception_thrown = None
|
|
mustend = int(time()) + timeout
|
|
while int(time()) < mustend:
|
|
try:
|
|
if my_lambda(element):
|
|
return True
|
|
except Exception as e:
|
|
# If lambda has thrown the exception we'll re-raise it later
|
|
# and forget about if lambda passes
|
|
exception_thrown = e
|
|
sleep(period)
|
|
if exception_thrown:
|
|
raise exception_thrown
|
|
else:
|
|
return False
|
|
|
|
|
|
class TimeoutError(Exception):
|
|
"""
|
|
Timeout exception class for limit_execution_time_to function
|
|
"""
|
|
pass
|
|
|
|
|
|
def limit_execution_time_to(
|
|
seconds=10, error_message=strerror(errno.ETIME)):
|
|
"""
|
|
Decorator to limit function execution to specified limit
|
|
"""
|
|
def decorator(func):
|
|
def _handle_timeout(signum, frame):
|
|
raise TimeoutError(error_message)
|
|
|
|
def wrapper(*args, **kwargs):
|
|
signal(SIGALRM, _handle_timeout)
|
|
alarm(seconds)
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
finally:
|
|
alarm(0)
|
|
return result
|
|
|
|
return wraps(func)(wrapper)
|
|
|
|
return decorator
|
|
|
|
|
|
class App(object):
|
|
"""
|
|
This class does all basic events with the app
|
|
"""
|
|
def __init__(
|
|
self, appName, shortcut='<Control><Q>', a11yAppName=None,
|
|
forceKill=True, parameters='', recordVideo=False):
|
|
"""
|
|
Initialize object App
|
|
appName command to run the app
|
|
shortcut default quit shortcut
|
|
a11yAppName app's a11y name is different than binary
|
|
forceKill is the app supposed to be kill before/after test?
|
|
parameters has the app any params needed to start? (only for startViaCommand)
|
|
recordVideo start gnome-shell recording while running the app
|
|
"""
|
|
self.appCommand = appName
|
|
self.shortcut = shortcut
|
|
self.forceKill = forceKill
|
|
self.parameters = parameters
|
|
self.internCommand = self.appCommand.lower()
|
|
self.a11yAppName = a11yAppName
|
|
self.recordVideo = recordVideo
|
|
self.pid = None
|
|
|
|
# a way of overcoming overview autospawn when mouse in 1,1 from start
|
|
pressKey('Esc')
|
|
absoluteMotion(100, 100, 2)
|
|
|
|
# attempt to make a recording of the test
|
|
if self.recordVideo:
|
|
keyCombo('<Control><Alt><Shift>R')
|
|
|
|
def isRunning(self):
|
|
"""
|
|
Is the app running?
|
|
"""
|
|
if self.a11yAppName is None:
|
|
self.a11yAppName = self.internCommand
|
|
|
|
# Trap weird bus errors
|
|
for attempt in xrange(0, 30):
|
|
sleep(1)
|
|
try:
|
|
return self.a11yAppName in [x.name for x in root.applications()]
|
|
except GLib.GError:
|
|
continue
|
|
raise Exception("10 at-spi errors, seems that bus is blocked")
|
|
|
|
def kill(self):
|
|
"""
|
|
Kill the app via 'killall'
|
|
"""
|
|
if self.recordVideo:
|
|
keyCombo('<Control><Alt><Shift>R')
|
|
|
|
try:
|
|
self.process.kill()
|
|
except:
|
|
# Fall back to killall
|
|
Popen("killall " + self.appCommand, shell=True).wait()
|
|
|
|
def startViaCommand(self):
|
|
"""
|
|
Start the app via command
|
|
"""
|
|
if self.forceKill and self.isRunning():
|
|
self.kill()
|
|
assert not self.isRunning(), "Application cannot be stopped"
|
|
|
|
#command = "%s %s" % (self.appCommand, self.parameters)
|
|
#self.pid = run(command, timeout=5)
|
|
self.process = Popen(self.appCommand.split() + self.parameters.split(),
|
|
stdout=PIPE, stderr=PIPE, bufsize=0)
|
|
self.pid = self.process.pid
|
|
|
|
assert self.isRunning(), "Application failed to start"
|
|
return root.application(self.a11yAppName)
|
|
|
|
def closeViaShortcut(self):
|
|
"""
|
|
Close the app via shortcut
|
|
"""
|
|
if not self.isRunning():
|
|
raise Exception("App is not running")
|
|
|
|
keyCombo(self.shortcut)
|
|
assert not self.isRunning(), "Application cannot be stopped"
|
|
|
|
|
|
@step(u'Start a new Evolution instance')
|
|
def start_new_evolution_instance(context):
|
|
context.app = context.app_class.startViaCommand()
|
|
|
|
|
|
def cleanup():
|
|
# Remove cached data and settings
|
|
folders = ['~/.local/share/evolution', '~/.cache/evolution', '~/.config/evolution']
|
|
for folder in folders:
|
|
system("rm -rf %s > /dev/null" % folder)
|
|
|
|
# Clean up goa data
|
|
system("rm -rf ~/.config/goa-1.0/accounts.conf")
|
|
system("killall goa-daemon 2&> /dev/null")
|
|
|
|
# Reset GSettings
|
|
schemas = [x for x in Gio.Settings.list_schemas() if 'evolution' in x.lower()]
|
|
for schema in schemas:
|
|
system("gsettings reset-recursively %s" % schema)
|
|
|
|
# Skip warning dialog
|
|
system("gsettings set org.gnome.evolution.shell skip-warning-dialog true")
|
|
# Show switcher buttons as icons (to minimize tree scrolling)
|
|
system("gsettings set org.gnome.evolution.shell buttons-style icons")
|
|
# Skip default mailer handler dialog
|
|
system("gsettings set org.gnome.evolution.mail prompt-check-if-default-mailer false")
|
|
|
|
|
|
def check_for_errors(context):
|
|
"""Check that no error is displayed on Evolution UI"""
|
|
# Don't try to check for errors on dead app
|
|
if not context.app or context.app.dead:
|
|
return
|
|
alerts = context.app.findChildren(lambda x: x.roleName == 'alert')
|
|
if not alerts:
|
|
# alerts can also return None
|
|
return
|
|
alerts = filter(lambda x: x.showing, alerts)
|
|
if len(alerts) > 0:
|
|
labels = alerts[0].findChildren(lambda x: x.roleName == 'label')
|
|
messages = [x.name for x in labels]
|
|
|
|
if alerts[0].name != 'Error' and alerts[0].showing:
|
|
# Erase the configuration and start all over again
|
|
system("evolution --force-shutdown &> /dev/null")
|
|
|
|
# Remove previous data
|
|
folders = ['~/.local/share/evolution', '~/.cache/evolution', '~/.config/evolution']
|
|
for folder in folders:
|
|
system("rm -rf %s > /dev/null" % folder)
|
|
|
|
raise RuntimeError("Error occurred: %s" % messages)
|
|
|
|
|
|
def non_block_read(output):
|
|
fd = output.fileno()
|
|
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
|
|
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
|
try:
|
|
return output.read()
|
|
except:
|
|
return ""
|