Skip to content

ot_ssh

korobka.opentrons.ot_ssh ¤

TODO: Add Docstring for the module.

Classes¤

OpentronSSH(local_address) ¤

This class handles SSH-based interactions with the Opentron robot.

Initializes the SSH connection parameters.

Source code in korobka/opentrons/ot_ssh.py
18
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(self, local_address: str):
    """Initializes the SSH connection parameters."""
    self.local_address = local_address
    self.ot_address = "169.254.24.120"

    equipment_data_dir = os.path.dirname(self.local_address)
    self.access_folder = os.path.join(equipment_data_dir, "access")

    os.makedirs(self.access_folder, exist_ok=True)

    logger.info("Initializing OpentronSSH...")

    self.core = self.check_or_generate_ssh_key()
Functions¤
check(process_name) ¤

Checks the status of a specific command execution on the Opentron.

Source code in korobka/opentrons/ot_ssh.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def check(self, process_name: str) -> None:
    """Checks the status of a specific command execution on the Opentron."""
    while True:
        vals = []
        for val in ["status.txt", "error.txt"]:
            self.get(val, val)
            with open(os.path.join(self.local_address, val)) as f:
                vals.append(f.read().strip())

        status, error = vals
        if status == "completed" and error == "None":
            logger.info(f"Action defined by {process_name} completed successfully.")
            break
        elif error != "None":
            logger.error(f"Action defined by {process_name} failed: {error}")
            raise RuntimeError("OT went wrong")

        time.sleep(1.5)
check_or_generate_ssh_key() ¤

Checks if an SSH key exists in the access folder. If not, generates a new one.

Source code in korobka/opentrons/ot_ssh.py
32
33
34
35
36
37
38
39
40
41
def check_or_generate_ssh_key(self) -> str:
    """Checks if an SSH key exists in the access folder. If not, generates a new one."""
    for file in os.listdir(self.access_folder):
        if file.startswith("ot_key_") and not file.endswith(".pub"):
            ssh_key_path = os.path.join(self.access_folder, file).replace("\\", "/")
            logger.info(f"Found existing SSH key: {ssh_key_path}")
            self.ssh_key_path = ssh_key_path
            return f"-O -i {ssh_key_path}"

    return self.generate_ssh_key()
execute(file_name, control=True) ¤

Sends a specific file to the Opentron and waits for completion.

Source code in korobka/opentrons/ot_ssh.py
168
169
170
171
172
173
174
def execute(self, file_name: str, control: bool = True) -> None:
    """Sends a specific file to the Opentron and waits for completion."""
    self.upload(file_name)
    logger.info(f"Triggered action defined by {file_name}")
    if control:
        time.sleep(1.5)  # gives enough time for the status files to be established
        self.check(file_name)
execute_bash_command(command, timeout=5) ¤

Executes a given shell command with logging and return code check.

Source code in korobka/opentrons/ot_ssh.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def execute_bash_command(self, command: str, timeout: int = 5) -> None:
    """Executes a given shell command with logging and return code check."""
    command_list = shlex.split(command)
    logger.debug(f"[SSH] Running command: {' '.join(command_list)}")

    try:
        result = subprocess.run(
            command_list, capture_output=True, text=True, timeout=timeout, stdin=subprocess.DEVNULL
        )

        logger.debug(f"[SSH] Exit code: {result.returncode}")

        if result.stdout.strip():
            logger.debug(f"[SSH] STDOUT:\n{result.stdout.strip()}")
        if result.stderr.strip():
            logger.warning(f"[SSH] STDERR:\n{result.stderr.strip()}")

        if result.returncode != 0:
            logger.error(f"[SSH] Command failed: {command_list}")
            # raise RuntimeError("SSH command failed.")

    except subprocess.TimeoutExpired:
        logger.error(f"[SSH] Command timed out: {command_list}")
    except Exception as e:
        logger.exception(f"[SSH] Unexpected error during execution: {e}")
generate_ssh_key() ¤

Generates a new RSA SSH key pair and registers the public key with the Opentron robot.

