2024-06-21 01:20:01 -07:00
|
|
|
import cv2
|
|
|
|
import numpy as np
|
|
|
|
|
2024-06-24 01:57:54 -07:00
|
|
|
from lib import *
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
|
|
|
|
class MainProgram:
|
2024-06-24 20:41:35 -07:00
|
|
|
def __init__(self, face_model_path, model_path, reid_weights, tracker_type="ocsort"):
|
2024-06-21 01:20:01 -07:00
|
|
|
self.face_model_path = face_model_path
|
|
|
|
self.model_path = model_path
|
|
|
|
|
|
|
|
self.face_model = YOLO(face_model_path)
|
|
|
|
self.person_model = YOLO(model_path)
|
|
|
|
|
|
|
|
self.reid_weights = reid_weights
|
|
|
|
|
|
|
|
self.tracker_conf = get_tracker_config(tracker_type)
|
|
|
|
|
|
|
|
self.sock = U.UdpComms(udpIP="192.168.1.122", portTX=8000, portRX=8001, enableRX=True, suppressWarnings=True)
|
|
|
|
|
|
|
|
self.tracker = create_tracker(
|
|
|
|
tracker_type=tracker_type,
|
|
|
|
tracker_config=self.tracker_conf,
|
|
|
|
reid_weights=reid_weights,
|
|
|
|
device='0',
|
|
|
|
half=False,
|
|
|
|
per_class=False
|
|
|
|
)
|
|
|
|
|
|
|
|
self.send_data_unity: dict = {
|
|
|
|
"PassBy": False,
|
|
|
|
"Engage": False,
|
|
|
|
"Ready": False,
|
|
|
|
"Gender": None,
|
|
|
|
"AgeMin": None,
|
|
|
|
"AgeMax": None,
|
|
|
|
"GenerateImageSuccess": False,
|
|
|
|
"Description": ""
|
|
|
|
}
|
|
|
|
|
|
|
|
self.focus_id = None
|
|
|
|
|
|
|
|
self.frame_count_remove_idx = 0
|
|
|
|
|
|
|
|
sa = gspread.service_account("key.json")
|
|
|
|
sh = sa.open("TestData")
|
|
|
|
|
|
|
|
wks = sh.worksheet("Sheet1")
|
|
|
|
|
|
|
|
self.all_record = wks.get_all_records()
|
|
|
|
|
2024-06-24 20:41:35 -07:00
|
|
|
self.client = NovitaClient("48cc2b16-286f-49c8-9581-f409b68359c4")
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
self.ready_success = False
|
|
|
|
|
|
|
|
self.show_success = False
|
|
|
|
|
|
|
|
self.check_save, self.check_generate = False, False
|
|
|
|
|
2024-06-24 01:57:54 -07:00
|
|
|
self.forward_face = Face_detection.FaceDetection()
|
|
|
|
|
2024-06-24 20:41:35 -07:00
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
|
self.detector = MTCNN(keep_all=True, device=device)
|
|
|
|
|
|
|
|
self.count_frame = 0
|
|
|
|
|
2024-06-21 01:20:01 -07:00
|
|
|
def convertFrame(self, frame) -> str:
|
|
|
|
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 90]
|
2024-06-24 20:41:35 -07:00
|
|
|
frame = imutils.resize(frame, width=480)
|
2024-06-21 01:20:01 -07:00
|
|
|
result, encoded_frame = cv2.imencode('.jpg', frame, encode_param)
|
|
|
|
jpg_as_text = base64.b64encode(encoded_frame.tobytes())
|
|
|
|
|
|
|
|
return jpg_as_text.decode('utf-8')
|
|
|
|
|
|
|
|
def get_image(self):
|
|
|
|
ran_num = random.randint(0, len(self.all_record) - 1)
|
|
|
|
|
|
|
|
image_url = self.all_record[ran_num]["Image link"]
|
|
|
|
des = self.all_record[ran_num]["Note"]
|
|
|
|
|
|
|
|
return image_url, des
|
|
|
|
|
|
|
|
def generate_image(self):
|
|
|
|
image_url, des = self.get_image()
|
|
|
|
|
|
|
|
res = self.client.merge_face(
|
|
|
|
image=image_url,
|
|
|
|
face_image="./image/output.jpg",
|
|
|
|
)
|
|
|
|
|
|
|
|
base64_to_image(res.image_file).save("./image/merge_face.png")
|
|
|
|
|
|
|
|
self.send_data_unity["Description"] = des
|
|
|
|
self.send_data_unity["GenerateImageSuccess"] = True
|
|
|
|
self.send_data_unity["StreamingData"] = "./Assets/StreamingAssets/MergeFace/image/merge_face.png"
|
|
|
|
|
|
|
|
def predict_age_and_gender(self):
|
|
|
|
image_predict = cv2.imread("./image/output.jpg")
|
|
|
|
if AgeGenderPrediction.Prediction(image_predict):
|
|
|
|
|
|
|
|
self.send_data_unity["Gender"] = AgeGenderPrediction.Prediction(image_predict)[0]
|
|
|
|
self.send_data_unity["AgeMin"] = int(
|
|
|
|
AgeGenderPrediction.Prediction(image_predict)[1].split("-")[0])
|
|
|
|
self.send_data_unity["AgeMax"] = int(
|
|
|
|
AgeGenderPrediction.Prediction(image_predict)[1].split("-")[1])
|
|
|
|
|
|
|
|
else:
|
|
|
|
self.send_data_unity["Gender"] = None
|
|
|
|
self.send_data_unity["AgeMin"] = None
|
|
|
|
self.send_data_unity["AgeMax"] = None
|
|
|
|
|
2024-06-24 01:57:54 -07:00
|
|
|
def check_engage(self, x1, x2) -> bool:
|
2024-06-21 01:20:01 -07:00
|
|
|
if not (x1 > self.red_zone_width[1] or x2 < self.red_zone_width[0]):
|
|
|
|
return True
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
def cropped_image(self, frame, x1, y1, x2, y2):
|
|
|
|
return frame[y1: y2, x1: x2]
|
|
|
|
|
2024-06-24 20:41:35 -07:00
|
|
|
def get_face(self, frame):
|
|
|
|
boxes, probs, landmarks = self.detector.detect(frame, landmarks=True)
|
|
|
|
|
|
|
|
lm_list = []
|
|
|
|
|
|
|
|
for landmark in landmarks:
|
|
|
|
x1, y1, x2, y2 = int(landmark[0][0]), int(landmark[0][1]), int(landmark[1][0]), int(landmark[2][1])
|
|
|
|
lm_list.append([x1, y1, x2, y2])
|
|
|
|
|
|
|
|
if boxes is not None:
|
|
|
|
bboxes = boxes.astype(np.int_)
|
|
|
|
confs = probs.astype(np.float32).reshape(-1, 1)
|
|
|
|
|
|
|
|
# Create an array of zeros with the same length as bboxes
|
|
|
|
zeros = np.zeros((bboxes.shape[0], 1), dtype=np.float32)
|
|
|
|
|
|
|
|
# Concatenate bboxes, confs, and zeros
|
|
|
|
combined = np.hstack((lm_list, confs, zeros))
|
|
|
|
return combined, landmarks, bboxes
|
|
|
|
else:
|
|
|
|
return np.array([])
|
|
|
|
|
|
|
|
def check_ready(self, nose, left_eye, right_eye):
|
2024-06-21 01:20:01 -07:00
|
|
|
|
2024-06-24 20:41:35 -07:00
|
|
|
distance_left = self.forward_face.calculate_distance(nose, left_eye)
|
|
|
|
distance_right = self.forward_face.calculate_distance(nose, right_eye)
|
|
|
|
|
|
|
|
distance_to_point = self.forward_face.calculate_dis_to_cp(self.face_zone_center_point[0],
|
|
|
|
self.face_zone_center_point[1],
|
|
|
|
nose[0], nose[1])
|
|
|
|
|
|
|
|
cv2.circle(self.frame_to_show, (int(nose[0]), int(nose[1])), 5, (0, 255, 255), -1)
|
|
|
|
cv2.circle(self.frame_to_show, (int(self.face_zone_center_point[0]), int(self.face_zone_center_point[1])), 5, (0, 255, 255), -1)
|
|
|
|
|
|
|
|
# Check if distances exceed threshold
|
|
|
|
if (distance_left > K_MULTIPLIER * distance_right or distance_right > K_MULTIPLIER * distance_left or
|
|
|
|
distance_to_point > 30):
|
|
|
|
if self.count_frame > 200:
|
|
|
|
self.count_frame = 0
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
self.count_frame += 1
|
|
|
|
|
|
|
|
return True
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
def person_process(self, frame):
|
|
|
|
# Perform person detection
|
2024-06-24 20:41:35 -07:00
|
|
|
face_detections, landmarks, bboxes = self.get_face(frame)
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
# Update the tracker with person detections
|
2024-06-24 20:41:35 -07:00
|
|
|
tracked_objects = self.tracker.update(face_detections, frame)
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
track_list = []
|
|
|
|
|
2024-06-24 20:41:35 -07:00
|
|
|
face_info = []
|
|
|
|
|
|
|
|
nose_pose, left_eye_pose, right_eye_pos = (0, 0), (0, 0), (0, 0)
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
engage = False
|
|
|
|
|
|
|
|
for track in tracked_objects.astype(int):
|
|
|
|
x1, y1, x2, y2, track_id, conf, cls, _ = track
|
|
|
|
track_list.append(track_id)
|
|
|
|
|
2024-06-24 20:41:35 -07:00
|
|
|
for idx in range(len(landmarks) - 1):
|
|
|
|
x1_lm, y1_lm = int(landmarks[idx][0][0]), int(landmarks[idx][0][1])
|
|
|
|
if x1_lm == x1 and y1_lm == y1:
|
|
|
|
nose_pose, left_eye_pose, right_eye_pos = ((landmarks[idx][2][0], landmarks[idx][2][1]),
|
|
|
|
(landmarks[idx][0][0], landmarks[idx][0][1]),
|
|
|
|
(landmarks[idx][1][0], landmarks[idx][1][1]))
|
|
|
|
|
|
|
|
face_info.append((nose_pose, left_eye_pose, right_eye_pos, track_id, bboxes[idx]))
|
|
|
|
|
2024-06-21 01:20:01 -07:00
|
|
|
if not engage:
|
|
|
|
engage = self.check_engage(x1, x2)
|
|
|
|
|
2024-06-24 20:41:35 -07:00
|
|
|
print(self.focus_id)
|
|
|
|
|
2024-06-21 01:20:01 -07:00
|
|
|
if not self.focus_id:
|
2024-06-24 20:41:35 -07:00
|
|
|
self.focus_id = track_id \
|
|
|
|
if self.check_ready(nose_pose, left_eye_pose, right_eye_pos) else None
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
elif track_id != self.focus_id:
|
|
|
|
continue
|
|
|
|
|
|
|
|
else:
|
|
|
|
received_data = self.sock.ReadReceivedData()
|
|
|
|
|
|
|
|
if received_data == "Begin":
|
|
|
|
self.ready_success = True
|
|
|
|
|
2024-06-24 01:57:54 -07:00
|
|
|
elif received_data == "End":
|
|
|
|
self.ready_success = False
|
|
|
|
self.check_save = False
|
|
|
|
self.check_generate = False
|
|
|
|
os.remove("./image/output.jpg")
|
|
|
|
os.remove("./image/merge_face.png")
|
|
|
|
self.send_data_unity: dict = {
|
|
|
|
"PassBy": False,
|
|
|
|
"Engage": False,
|
|
|
|
"Ready": False,
|
|
|
|
"Gender": None,
|
|
|
|
"AgeMin": None,
|
|
|
|
"AgeMax": None,
|
|
|
|
"GenerateImageSuccess": False,
|
|
|
|
"Description": ""
|
|
|
|
}
|
|
|
|
|
2024-06-21 01:20:01 -07:00
|
|
|
if not self.ready_success:
|
2024-06-24 20:41:35 -07:00
|
|
|
self.send_data_unity["Ready"] = True if self.check_ready(nose_pose, left_eye_pose,
|
|
|
|
right_eye_pos) else False
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
elif not self.check_save:
|
2024-06-24 20:41:35 -07:00
|
|
|
for idx in range(len(face_info) - 1):
|
|
|
|
if face_info[idx][3] == self.focus_id:
|
|
|
|
x1_face, y1_face, x2_face, y2_face = (int(face_info[idx][4][0]), int(face_info[idx][4][1]),
|
|
|
|
int(face_info[idx][4][2]), int(face_info[idx][4][3]))
|
|
|
|
cv2.imwrite("./image/output.jpg",
|
|
|
|
self.cropped_image(frame, x1_face, y1_face, x2_face, y2_face))
|
|
|
|
self.check_save = True
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
elif not self.check_generate:
|
|
|
|
if str(self.send_data_unity["Gender"]) == "None":
|
|
|
|
self.predict_age_and_gender()
|
|
|
|
|
|
|
|
else:
|
|
|
|
self.generate_image()
|
|
|
|
self.check_generate = True
|
|
|
|
|
|
|
|
elif self.show_success:
|
|
|
|
self.check_save = False
|
|
|
|
self.check_generate = False
|
|
|
|
|
|
|
|
if track_list:
|
|
|
|
self.send_data_unity["PassBy"] = True
|
|
|
|
self.send_data_unity["Engage"] = engage
|
|
|
|
|
|
|
|
else:
|
|
|
|
self.send_data_unity["Engage"] = False
|
|
|
|
self.send_data_unity["PassBy"] = False
|
|
|
|
self.send_data_unity["Ready"] = False
|
|
|
|
|
|
|
|
if self.focus_id not in track_list:
|
|
|
|
if self.frame_count_remove_idx == 20:
|
|
|
|
self.frame_count_remove_idx = 0
|
|
|
|
self.focus_id = None
|
|
|
|
|
|
|
|
else:
|
|
|
|
self.frame_count_remove_idx += 1
|
|
|
|
|
|
|
|
else:
|
|
|
|
self.frame_count_remove_idx = 0
|
|
|
|
|
|
|
|
def __call__(self):
|
|
|
|
cap = cv2.VideoCapture(0)
|
|
|
|
|
|
|
|
while cap.isOpened():
|
|
|
|
|
|
|
|
self.frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
|
|
self.frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
|
|
|
|
self.center_point = (int(int(self.frame_width) / 2), int(int(self.frame_height) / 2))
|
|
|
|
|
|
|
|
self.red_zone_width = (self.center_point[0] - 250, self.center_point[0] + 250)
|
|
|
|
self.red_zone_height = (self.center_point[1] - 50, self.frame_height)
|
|
|
|
|
2024-06-24 20:41:35 -07:00
|
|
|
self.face_zone_width = (self.center_point[0] - 100, self.center_point[0] + 100)
|
|
|
|
self.face_zone_height = (self.center_point[1] - 150, self.center_point[1] + 50)
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
self.face_zone_center_point = (int((self.face_zone_width[1] - self.face_zone_width[0]) / 2) + self.face_zone_width[0],
|
|
|
|
int((self.face_zone_height[1] - self.face_zone_height[0]) / 2) + self.face_zone_height[0])
|
|
|
|
|
|
|
|
ret, frame = cap.read()
|
|
|
|
|
|
|
|
if not ret:
|
|
|
|
continue
|
|
|
|
|
|
|
|
frame_to_handle = frame.copy()
|
|
|
|
self.frame_to_show = frame.copy()
|
|
|
|
|
2024-06-24 20:41:35 -07:00
|
|
|
# self.person_process(frame_to_handle)
|
|
|
|
|
2024-06-24 01:57:54 -07:00
|
|
|
try:
|
|
|
|
self.person_process(frame_to_handle)
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
print(e)
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
if not self.send_data_unity["GenerateImageSuccess"]:
|
|
|
|
|
|
|
|
self.send_data_unity["StreamingData"] = self.convertFrame(self.cropped_image(frame,
|
|
|
|
self.face_zone_width[0],
|
|
|
|
self.face_zone_height[0],
|
|
|
|
self.face_zone_width[1],
|
|
|
|
self.face_zone_height[1]))
|
|
|
|
|
|
|
|
self.sock.SendData(self.send_data_unity)
|
|
|
|
|
2024-06-24 20:41:35 -07:00
|
|
|
cv2.imshow("Output", self.frame_to_show)
|
|
|
|
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord("q"):
|
|
|
|
break
|
|
|
|
|
|
|
|
cap.release()
|
|
|
|
cv2.destroyAllWindows()
|
|
|
|
|
2024-06-21 01:20:01 -07:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2024-06-24 20:41:35 -07:00
|
|
|
K_MULTIPLIER = 1.2
|
2024-06-21 01:20:01 -07:00
|
|
|
face_model_path = "face_detect.pt"
|
|
|
|
model_path = "yolov8n.pt"
|
|
|
|
|
|
|
|
tracker_type = "deepocsort"
|
|
|
|
reid_weights = Path('osnet_x0_25_msmt17.pt')
|
|
|
|
|
|
|
|
run_main_program = MainProgram(face_model_path, model_path, reid_weights, tracker_type)
|
|
|
|
|
|
|
|
run_main_program()
|