Fediversity/services/tests/peertube.nix

234 lines
9 KiB
Nix

## This file is a basic test of Peertube functionalities.
{ pkgs, self }:
let
lib = pkgs.lib;
testVideo = pkgs.copyPathToStore ./green.mp4;
testVideoColour = "#00FF00";
postVideoInBrowser =
pkgs.writers.writePython3Bin "post-video-in-browser"
{
libraries = with pkgs.python3Packages; [ selenium ];
flakeIgnore = [ "E501" ]; # welcome to the 21st century
}
''
import sys
from urllib.parse import urlparse
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import NoSuchElementException
options = Options()
print("########################################", file=sys.stderr)
print("A", file=sys.stderr)
options.add_argument("--headless")
print("########################################", file=sys.stderr)
print("B", file=sys.stderr)
service = webdriver.FirefoxService(executable_path="${lib.getExe pkgs.geckodriver}")
print("########################################", file=sys.stderr)
print("C", file=sys.stderr)
driver = webdriver.Firefox(options=options, service=service)
print("########################################", file=sys.stderr)
print("D", file=sys.stderr)
driver.set_window_size(4096, 2160)
print("########################################", file=sys.stderr)
print("E", file=sys.stderr)
driver.implicitly_wait(360)
print("########################################", file=sys.stderr)
print("F", file=sys.stderr)
wait = WebDriverWait(driver, timeout=360, poll_frequency=10)
print("########################################", file=sys.stderr)
############################################################
# Login
def load(driver, page):
print(f"Loading page {page}...", file=sys.stderr)
driver.get(page)
print("Waiting until page is loaded...", file=sys.stderr)
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
if urlparse(driver.current_url).path == "/login":
print("Hit a login page.", file=sys.stderr)
print("Enter username...", file=sys.stderr)
driver.find_element(By.ID, "username").send_keys("root")
print("Enter password...", file=sys.stderr)
driver.find_element(By.ID, "password").send_keys(sys.argv[1])
print("Click Login button...", file=sys.stderr)
driver.find_element(By.XPATH, "//input[@value='Login']").click()
print("Waiting until we are logged-in...", file=sys.stderr)
wait.until(lambda d: urlparse(d.current_url).path == urlparse(page).path)
print("Waiting until page is loaded...", file=sys.stderr)
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
print("Clicking the annoying setup wizard away...", file=sys.stderr)
try:
driver.find_element(By.XPATH, "//input[@value='Remind me later']").click()
except NoSuchElementException:
# Somehow, sometimes, the wizard just does not show up; then we
# ignore the error and carry on like nothing happened.
print("Setup wizard did not show up.", file=sys.stderr)
print(f"Done loading page {page}.", file=sys.stderr)
############################################################
# Upload video and take a screenshot
print("Go to the upload page...", file=sys.stderr)
load(driver, "http://peertube.localhost/videos/upload")
print("Submit video file...", file=sys.stderr)
driver.find_element(By.XPATH, "//input[@type='file']").send_keys("${testVideo}")
print("Wait for file to upload, then publish it...", file=sys.stderr)
publish_button = driver.find_element(By.XPATH, "//button[.//span[normalize-space()='Publish']]")
wait.until(lambda _d: "disabled" not in publish_button.get_attribute("class"))
publish_button.click()
print("Waiting until we are redirected...", file=sys.stderr)
wait.until(lambda d: urlparse(d.current_url).path != "/videos/upload")
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
# FIXME: The video cannot play and we get Failed to play video. I
# believe it is a codec problem, and it possibly has to do with video
# codecs enabled in Firefox in headless mode -- after all, who would
# want to play a video? The following is a list of things that I have
# tried without success. Maybe one day we can manage?
#
# video = driver.find_element(By.XPATH, "//video")
# wait.until(lambda _d: not video.get_attribute("src").startswith("blob:"))
# wait.until(lambda d: d.execute_script("return arguments[0].readyState", video) == 4)
# driver.find_element(By.XPATH, "//*[contains(text(), 'Failed to play video')]")
#
# def detect_image_in_screen(d):
# print("Taking a screenshot...", file=sys.stderr)
# d.save_screenshot("/screenshot.png")
# print("Checking it...", file=sys.stderr)
# displayed_colours = subprocess.run(
# [
# "magick",
# "/screenshot.png",
# "-define",
# "histogram:unique-colors=true",
# "-format",
# "%c",
# "histogram:info:",
# ],
# capture_output=True,
# text=True,
# check=True,
# ).stdout
# return bool(re.match(".*#${testVideoColour}.*", displayed_colours, re.S))
#
# print("Wait until the image shows in screen...", file=sys.stderr)
# wait.until(detect_image_in_screen)
############################################################
print("Done; bye!", file=sys.stderr)
driver.close()
'';
acquireRootPassword = pkgs.writeShellScriptBin "acquire-root-password" ''
readonly retry=30
readonly wait=60
for _ in $(seq $retry); do
password=$(journalctl -u peertube | perl -ne '/password: (.*)/ && print $1')
if [ -n "$password" ]; then
echo "$password"
exit 0
fi
sleep $wait
done
echo "Could not acquire root password in $((retry * wait))s."
exit 1
'';
in
pkgs.nixosTest {
name = "peertube";
nodes = {
server =
{ config, ... }:
{
imports = with self.nixosModules; [
fediversity
../vm/garage-vm.nix
../vm/peertube-vm.nix
../vm/interactive-vm.nix
];
virtualisation = {
memorySize = lib.mkVMOverride 8192;
cores = 8;
};
environment.systemPackages = with pkgs; [
python3
firefox-unwrapped
geckodriver
toot
acquireRootPassword
postVideoInBrowser
imagemagick
ffmpeg # to identify videos
expect
];
## FIXME: The CI is very slow, so the default timeout of 120s is not
## good enough. We bump it drastically.
systemd.services.postgresql.serviceConfig.TimeoutSec = lib.mkForce 3600;
environment.variables = {
AWS_ACCESS_KEY_ID = config.services.garage.ensureKeys.peertube.id;
AWS_SECRET_ACCESS_KEY = config.services.garage.ensureKeys.peertube.secret;
PT_INITIAL_ROOT_PASSWORD = "testtest";
};
};
};
testScript =
{ nodes, ... }:
''
server.start()
# FIXME: I think this trick to look for a password can be replaced by
# services.peertube.serviceEnvironmentFile.PT_INITIAL_ROOT_PASSWORD=testtest
with subtest("Peertube starts"):
server.wait_for_unit("peertube.service")
root_password = server.succeed("acquire-root-password").rstrip()
with subtest("Post a video in the browser"):
server.succeed(f"post-video-in-browser {root_password}")
with subtest("Find video in garage"):
server.succeed("mc alias set garage ${nodes.server.fediversity.internal.garage.api.url} --api s3v4 --path off $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY")
video = server.succeed("mc find garage --regex '\\.mp4'").rstrip()
if video == "":
raise Exception("Could not find any .mp4 video stored in Garage")
server.succeed(f"mc cat {video} > /video.mp4")
garage_hash = server.succeed("identify -quiet -format '%#' /video.mp4")
hash = server.succeed("identify -quiet -format '%#' ${testVideo}")
if garage_hash != hash:
raise Exception("The video stored in Garage does not correspond to the original one.")
'';
}