forked from Fediversity/Fediversity
234 lines
9 KiB
Nix
234 lines
9 KiB
Nix
|
## This file is a basic test of Peertube functionalities.
|
||
|
|
||
|
{ pkgs, self }:
|
||
|
|
||
|
let
|
||
|
lib = pkgs.lib;
|
||
|
|
||
|
testVideo = pkgs.copyPathToStore ./green.mp4;
|
||
|
testVideoColour = "#00FF00";
|
||
|
|
||
|
postVideoInBrowser =
|
||
|
pkgs.writers.writePython3Bin "post-video-in-browser"
|
||
|
{
|
||
|
libraries = with pkgs.python3Packages; [ selenium ];
|
||
|
flakeIgnore = [ "E501" ]; # welcome to the 21st century
|
||
|
}
|
||
|
''
|
||
|
import sys
|
||
|
from urllib.parse import urlparse
|
||
|
|
||
|
from selenium import webdriver
|
||
|
from selenium.webdriver.common.by import By
|
||
|
from selenium.webdriver.firefox.options import Options
|
||
|
from selenium.webdriver.support.ui import WebDriverWait
|
||
|
from selenium.common.exceptions import NoSuchElementException
|
||
|
|
||
|
options = Options()
|
||
|
print("########################################", file=sys.stderr)
|
||
|
print("A", file=sys.stderr)
|
||
|
options.add_argument("--headless")
|
||
|
print("########################################", file=sys.stderr)
|
||
|
print("B", file=sys.stderr)
|
||
|
service = webdriver.FirefoxService(executable_path="${lib.getExe pkgs.geckodriver}")
|
||
|
print("########################################", file=sys.stderr)
|
||
|
print("C", file=sys.stderr)
|
||
|
driver = webdriver.Firefox(options=options, service=service)
|
||
|
print("########################################", file=sys.stderr)
|
||
|
print("D", file=sys.stderr)
|
||
|
driver.set_window_size(4096, 2160)
|
||
|
print("########################################", file=sys.stderr)
|
||
|
print("E", file=sys.stderr)
|
||
|
driver.implicitly_wait(360)
|
||
|
print("########################################", file=sys.stderr)
|
||
|
print("F", file=sys.stderr)
|
||
|
wait = WebDriverWait(driver, timeout=360, poll_frequency=10)
|
||
|
print("########################################", file=sys.stderr)
|
||
|
|
||
|
############################################################
|
||
|
# Login
|
||
|
|
||
|
|
||
|
def load(driver, page):
|
||
|
print(f"Loading page {page}...", file=sys.stderr)
|
||
|
driver.get(page)
|
||
|
|
||
|
print("Waiting until page is loaded...", file=sys.stderr)
|
||
|
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
|
||
|
|
||
|
if urlparse(driver.current_url).path == "/login":
|
||
|
print("Hit a login page.", file=sys.stderr)
|
||
|
|
||
|
print("Enter username...", file=sys.stderr)
|
||
|
driver.find_element(By.ID, "username").send_keys("root")
|
||
|
|
||
|
print("Enter password...", file=sys.stderr)
|
||
|
driver.find_element(By.ID, "password").send_keys(sys.argv[1])
|
||
|
|
||
|
print("Click “Login” button...", file=sys.stderr)
|
||
|
driver.find_element(By.XPATH, "//input[@value='Login']").click()
|
||
|
|
||
|
print("Waiting until we are logged-in...", file=sys.stderr)
|
||
|
wait.until(lambda d: urlparse(d.current_url).path == urlparse(page).path)
|
||
|
|
||
|
print("Waiting until page is loaded...", file=sys.stderr)
|
||
|
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
|
||
|
|
||
|
print("Clicking the annoying setup wizard away...", file=sys.stderr)
|
||
|
try:
|
||
|
driver.find_element(By.XPATH, "//input[@value='Remind me later']").click()
|
||
|
except NoSuchElementException:
|
||
|
# Somehow, sometimes, the wizard just does not show up; then we
|
||
|
# ignore the error and carry on like nothing happened.
|
||
|
print("Setup wizard did not show up.", file=sys.stderr)
|
||
|
|
||
|
print(f"Done loading page {page}.", file=sys.stderr)
|
||
|
|
||
|
|
||
|
############################################################
|
||
|
# Upload video and take a screenshot
|
||
|
|
||
|
print("Go to the upload page...", file=sys.stderr)
|
||
|
load(driver, "http://peertube.localhost/videos/upload")
|
||
|
|
||
|
print("Submit video file...", file=sys.stderr)
|
||
|
driver.find_element(By.XPATH, "//input[@type='file']").send_keys("${testVideo}")
|
||
|
|
||
|
print("Wait for file to upload, then publish it...", file=sys.stderr)
|
||
|
publish_button = driver.find_element(By.XPATH, "//button[.//span[normalize-space()='Publish']]")
|
||
|
wait.until(lambda _d: "disabled" not in publish_button.get_attribute("class"))
|
||
|
publish_button.click()
|
||
|
|
||
|
print("Waiting until we are redirected...", file=sys.stderr)
|
||
|
wait.until(lambda d: urlparse(d.current_url).path != "/videos/upload")
|
||
|
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
|
||
|
|
||
|
# FIXME: The video cannot play and we get “Failed to play video”. I
|
||
|
# believe it is a codec problem, and it possibly has to do with video
|
||
|
# codecs enabled in Firefox in headless mode -- after all, who would
|
||
|
# want to play a video? The following is a list of things that I have
|
||
|
# tried without success. Maybe one day we can manage?
|
||
|
#
|
||
|
# video = driver.find_element(By.XPATH, "//video")
|
||
|
# wait.until(lambda _d: not video.get_attribute("src").startswith("blob:"))
|
||
|
# wait.until(lambda d: d.execute_script("return arguments[0].readyState", video) == 4)
|
||
|
# driver.find_element(By.XPATH, "//*[contains(text(), 'Failed to play video')]")
|
||
|
#
|
||
|
# def detect_image_in_screen(d):
|
||
|
# print("Taking a screenshot...", file=sys.stderr)
|
||
|
# d.save_screenshot("/screenshot.png")
|
||
|
# print("Checking it...", file=sys.stderr)
|
||
|
# displayed_colours = subprocess.run(
|
||
|
# [
|
||
|
# "magick",
|
||
|
# "/screenshot.png",
|
||
|
# "-define",
|
||
|
# "histogram:unique-colors=true",
|
||
|
# "-format",
|
||
|
# "%c",
|
||
|
# "histogram:info:",
|
||
|
# ],
|
||
|
# capture_output=True,
|
||
|
# text=True,
|
||
|
# check=True,
|
||
|
# ).stdout
|
||
|
# return bool(re.match(".*#${testVideoColour}.*", displayed_colours, re.S))
|
||
|
#
|
||
|
# print("Wait until the image shows in screen...", file=sys.stderr)
|
||
|
# wait.until(detect_image_in_screen)
|
||
|
|
||
|
############################################################
|
||
|
|
||
|
print("Done; bye!", file=sys.stderr)
|
||
|
driver.close()
|
||
|
'';
|
||
|
|
||
|
acquireRootPassword = pkgs.writeShellScriptBin "acquire-root-password" ''
|
||
|
readonly retry=30
|
||
|
readonly wait=60
|
||
|
|
||
|
for _ in $(seq $retry); do
|
||
|
password=$(journalctl -u peertube | perl -ne '/password: (.*)/ && print $1')
|
||
|
if [ -n "$password" ]; then
|
||
|
echo "$password"
|
||
|
exit 0
|
||
|
fi
|
||
|
sleep $wait
|
||
|
done
|
||
|
|
||
|
echo "Could not acquire root password in $((retry * wait))s."
|
||
|
exit 1
|
||
|
'';
|
||
|
in
|
||
|
|
||
|
pkgs.nixosTest {
|
||
|
name = "peertube";
|
||
|
|
||
|
nodes = {
|
||
|
server =
|
||
|
{ config, ... }:
|
||
|
{
|
||
|
imports = with self.nixosModules; [
|
||
|
fediversity
|
||
|
../vm/garage-vm.nix
|
||
|
../vm/peertube-vm.nix
|
||
|
../vm/interactive-vm.nix
|
||
|
];
|
||
|
|
||
|
virtualisation = {
|
||
|
memorySize = lib.mkVMOverride 8192;
|
||
|
cores = 8;
|
||
|
};
|
||
|
|
||
|
environment.systemPackages = with pkgs; [
|
||
|
python3
|
||
|
firefox-unwrapped
|
||
|
geckodriver
|
||
|
toot
|
||
|
acquireRootPassword
|
||
|
postVideoInBrowser
|
||
|
imagemagick
|
||
|
ffmpeg # to identify videos
|
||
|
expect
|
||
|
];
|
||
|
|
||
|
## FIXME: The CI is very slow, so the default timeout of 120s is not
|
||
|
## good enough. We bump it drastically.
|
||
|
systemd.services.postgresql.serviceConfig.TimeoutSec = lib.mkForce 3600;
|
||
|
|
||
|
environment.variables = {
|
||
|
AWS_ACCESS_KEY_ID = config.services.garage.ensureKeys.peertube.id;
|
||
|
AWS_SECRET_ACCESS_KEY = config.services.garage.ensureKeys.peertube.secret;
|
||
|
PT_INITIAL_ROOT_PASSWORD = "testtest";
|
||
|
};
|
||
|
};
|
||
|
};
|
||
|
|
||
|
testScript =
|
||
|
{ nodes, ... }:
|
||
|
''
|
||
|
server.start()
|
||
|
|
||
|
# FIXME: I think this trick to look for a password can be replaced by
|
||
|
# services.peertube.serviceEnvironmentFile.PT_INITIAL_ROOT_PASSWORD=testtest
|
||
|
|
||
|
with subtest("Peertube starts"):
|
||
|
server.wait_for_unit("peertube.service")
|
||
|
root_password = server.succeed("acquire-root-password").rstrip()
|
||
|
|
||
|
with subtest("Post a video in the browser"):
|
||
|
server.succeed(f"post-video-in-browser {root_password}")
|
||
|
|
||
|
with subtest("Find video in garage"):
|
||
|
server.succeed("mc alias set garage ${nodes.server.fediversity.internal.garage.api.url} --api s3v4 --path off $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY")
|
||
|
video = server.succeed("mc find garage --regex '\\.mp4'").rstrip()
|
||
|
if video == "":
|
||
|
raise Exception("Could not find any .mp4 video stored in Garage")
|
||
|
server.succeed(f"mc cat {video} > /video.mp4")
|
||
|
garage_hash = server.succeed("identify -quiet -format '%#' /video.mp4")
|
||
|
hash = server.succeed("identify -quiet -format '%#' ${testVideo}")
|
||
|
if garage_hash != hash:
|
||
|
raise Exception("The video stored in Garage does not correspond to the original one.")
|
||
|
'';
|
||
|
}
|