This class handles SSH-based interactions with the Opentron robot.
Initializes the SSH connection parameters.
Source code in korobka/opentrons/ot_ssh.py
18
19
20
21
22
23
24
25
26
27
28
29
30 | def __init__(self, local_address: str):
"""Initializes the SSH connection parameters."""
self.local_address = local_address
self.ot_address = "169.254.24.120"
equipment_data_dir = os.path.dirname(self.local_address)
self.access_folder = os.path.join(equipment_data_dir, "access")
os.makedirs(self.access_folder, exist_ok=True)
logger.info("Initializing OpentronSSH...")
self.core = self.check_or_generate_ssh_key()
|
Functions
check(process_name)
Checks the status of a specific command execution on the Opentron.
Source code in korobka/opentrons/ot_ssh.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 | def check(self, process_name: str) -> None:
"""Checks the status of a specific command execution on the Opentron."""
while True:
vals = []
for val in ["status.txt", "error.txt"]:
self.get(val, val)
with open(os.path.join(self.local_address, val)) as f:
vals.append(f.read().strip())
status, error = vals
if status == "completed" and error == "None":
logger.info(f"Action defined by {process_name} completed successfully.")
break
elif error != "None":
logger.error(f"Action defined by {process_name} failed: {error}")
raise RuntimeError("OT went wrong")
time.sleep(1.5)
|
check_or_generate_ssh_key()
Checks if an SSH key exists in the access folder. If not, generates a new one.
Source code in korobka/opentrons/ot_ssh.py
32
33
34
35
36
37
38
39
40
41 | def check_or_generate_ssh_key(self) -> str:
"""Checks if an SSH key exists in the access folder. If not, generates a new one."""
for file in os.listdir(self.access_folder):
if file.startswith("ot_key_") and not file.endswith(".pub"):
ssh_key_path = os.path.join(self.access_folder, file).replace("\\", "/")
logger.info(f"Found existing SSH key: {ssh_key_path}")
self.ssh_key_path = ssh_key_path
return f"-O -i {ssh_key_path}"
return self.generate_ssh_key()
|
execute(file_name, control=True)
Sends a specific file to the Opentron and waits for completion.
Source code in korobka/opentrons/ot_ssh.py
168
169
170
171
172
173
174 | def execute(self, file_name: str, control: bool = True) -> None:
"""Sends a specific file to the Opentron and waits for completion."""
self.upload(file_name)
logger.info(f"Triggered action defined by {file_name}")
if control:
time.sleep(1.5) # gives enough time for the status files to be established
self.check(file_name)
|
execute_bash_command(command, timeout=5)
Executes a given shell command with logging and return code check.
Source code in korobka/opentrons/ot_ssh.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 | def execute_bash_command(self, command: str, timeout: int = 5) -> None:
"""Executes a given shell command with logging and return code check."""
command_list = shlex.split(command)
logger.debug(f"[SSH] Running command: {' '.join(command_list)}")
try:
result = subprocess.run(
command_list, capture_output=True, text=True, timeout=timeout, stdin=subprocess.DEVNULL
)
logger.debug(f"[SSH] Exit code: {result.returncode}")
if result.stdout.strip():
logger.debug(f"[SSH] STDOUT:\n{result.stdout.strip()}")
if result.stderr.strip():
logger.warning(f"[SSH] STDERR:\n{result.stderr.strip()}")
if result.returncode != 0:
logger.error(f"[SSH] Command failed: {command_list}")
# raise RuntimeError("SSH command failed.")
except subprocess.TimeoutExpired:
logger.error(f"[SSH] Command timed out: {command_list}")
except Exception as e:
logger.exception(f"[SSH] Unexpected error during execution: {e}")
|
generate_ssh_key()
Generates a new RSA SSH key pair and registers the public key with the Opentron robot.
Source code in korobka/opentrons/ot_ssh.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 | def generate_ssh_key(self) -> str:
"""Generates a new RSA SSH key pair and registers the public key with the Opentron robot."""
device_name = platform.node()
ssh_key_name = f"ot_key_{device_name}"
ssh_key_path = os.path.join(self.access_folder, ssh_key_name).replace("\\", "/")
logger.info(f"Generating new RSA SSH key: {ssh_key_path}")
keygen_command = [
"ssh-keygen",
"-t",
"rsa",
"-b",
"4096",
"-N",
"",
"-f",
ssh_key_path,
]
subprocess.run(keygen_command, check=True)
self.register_public_key(ssh_key_path + ".pub")
logger.info(f"SSH key registered successfully: {ssh_key_path}")
self.ssh_key_path = ssh_key_path
return f"-O -i {ssh_key_path}"
|
get(remote_file, local_destination)
Fetches a file from Opentron's /root/ directory.
Source code in korobka/opentrons/ot_ssh.py
| def get(self, remote_file: str, local_destination: str) -> None:
"""Fetches a file from Opentron's /root/ directory."""
command = f'scp {self.core} root@{self.ot_address}:/root/{remote_file} "{os.path.join(self.local_address, local_destination)}"' # NOQA
self.execute_bash_command(command)
|
register_public_key(pub_key_path)
Registers the public key with the Opentron robot via API.
Source code in korobka/opentrons/ot_ssh.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 | def register_public_key(self, pub_key_path: str) -> None:
"""Registers the public key with the Opentron robot via API."""
with open(pub_key_path) as key_file:
public_key = key_file.read().strip()
logger.info("Sending public key to Opentron robot...")
if os.name == "nt":
command = [
"powershell",
"-Command",
f"@{{key = Get-Content {pub_key_path} | Out-String}} | ConvertTo-Json | "
f"Invoke-WebRequest -Method Post -ContentType 'application/json' -Uri http://{self.ot_address}:31950/server/ssh_keys",
]
else:
command = [
"curl",
"-H",
"Content-Type: application/json",
"-d",
json.dumps({"key": public_key}),
f"http://{self.ot_address}:31950/server/ssh_keys",
]
result = subprocess.run(command, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f"Failed to register SSH key: {result.stderr.strip()}"
logger.error(error_msg)
raise Exception(error_msg)
else:
logger.info("Public key successfully registered with Opentron robot.")
|
set_address(new_address)
Updates the Opentron's IP address dynamically.
Source code in korobka/opentrons/ot_ssh.py
| def set_address(self, new_address: str) -> None:
"""Updates the Opentron's IP address dynamically."""
logger.info(f"Updating Opentron IP from {self.ot_address} to {new_address}")
self.ot_address = new_address
|
start_up(file_name)
Runs opentrons_execute on a specific file in Opentron.
Source code in korobka/opentrons/ot_ssh.py
| def start_up(self, file_name: str) -> None:
"""Runs opentrons_execute on a specific file in Opentron."""
command = f'ssh -i {self.ssh_key_path} root@{self.ot_address} "opentrons_execute {file_name}"'
logger.info("Executing %s on Opentron...", file_name)
threading.Thread(target=self.execute_bash_command, args=(command, 40)).start()
|
upload(file_path)
Uploads a file to Opentron's /root/ directory.
Source code in korobka/opentrons/ot_ssh.py
130
131
132
133
134
135
136 | def upload(self, file_path: str) -> None:
"""Uploads a file to Opentron's /root/ directory."""
file_name = os.path.basename(file_path)
file_path = os.path.abspath(file_path)
command = f'scp {self.core} "{file_path}" root@{self.ot_address}:/root/{file_name}'
logger.info(f"Uploading {file_name} to Opentron...")
self.execute_bash_command(command)
|