import cv2 import numpy as np from lib import * class MainProgram: def __init__(self, face_model_path, model_path, reid_weights, tracker_type="ocsort"): self.face_model_path = face_model_path self.model_path = model_path self.face_model = YOLO(face_model_path) self.person_model = YOLO(model_path) self.reid_weights = reid_weights self.tracker_conf = get_tracker_config(tracker_type) self.sock = U.UdpComms(udpIP="192.168.1.122", portTX=8000, portRX=8001, enableRX=True, suppressWarnings=True) self.tracker = create_tracker( tracker_type=tracker_type, tracker_config=self.tracker_conf, reid_weights=reid_weights, device='0', half=False, per_class=False ) self.send_data_unity: dict = { "PassBy": False, "Engage": False, "Ready": False, "Gender": None, "AgeMin": None, "AgeMax": None, "GenerateImageSuccess": False, "Description": "" } self.focus_id = None self.frame_count_remove_idx = 0 sa = gspread.service_account("key.json") sh = sa.open("TestData") wks = sh.worksheet("Sheet1") self.all_record = wks.get_all_records() self.client = NovitaClient("48cc2b16-286f-49c8-9581-f409b68359c4") self.ready_success = False self.show_success = False self.check_save, self.check_generate = False, False self.forward_face = Face_detection.FaceDetection() device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.detector = MTCNN(keep_all=True, device=device) self.count_frame = 0 def convertFrame(self, frame) -> str: encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 90] frame = imutils.resize(frame, width=480) result, encoded_frame = cv2.imencode('.jpg', frame, encode_param) jpg_as_text = base64.b64encode(encoded_frame.tobytes()) return jpg_as_text.decode('utf-8') def get_image(self): ran_num = random.randint(0, len(self.all_record) - 1) image_url = self.all_record[ran_num]["Image link"] des = self.all_record[ran_num]["Note"] return image_url, des def generate_image(self): image_url, des = self.get_image() res = self.client.merge_face( image=image_url, face_image="./image/output.jpg", ) base64_to_image(res.image_file).save("./image/merge_face.png") self.send_data_unity["Description"] = des self.send_data_unity["GenerateImageSuccess"] = True self.send_data_unity["StreamingData"] = "./Assets/StreamingAssets/MergeFace/image/merge_face.png" def predict_age_and_gender(self): image_predict = cv2.imread("./image/output.jpg") if AgeGenderPrediction.Prediction(image_predict): self.send_data_unity["Gender"] = AgeGenderPrediction.Prediction(image_predict)[0] self.send_data_unity["AgeMin"] = int( AgeGenderPrediction.Prediction(image_predict)[1].split("-")[0]) self.send_data_unity["AgeMax"] = int( AgeGenderPrediction.Prediction(image_predict)[1].split("-")[1]) else: self.send_data_unity["Gender"] = None self.send_data_unity["AgeMin"] = None self.send_data_unity["AgeMax"] = None def check_engage(self, x1, x2) -> bool: if not (x1 > self.red_zone_width[1] or x2 < self.red_zone_width[0]): return True return False def cropped_image(self, frame, x1, y1, x2, y2): return frame[y1: y2, x1: x2] def get_face(self, frame): boxes, probs, landmarks = self.detector.detect(frame, landmarks=True) lm_list = [] for landmark in landmarks: x1, y1, x2, y2 = int(landmark[0][0]), int(landmark[0][1]), int(landmark[1][0]), int(landmark[2][1]) lm_list.append([x1, y1, x2, y2]) if boxes is not None: bboxes = boxes.astype(np.int_) confs = probs.astype(np.float32).reshape(-1, 1) # Create an array of zeros with the same length as bboxes zeros = np.zeros((bboxes.shape[0], 1), dtype=np.float32) # Concatenate bboxes, confs, and zeros combined = np.hstack((lm_list, confs, zeros)) return combined, landmarks, bboxes else: return np.array([]) def check_ready(self, nose, left_eye, right_eye): distance_left = self.forward_face.calculate_distance(nose, left_eye) distance_right = self.forward_face.calculate_distance(nose, right_eye) distance_to_point = self.forward_face.calculate_dis_to_cp(self.face_zone_center_point[0], self.face_zone_center_point[1], nose[0], nose[1]) cv2.circle(self.frame_to_show, (int(nose[0]), int(nose[1])), 5, (0, 255, 255), -1) cv2.circle(self.frame_to_show, (int(self.face_zone_center_point[0]), int(self.face_zone_center_point[1])), 5, (0, 255, 255), -1) # Check if distances exceed threshold if (distance_left > K_MULTIPLIER * distance_right or distance_right > K_MULTIPLIER * distance_left or distance_to_point > 30): if self.count_frame > 200: self.count_frame = 0 return False else: self.count_frame += 1 return True def person_process(self, frame): # Perform person detection face_detections, landmarks, bboxes = self.get_face(frame) # Update the tracker with person detections tracked_objects = self.tracker.update(face_detections, frame) track_list = [] face_info = [] nose_pose, left_eye_pose, right_eye_pos = (0, 0), (0, 0), (0, 0) engage = False for track in tracked_objects.astype(int): x1, y1, x2, y2, track_id, conf, cls, _ = track track_list.append(track_id) for idx in range(len(landmarks) - 1): x1_lm, y1_lm = int(landmarks[idx][0][0]), int(landmarks[idx][0][1]) if x1_lm == x1 and y1_lm == y1: nose_pose, left_eye_pose, right_eye_pos = ((landmarks[idx][2][0], landmarks[idx][2][1]), (landmarks[idx][0][0], landmarks[idx][0][1]), (landmarks[idx][1][0], landmarks[idx][1][1])) face_info.append((nose_pose, left_eye_pose, right_eye_pos, track_id, bboxes[idx])) if not engage: engage = self.check_engage(x1, x2) print(self.focus_id) if not self.focus_id: self.focus_id = track_id \ if self.check_ready(nose_pose, left_eye_pose, right_eye_pos) else None elif track_id != self.focus_id: continue else: received_data = self.sock.ReadReceivedData() if received_data == "Begin": self.ready_success = True elif received_data == "End": self.ready_success = False self.check_save = False self.check_generate = False os.remove("./image/output.jpg") os.remove("./image/merge_face.png") self.send_data_unity: dict = { "PassBy": False, "Engage": False, "Ready": False, "Gender": None, "AgeMin": None, "AgeMax": None, "GenerateImageSuccess": False, "Description": "" } if not self.ready_success: self.send_data_unity["Ready"] = True if self.check_ready(nose_pose, left_eye_pose, right_eye_pos) else False elif not self.check_save: for idx in range(len(face_info) - 1): if face_info[idx][3] == self.focus_id: x1_face, y1_face, x2_face, y2_face = (int(face_info[idx][4][0]), int(face_info[idx][4][1]), int(face_info[idx][4][2]), int(face_info[idx][4][3])) cv2.imwrite("./image/output.jpg", self.cropped_image(frame, x1_face, y1_face, x2_face, y2_face)) self.check_save = True elif not self.check_generate: if str(self.send_data_unity["Gender"]) == "None": self.predict_age_and_gender() else: self.generate_image() self.check_generate = True elif self.show_success: self.check_save = False self.check_generate = False if track_list: self.send_data_unity["PassBy"] = True self.send_data_unity["Engage"] = engage else: self.send_data_unity["Engage"] = False self.send_data_unity["PassBy"] = False self.send_data_unity["Ready"] = False if self.focus_id not in track_list: if self.frame_count_remove_idx == 20: self.frame_count_remove_idx = 0 self.focus_id = None else: self.frame_count_remove_idx += 1 else: self.frame_count_remove_idx = 0 def __call__(self): cap = cv2.VideoCapture(0) while cap.isOpened(): self.frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.center_point = (int(int(self.frame_width) / 2), int(int(self.frame_height) / 2)) self.red_zone_width = (self.center_point[0] - 250, self.center_point[0] + 250) self.red_zone_height = (self.center_point[1] - 50, self.frame_height) self.face_zone_width = (self.center_point[0] - 100, self.center_point[0] + 100) self.face_zone_height = (self.center_point[1] - 150, self.center_point[1] + 50) self.face_zone_center_point = (int((self.face_zone_width[1] - self.face_zone_width[0]) / 2) + self.face_zone_width[0], int((self.face_zone_height[1] - self.face_zone_height[0]) / 2) + self.face_zone_height[0]) ret, frame = cap.read() if not ret: continue frame_to_handle = frame.copy() self.frame_to_show = frame.copy() # self.person_process(frame_to_handle) try: self.person_process(frame_to_handle) except Exception as e: print(e) if not self.send_data_unity["GenerateImageSuccess"]: self.send_data_unity["StreamingData"] = self.convertFrame(self.cropped_image(frame, self.face_zone_width[0], self.face_zone_height[0], self.face_zone_width[1], self.face_zone_height[1])) self.sock.SendData(self.send_data_unity) cv2.imshow("Output", self.frame_to_show) if cv2.waitKey(1) & 0xFF == ord("q"): break cap.release() cv2.destroyAllWindows() if __name__ == "__main__": K_MULTIPLIER = 1.2 face_model_path = "face_detect.pt" model_path = "yolov8n.pt" tracker_type = "deepocsort" reid_weights = Path('osnet_x0_25_msmt17.pt') run_main_program = MainProgram(face_model_path, model_path, reid_weights, tracker_type) run_main_program()