smart-interactive-display/Assets/StreamingAssets/MergeFace/demo.py

290 lines
10 KiB
Python

import cv2
import numpy as np
import Face_Swap
from lib import *
class MainProgram:
def __init__(self, face_model_path, model_path, reid_weights, tracker_type="deepocsort"):
self.face_model_path = face_model_path
self.model_path = model_path
self.face_model = YOLO(face_model_path)
self.person_model = YOLO(model_path)
self.reid_weights = reid_weights
self.tracker_conf = get_tracker_config(tracker_type)
self.sock = U.UdpComms(udpIP="192.168.1.122", portTX=8000, portRX=8001, enableRX=True, suppressWarnings=True)
self.tracker = create_tracker(
tracker_type=tracker_type,
tracker_config=self.tracker_conf,
reid_weights=reid_weights,
device='0',
half=False,
per_class=False
)
self.face_detect = FaceAnalysis(name='buffalo_l')
self.face_detect.prepare(ctx_id=0, det_size=(640, 640))
self.swapper = insightface.model_zoo.get_model("inswapper_128.onnx")
self.send_data_unity: dict = {
"PassBy": False,
"Engage": False,
"Ready": False,
"Gender": None,
"AgeMin": None,
"AgeMax": None,
"GenerateImageSuccess": False,
"Description": "",
"FileName": ""
}
self.focus_id = None
self.frame_count_remove_idx = 0
sa = gspread.service_account("key.json")
sh = sa.open("TestData")
wks = sh.worksheet("Sheet2")
self.all_record = wks.get_all_records()
self.client = NovitaClient("48cc2b16-286f-49c8-9581-f409b68359c4")
self.client_minio = Minio("192.168.1.186:50047",
access_key="play4promo_user",
secret_key="12345678",
secure=False
)
self.bucket_name = "play4promo"
self.des_file = ""
self.source_file = "sid/source_file.jpg"
self.ready_success = False
self.show_success = False
self.check_save, self.check_generate = False, False
self.forward_face = FaceDetection()
# self.face_swap = Face_Swap.FaceSwap()
self.gender_model = Gender_Prediction.GenderPrediction()
self.focus_face_bbox = []
def convertFrame(self, frame) -> str:
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 90]
frame = imutils.resize(frame, width=400)
result, encoded_frame = cv2.imencode('.jpg', frame, encode_param)
jpg_as_text = base64.b64encode(encoded_frame.tobytes())
return jpg_as_text.decode('utf-8')
def get_image(self):
ran_num = random.randint(0, len(self.all_record) - 1)
image_url = self.all_record[ran_num]["Image link"]
des = self.all_record[ran_num]["Note"]
return image_url, des
def face_swap(self, image, source_face, paste_back=True):
"""
Args:
image : original content image
target: face in the original image [{....}] need embedding
source_face: face of the user [{....}] need embedding
Return:
image with swapped face
"""
target = self.face_detect.get(image)[0]
source_face = self.face_detect.get(source_face)[0]
res = image.copy()
res = self.swapper.get(res, target, source_face, paste_back)
return res
def generate_local_image(self, source_face):
# image_url, des = self.get_image()
# Path to the folder containing images
folder_path = "./origin_image/"
# Get a list of all files in the folder
files = os.listdir(folder_path)
# Filter out only image files (assuming common image formats)
image_files = [f for f in files if f.endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
# Check if there are any image files in the folder
if image_files:
# Select a random image file
random_image_file = random.choice(image_files)
# Read the image using OpenCV
image_path = os.path.join(folder_path, random_image_file)
image = cv2.imread(image_path)
origin_image = image.copy()
image_path = "./image/merge_face.png"
result = self.face_swap(origin_image, source_face)
cv2.imwrite(image_path, result)
def predict_age_and_gender(self):
image_predict = cv2.imread("./image/output.jpg")
gender = self.gender_model.predict_gender(image_predict)
self.send_data_unity["Gender"] = gender
def get_face_bbox(self, frame):
outs = self.face_model(frame)
results = sv.Detections.from_ultralytics(outs[0])
bbox = results.xyxy.astype(np.int_)
conf = results.confidence.astype(np.float32)
return np.concatenate((bbox, conf[:, np.newaxis]), axis=1)
def cropped_image(self, frame, x1, y1, x2, y2):
return frame[y1: y2, x1: x2]
def check_ready(self, frame):
return self.forward_face.detect_face(frame, self.face_zone_center_point[0],
self.face_zone_center_point[1])
def person_process(self, frame):
received_data = self.sock.ReadReceivedData()
if received_data == "Begin":
self.ready_success = True
elif received_data == "End":
self.ready_success = False
self.check_save = False
self.check_generate = False
self.send_data_unity["Gender"] = None
self.send_data_unity["AgeMin"] = None
self.send_data_unity["AgeMax"] = None
self.send_data_unity["GenerateImageSuccess"] = False
self.send_data_unity["Description"] = ""
if self.ready_success and (not self.check_save):
# Perform person detection
self.send_data_unity["PassBy"], _, _ = self.check_ready(frame)
self.send_data_unity["Engage"] = self.send_data_unity["PassBy"]
if not self.ready_success:
self.send_data_unity["PassBy"], self.send_data_unity["Ready"], self.focus_face_bbox = self.check_ready(frame)
self.send_data_unity["Engage"] = self.send_data_unity["PassBy"]
elif not self.check_save and self.focus_face_bbox:
cv2.imwrite("./image/output.jpg", self.cropped_image(frame, self.focus_face_bbox[0],
self.focus_face_bbox[1],
self.focus_face_bbox[2],
self.focus_face_bbox[3]))
self.client_minio.fput_object(
self.bucket_name, self.source_file, "./image/output.jpg",
)
self.check_save = True
elif not self.check_generate:
if str(self.send_data_unity["Gender"]) == "None":
self.predict_age_and_gender()
else:
source_face = cv2.imread("./image/output.jpg")
self.generate_local_image(source_face)
self.check_generate = True
elif self.show_success:
self.check_save = False
self.check_generate = False
def __call__(self):
cap = cv2.VideoCapture(0)
while cap.isOpened():
self.frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.center_point = (int(int(self.frame_width) / 2), int(int(self.frame_height) / 2))
self.red_zone_width = (self.center_point[0] - 250, self.center_point[0] + 250)
self.red_zone_height = (self.center_point[1] - 50, self.frame_height)
self.face_zone_width = (self.center_point[0] - 120, self.center_point[0] + 120)
self.face_zone_height = (self.center_point[1] - 120, self.center_point[1] + 120)
self.face_zone_center_point = (
int((self.face_zone_width[1] - self.face_zone_width[0]) / 2) + self.face_zone_width[0],
int((self.face_zone_height[1] - self.face_zone_height[0]) / 2) + self.face_zone_height[0])
ret, frame = cap.read()
frame = cv2.flip(frame, 1)
if not ret:
continue
frame_to_handle = frame.copy()
self.frame_to_show = frame.copy()
# self.person_process(frame_to_handle)
try:
self.person_process(frame_to_handle)
except Exception as e:
print(e)
if not self.send_data_unity["GenerateImageSuccess"]:
self.send_data_unity["StreamingData"] = self.convertFrame(self.cropped_image(frame,
self.face_zone_width[0],
self.face_zone_height[0],
self.face_zone_width[1],
self.face_zone_height[1]))
self.sock.SendData(self.send_data_unity)
cv2.rectangle(self.frame_to_show, (self.face_zone_width[0], self.face_zone_height[0]),
(self.face_zone_width[1], self.face_zone_height[1]),
(0, 255, 255), 2)
cv2.circle(self.frame_to_show, self.face_zone_center_point, 5, (255, 255, 0), -1)
cv2.imshow("Output", self.frame_to_show)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
print("Starting python...")
face_model_path = "face_detect.pt"
model_path = "yolov8n.pt"
tracker_type = "deepocsort"
reid_weights = Path('osnet_x0_25_msmt17.pt')
run_main_program = MainProgram(face_model_path, model_path, reid_weights, tracker_type)
run_main_program()