# -*- coding: utf-8 -*- # -.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.# #* File Name : anpr_yolo_v8.py #* Purpose : #* Creation Date : 21-04-2025 #* Last Modified : Thu 01 May 2025 05:48:26 PM UTC #* Created By : Yaay Nands #_._._._._._._._._._._._._._._._._._._._._.# import glob import logging import os import pprint as pp import signal import time import traceback # Async imports import asyncio import aiosqlite from datetime import datetime from concurrent.futures import ThreadPoolExecutor import functools import aiofiles from aiohttp import ClientSession, FormData, ConnectionTimeoutError, BasicAuth from lib import anpr as anprm from lib.consts import ( DB_RECORDINGS, VIDEO_OUT_FULL, VIDEO_OUT_ANAL, MODEL_NAME, RTSP_URL, CONTOUR_MOTION_THRESHOLD, MAX_CLIP_TIME, MIN_TIME, CLASS_LABELS, CONFIG_JSON, AUTH_USER, AUTH_PASSWORD, STATIONARY_THRESHOLD, FAST_LEARNING_RATE, GLOBAL_LEARNING_RATE, DIFF_THRESHOLD, MOTION_AREA_THRESHOLD ) BASE_PATH = '.' LOCATION_ID = 1 HAS_CONFIG = False if os.path.exists(CONFIG_JSON): with open(CONFIG_JSON, "r") as f: config = json.load(f) HAS_CONFIG = True else: config = {} SQL = { 'INSERT_NUMBERS': 'INSERT INTO numberplates (model, filename, ts, ) VALUES (? , ?, ?)', 'UPDATE_NUMBERS': 'UPDATE numberplates set anal_complete=?, numberplates=? \ WHERE model=? AND filename=? AND ts=?', 'SELECT_UNPROCESSED': 'SELECT model, filename, ts FROM numberplates WHERE anal_complete=0;', } MODEL_NAME="YOLOv8" daily_log_ts = time.strftime("%Y-%m-%d-%H:%M") logging.basicConfig( filename= f"logs/main-{daily_log_ts}.log", level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s' ) logger = logging.getLogger("anpr_async_loop") async def anpr_thread_loop(): async with aiosqlite.connect(DB_RECORDINGS) as db: logger.debug(db.__str__()) db.row_factory = aiosqlite.Row signal_received = asyncio.Event() loop = asyncio.get_running_loop() for signame in {'SIGINT', 'SIGTERM'}: loop.add_signal_handler(getattr(signal, signame), lambda s=signame: signal_received.set()) tasks = [ #asyncio.create_task(motion_capture_and_record(db, signal_received)), asyncio.create_task(capture_number_plate(db, signal_received)), ] #if HAS_CONFIG and config['do_anal']: # tasks.append(asyncio.create_task(do_analysis(db, signal_received))) try: await asyncio.gather(*tasks) except asyncio.CancelledError: logger.error("Closing loop anpr_monitor_loop") #break except Exception as e: logger.critical(f"Error in anpr_monitor loop:") err_type = type(e).__name__ line_no = e.__traceback__.tb_lineno logger.critical(f"traceback:{traceback.format_tb(e.__traceback__)} ") logger.critical(f"Error {err_type} occurred on line: {line_no}") finally: logger.info("finally shutting down") for task in tasks: task.cancel() await db.commit() await db.close() async def motion_capture_and_record(db, signal_received): logger.debug("motion_capture_and_record task running") loop = asyncio.get_running_loop() # logger.debug(f"CONTOUR_MOTION_THRESHOLD: {CONTOUR_MOTION_THRESHOLD}") with ThreadPoolExecutor() as pool: previous_frame = None motion_detected = False output_file = None start_time = 0 recording = False recording_process = None # startrecord_stderr = '' # startrecord_stdout = '' loop_in_pool = functools.partial(loop.run_in_executor, pool) cap = await loop_in_pool(lambda: cv2.VideoCapture(RTSP_URL)) cap.set(cv2.CAP_PROP_BUFFERSIZE, 2) if not cap.isOpened(): logger.critical("Error: Could not open RTSP stream.") return ret, frame = cap.read() if not ret: logger.critical("Error: Unable to capture video.") cap.release() sys.exit(-1) # Convert the first frame to grayscale and blur to reduce noise gray_init = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray_init = cv2.GaussianBlur(gray_init, (21, 21), 0) # Initialize the background model as a floating point image for blending background = gray_init.astype("float") # Create an image to count how many consecutive frames a pixel is stationary stationary_counter = np.zeros_like(gray_init, dtype=np.uint8) while not signal_received.is_set(): try: try: ret, frame = await loop_in_pool(lambda: cap.read()) except Exception as e: logger.debug(f"Error capturing frame: {e}") continue if not ret or not is_valid_frame(frame): logger.debug("Failed to capture from IP camera") break if not cap.isOpened(): logger.debug("RTSP stream disconnected. Reconnecting") cap.release() cap = await loop_in_pool(lambda: cv2.VideoCapture(RTSP_URL)) cap.set(cv2.CAP_PROP_BUFFERSIZE, 2) # Preprocess: convert to grayscale and apply Gaussian blur gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray_blurred = cv2.GaussianBlur(gray, (21, 21), 0) # Compute absolute difference between the current background and the new frame diff = cv2.absdiff(cv2.convertScaleAbs(background), gray_blurred) _, motion_mask = cv2.threshold(diff, DIFF_THRESHOLD, 255, cv2.THRESH_BINARY) # Invert motion mask to get stationary areas (low difference) stationary_mask = cv2.bitwise_not(motion_mask) # Update the stationary counter: add 1 where stationary, reset where motion is detected stationary_counter = cv2.add(stationary_counter, (stationary_mask // 255)) stationary_counter[motion_mask == 255] = 0 # Determine where pixels have been stationary long enough update_mask = (stationary_counter >= STATIONARY_THRESHOLD).astype(np.float32) # Create an effective learning rate for each pixel: # Pixels meeting the stationary criteria use fast_learning_rate, # otherwise, they use the global (slow) learning rate. effective_learning_rate = GLOBAL_LEARNING_RATE + update_mask * (FAST_LEARNING_RATE - GLOBAL_LEARNING_RATE) # Update the background model for all pixels using the effective learning rate background = (1 - effective_learning_rate) * background + effective_learning_rate * gray_blurred motion_detected = False # Motion detection: if enough pixels are marked as moving, declare motion motion_pixels = cv2.countNonZero(motion_mask) if motion_pixels > MOTION_AREA_THRESHOLD: motion_detected = True delta = time.time() - start_time if motion_detected and not recording: start_time = time.time() logger.debug(f"Motion detected! Starting recording... {start_time}") timestamp = time.strftime("%Y%m%d-%H%M%S") output_file = f"motion_{timestamp}.mp4" recording_process = await ffmpeg_start_recording(os.path.join(VIDEO_OUT_FULL, output_file)) recording = True # If no motion is detected and we're recording, stop recording elif ((not motion_detected and recording and delta > MIN_TIME) or (motion_detected and recording and delta > MAX_CLIP_TIME)): logger.debug("No motion detected or MAX_CLIP_TIME exceeded. Stopping recording...") if recording_process: await ffmpeg_stop_recording(recording_process) await asyncio.sleep(0.1) # logger.debug(f"startrecord_stderr: {startrecord_stderr}") # logger.debug(f"startrecord_stdout: {startrecord_stdout}") # if startrecord_stderr: # with open(f"logs/ffmpeg-{output_file}-err.log") as fe: # fe.write(str(startrecord_stderr)) # if startrecord_stdout: # with open(f"logs/ffmpeg-{output_file}-out.log") as fo: # fo.write(str(startrecord_stdout)) cursor = await db.execute(SQL["INSERT_NUMBERS"], (MODEL_NAME, output_file, timestamp)) await cursor.fetchone() logger.debug(f"inserted recording with id: {cursor.lastrowid}") recording_process = None recording = False except asyncio.CancelledError: logger.debug("Closing loop motion_capture_and_record") await db.commit() break except Exception as e: logger.critical(f"Error in produce_anal_file:") err_type = type(e).__name__ line_no = e.__traceback__.tb_lineno logger.critical(f"Error {err_type} occurred on line: {line_no}") async def capture_number_plate(db, signal_received): #loop = asyncio.get_running_loop() logger.debug("capture_number_plate task running") with ThreadPoolExecutor() as pool: while not signal_received.is_set(): try: cursor = await db.execute(SQL["SELECT_UNPROCESSED"]) res = await cursor.fetchall() logger.debug("found unprocessed records: %i ", len(res)) for record in res: #logger.debug("processing record: %s"%json.dumps(record)) plates = anprm.infer(filename) cursor = await db.execute(SQL["UPDATE_NUMBERS"], (True, 'NOMARS', record['model'], record['filename'], record['ts'])) res = await cursor.fetchone() except asyncio.CancelledError: logger.debug("Closing loop capture_number_plate") #break except Exception as e: logger.critical(f"traceback:{traceback.format_tb(e.__traceback__)} ") logger.critical(f"Error in capture_number_plate logic:") err_type = type(e).__name__ line_no = e.__traceback__.tb_lineno logger.critical(f"Error {err_type} occurred on line: {line_no}") if __name__ == '__main__': try: asyncio.run(anpr_thread_loop(), debug=True) except RuntimeError as e: logger.debug(f"Error in asyncio.run(main()) loop: {str(e)}") print("Error in asyncio.run(main()) loop: ", str(e))