Source code in korobka/opentrons/ot_ssh.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def generate_ssh_key(self) -> str:
    """Generates a new RSA SSH key pair and registers the public key with the Opentron robot."""
    device_name = platform.node()
    ssh_key_name = f"ot_key_{device_name}"
    ssh_key_path = os.path.join(self.access_folder, ssh_key_name).replace("\\", "/")

    logger.info(f"Generating new RSA SSH key: {ssh_key_path}")

    keygen_command = [
        "ssh-keygen",
        "-t",
        "rsa",
        "-b",
        "4096",
        "-N",
        "",
        "-f",
        ssh_key_path,
    ]
    subprocess.run(keygen_command, check=True)

    self.register_public_key(ssh_key_path + ".pub")

    logger.info(f"SSH key registered successfully: {ssh_key_path}")
    self.ssh_key_path = ssh_key_path

    return f"-O -i {ssh_key_path}"
get(remote_file, local_destination) ¤

Fetches a file from Opentron's /root/ directory.

Source code in korobka/opentrons/ot_ssh.py
144
145
146
147
def get(self, remote_file: str, local_destination: str) -> None:
    """Fetches a file from Opentron's /root/ directory."""
    command = f'scp {self.core} root@{self.ot_address}:/root/{remote_file} "{os.path.join(self.local_address, local_destination)}"'  # NOQA
    self.execute_bash_command(command)
register_public_key(pub_key_path) ¤

Registers the public key with the Opentron robot via API.

Source code in korobka/opentrons/ot_ssh.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def register_public_key(self, pub_key_path: str) -> None:
    """Registers the public key with the Opentron robot via API."""
    with open(pub_key_path) as key_file:
        public_key = key_file.read().strip()

    logger.info("Sending public key to Opentron robot...")

    if os.name == "nt":
        command = [
            "powershell",
            "-Command",
            f"@{{key = Get-Content {pub_key_path} | Out-String}} | ConvertTo-Json | "
            f"Invoke-WebRequest -Method Post -ContentType 'application/json' -Uri http://{self.ot_address}:31950/server/ssh_keys",
        ]
    else:
        command = [
            "curl",
            "-H",
            "Content-Type: application/json",
            "-d",
            json.dumps({"key": public_key}),
            f"http://{self.ot_address}:31950/server/ssh_keys",
        ]

    result = subprocess.run(command, capture_output=True, text=True)

    if result.returncode != 0:
        error_msg = f"Failed to register SSH key: {result.stderr.strip()}"
        logger.error(error_msg)
        raise Exception(error_msg)
    else:
        logger.info("Public key successfully registered with Opentron robot.")
set_address(new_address) ¤

Updates the Opentron's IP address dynamically.

Source code in korobka/opentrons/ot_ssh.py
176
177
178
179
def set_address(self, new_address: str) -> None:
    """Updates the Opentron's IP address dynamically."""
    logger.info(f"Updating Opentron IP from {self.ot_address} to {new_address}")
    self.ot_address = new_address
start_up(file_name) ¤

Runs opentrons_execute on a specific file in Opentron.

Source code in korobka/opentrons/ot_ssh.py
138
139
140
141
142
def start_up(self, file_name: str) -> None:
    """Runs opentrons_execute on a specific file in Opentron."""
    command = f'ssh -i {self.ssh_key_path} root@{self.ot_address} "opentrons_execute {file_name}"'
    logger.info("Executing %s on Opentron...", file_name)
    threading.Thread(target=self.execute_bash_command, args=(command, 40)).start()
upload(file_path) ¤

Uploads a file to Opentron's /root/ directory.

Source code in korobka/opentrons/ot_ssh.py
130
131
132
133
134
135
136
def upload(self, file_path: str) -> None:
    """Uploads a file to Opentron's /root/ directory."""
    file_name = os.path.basename(file_path)
    file_path = os.path.abspath(file_path)
    command = f'scp {self.core} "{file_path}" root@{self.ot_address}:/root/{file_name}'
    logger.info(f"Uploading {file_name} to Opentron...")
    self.execute_bash_command(command)