Source code for pyrobotiqgripper.gripper

"""Core Robotiq Gripper class for controlling via Modbus RTU/TCP."""

# Optional dependency: pandas is only required for history DataFrame helpers.

#Meno: Windows / Linux: Ctrl + K then Ctrl + 0 to folds all foldable regions (functions,classes,etc.) in Visual code studio

import numpy as np
from pymodbus.client import ModbusSerialClient, ModbusTcpClient
from pymodbus.framer import FramerType
import time
import serial
import serial.tools.list_ports
from .utils import *
from .constants import *
from .exceptions import *
import logging
import multiprocessing
import warnings


def _get_pandas():
    """Lazy import pandas and provide clear error when missing."""
    try:
        import pandas as pd
        return pd
    except ImportError as exc:
        raise ImportError(
            "pandas is required only for RobotiqGripper history methods "
            "(commandHistory, statusHistory, history). Install pandas via "
            "`pip install pandas` or avoid calling these methods."
        ) from exc


[docs] class RobotiqGripper( ): """Class use to control Robotiq grippers (2F85, 2F140 or hande). This class provides methods to initialize, open, close, and monitor the gripper. Physical connection ------------------- The physical connection with the gripper can done in 2 ways: - Connected to the PC via the USB/RS485 adapter. - Connected to the UR robot: In this case the UR RS485 URCAP needs to be installed on the robot controller. Modbus RTU/TCP communication ---------------------------- Modbus RTU function code supported by robotiq gripper ======================================= ==================== Description Modbus function code ======================================= ==================== Read registers 4 Write registers 16 Master read & write multiple registers 23 ======================================= ==================== For more information for gripper communication please check gripper manual on Robotiq website. https://robotiq.com/support/2f-85-2f-140 .. note:: This class cannot be use to control epick, 3F or powerpick. """
[docs] def __init__(self, com_port: str = AUTO_DETECTION, device_id: int=9, connection_type: str = GRIPPER_MODE_RTU, tcp_host: str = "127.0.0.1", tcp_port: int = 54321, debug: bool = False, **kwargs): """Create a RobotiqGripper object which can be use to control Robotiq grippers using modbus RTU protocol USB/RS485 connection. Args: com_port (str): COM port to which the gripper is connected. If "auto" (or equal to the constant AUTO_DETECTION), the library will try to find the COM port to which the gripper is connected. On Windows, COM ports are named COM1, COM2, etc. On Linux, COM ports are named /dev/ttyUSB0, /dev/ttyUSB1, etc. Default is AUTO_DETECTION. device_id (int): Address of the gripper (integer) usually 9. gripper_type (str): Type of the gripper. Currently only "2F" is supported. Default is "2F". connection_type (str): Type of connection to the gripper. "RTU" (or equal to the constant GRIPPER_MODE_RTU) for direct Modbus RTU connection (e.g. via USB/RS485 adapter). "RTU_VIA_TCP" (or equal to the constant GRIPPER_MODE_RTU_VIA_TCP) for Modbus RTU connection via TCP (e.g. when using the UR RS485 URCAP). Default is "RTU". tcp_host (str): Host IP address for TCP connection. Default is "127.0.0.1" tcp_port (int): Port number for TCP connection. Default is 54321. debug (bool): If True, enable debug logging for Modbus communication. Default is False. Examples: Gripper connected at PC USB port: >>> import pyrobotiqgripper as rq >>> gripper = rq.RobotiqGripper(connection_type=rq.GRIPPER_MODE_RTU) >>> gripper.connect() >>> gripper.activate() >>> gripper.start() >>> gripper.open() >>> gripper.close() >>> gripper.move(100) #Move at position 100 in bit >>> print(gripper.position()) #Print gripper position in bit >>> gripper.calibrate_mm(closemm=0,openmm=85) #Calibrate the gripper with 0mm when closed and 85mm when open >>> gripper.move_mm(50) #Move at position 50mm >>> gripper.printStatus() #Print gripper status information in the python terminal >>> print(gripper.positionmm()) #Print gripper position in mm Gripper connected to UR robot via RS495 URCAP (RTU over TCP): >>> import pyrobotiqgripper as rq >>> gripper = rq.RobotiqGripper(connection_type=rq.GRIPPER_MODE_RTU_VIA_TCP,tcp_host="192.168.1.100") >>> gripper.connect() >>> gripper.activate() >>> gripper.start() >>> gripper.open() """ #Public properties ################## #: COM port to which the gripper is connected. #: If "auto" (or equal to the constant AUTO_DETECTION), the library will try #: to find the COM port to which the gripper is connected. #: On Windows, COM ports are named COM1, COM2, etc. On Linux, COM ports are #: named /dev/ttyUSB0, /dev/ttyUSB1, etc. Default is AUTO_DETECTION. self.com_port=com_port self.device_id=device_id self.connection_type=connection_type self.tcp_host=tcp_host self.tcp_port=tcp_port self.debug = debug #Maximum allowed time to perform and action self.timeOut=5 #Private properties ################### #Client managing gripper commmunication self._client=self._create_modbus_client() self.connect() #Status retrieval historical data self._commandHistory=np.ones((MAX_HISTORY,len(COMMAND_HISTORY_COLUMNS_ID_2_NAME)))*-1 self._statusHistory=np.ones((MAX_HISTORY,len(STATUS_HISTORY_COLUMNS_ID_2_NAME)))*-1 self._is_bit_calibrated=False #Say if the gripper is calibrated to be controlled in mm self._is_mm_calibrated=False #Say if the gripper is calibrated to be estimate position from speed self._is_speed_calibrated=False #Distance between the fingers when gripper is closed self._closemm=None #Position in bit when gripper is closed self._closebit=None #Distance between the fingers when gripper is open self._openmm=None #Position in bit when gripper is open self._openbit=None #Linear coefficient to link bit and distance between fingers #mm=self._aCoef*bit+self._bCoef self._aCoef=None self._bCoef=None self._gripper_vmax_bits=None #Speed in bit per second self._gripper_vmin_bits=None #Speed in bit per second #Initialisation ############### self._configure_logging() self.readStatus()
#################################################### ### PRIVATE FUCNTIONS ################################################### #SETUP FUNCTIONS def _probe_port_process(self, port, device_id, return_dict, timeout=1): """Probe a serial port to check if a gripper is connected. This method attempts to connect to a Modbus device on the specified port and reads a register to verify if it's a gripper. Parameters: ----------- port : str The serial port to probe (e.g., 'COM1', '/dev/ttyUSB0'). device_id : int The Modbus device ID to use for communication. return_dict : dict A shared dictionary to store the result of the probe. timeout : float, optional Timeout for the connection attempt in seconds. Default is 1. """ try: client = ModbusSerialClient( port=port, baudrate=115200, parity='N', stopbits=1, bytesize=8, timeout=timeout ) print(f"Trying to connect to {port}...") if not client.connect(): err = f"Fail to connect to modbus RTU device on {port}." print(err) return_dict["success"] = False return_dict["error"] = err return result = client.read_input_registers( address=2000, count=1, device_id=device_id ) if result is None: err = f"No response from {port} (register read returned None)" print(err) return_dict["success"] = False return_dict["error"] = err return if hasattr(result, 'isError') and result.isError(): error_code = getattr(result, 'exception_code', 'unknown') err = f"Modbus error on {port}: {error_code}" print(err) return_dict["success"] = False return_dict["error"] = err return if hasattr(result, 'registers') and len(result.registers) > 0: return_dict["success"] = True else: print(f"Invalid register response from {port}: {result}") return_dict["success"] = False except Exception as e: error_msg = f"Connection failed on port {port}: {type(e).__name__}: {str(e)}" print(error_msg) logging.error(error_msg) # Also log for debugging return_dict["success"] = False finally: try: client.close() except: pass def _configure_logging(self): """Configure logging for Modbus communication based on the debug flag.""" logger = logging.getLogger("pymodbus") if self.debug: logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.CRITICAL) # Effectively disables it def _create_modbus_client(self): """Factory method to create appropriate Modbus client.""" if self.connection_type == GRIPPER_MODE_RTU_VIA_TCP: return ModbusTcpClient( host=self.tcp_host, port=self.tcp_port, timeout=1, framer=FramerType.RTU) elif self.connection_type == GRIPPER_MODE_RTU: if self.com_port == AUTO_DETECTION: self.com_port = self._autoConnect() return ModbusSerialClient( port=self.com_port, baudrate=BAUDRATE, parity='N', stopbits=1, bytesize=8, timeout=1) def _autoConnect(self): """Automatically detect the COM port to which the gripper is connected. This method scans available serial ports and attempts to communicate with a gripper on each one to find the correct port. Returns: -------- str The COM port where the gripper was detected. Raises: ------ GripperConnectionError If no gripper is detected on any available port. """ ports = serial.tools.list_ports.comports() manager = multiprocessing.Manager() # Create ONCE for port in ports: print(f"Testing: {port.device}") return_dict = manager.dict() p = multiprocessing.Process( target=self._probe_port_process, args=(port.device, self.device_id, return_dict) ) p.start() p.join(3.0) # HARD TIMEOUT (1 second) if p.is_alive(): print(f"Hard timeout — killing process on {port.device}") p.terminate() p.join() print(f"Probe timed out on {port.device}; no port response from probe worker. This means the port may not be a Modbus device or is very slow.") continue if p.exitcode is not None and p.exitcode != 0: print(f"Probe process for {port.device} exited with code {p.exitcode}") if return_dict.get("success", False): print(f"Gripper detected on {port.device}") return port.device reason = return_dict.get("error", "none") print(f"No gripper response on {port.device} (reason: {reason})") raise GripperConnectionError( f"No gripper detected on any of {len(list(serial.tools.list_ports.comports()))} " f"available ports. Tested ports: " f"{', '.join([p.device for p in serial.tools.list_ports.comports()])}. " f"Please check: 1) Gripper is powered, 2) USB cable connected, " f"3) Device ID matches (expected {self.device_id})" ) #COMMUNICATION FUNCTIONS def _writePreadStatus(self,position): """Write position in the command register and read the gripper\ status in a single Modbus transaction. Parameters: ----------- position: int The position to move the gripper to in bits. Integer between 0 and 255. """ result=self._client.readwrite_registers(read_address=2000, read_count=3, write_address=1001, values=[position], device_id=self.device_id) registers=result.registers t=floor_to_ms(time.monotonic()) self._saveStatus(t,registers,readWrite=True) command={"time":t,"rPR":position} self._completeAndSaveCommand(command) def _writePSFreadStatus(self,position, speed, force): """Write position, speed and force in the command register and read the gripper\ status in a single Modbus transaction. Parameters: ----------- position: int The position to move the gripper to in bits. Integer between 0 and 255. speed: int The speed of the gripper movement. Integer between 0 and 255. force: int The force of the gripper movement. Integer between 0 and 255. """ result=self._client.readwrite_registers(read_address=2000, read_count=3, write_address=1001, values=[position, speed * 0b100000000 + force], device_id=self.device_id) registers=result.registers t=floor_to_ms(time.monotonic()) self._saveStatus(t,registers,readWrite=True) command={"time":t,"rPR":position, "rSP":speed,"rFR":force} self._completeAndSaveCommand(command) def _writePSF(self,position, speed, force): """Write position, speed and force in the command register. Parameters: ----------- position: int The position to move the gripper to in bits. Integer between 0 and 255. speed: int The speed of the gripper movement. Integer between 0 and 255. force: int The force of the gripper movement. Integer between 0 and 255. """ res=self._client.write_registers(address=1001, values=[position, speed * 0b100000000 + force], device_id=self.device_id) # Check result if res.isError(): raise GripperCommunicationError("Write failed") t=floor_to_ms(time.monotonic()) command={"time":t,"rPR":position, "rSP":speed,"rFR":force} self._completeAndSaveCommand(command) def _writeP(self,position): """Write position in the command register. Parameters: ----------- position: int The position to move the gripper to in bits. Integer between 0 and 255. """ res=self._client.write_registers(address=1001, values=[position], device_id=self.device_id) # Check result if res.isError(): raise GripperCommunicationError("Write failed") t=floor_to_ms(time.monotonic()) command={"rPR":position} self._completeAndSaveCommand(command) def _writeSF(self,speed, force): """Write speed and force in the command register. Parameters: ----------- speed: int The speed of the gripper movement. Integer between 0 and 255. force: int The force of the gripper movement. Integer between 0 and 255. """ res=self._client.write_registers(address=1002, values=[speed * 0b100000000 + force], device_id=self.device_id) # Check result if res.isError(): raise GripperCommunicationError("Write failed") t=floor_to_ms(time.monotonic()) command={"rSP":speed,"rFR":force} self._completeAndSaveCommand(command) #DATA SAVING FUNCTIONS def _saveStatus(self,t,statusRegisters,readWrite): """Save the gripper status register values in status history. Parameters: ----------- t: Time of the status statusRegisters : Status register """ ######################################### #Register 2000 #First Byte: gripperStatus #Second Byte: RESERVED #First Byte: gripperStatus gripperStatusReg0=(statusRegisters[0] >> 8) & 0b11111111 #xxxxxxxx00000000 ######################################### #Object detection if readWrite: gOBJ=-1 else: gOBJ=(gripperStatusReg0 >> 6) & 0b11 #xx000000 #Gripper status gSTA=(gripperStatusReg0 >> 4) & 0b11 #00xx0000 #Action status. echo of rGTO (go to bit) gGTO=(gripperStatusReg0 >> 3) & 0b1 #0000x000 #Activation status gACT=gripperStatusReg0 & 0b00000001 #0000000x ######################################### #Register 2001 #First Byte: Fault status #Second Byte: Pos request echo #First Byte: fault status faultStatusReg2= (statusRegisters[1] >>8) & 0b11111111 #xxxxxxxx00000000 ######################################### #Universal controler kFLT=(faultStatusReg2 >> 4) & 0b1111 #xxxx0000 #Fault gFLT=faultStatusReg2 & 0b00001111 #0000xxxx if gFLT in [7,8,10,11,12,13,14,15]: print(f"gFLT = {gFLT}") print("Gripper status history:") print(self.statusHistory()) raise GripperFaultError(REGISTER_DIC["gFLT"][gFLT]) #if gFLT in [5,9]: # warnings.warn(REGISTER_DIC["gFLT"][9],UserWarning, stacklevel=2) ######################################### #Second Byte: Pos request echo posRequestEchoReg3=statusRegisters[1] & 0b11111111 #00000000xxxxxxxx ######################################### #Echo of request position gPR=posRequestEchoReg3 ######################################### #Register 2002 #First Byte: Position #Second Byte: Current #First Byte: Position positionReg4=(statusRegisters[2] >> 8) & 0b11111111 #xxxxxxxx00000000 ######################################### #Actual position of the gripper gPO=positionReg4 ######################################### #Second Byte: Current currentReg5=statusRegisters[2] & 0b0000000011111111 #00000000xxxxxxxx ######################################### #Current gCU=currentReg5 self._statusHistory[:-1,:]=self._statusHistory[1:,:] self._statusHistory[-1,:]=[t,gOBJ,gSTA,gGTO,gACT,kFLT,gFLT,gPR,gPO,gCU] def _completeAndSaveCommand(self,command): """Complete a partial command dictionary and save it to history. Parameters: ----------- command : dict A partial command dictionary to complete and save. """ self._complete_command(command) self._commandHistory[:-1,:]=self._commandHistory[1:,:] self._commandHistory[-1,:]= [command["time"], command["rARD"], command["rATR"], command["rGTO"], command["rACT"], command["rPR"], command["rSP"], command["rFR"]] #DATA PROCESSING FUNCTIONS def _mmToBit(self,mm): """Convert a mm gripper opening in bit opening. .. note:: Calibration is needed to use this function.\n Execute the function calibrate at least 1 time before using this function. """ if not self.is_mm_calibrated(): raise GripperNotCalibratedError() bit=(mm-self._bCoef)/self._aCoef if bit>255: bit=255 elif bit<0: bit=0 return bit def _bitTomm(self,bit): """Convert a bit gripper opening in mm opening. Returns: -------- mm : float Gripper position converted in mm .. note:: Calibration is needed to use this function.\n Execute the function calibrate at least 1 time before using this function. """ if not self.is_mm_calibrated(): raise GripperNotCalibratedError() mm=self._aCoef*bit+self._bCoef if mm>self._openmm: mm=self._openmm elif mm<self._closemm: mm=self._closemm return mm def _positionEstimation(self,startPosition,requestedPosition,speed,elapsedTime): """ Estimate what will be the gripper position after an "elapsedTime" knowing the\ start position of the motion, the requested position and the speed Parameters: ----------- startPosition : int Position in bits from which start the motion. requestedPosition : int Position in bits where the gripper is requested to move. speed : int Speed of the gripper in bits. elapsedTime : float Elapsed time in seconds from the start position to the estimated position Returns: -------- estimatedPosition : int Estimated position of the gripper. """ if not self.is_speed_calibrated(): raise GripperCalibrationError("The speed calibration is required to estimate gripper position.") #Calculate predicted position positionDelta = requestedPosition - startPosition direction = np.sign(positionDelta) motion = int(direction * float(self.gripper_vmin_bits() + (self.gripper_vmax_bits() - self.gripper_vmin_bits()) * speed / 255) * elapsedTime) estimatedPosition = 0 if abs(positionDelta) < abs(motion): #Last position request was reachable within the elapsed time estimatedPosition = requestedPosition else: #Last position was not reachable within elapsed time estimatedPosition = startPosition + motion return estimatedPosition def _bitPerSecond(self,speed): """Return the corresponding position bits/s speed for a speed value in bit. Parameters: ----------- speed : int Gripper speed in bits Returns: -------- bitPerSecond : float Position variation in bits/s """ bitPerSecond=self.gripper_vmin_bits() + (float(self.gripper_vmax_bits()-self.gripper_vmin_bits())/255)*speed if bitPerSecond<0: raise GripperValidationError("Negative bit per second value {} for speed {}. Gripper_vmax_bits: {} and Gripper_vmin_bits: {}".format(bitPerSecond,speed,self.gripper_vmax_bits(),self.gripper_vmin_bits())) return bitPerSecond def _mergeHistory(self): """Merge command and status history arrays based on time. Returns: -------- numpy.ndarray A merged array containing aligned command and status data. """ #Merge using time property history=array_merge_on_first_column(self._commandHistory,self._statusHistory) #Filled forward missing command value columns_to_fill=[RARD,RGTO,RACT,RPR,RSP,RFR] array_forward_fill_columns(history,columns_to_fill,missing_value=-1) return history def _fill_gPO(self,history): """ Fill NaN values of gPO using speed, target, and elapsed time. """ df=history time_col="time" pos_col="gPO" target_col="rPR" speed_col="rSP" if not self.is_speed_calibrated(): raise GripperCalibrationError("The gripper need to be speed activated to be able to estimate gripper position.") #Sort table with time ascending values df = df.sort_values(by=time_col).reset_index(drop=True) for i in range(1, len(df)): if np.isnan(df.loc[i, pos_col]): prev_gPO = df.loc[i-1, pos_col] prev_rPR = df.loc[i-1, target_col] speed = df.loc[i-1, speed_col] # If we still don't have enough info, skip if np.isnan(prev_gPO) or np.isnan(prev_rPR) or np.isnan(speed): #print("prev_gPO : ",prev_gPO) #print("prev_rPR : ",prev_rPR) #print("speed : ",speed) #warnings.warn("Not enough information to estimate position") continue # Time difference dt = df.loc[i, time_col] - df.loc[i-1, time_col] # Direction direction = np.sign(prev_rPR - prev_gPO) # Movement delta = direction * self._bitPerSecond(speed) * dt new_pos = int(prev_gPO + delta) # Clamp to target (avoid overshoot) if direction > 0: new_pos = min(new_pos, prev_rPR) elif direction < 0: new_pos = max(new_pos, prev_rPR) if new_pos > self._closebit: new_pos = self._closebit if new_pos < self._openbit: new_pos = self._openbit df.loc[i, pos_col] = new_pos df=df.sort_values(by="time", ascending=False).reset_index(drop=True) return df def _travelTime(self,startPosition,endPosition,speed): """Return the time need to travel from a position to another at a given speed. Parameters: ----------- startPosition : int Start position in bits. endPosition : int End position in bits. speed : int Gripper speed in bits. Returns: -------- travelTime : float Time needed to travel from start to end position. """ posBitPerSecond = self._bitPerSecond(speed) travelTime = abs(float(endPosition-startPosition))/posBitPerSecond return travelTime def _commandFilter(self, current_time, current_rPR, minSpeedPosDelta=5, maxSpeedPosDelta=55, continuousGrip=True, autoLock=True, minimalMotion=1, verbose=0, refreshStatus=False, objectDetectionDuration=0.5 ): """"Filter the gripper command to perform a smooth and safe motion of the gripper.\ The command filter is based on the gripper command history and the gripper status. Parameters: ----------- t0_RequestTime : float Request time. t0_RequestPosition : int Requested position. commandHistory : dict Command history. status : dict Gripper status. minSpeedPosDelta : int Minimum speed position delta. maxSpeedPosDelta : int Maximum speed position delta. continuousGrip : bool Whether to continuously grip. autoLock : bool Whether to automatically lock. minimalMotion : int Minimal motion. verbose : int Verbose level to print. 1 print all executed command. 2 print all commands. Returns: -------- command : dict Filtered command. """ if refreshStatus: self.readStatus() #t1: previous command #t0: next command to come #Object detection history = self._mergeHistory() prev_time=history[-1,TIME] prev_cOBJ=self.objectDetection(history,duration=objectDetectionDuration,tolerance=3,refreshStatus=False)#, verbose=verbose) prev_gPO=history[-1,M_GPO] prev_rPR=history[-1,RPR] prev_rSP=history[-1,RSP] prev_rFR=history[-1,RFR] forceMin = continuousGrip * 1 command = {} dt = current_time - prev_time command["execution"] = NO_COMMAND command["rPR"] = 0 command["rSP"] = 0 command["rFR"] = forceMin command["wait"] = False command["comment"] = "" cPO = self._positionEstimation(prev_gPO, prev_rPR, prev_rSP, dt) # Check if object detected if prev_cOBJ in [GOBJ_DETECTED_WHILE_OPENING, GOBJ_DETECTED_WHILE_CLOSING]: # Object detected full_grip_applied = (prev_rPR in [0, 255] and prev_rSP == 255 and prev_rFR == 255) if not full_grip_applied: # Apply full grip if prev_cOBJ == GOBJ_DETECTED_WHILE_CLOSING: command["execution"] = WRITE_READ_COMMAND command["rPR"] = 255 command["rSP"] = 255 command["rFR"] = 255 command["wait"] = True command["comment"] = "Object detected while closing, apply full grip" else: # GOBJ_DETECTED_WHILE_OPENING command["execution"] = WRITE_READ_COMMAND command["rPR"] = 0 command["rSP"] = 255 command["rFR"] = 255 command["wait"] = True command["comment"] = "Object detected while opening, apply full release" else: # Full grip applied # Check if further gripping requested if ((prev_cOBJ == GOBJ_DETECTED_WHILE_CLOSING and current_rPR >= prev_gPO) or (prev_cOBJ == GOBJ_DETECTED_WHILE_OPENING and current_rPR <= prev_gPO)): # Further gripping command["execution"] = READ_COMMAND command["rPR"] = None command["rSP"] = None command["rFR"] = None command["wait"] = False command["comment"] = "Full grip applied, further gripping requested, do nothing" else: # Away from grip or release command["execution"] = WRITE_READ_COMMAND command["rPR"] = current_rPR command["rSP"] = 255 command["rFR"] = 255 command["wait"] = True command["comment"] = "Object detected, going to release position" else: # No object detected if current_rPR in [0, 255] and not (prev_rPR in [0, 255] and prev_rSP == 255 and prev_rFR == 255): command["execution"] = WRITE_READ_COMMAND command["rPR"] = current_rPR command["rSP"] = 255 command["rFR"] = 255 command["wait"] = True command["comment"] = f"No object detected, trigger full grip to {current_rPR}" elif ((abs(prev_gPO - current_rPR) <= minimalMotion) or (prev_rPR > self._closebit and current_rPR > self._closebit) or (prev_rPR < self._openbit and current_rPR < self._openbit)): command["execution"] = READ_COMMAND command["rPR"] = None command["rSP"] = None command["rFR"] = None command["wait"] = False command["comment"] = "Requested position close to current position or both in extreme position, do nothing" else: # Adjust speed based on distance posDelta = abs(prev_gPO - current_rPR) if posDelta <= minSpeedPosDelta: speed = 0 elif posDelta >= maxSpeedPosDelta: speed = 255 else: speed = int((posDelta - minSpeedPosDelta) / (maxSpeedPosDelta - minSpeedPosDelta) * 255) force = forceMin wait = False comment = f"No object detected, move with adjusted speed {speed}" if (prev_rPR==0 and current_rPR > self._openbit) or (prev_rPR==255 and current_rPR < self._closebit): force = 255 speed = 255 wait =True comment = f"Move out of a full close of a full opening" # Check if requesting extreme position and previous was not full grip command["execution"] = WRITE_READ_COMMAND command["rPR"] = current_rPR command["rSP"] = speed command["rFR"] = force command["wait"] = wait command["comment"] = comment if verbose ==1: if command["execution"]==WRITE_READ_COMMAND: print(f"Time: {current_time:.3f} | ReqPos: {current_rPR:3d} | cPO: {cPO}| ObjDet: {prev_cOBJ} | CmdExe: {command['execution']:1d} | CmdPos: {command['rPR'] or 0:3d} | CmdSpd: {command['rSP'] or 0:3d} | CmdFrc: {command['rFR'] or 0:3d} | CmdWait: {command['wait'] or 0:.3f} | Comment: {command['comment']}") if verbose ==2: print(f"Time: {current_time:.3f} | ReqPos: {current_rPR:3d} | cPO: {cPO} | ObjDet: {prev_cOBJ} | CmdExe: {command['execution']:1d} | CmdPos: {command['rPR'] or 0:3d} | CmdSpd: {command['rSP'] or 0:3d} | CmdFrc: {command['rFR'] or 0:3d} | CmdWait: {command['wait'] or 0:.3f} | Comment: {command['comment']}") return command def _complete_command(self,command: dict) -> dict: """ Complete a partial record using the first row of the DataFrame. Parameters: partial_record (dict): A dictionary with some values missing. Returns: dict: A complete record with missing values filled from the first row of df. """ if "time" not in command.keys(): raise GripperValidationError("Time is required to complete the command record.") for key in ["rPR", "rSP", "rFR"]: if key not in command.keys(): arrayID=COMMAND_HISTORY_COLUMNS_NAME_2_ID[key] command[key] = self._commandHistory[-1, arrayID] for key in COMMAND_HISTORY_COLUMNS_NAME_2_ID.keys(): if key not in command.keys(): command[key] = -1 #TIME FUNCTIONS def _waitComplete(self): """Wait until the gripper has completed its motion or detect an object. This method blocks until the gripper reaches the target position or detects an object. """ startTime = time.time() self.readStatus() gOBJ=self._statusHistory[-1,GOBJ] while gOBJ == GOBJ_IN_MOTION and (time.time() - startTime) < self.timeOut: self.readStatus() gOBJ=self._statusHistory[-1,GOBJ] #################################################### ### PUBLIC FUCNTIONS ################################################### #SETUP
[docs] def connect(self): """Connect to the gripper. If the connection is already established, do nothing. """ if not self._client.connect(): raise GripperConnectionError("Failed to connect to the gripper. Please check the connection and try again.")
[docs] def disconnect(self): """Disconnect from the gripper. If the connection is already closed, do nothing. """ self._client.close()
[docs] def reset(self): """Reset the gripper (clear previous activation and calibration if any) """ #Reset the gripper self._client.write_registers(1000,[0,0,0],device_id=self.device_id) t=time.monotonic() command={"time": t, "rARD": 0, "rATR": 0, "rGTO": 0, "rACT": 0, "rPR": 0, "rSP": 0, "rFR": 0} self._completeAndSaveCommand(command) self._is_bit_calibrated=False self._is_speed_calibrated=False self._is_mm_calibrated=False self.readStatus()
[docs] def activate(self,reset= True, start=True,refreshStatus=True): """Activate the gripper if it is not already active. Args: reset (bool): Whether to reset the gripper before activation. start (bool): Whether to start the gripper motion immediately after activation (gGTO set to 1). refreshStatus (bool): Whether to refresh the gripper status before the activation process. The status information is used to determine if the gripper is already activated. Setting this parameter to False can make the activation faster if you are sure the gripper status is up to date. .. warning:: When you execute this function, the gripper will fully open and close. During this operation, the gripper must be able to move freely. Do not place any object inside the gripper. """ #Turn the variable which indicate that the gripper is processing #an action to True if reset: self.reset() t=floor_to_ms(time.monotonic()) command={"time": t, "rACT": RACT_ACTIVATE} if not self.isActivated(refreshStatus=refreshStatus): #Activate the gripper #rACT=1 Activate Gripper (must stay on after activation routine is #completed). if start: res=self._client.write_registers(1000,[0b0000100100000000],device_id=self.device_id) command["rGTO"]=RGTO_GO_TO_REQUESTED_POSITION if res.count !=1: raise GripperCommunicationError("Failed to write register") else: res=self._client.write_registers(1000,[0b0000000100000000],device_id=self.device_id) if res.isError(): raise GripperCommunicationError("Fail to write gripper register") #Waiting for activation to complete activationStartTime=floor_to_ms(time.monotonic()) activationTime=0 while not self.isActivated(refreshStatus=True) and (activationTime < self.timeOut): activationTime = floor_to_ms(time.monotonic()) - activationStartTime if activationTime > self.timeOut: raise GripperTimeoutError("Activation", self.timeOut) self.readStatus() self._completeAndSaveCommand(command) else: res = None if start: res=self._client.write_registers(1000,[0b0000100100000000],device_id=self.device_id) command["rGTO"]=RGTO_GO_TO_REQUESTED_POSITION self._completeAndSaveCommand(command) self.readStatus() if res is not None and res.isError(): raise GripperCommunicationError("Failed to write register")
[docs] def start(self,refreshStatus=True): """Start the gripper motion. The GTO bit is set to 1. The gripper will move to the position command if it is not already there. Args: refreshStatus (bool): Whether to refresh the gripper status before the start process. The status information is used to determine if the gripper is already started. Setting this parameter to False can make the start process faster if you are sure the gripper status is up to date. """ t=floor_to_ms(time.monotonic()) command={"time":t, "rARD":0, "rATR":0, "rGTO":RGTO_GO_TO_REQUESTED_POSITION } if not self.isStarted(refreshStatus=refreshStatus): if self.isActivated(refreshStatus=refreshStatus): res=self._client.write_registers(1000,[0b0000100100000000],device_id=self.device_id) command["rACT"]=RACT_ACTIVATE if res.isError(): raise GripperCommunicationError("Communication to set rGTO bit failed") else: res=self._client.write_registers(1000,[0b0000100000000000],device_id=self.device_id) command["rACT"]=RACT_DESACTIVATE if res.isError(): raise GripperCommunicationError("Communication to set rGTO bit failed") self._completeAndSaveCommand(command) self.readStatus()
[docs] def stop(self): """Gripper stop moving. GTO bit is set to 0 but the position command is not changed. The gripper will stop at its current position and hold it.""" if self.isActivated: self._client.write_registers(1000,[0b0000000100000000,0,0],device_id=self.device_id) else: self._client.write_registers(1000,[0b0000000000000000,0,0],device_id=self.device_id)
[docs] def calibrate_bit(self, openbit=None, closebit=None): """Calibrate the maximum and minimum position bit values of the gripper. This function sets the reference positions for the gripper in bits. If no parameters are provided, the gripper will perform a full open followed by a full close to measure the maximum and minimum positions. Args: openbit (int, optional): Position value in bits when the gripper is open. closebit (int, optional): Position value in bits when the gripper is closed. .. warning:: If no parameters are provided, the gripper will make a full open followed by a full close to measure the maximum and minimum positions in bits. Ensure the gripper can move freely during this operation. """ if (openbit is None) or (closebit is None): #Search for open and close position values #and search for min and max speed in bits/s self.open(speed=255,force=0,wait=True) self._openbit=self.position() self.close(speed=255,force=0,wait=True) self._closebit=self.position() else: self._openbit=openbit self._closebit=closebit self._is_bit_calibrated =True
[docs] def calibrate_speed(self,minSpeedClosingTime=None,maxSpeedClosingTime=None): """Calibrate gripper speed to estimate gripper position over time. If no parameters are provided, the gripper will perform a full-speed closing and a slow-speed closing to evaluate its motion. Bit calibration will also be performed. Args: minSpeedClosingTime (float, optional): Time in seconds for the gripper to move from fully open to fully closed at minimum speed. maxSpeedClosingTime (float, optional): Time in seconds for the gripper to move from fully open to fully closed at maximum speed. .. warning:: If no parameters are provided, the gripper will perform a full-speed closing and a slow-speed closing to evaluate its motion. Ensure the gripper can move freely during this operation. """ if (minSpeedClosingTime is None) or (maxSpeedClosingTime is None): self.open(speed=255,force=0,wait=True) self._openbit=self.position() startTime=floor_to_ms(time.monotonic()) self.close(speed=255,force=0,wait=True) elapsedTime=floor_to_ms(time.monotonic())-startTime self._closebit=self.position() self._gripper_vmax_bits= (self._closebit - self._openbit)/elapsedTime self.open(speed=255,force=0,wait=True) startTime=floor_to_ms(time.monotonic()) self.close(speed=0,force=0,wait=True) elapsedTime=floor_to_ms(time.monotonic())-startTime self._gripper_vmin_bits= (self._closebit - self._openbit)/elapsedTime self._is_bit_calibrated=True else: if not self.is_bit_calibrated(): raise GripperCalibrationError("You have to execute calibrate_bit() before calibrating speed with input parameters") self._gripper_vmin_bits=(self._closebit - self._openbit)/minSpeedClosingTime self._gripper_vmax_bits=(self._closebit - self._openbit)/maxSpeedClosingTime self._is_speed_calibrated = True
[docs] def calibrate_mm(self, closemm, openmm): """Calibrate the gripper for millimeter positioning. Once this calibration is done, it is possible to control the gripper using millimeters. Args: closemm (float): Distance between the fingers when the gripper is fully closed. openmm (float): Distance between the fingers when the gripper is fully open. .. warning:: Bit calibration is required before executing this function. If the bit calibration is not done, the function `calibrate_bit()` will be executed automatically before calibrating in millimeters. """ if not self.is_bit_calibrated(): self.calibrate_bit() self._closemm=closemm self._openmm=openmm self._aCoef=(closemm-openmm)/(self._closebit-self._openbit) self._bCoef=(openmm*self._closebit-self._openbit*closemm)/(self._closebit-self._openbit) self._is_mm_calibrated=True
#ACTIONS
[docs] def open(self,speed=255,force=255,wait=True,readStatus=True,refreshStatus=False): """Open the gripper. Args: speed (int, optional): Gripper speed between 0 and 255. Defaults to 255. force (int, optional): Gripper force between 0 and 255. Defaults to 255. wait (bool, optional): If True, the function waits until the gripper reaches the requested position or detects an object. Defaults to True. readStatus (bool, optional): If True, the gripper status is read after sending the command. Defaults to True. refreshStatus (bool, optional): If True, the gripper status is refreshed before sending the command. The status information is used to determine if the gripper is already activated. Setting this parameter to False can make the command process faster if the gripper status is up to date. Defaults to False. """ #Check if the gripper is activated self.move(0,speed,force,wait=wait,readStatus=readStatus,refreshStatus=refreshStatus)
[docs] def close(self,speed=255,force=255,wait=True,readStatus=True,refreshStatus=False): """Close the gripper. Args: speed (int, optional): Gripper speed between 0 and 255. Defaults to 255. force (int, optional): Gripper force between 0 and 255. Defaults to 255. wait (bool, optional): If True, the function waits until the gripper reaches the requested position or detects an object. Defaults to True. readStatus (bool, optional): If True, the gripper status is read after sending the command. Defaults to True. refreshStatus (bool, optional): If True, the gripper status is refreshed before sending the command. The status information is used to determine if the gripper is already activated. Setting this parameter to False can make the command process faster if the gripper status is up to date. Defaults to False. """ self.move(255,speed,force,wait=wait,readStatus=readStatus,refreshStatus=refreshStatus)
[docs] def move(self,position,speed=255,force=255,wait=True,readStatus=True,refreshStatus=False): """Move gripper fingers to the requested position with specified speed and force. Args: position (int): Target position of the gripper. Must be between 0 and 255, where 0 represents fully open and 255 represents fully closed. speed (int, optional): Gripper speed between 0 and 255. Defaults to 255. force (int, optional): Gripper force between 0 and 255. Defaults to 255. wait (bool, optional): If True, the function waits until the gripper reaches the requested position or detects an object. Defaults to True. readStatus (bool, optional): If True, the gripper status is read after sending the command. Defaults to True. refreshStatus (bool, optional): If True, the gripper status is refreshed before sending the command. The status information is used to determine if the gripper is already activated. Setting this parameter to False can make the command process faster if the gripper status is up to date. Defaults to False. """ if refreshStatus: self.readStatus() speed_value=0 if speed is None: if self.speed() is not None: speed_value = self.speed() else: speed_value = 0 else: speed_value = speed force_value=0 if force is None: if self.force() is not None: force_value = self.force() else: force_value = 0 else: force_value = force #Check if the gripper is activated if not self.isActivated(refreshStatus=False): print("Status history:") print(self._statusHistory) raise GripperNotActivatedError() if not self.isStarted(refreshStatus=False): raise GripperNotStartedError() #Check input value if position>255 or position<0: raise GripperPositionError(position) position=int(position) if (speed is None) and (force is None): if readStatus: self._writePreadStatus(position) else: self._writeP(position) else: speed_value=int(speed_value) force_value=int(force_value) if readStatus: self._writePSFreadStatus(position,speed_value,force_value) else: self._writePSF(position,speed_value,force_value) self._processing=True if wait: self._waitComplete() self._processing=False
[docs] def move_mm(self,positionmm,speed=255,force=255,wait=True,readStatus=True,refreshStatus=False): """Move the gripper to the requested opening in millimeters. Args: positionmm (float): Target gripper opening in millimeters. speed (int, optional): Gripper speed between 0 and 255. Defaults to 255. force (int, optional): Gripper force between 0 and 255. Defaults to 255. wait (bool, optional): If True, the function waits until the gripper reaches the requested position or detects an object. Defaults to True. readStatus (bool, optional): If True, the gripper status is read after sending the command. Defaults to True. refreshStatus (bool, optional): If True, the gripper status is refreshed before sending the command. The status information is used to determine if the gripper is already activated. Setting this parameter to False can make the command process faster if the gripper status is up to date. Defaults to False. .. note:: Millimeter calibration is required to use this function. Execute the function `calibrate_mm()` before using this function. """ if not self.is_mm_calibrated(): raise GripperNotCalibratedError() if positionmm>self._openmm: raise GripperValidationError("The maximum opening is {} mm but the given value is {} mm".format(self._openmm,positionmm)) if positionmm<self._closemm: raise GripperValidationError("The minimum closing is {} mm but the given value is {} mm".format(self._openmm,positionmm)) position=int(self._mmToBit(positionmm)) self.move(position,speed,force,wait,readStatus,refreshStatus)
[docs] def realTimeMove(self, requestedPosition, minSpeedPosDelta=5, maxSpeedPosDelta=100, continuousGrip=True, autoLock=True, minimalMotion=2, verbose=False, objectDetectionDuration=0.5): """Move the gripper in real time to the requested position. Args: requestedPosition (int): Target position for the gripper in bits. Must be between 0 and 255, where 0 represents fully open and 255 represents fully closed. minSpeedPosDelta (int, optional): Minimum position delta to apply the minimum speed. Defaults to 5. maxSpeedPosDelta (int, optional): Position delta above which the maximum speed is applied. Defaults to 100. continuousGrip (bool, optional): If True, the gripper continuously tries to close on the object even after detection (force > 0). Defaults to True. autoLock (bool, optional): If True, the gripper automatically performs a full-speed, full-force grip after object detection. Defaults to True. minimalMotion (int, optional): Minimum motion in bits to perform when a motion is requested. If the position delta between the current and requested position is below this value, no motion is performed. Defaults to 2. verbose (int, optional): Verbose level for printing. 1 prints all executed commands; 2 prints all commands. Defaults to 0. Examples: Make a loop to control the gripper using a joystick with pygame >>> import pygame >>> import time >>> import pyrobotiqgripper as rq >>> grip = rq.RobotiqGripper() >>> pygame.init() >>> pygame.joystick.init() >>> js = pygame.joystick.Joystick(0) >>> js.init() >>> while True: >>> pygame.event.pump() >>> requested_pos=int((js.get_axis(0) + 1) * 255 / 2) >>> grip.realtimemove(requested_pos) """ #Check if the gripper is activated if not self.isActivated(refreshStatus=False): raise GripperNotActivatedError() if not self.isStarted(refreshStatus=False): raise GripperNotStartedError() if not self.is_speed_calibrated(): raise GripperCalibrationError("The gripper need to be speed calibrated before using realtime move.") #2- Get time now=floor_to_ms(time.monotonic()) #2 Build gripper command command=self._commandFilter(current_time=now, current_rPR=requestedPosition, minSpeedPosDelta=minSpeedPosDelta, maxSpeedPosDelta=maxSpeedPosDelta, continuousGrip=continuousGrip, autoLock=autoLock, minimalMotion=minimalMotion, verbose=verbose, objectDetectionDuration=objectDetectionDuration) if command["execution"]==WRITE_READ_COMMAND: if command["wait"]: self.move(command["rPR"],command["rSP"],command["rFR"],wait=True) else: self.move(command["rPR"],command["rSP"],command["rFR"],wait=False) elif command["execution"]==READ_COMMAND: self.readStatus() else: warnings.warn("No command executed",UserWarning, stacklevel=2)
#STATUS
[docs] def isActivated(self, refreshStatus=True): """Check whether the gripper is activated. Args: refreshStatus (bool, optional): Whether to refresh the gripper status before checking the activation state. The status information is used to determine if the gripper is already activated. Setting this parameter to False can make the check faster if the gripper status is up to date. Defaults to True. Returns: bool: True if the gripper is activated, False otherwise. """ if refreshStatus: self.readStatus() gSTA = self._statusHistory[-1,GSTA] if gSTA == -1: warnings.warn("Status history is empty. Activation status unknown.",UserWarning, stacklevel=2) res=None res=False if gSTA == GSTA_ACTIVATED: res=True else: res=False return res
[docs] def isStarted(self,refreshStatus=True): """Check whether the gripper is started. Returns: bool: True if the gripper is started, False otherwise. """ if refreshStatus: self.readStatus() gGTO=self._statusHistory[-1,GGTO] if gGTO == -1: warnings.warn("Status history is empty. Action status unknown.",UserWarning, stacklevel=2) print(self.statusHistory()) return None res=False if gGTO == GGTO_GO_TO_REQUESTED_POSITION: res=True return res
[docs] def is_bit_calibrated(self): """Check whether the gripper is bit calibrated. Returns: bool: True if the gripper is bit calibrated, False otherwise. """ return self._is_bit_calibrated
[docs] def is_mm_calibrated(self): """Check whether the gripper millimeter (mm) calibration is done. Returns: bool: True if the gripper is mm calibrated, False otherwise. """ return self._is_mm_calibrated
[docs] def is_speed_calibrated(self): """Check whether the gripper speed calibration is done. Returns: bool: True if the gripper speed is calibrated, False otherwise. """ return self._is_speed_calibrated
[docs] def gripper_vmax_bits(self): """Return the maximum gripper speed in bits per second. Returns: int: Maximum gripper speed in bits per second. """ if not self.is_speed_calibrated(): raise GripperCalibrationError("The gripper is not speed calibrated") return self._gripper_vmax_bits
[docs] def gripper_vmin_bits(self): """Return the minimum gripper speed in bits per second. Returns: int: Minimum gripper speed in bits per second. """ if not self.is_speed_calibrated(): raise GripperCalibrationError("The gripper is not speed calibrated") return self._gripper_vmin_bits
[docs] def positionCommand(self): """Return the last commanded position value. Returns: int | None: The last position command value (0-255), or None if the history is empty. """ value = self._statusHistory[-1,GPO] if value == -1: warnings.warn("Command history is empty. Last set position is unknown.",UserWarning, stacklevel=2) return None return value
[docs] def position(self,refreshStatus=True): """Return the current position of the gripper in bits. Args: refreshStatus (bool, optional): Whether to refresh the gripper status before getting the position. The status information is used to determine the current position. Setting this parameter to False can make the retrieval faster if the gripper status is already up to date. Defaults to True. Returns: int | None: Current gripper position in bits (0-255), or None if history is empty. """ if refreshStatus: self.readStatus() res = self._statusHistory[-1,GPO] if res ==-1: warnings.warn("Status history is empty. Last position is unknown.",UserWarning, stacklevel=2) return None return int(res)
[docs] def position_mm(self,refreshStatus=True): """Return the current position of the gripper in millimeters. Args: refreshStatus (bool, optional): Whether to refresh the gripper status before getting the position. The status information is used to determine the current position. Setting this parameter to False can make the retrieval faster if the gripper status is already up to date. Defaults to True. Returns: float: Current gripper position in millimeters. .. note:: Calibration is required to use this function. Execute the calibration function at least once before using this function. """ if not self.is_mm_calibrated(): raise GripperNotCalibratedError() res=self._bitTomm(self.position(refreshStatus=refreshStatus)) return res
[docs] def speed(self): """Return the last set speed value. Returns: -------- int or None The last speed value set (0-255), or None if history is empty. """ value=self._commandHistory[-1, RSP] if value == -1: warnings.warn("Command history is empty. Last set speed is unknown.",UserWarning, stacklevel=2) return None return value
[docs] def force(self): """Return the last set force value. Returns: int | None: The last force value set (0-255), or None if the history is empty. """ value = self._commandHistory[-1, RFR] if value == -1: warnings.warn("Command history is empty. Last set force is unknown.",UserWarning, stacklevel=2) return None return value
[docs] def objectDetection(self,mergedHistory=None, duration=0.5, tolerance=3, refreshStatus=True,verbose=0): """Estimate object detection status from history data. Args: mergedHistory (numpy.ndarray, optional): Pre-merged history array. If None, history is merged from status and command history. Passing a pre-merged history can speed up the estimation if this function is called multiple times in a loop. Defaults to None. duration (float, optional): Stability duration threshold. Object detection is considered valid only if the position is stable for longer than this duration. Defaults to 0.2. tolerance (int, optional): Position tolerance. The gripper is considered to be at the expected position if the difference between expected and actual position is less than this tolerance. Defaults to 3. refreshStatus (bool, optional): Whether to refresh the gripper status to get the object detection status from the gripper. Defaults to True. Returns: int: Object detection status code. """ if refreshStatus: self.readStatus() return self._statusHistory[-1,GOBJ] if self._statusHistory[-1,TIME] > self._commandHistory[-1,TIME]: return self._statusHistory[-1,GOBJ] if not self.is_bit_calibrated(): raise GripperCalibrationError("The gripper need to be bit calibrated to be able to estimate object detection.") if mergedHistory is None: mergedHistory = self._mergeHistory() last_gOBJ = mergedHistory[-1, M_GOBJ] if last_gOBJ != -1: # If gOBJ is explicitly available from the gripper, use that value. return int(last_gOBJ) # Filter for valid status rows (gPO and rPR are required for estimation). valid_mask = (mergedHistory[:, M_GPO] != -1) & (mergedHistory[:, RPR] != -1) if not np.any(valid_mask): return GOBJ_IN_MOTION time = mergedHistory[valid_mask, TIME] rpr = mergedHistory[valid_mask, RPR] gpo = mergedHistory[valid_mask, M_GPO] last_gPO = gpo[-1] # If we have a recent request change, consider motion in progress first. rpr_diff_idx = np.flatnonzero(rpr != rpr[-1]) if rpr_diff_idx.size > 0 and (time[-1] - time[rpr_diff_idx[-1]] <= duration): return GOBJ_IN_MOTION # Detect object only after the gripper position has been stable long enough. gpo_diff_idx = np.flatnonzero(gpo != last_gPO) if gpo_diff_idx.size == 0: # gPO constant on history window. expected = rpr[-1] expected = min(max(expected, self._openbit), self._closebit) if abs(expected - last_gPO) < tolerance: return GOBJ_AT_POSITION dt_constant = time[-1] - time[0] if dt_constant <= duration: return GOBJ_IN_MOTION if expected > last_gPO: if verbose > 0: print(REGISTER_DIC["gOBJ"][GOBJ_DETECTED_WHILE_CLOSING]) print(self.history()) return GOBJ_DETECTED_WHILE_CLOSING elif expected < last_gPO: if verbose > 0: print(REGISTER_DIC["gOBJ"][GOBJ_DETECTED_WHILE_OPENING]) print(self.history()) return GOBJ_DETECTED_WHILE_OPENING else: return GOBJ_IN_MOTION idx = gpo_diff_idx[-1] dt = time[-1] - time[idx] if dt <= duration: return GOBJ_IN_MOTION expected = rpr[-1] expected = min(max(expected, self._openbit), self._closebit) if abs(expected - last_gPO) < tolerance: return GOBJ_AT_POSITION rpr_slice = rpr[idx:] gpo_slice = gpo[idx:] if np.all(rpr_slice > gpo_slice): if verbose > 0: print(REGISTER_DIC["gOBJ"][GOBJ_DETECTED_WHILE_CLOSING]) print(self.history()) return GOBJ_DETECTED_WHILE_CLOSING if np.all(rpr_slice < gpo_slice): if verbose > 0: print(REGISTER_DIC["gOBJ"][GOBJ_DETECTED_WHILE_OPENING]) print(self.history()) return GOBJ_DETECTED_WHILE_OPENING return GOBJ_IN_MOTION
def printObjectDetection(self,mergedHistory=None, duration=0.2, tolerance=3, refreshStatus=True): """Print object detection status in a human readable way """ gOBJ=self.objectDetection(mergedHistory=mergedHistory, duration=duration, tolerance=tolerance, refreshStatus=refreshStatus) print(REGISTER_DIC["gOBJ"][gOBJ])
[docs] def commandHistory(self): """Return the gripper command history as a pandas DataFrame. Returns: pd.DataFrame: A DataFrame containing the command history with columns for time and various command registers (rARD, rATR, etc.). """ pd = _get_pandas() columns = [COMMAND_HISTORY_COLUMNS_ID_2_NAME[i] for i in range(self._commandHistory.shape[1])] df = pd.DataFrame(self._commandHistory, columns=columns) return df
[docs] def readStatus(self): """Retrieve gripper output register information and save it in the status history. """ #Read 3 16bits registers starting from register 2000 registers=self._client.read_input_registers(2000,count=3,device_id=self.device_id).registers t=floor_to_ms(time.monotonic()) self._saveStatus(t,registers,readWrite=False)
[docs] def status(self, refreshStatus=True): """Return the current gripper status as a dictionary. Args: refreshStatus (bool, optional): Whether to read fresh status from the gripper. Defaults to True. Returns: dict: A dictionary containing current status values for all registers. """ if refreshStatus: self.readStatus() status={} status["time"]=self._statusHistory[-1,TIME] status["gOBJ"]=self._statusHistory[-1,GOBJ] status["gSTA"]=self._statusHistory[-1,GSTA] status["gGTO"]=self._statusHistory[-1,GGTO] status["gACT"]=self._statusHistory[-1,GACT] status["kFLT"]=self._statusHistory[-1,KFLT] status["gFLT"]=self._statusHistory[-1,GFLT] status["gPR"]=self._statusHistory[-1,GPR] status["gPO"]=self._statusHistory[-1,GPO] status["gCU"]=self._statusHistory[-1,GCU] return status
[docs] def printStatus(self, refreshStatus=True): """Print gripper status info in the Python terminal. Args: refreshStatus (bool, optional): Whether to read fresh status from the gripper before printing. Defaults to False. If you are sure that the status information is up to date, setting this parameter to False can make the printing faster. Examples: >>> grip.move(100) >>> grip.print_status() Output:: ====================================================================== GRIPPER STATUS ====================================================================== gOBJ : 3 └─ Fingers are at requested position. No object detected or object has been lost / dropped. gSTA : 3 └─ Activation is completed. gGTO : 1 └─ Go to Position Request. gACT : 1 └─ Gripper activation. kFLT : 0 └─ 0 gFLT : 9 └─ Minor faults (LED continuous red). No communication during at least 1 second. gPR : 100 └─ Echo of the requested position for the Gripper: 100/255 gPO : 100 └─ Actual position of the Gripper obtained via the encoders: 100/255 gCU : 0 └─ The current is read instantaneously from the motor drive, approximate current: 0 mA ====================================================================== """ status=self.status(refreshStatus=refreshStatus) # Print header print("\n" + "=" * 70) print(" " * 20 + "GRIPPER STATUS") print("=" * 70) # Find the longest key for alignment max_key_length = max(len(key) for key in status.keys() if key != "time") # Print each status item with description for key, value in status.items(): if key != "time": # Print key and value with alignment print(f"\n{key:<{max_key_length}} : {value}") # Print description with indentation description = REGISTER_DIC[key][value] print(f" └─ {description}") print("\n" + "=" * 70 + "\n")
[docs] def statusHistory(self): """Return the gripper status history as a pandas DataFrame. Returns: pd.DataFrame: A DataFrame containing the status history with columns for time and various status registers (gOBJ, gSTA, etc.). """ pd = _get_pandas() columns = [STATUS_HISTORY_COLUMNS_ID_2_NAME[i] for i in range(self._statusHistory.shape[1])] df = pd.DataFrame(self._statusHistory, columns=columns) return df
[docs] def history(self): """Return the merged command and status history as a pandas DataFrame. This method combines the command history and status history into a single DataFrame with all timestamps aligned. Returns: pd.DataFrame: A DataFrame containing the merged history with columns for time, commands, and status values. """ pd = _get_pandas() mergedHistory = self._mergeHistory() columns = [HISTORY_COLUMNS_ID_2_NAME[i] for i in range(mergedHistory.shape[1])] df = pd.DataFrame(mergedHistory, columns=columns) return df