259 lines
11 KiB
Python
259 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
# -.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.-.#
|
|
#* File Name : anpr_yolo_v8.py
|
|
#* Purpose :
|
|
#* Creation Date : 21-04-2025
|
|
#* Last Modified : Thu 01 May 2025 08:27:26 PM CEST
|
|
#* Created By : Yaay Nands
|
|
#_._._._._._._._._._._._._._._._._._._._._.#
|
|
import glob
|
|
import logging
|
|
import os
|
|
import pprint as pp
|
|
import signal
|
|
import time
|
|
import traceback
|
|
# Async imports
|
|
import asyncio
|
|
import aiosqlite
|
|
from datetime import datetime
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import functools
|
|
import aiofiles
|
|
from aiohttp import ClientSession, FormData, ConnectionTimeoutError, BasicAuth
|
|
from lib import anpr as anprm
|
|
from lib.consts import (
|
|
DB_RECORDINGS, VIDEO_OUT_FULL, VIDEO_OUT_ANAL, MODEL_NAME, RTSP_URL,
|
|
CONTOUR_MOTION_THRESHOLD, MAX_CLIP_TIME, MIN_TIME, CLASS_LABELS, CONFIG_JSON, AUTH_USER, AUTH_PASSWORD,
|
|
STATIONARY_THRESHOLD, FAST_LEARNING_RATE, GLOBAL_LEARNING_RATE, DIFF_THRESHOLD, MOTION_AREA_THRESHOLD
|
|
)
|
|
|
|
BASE_PATH = '.'
|
|
LOCATION_ID = 1
|
|
|
|
HAS_CONFIG = False
|
|
if os.path.exists(CONFIG_JSON):
|
|
with open(CONFIG_JSON, "r") as f:
|
|
config = json.load(f)
|
|
HAS_CONFIG = True
|
|
else:
|
|
config = {}
|
|
|
|
SQL = {
|
|
'INSERT_NUMBERS':
|
|
'INSERT INTO numberplates (model, filename, ts, ) VALUES (? , ?, ?)',
|
|
'UPDATE_NUMBERS':
|
|
'UPDATE numberplates set anal_complete=?, numberplates=? \
|
|
WHERE model=? AND filename=? AND ts=?',
|
|
'SELECT_UNPROCESSED':
|
|
'SELECT model, filename, ts FROM numberplates WHERE anal_complete=0;',
|
|
}
|
|
|
|
MODEL_NAME="YOLOv8"
|
|
|
|
daily_log_ts = time.strftime("%Y-%m-%d-%H:%M")
|
|
logging.basicConfig(
|
|
filename= f"logs/main-{daily_log_ts}.log",
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
|
|
)
|
|
|
|
logger = logging.getLogger("anpr_async_loop")
|
|
|
|
async def anpr_thread_loop():
|
|
async with aiosqlite.connect(DB_RECORDINGS) as db:
|
|
logger.debug(db.__str__())
|
|
db.row_factory = aiosqlite.Row
|
|
signal_received = asyncio.Event()
|
|
|
|
loop = asyncio.get_running_loop()
|
|
for signame in {'SIGINT', 'SIGTERM'}:
|
|
loop.add_signal_handler(getattr(signal, signame), lambda s=signame: signal_received.set())
|
|
tasks = [
|
|
#asyncio.create_task(motion_capture_and_record(db, signal_received)),
|
|
asyncio.create_task(capture_number_plate(db, signal_received)),
|
|
]
|
|
#if HAS_CONFIG and config['do_anal']:
|
|
# tasks.append(asyncio.create_task(do_analysis(db, signal_received)))
|
|
|
|
try:
|
|
await asyncio.gather(*tasks)
|
|
except asyncio.CancelledError:
|
|
logger.error("Closing loop anpr_monitor_loop")
|
|
#break
|
|
except Exception as e:
|
|
logger.critical(f"Error in anpr_monitor loop:")
|
|
err_type = type(e).__name__
|
|
line_no = e.__traceback__.tb_lineno
|
|
logger.critical(f"traceback:{traceback.format_tb(e.__traceback__)} ")
|
|
logger.critical(f"Error {err_type} occurred on line: {line_no}")
|
|
finally:
|
|
logger.info("finally shutting down")
|
|
for task in tasks:
|
|
task.cancel()
|
|
await db.commit()
|
|
await db.close()
|
|
|
|
async def motion_capture_and_record(db, signal_received):
|
|
logger.debug("motion_capture_and_record task running")
|
|
loop = asyncio.get_running_loop()
|
|
# logger.debug(f"CONTOUR_MOTION_THRESHOLD: {CONTOUR_MOTION_THRESHOLD}")
|
|
with ThreadPoolExecutor() as pool:
|
|
previous_frame = None
|
|
motion_detected = False
|
|
output_file = None
|
|
start_time = 0
|
|
recording = False
|
|
recording_process = None
|
|
# startrecord_stderr = ''
|
|
# startrecord_stdout = ''
|
|
|
|
loop_in_pool = functools.partial(loop.run_in_executor, pool)
|
|
|
|
cap = await loop_in_pool(lambda: cv2.VideoCapture(RTSP_URL))
|
|
cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)
|
|
if not cap.isOpened():
|
|
logger.critical("Error: Could not open RTSP stream.")
|
|
return
|
|
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
logger.critical("Error: Unable to capture video.")
|
|
cap.release()
|
|
sys.exit(-1)
|
|
|
|
|
|
# Convert the first frame to grayscale and blur to reduce noise
|
|
gray_init = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
gray_init = cv2.GaussianBlur(gray_init, (21, 21), 0)
|
|
|
|
# Initialize the background model as a floating point image for blending
|
|
background = gray_init.astype("float")
|
|
|
|
# Create an image to count how many consecutive frames a pixel is stationary
|
|
stationary_counter = np.zeros_like(gray_init, dtype=np.uint8)
|
|
|
|
while not signal_received.is_set():
|
|
try:
|
|
try:
|
|
ret, frame = await loop_in_pool(lambda: cap.read())
|
|
except Exception as e:
|
|
logger.debug(f"Error capturing frame: {e}")
|
|
continue
|
|
|
|
if not ret or not is_valid_frame(frame):
|
|
logger.debug("Failed to capture from IP camera")
|
|
break
|
|
|
|
if not cap.isOpened():
|
|
logger.debug("RTSP stream disconnected. Reconnecting")
|
|
cap.release()
|
|
cap = await loop_in_pool(lambda: cv2.VideoCapture(RTSP_URL))
|
|
cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)
|
|
|
|
# Preprocess: convert to grayscale and apply Gaussian blur
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
gray_blurred = cv2.GaussianBlur(gray, (21, 21), 0)
|
|
|
|
# Compute absolute difference between the current background and the new frame
|
|
diff = cv2.absdiff(cv2.convertScaleAbs(background), gray_blurred)
|
|
_, motion_mask = cv2.threshold(diff, DIFF_THRESHOLD, 255, cv2.THRESH_BINARY)
|
|
|
|
# Invert motion mask to get stationary areas (low difference)
|
|
stationary_mask = cv2.bitwise_not(motion_mask)
|
|
|
|
# Update the stationary counter: add 1 where stationary, reset where motion is detected
|
|
stationary_counter = cv2.add(stationary_counter, (stationary_mask // 255))
|
|
stationary_counter[motion_mask == 255] = 0
|
|
|
|
# Determine where pixels have been stationary long enough
|
|
update_mask = (stationary_counter >= STATIONARY_THRESHOLD).astype(np.float32)
|
|
# Create an effective learning rate for each pixel:
|
|
# Pixels meeting the stationary criteria use fast_learning_rate,
|
|
# otherwise, they use the global (slow) learning rate.
|
|
effective_learning_rate = GLOBAL_LEARNING_RATE + update_mask * (FAST_LEARNING_RATE - GLOBAL_LEARNING_RATE)
|
|
|
|
# Update the background model for all pixels using the effective learning rate
|
|
background = (1 - effective_learning_rate) * background + effective_learning_rate * gray_blurred
|
|
|
|
motion_detected = False
|
|
# Motion detection: if enough pixels are marked as moving, declare motion
|
|
motion_pixels = cv2.countNonZero(motion_mask)
|
|
if motion_pixels > MOTION_AREA_THRESHOLD:
|
|
motion_detected = True
|
|
|
|
delta = time.time() - start_time
|
|
|
|
if motion_detected and not recording:
|
|
start_time = time.time()
|
|
logger.debug(f"Motion detected! Starting recording... {start_time}")
|
|
timestamp = time.strftime("%Y%m%d-%H%M%S")
|
|
output_file = f"motion_{timestamp}.mp4"
|
|
recording_process = await ffmpeg_start_recording(os.path.join(VIDEO_OUT_FULL, output_file))
|
|
recording = True
|
|
|
|
# If no motion is detected and we're recording, stop recording
|
|
elif ((not motion_detected and recording and delta > MIN_TIME) or
|
|
(motion_detected and recording and delta > MAX_CLIP_TIME)):
|
|
logger.debug("No motion detected or MAX_CLIP_TIME exceeded. Stopping recording...")
|
|
if recording_process:
|
|
await ffmpeg_stop_recording(recording_process)
|
|
await asyncio.sleep(0.1)
|
|
# logger.debug(f"startrecord_stderr: {startrecord_stderr}")
|
|
# logger.debug(f"startrecord_stdout: {startrecord_stdout}")
|
|
# if startrecord_stderr:
|
|
# with open(f"logs/ffmpeg-{output_file}-err.log") as fe:
|
|
# fe.write(str(startrecord_stderr))
|
|
# if startrecord_stdout:
|
|
# with open(f"logs/ffmpeg-{output_file}-out.log") as fo:
|
|
# fo.write(str(startrecord_stdout))
|
|
cursor = await db.execute(SQL["INSERT_NUMBERS"], (MODEL_NAME, output_file, timestamp))
|
|
await cursor.fetchone()
|
|
logger.debug(f"inserted recording with id: {cursor.lastrowid}")
|
|
recording_process = None
|
|
recording = False
|
|
|
|
except asyncio.CancelledError:
|
|
logger.debug("Closing loop motion_capture_and_record")
|
|
await db.commit()
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Error in produce_anal_file:")
|
|
err_type = type(e).__name__
|
|
line_no = e.__traceback__.tb_lineno
|
|
logger.critical(f"Error {err_type} occurred on line: {line_no}")
|
|
|
|
|
|
async def capture_number_plate(db, signal_received):
|
|
#loop = asyncio.get_running_loop()
|
|
logger.debug("capture_number_plate task running")
|
|
with ThreadPoolExecutor() as pool:
|
|
while not signal_received.is_set():
|
|
try:
|
|
cursor = await db.execute(SQL["SELECT_UNPROCESSED"])
|
|
res = await cursor.fetchall()
|
|
logger.debug("found unprocessed records: %i ", len(res))
|
|
for record in res:
|
|
logger.debug("processing video file %s"%record['filename'])
|
|
plates = anprm.infer(record['filename'])
|
|
cursor = await db.execute(SQL["UPDATE_NUMBERS"],
|
|
(True, json.dumps(plates), record['model'],
|
|
record['filename'], record['ts']))
|
|
res = await cursor.fetchone()
|
|
except asyncio.CancelledError:
|
|
logger.debug("Closing loop capture_number_plate")
|
|
#break
|
|
except Exception as e:
|
|
logger.critical(f"traceback:{traceback.format_tb(e.__traceback__)} ")
|
|
logger.critical(f"Error in capture_number_plate logic:")
|
|
err_type = type(e).__name__
|
|
line_no = e.__traceback__.tb_lineno
|
|
logger.critical(f"Error {err_type} occurred on line: {line_no}")
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
asyncio.run(anpr_thread_loop(), debug=True)
|
|
except RuntimeError as e:
|
|
logger.debug(f"Error in asyncio.run(main()) loop: {str(e)}")
|
|
print("Error in asyncio.run(main()) loop: ", str(e))
|