anpr/anpr_main.py

267 lines
12 KiB
Python

# -*- coding: utf-8 -*-
# -.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.#
#* File Name : anpr_yolo_v8.py
#* Purpose :
#* Creation Date : 21-04-2025
#* Last Modified : Thu 01 May 2025 11:45:10 PM CEST
#* Created By : Yaay Nands
#_._._._._._._._._._._._._._._._._._._._._.#
import glob
import logging
import os
import pprint as pp
import signal
import time
import traceback
# Async imports
import asyncio
import aiosqlite
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import functools
import aiofiles
from aiohttp import ClientSession, FormData, ConnectionTimeoutError, BasicAuth
from lib import anpr as anprm
from lib.consts import (
DB_RECORDINGS, VIDEO_OUT_FULL, VIDEO_OUT_ANAL, MODEL_NAME, RTSP_URL,
CONTOUR_MOTION_THRESHOLD, MAX_CLIP_TIME, MIN_TIME, CLASS_LABELS, CONFIG_JSON, AUTH_USER, AUTH_PASSWORD,
STATIONARY_THRESHOLD, FAST_LEARNING_RATE, GLOBAL_LEARNING_RATE, DIFF_THRESHOLD, MOTION_AREA_THRESHOLD
)
BASE_PATH = '.'
LOCATION_ID = 1
HAS_CONFIG = False
if os.path.exists(CONFIG_JSON):
with open(CONFIG_JSON, "r") as f:
config = json.load(f)
HAS_CONFIG = True
else:
config = {}
SQL = {
'INSERT_NUMBERS':
'INSERT INTO numberplates (model, filename, ts, ) VALUES (? , ?, ?)',
'UPDATE_NUMBERS':
'''UPDATE numberplates set anal_complete=?, numberplates=?, np_extract_status ="success"
WHERE model=? AND filename=? AND ts=?''',
'SELECT_UNPROCESSED':
'SELECT model, filename, ts FROM numberplates WHERE anal_complete=0 and np_extract_status="pending";',
'UPDATE_FAILS':
'UPDATE numberplates set np_extract_status = ? WHERE model=? AND filename=? AND ts=?'
}
MODEL_NAME="YOLOv8"
daily_log_ts = time.strftime("%Y-%m-%d-%H:%M")
logging.basicConfig(
filename= f"logs/main-{daily_log_ts}.log",
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
)
logger = logging.getLogger("anpr_async_loop")
async def anpr_thread_loop():
async with aiosqlite.connect(DB_RECORDINGS) as db:
logger.debug(db.__str__())
db.row_factory = aiosqlite.Row
signal_received = asyncio.Event()
loop = asyncio.get_running_loop()
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(getattr(signal, signame), lambda s=signame: signal_received.set())
tasks = [
#asyncio.create_task(motion_capture_and_record(db, signal_received)),
asyncio.create_task(capture_number_plate(db, signal_received)),
]
#if HAS_CONFIG and config['do_anal']:
# tasks.append(asyncio.create_task(do_analysis(db, signal_received)))
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
logger.error("Closing loop anpr_monitor_loop")
#break
except Exception as e:
logger.critical(f"Error in anpr_monitor loop:")
err_type = type(e).__name__
line_no = e.__traceback__.tb_lineno
logger.critical(f"traceback:{traceback.format_tb(e.__traceback__)} ")
logger.critical(f"Error {err_type} occurred on line: {line_no}")
finally:
logger.info("finally shutting down")
for task in tasks:
task.cancel()
await db.commit()
await db.close()
async def motion_capture_and_record(db, signal_received):
logger.debug("motion_capture_and_record task running")
loop = asyncio.get_running_loop()
# logger.debug(f"CONTOUR_MOTION_THRESHOLD: {CONTOUR_MOTION_THRESHOLD}")
with ThreadPoolExecutor() as pool:
previous_frame = None
motion_detected = False
output_file = None
start_time = 0
recording = False
recording_process = None
# startrecord_stderr = ''
# startrecord_stdout = ''
loop_in_pool = functools.partial(loop.run_in_executor, pool)
cap = await loop_in_pool(lambda: cv2.VideoCapture(RTSP_URL))
cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)
if not cap.isOpened():
logger.critical("Error: Could not open RTSP stream.")
return
ret, frame = cap.read()
if not ret:
logger.critical("Error: Unable to capture video.")
cap.release()
sys.exit(-1)
# Convert the first frame to grayscale and blur to reduce noise
gray_init = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray_init = cv2.GaussianBlur(gray_init, (21, 21), 0)
# Initialize the background model as a floating point image for blending
background = gray_init.astype("float")
# Create an image to count how many consecutive frames a pixel is stationary
stationary_counter = np.zeros_like(gray_init, dtype=np.uint8)
while not signal_received.is_set():
try:
try:
ret, frame = await loop_in_pool(lambda: cap.read())
except Exception as e:
logger.debug(f"Error capturing frame: {e}")
continue
if not ret or not is_valid_frame(frame):
logger.debug("Failed to capture from IP camera")
break
if not cap.isOpened():
logger.debug("RTSP stream disconnected. Reconnecting")
cap.release()
cap = await loop_in_pool(lambda: cv2.VideoCapture(RTSP_URL))
cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)
# Preprocess: convert to grayscale and apply Gaussian blur
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray_blurred = cv2.GaussianBlur(gray, (21, 21), 0)
# Compute absolute difference between the current background and the new frame
diff = cv2.absdiff(cv2.convertScaleAbs(background), gray_blurred)
_, motion_mask = cv2.threshold(diff, DIFF_THRESHOLD, 255, cv2.THRESH_BINARY)
# Invert motion mask to get stationary areas (low difference)
stationary_mask = cv2.bitwise_not(motion_mask)
# Update the stationary counter: add 1 where stationary, reset where motion is detected
stationary_counter = cv2.add(stationary_counter, (stationary_mask // 255))
stationary_counter[motion_mask == 255] = 0
# Determine where pixels have been stationary long enough
update_mask = (stationary_counter >= STATIONARY_THRESHOLD).astype(np.float32)
# Create an effective learning rate for each pixel:
# Pixels meeting the stationary criteria use fast_learning_rate,
# otherwise, they use the global (slow) learning rate.
effective_learning_rate = GLOBAL_LEARNING_RATE + update_mask * (FAST_LEARNING_RATE - GLOBAL_LEARNING_RATE)
# Update the background model for all pixels using the effective learning rate
background = (1 - effective_learning_rate) * background + effective_learning_rate * gray_blurred
motion_detected = False
# Motion detection: if enough pixels are marked as moving, declare motion
motion_pixels = cv2.countNonZero(motion_mask)
if motion_pixels > MOTION_AREA_THRESHOLD:
motion_detected = True
delta = time.time() - start_time
if motion_detected and not recording:
start_time = time.time()
logger.debug(f"Motion detected! Starting recording... {start_time}")
timestamp = time.strftime("%Y%m%d-%H%M%S")
output_file = f"motion_{timestamp}.mp4"
recording_process = await ffmpeg_start_recording(os.path.join(VIDEO_OUT_FULL, output_file))
recording = True
# If no motion is detected and we're recording, stop recording
elif ((not motion_detected and recording and delta > MIN_TIME) or
(motion_detected and recording and delta > MAX_CLIP_TIME)):
logger.debug("No motion detected or MAX_CLIP_TIME exceeded. Stopping recording...")
if recording_process:
await ffmpeg_stop_recording(recording_process)
await asyncio.sleep(0.1)
# logger.debug(f"startrecord_stderr: {startrecord_stderr}")
# logger.debug(f"startrecord_stdout: {startrecord_stdout}")
# if startrecord_stderr:
# with open(f"logs/ffmpeg-{output_file}-err.log") as fe:
# fe.write(str(startrecord_stderr))
# if startrecord_stdout:
# with open(f"logs/ffmpeg-{output_file}-out.log") as fo:
# fo.write(str(startrecord_stdout))
cursor = await db.execute(SQL["INSERT_NUMBERS"], (MODEL_NAME, output_file, timestamp))
await cursor.fetchone()
logger.debug(f"inserted recording with id: {cursor.lastrowid}")
recording_process = None
recording = False
except asyncio.CancelledError:
logger.debug("Closing loop motion_capture_and_record")
await db.commit()
break
except Exception as e:
logger.critical(f"Error in produce_anal_file:")
err_type = type(e).__name__
line_no = e.__traceback__.tb_lineno
logger.critical(f"Error {err_type} occurred on line: {line_no}")
async def capture_number_plate(db, signal_received):
#loop = asyncio.get_running_loop()
logger.debug("capture_number_plate task running")
with ThreadPoolExecutor() as pool:
while not signal_received.is_set():
try:
cursor = await db.execute(SQL["SELECT_UNPROCESSED"])
res = await cursor.fetchall()
logger.debug("found unprocessed records: %i ", len(res))
curr_record = dict()
for record in res:
curr_record = record
logger.debug("processing video file %s"%record['filename'])
plates = anprm.infer(record['filename'])
cursor = await db.execute(SQL["UPDATE_NUMBERS"],
(True, json.dumps(plates), record['model'],
record['filename'], record['ts'])
)
res = await cursor.fetchone()
except asyncio.CancelledError:
logger.debug("Closing loop capture_number_plate")
#break
except Exception as e:
cursor = await db.execute(SQL["UPDATE_FAILS"], ('failed', curr_record['model'],
curr_record['filename'],
curr_record['ts']))
logger.critical(f"traceback:{traceback.format_tb(e.__traceback__)} ")
logger.critical(f"Error in capture_number_plate logic:")
err_type = type(e).__name__
line_no = e.__traceback__.tb_lineno
logger.critical(f"Error {err_type} occurred on line: {line_no}")
if __name__ == '__main__':
try:
asyncio.run(anpr_thread_loop(), debug=True)
except RuntimeError as e:
logger.debug(f"Error in asyncio.run(main()) loop: {str(e)}")
print("Error in asyncio.run(main()) loop: ", str(e))