testpage

#!/usr/bin/env python3

"""

Raspberry Pi ANPR Logger

Pipeline:

Picamera2 -> YOLO license plate detector -> crop plate -> Tesseract OCR -> fuzzy XML match -> SQLite log -> Telegram alert

Hardware target:

- Raspberry Pi 4 B 8GB

- Raspberry Pi NoIR Camera V2

- Optional Adafruit Mini PiTFT display handled later

Install notes:

cd ~/plate_logger

source venv/bin/activate

pip install ultralytics rapidfuzz requests pillow pytesseract

sudo apt install -y tesseract-ocr python3-opencv python3-picamera2

You also need a YOLO license plate detection model, for example:

models/license_plate_detector.pt

This file assumes:

data/watchlist.xml

models/license_plate_detector.pt

captures/

crops/

logs/

"""

import os

import re

import cv2

import time

import sqlite3

import requests

import xml.etree.ElementTree as ET

from datetime import datetime

from pathlib import Path

from typing import Dict, Any, Optional, Tuple, List

import pytesseract

from rapidfuzz import fuzz, process

from picamera2 import Picamera2

from ultralytics import YOLO

# ============================================================

# CONFIGURATION

# ============================================================

BASE_DIR = Path.home() / "plate_logger"

DATA_DIR = BASE_DIR / "data"

MODEL_DIR = BASE_DIR / "models"

CAPTURE_DIR = BASE_DIR / "captures"

CROP_DIR = BASE_DIR / "crops"

LOG_DIR = BASE_DIR / "logs"

XML_WATCHLIST_FILE = DATA_DIR / "watchlist.xml"

YOLO_MODEL_FILE = MODEL_DIR / "license_plate_detector.pt"

DB_FILE = LOG_DIR / "plate_logs.db"

# Camera settings

CAMERA_WIDTH = 1280

CAMERA_HEIGHT = 720

CAMERA_WARMUP_SECONDS = 1.5

SCAN_INTERVAL_SECONDS = 2.0

# YOLO settings

YOLO_CONFIDENCE_THRESHOLD = 0.35

MAX_DETECTIONS_TO_OCR = 5

# OCR settings

MIN_PLATE_LENGTH = 4

MAX_PLATE_LENGTH = 9

# Fuzzy matching settings

FUZZY_MATCH_THRESHOLD = 85

# Telegram settings

ENABLE_TELEGRAM = False

TELEGRAM_BOT_TOKEN = "PASTE_YOUR_BOT_TOKEN_HERE"

TELEGRAM_CHAT_ID = "PASTE_YOUR_CHAT_ID_HERE"

# Runtime mode

# "continuous" = repeatedly scan

# "single" = scan once and exit

MODE = "continuous"

# ============================================================

# DIRECTORY AND DATABASE SETUP

# ============================================================

def ensure_directories() -> None:

for directory in [DATA_DIR, MODEL_DIR, CAPTURE_DIR, CROP_DIR, LOG_DIR]:

directory.mkdir(parents=True, exist_ok=True)

def init_database() -> sqlite3.Connection:

conn = sqlite3.connect(DB_FILE)

cur = conn.cursor()

cur.execute(

"""

CREATE TABLE IF NOT EXISTS plate_detections (

id INTEGER PRIMARY KEY AUTOINCREMENT,

timestamp TEXT NOT NULL,

detected_plate TEXT,

ocr_confidence REAL,

yolo_confidence REAL,

matched_plate TEXT,

match_score REAL,

is_match INTEGER,

owner TEXT,

vehicle_make TEXT,

vehicle_model TEXT,

status TEXT,

notes TEXT,

image_path TEXT,

crop_path TEXT

)

"""

)

conn.commit()

return conn

# ============================================================

# XML WATCHLIST LOADING

# ============================================================

def safe_text(element: Optional[ET.Element], default: str = "") -> str:

if element is None or element.text is None:

return default

return element.text.strip()

def normalize_plate(text: str) -> str:

return re.sub(r"[^A-Z0-9]", "", text.upper())

def load_watchlist(xml_file: Path) -> Dict[str, Dict[str, str]]:

if not xml_file.exists():

create_example_watchlist(xml_file)

print(f"Created example watchlist at: {xml_file}")

tree = ET.parse(xml_file)

root = tree.getroot()

vehicles: Dict[str, Dict[str, str]] = {}

for vehicle in root.findall("vehicle"):

raw_plate = safe_text(vehicle.find("plate"))

plate = normalize_plate(raw_plate)

if not plate:

continue

vehicles[plate] = {

"plate": plate,

"owner": safe_text(vehicle.find("owner"), "Unknown"),

"vehicle_make": safe_text(vehicle.find("vehicle_make"), "Unknown"),

"vehicle_model": safe_text(vehicle.find("vehicle_model"), "Unknown"),

"color": safe_text(vehicle.find("color"), "Unknown"),

"status": safe_text(vehicle.find("status"), "Unknown"),

"notes": safe_text(vehicle.find("notes"), ""),

}

return vehicles

def create_example_watchlist(xml_file: Path) -> None:

xml_file.parent.mkdir(parents=True, exist_ok=True)

example = """<vehicles>

<vehicle>

<plate>ABC123</plate>

<owner>John Smith</owner>

<vehicle_make>Toyota</vehicle_make>

<vehicle_model>Camry</vehicle_model>

<color>Blue</color>

<status>WATCHLIST</status>

<notes>Example vehicle entry</notes>

</vehicle>

<vehicle>

<plate>XYZ789</plate>

<owner>Jane Doe</owner>

<vehicle_make>Ford</vehicle_make>

<vehicle_model>F150</vehicle_model>

<color>White</color>

<status>AUTHORIZED</status>

<notes>Example authorized vehicle</notes>

</vehicle>

</vehicles>

"""

xml_file.write_text(example)

# ============================================================

# CAMERA

# ============================================================

def setup_camera() -> Picamera2:

picam2 = Picamera2()

config = picam2.create_still_configuration(

main={"size": (CAMERA_WIDTH, CAMERA_HEIGHT)}

)

picam2.configure(config)

picam2.start()

time.sleep(CAMERA_WARMUP_SECONDS)

return picam2

def capture_frame(picam2: Picamera2) -> Tuple[Any, Path]:

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")

image_path = CAPTURE_DIR / f"capture_{timestamp}.jpg"

# capture_array returns RGB by default in many Picamera2 configurations.

frame_rgb = picam2.capture_array()

frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)

cv2.imwrite(str(image_path), frame_bgr)

return frame_bgr, image_path

# ============================================================

# YOLO PLATE DETECTION

# ============================================================

def load_yolo_model(model_file: Path) -> YOLO:

if not model_file.exists():

raise FileNotFoundError(

f"YOLO model not found: {model_file}\n"

"Put your plate detector model at models/license_plate_detector.pt"

)

return YOLO(str(model_file))

def detect_plate_boxes(model: YOLO, image_bgr: Any) -> List[Dict[str, Any]]:

results = model.predict(

source=image_bgr,

conf=YOLO_CONFIDENCE_THRESHOLD,

verbose=False

)

detections: List[Dict[str, Any]] = []

if not results:

return detections

result = results[0]

if result.boxes is None:

return detections

for box in result.boxes:

xyxy = box.xyxy[0].cpu().numpy().astype(int)

conf = float(box.conf[0].cpu().numpy())

x1, y1, x2, y2 = xyxy.tolist()

# Basic sanity check

if x2 <= x1 or y2 <= y1:

continue

detections.append({

"box": (x1, y1, x2, y2),

"confidence": conf,

"area": (x2 - x1) * (y2 - y1)

})

detections.sort(key=lambda d: d["confidence"], reverse=True)

return detections

# ============================================================

# CROP + OCR

# ============================================================

def crop_with_padding(image_bgr: Any, box: Tuple[int, int, int, int], pad_ratio: float = 0.12) -> Any:

h, w = image_bgr.shape[:2]

x1, y1, x2, y2 = box

box_w = x2 - x1

box_h = y2 - y1

pad_x = int(box_w * pad_ratio)

pad_y = int(box_h * pad_ratio)

x1 = max(0, x1 - pad_x)

y1 = max(0, y1 - pad_y)

x2 = min(w, x2 + pad_x)

y2 = min(h, y2 + pad_y)

return image_bgr[y1:y2, x1:x2]

def preprocess_plate_crop(crop_bgr: Any) -> Any:

gray = cv2.cvtColor(crop_bgr, cv2.COLOR_BGR2GRAY)

# Enlarge crop to help OCR

gray = cv2.resize(gray, None, fx=3, fy=3, interpolation=cv2.INTER_CUBIC)

# Denoise while preserving edges

gray = cv2.bilateralFilter(gray, 9, 75, 75)

# Contrast normalization

gray = cv2.equalizeHist(gray)

# Threshold to black/white

thresh = cv2.threshold(

gray,

0,

255,

cv2.THRESH_BINARY + cv2.THRESH_OTSU

)[1]

return thresh

def tesseract_plate_ocr(processed_image: Any) -> Tuple[str, float, str]:

config = (

"--oem 3 --psm 7 "

"-c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

)

raw_text = pytesseract.image_to_string(processed_image, config=config)

cleaned = normalize_plate(raw_text)

# Tesseract image_to_string doesn't give a clean single confidence.

# Use image_to_data for approximate confidence.

data = pytesseract.image_to_data(

processed_image,

config=config,

output_type=pytesseract.Output.DICT

)

confidences = []

for conf in data.get("conf", []):

try:

conf_float = float(conf)

if conf_float >= 0:

confidences.append(conf_float)

except ValueError:

pass

avg_conf = sum(confidences) / len(confidences) if confidences else 0.0

return cleaned, avg_conf, raw_text

def read_best_plate_from_detections(

image_bgr: Any,

detections: List[Dict[str, Any]]

) -> Optional[Dict[str, Any]]:

best: Optional[Dict[str, Any]] = None

for idx, detection in enumerate(detections[:MAX_DETECTIONS_TO_OCR]):

box = detection["box"]

yolo_conf = detection["confidence"]

crop = crop_with_padding(image_bgr, box)

processed = preprocess_plate_crop(crop)

plate_text, ocr_conf, raw_text = tesseract_plate_ocr(processed)

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")

crop_path = CROP_DIR / f"plate_candidate_{timestamp}_{idx}.jpg"

processed_path = CROP_DIR / f"plate_candidate_{timestamp}_{idx}_processed.jpg"

cv2.imwrite(str(crop_path), crop)

cv2.imwrite(str(processed_path), processed)

# Score candidate

score = 0.0

if MIN_PLATE_LENGTH <= len(plate_text) <= MAX_PLATE_LENGTH:

score += 50

score += len(plate_text) * 5

score += ocr_conf * 0.5

score += yolo_conf * 30

candidate = {

"plate": plate_text,

"ocr_confidence": ocr_conf,

"raw_text": raw_text,

"yolo_confidence": yolo_conf,

"box": box,

"score": score,

"crop_path": crop_path,

"processed_path": processed_path,

}

print(

f"Candidate {idx}: plate={plate_text or '[blank]'} "

f"ocr_conf={ocr_conf:.1f} yolo_conf={yolo_conf:.2f} score={score:.1f}"

)

if best is None or candidate["score"] > best["score"]:

best = candidate

if best and best["plate"]:

return best

return None

# ============================================================

# FUZZY XML MATCHING

# ============================================================

def fuzzy_match_plate(

detected_plate: str,

watchlist: Dict[str, Dict[str, str]]

) -> Optional[Dict[str, Any]]:

if not detected_plate or not watchlist:

return None

plate_keys = list(watchlist.keys())

result = process.extractOne(

detected_plate,

plate_keys,

scorer=fuzz.ratio

)

if result is None:

return None

matched_plate, score, _ = result

if score < FUZZY_MATCH_THRESHOLD:

return {

"is_match": False,

"detected_plate": detected_plate,

"matched_plate": matched_plate,

"match_score": score,

"vehicle": watchlist[matched_plate]

}

return {

"is_match": True,

"detected_plate": detected_plate,

"matched_plate": matched_plate,

"match_score": score,

"vehicle": watchlist[matched_plate]

}

# ============================================================

# LOGGING

# ============================================================

def log_detection(

conn: sqlite3.Connection,

detected: Dict[str, Any],

match: Optional[Dict[str, Any]],

image_path: Path

) -> None:

vehicle = match["vehicle"] if match else {}

cur = conn.cursor()

cur.execute(

"""

INSERT INTO plate_detections (

timestamp,

detected_plate,

ocr_confidence,

yolo_confidence,

matched_plate,

match_score,

is_match,

owner,

vehicle_make,

vehicle_model,

status,

notes,

image_path,

crop_path

) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

""",

(

datetime.now().isoformat(),

detected.get("plate", ""),

detected.get("ocr_confidence", 0.0),

detected.get("yolo_confidence", 0.0),

match.get("matched_plate", "") if match else "",

match.get("match_score", 0.0) if match else 0.0,

1 if match and match.get("is_match") else 0,

vehicle.get("owner", ""),

vehicle.get("vehicle_make", ""),

vehicle.get("vehicle_model", ""),

vehicle.get("status", ""),

vehicle.get("notes", ""),

str(image_path),

str(detected.get("crop_path", "")),

)

)

conn.commit()

# ============================================================

# TELEGRAM ALERTS

# ============================================================

def telegram_enabled() -> bool:

return (

ENABLE_TELEGRAM

and TELEGRAM_BOT_TOKEN

and TELEGRAM_CHAT_ID

and "PASTE_" not in TELEGRAM_BOT_TOKEN

and "PASTE_" not in TELEGRAM_CHAT_ID

)

def send_telegram_photo(image_path: Path, caption: str) -> None:

if not telegram_enabled():

return

url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"

try:

with open(image_path, "rb") as photo:

response = requests.post(

url,

data={

"chat_id": TELEGRAM_CHAT_ID,

"caption": caption,

},

files={"photo": photo},

timeout=15,

)

response.raise_for_status()

except Exception as exc:

print(f"Telegram send failed: {exc}")

def build_alert_caption(detected: Dict[str, Any], match: Dict[str, Any]) -> str:

vehicle = match["vehicle"]

return (

"LICENSE PLATE MATCH\n\n"

f"Detected: {detected['plate']}\n"

f"Matched: {match['matched_plate']}\n"

f"Match Score: {match['match_score']:.1f}%\n"

f"OCR Confidence: {detected['ocr_confidence']:.1f}%\n"

f"YOLO Confidence: {detected['yolo_confidence']:.2f}\n\n"

f"Owner: {vehicle.get('owner', 'Unknown')}\n"

f"Vehicle: {vehicle.get('vehicle_make', 'Unknown')} {vehicle.get('vehicle_model', '')}\n"

f"Color: {vehicle.get('color', 'Unknown')}\n"

f"Status: {vehicle.get('status', 'Unknown')}\n"

f"Notes: {vehicle.get('notes', '')}\n\n"

f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"

)

# ============================================================

# MAIN SCAN LOOP

# ============================================================

def scan_once(

picam2: Picamera2,

model: YOLO,

watchlist: Dict[str, Dict[str, str]],

conn: sqlite3.Connection

) -> None:

print("\nCapturing frame...")

image_bgr, image_path = capture_frame(picam2)

print("Running YOLO plate detection...")

detections = detect_plate_boxes(model, image_bgr)

if not detections:

print("No license plate detected.")

return

print(f"YOLO found {len(detections)} plate candidate(s).")

detected = read_best_plate_from_detections(image_bgr, detections)

if detected is None:

print("No usable OCR result from detected plate crop.")

return

print(f"Best OCR plate: {detected['plate']}")

match = fuzzy_match_plate(detected["plate"], watchlist)

if match is None:

print("No watchlist loaded or no fuzzy match candidate.")

log_detection(conn, detected, None, image_path)

return

print(

f"Closest XML plate: {match['matched_plate']} "

f"score={match['match_score']:.1f}%"

)

if match["is_match"]:

print("MATCH FOUND")

vehicle = match["vehicle"]

print(f"Owner: {vehicle.get('owner', 'Unknown')}")

print(f"Vehicle: {vehicle.get('vehicle_make', '')} {vehicle.get('vehicle_model', '')}")

print(f"Status: {vehicle.get('status', '')}")

caption = build_alert_caption(detected, match)

send_telegram_photo(Path(detected["crop_path"]), caption)

else:

print("No match above threshold.")

log_detection(conn, detected, match, image_path)

def main() -> None:

ensure_directories()

print("Loading XML watchlist...")

watchlist = load_watchlist(XML_WATCHLIST_FILE)

print(f"Loaded {len(watchlist)} plate(s) from XML.")

print("Initializing database...")

conn = init_database()

print("Loading YOLO model...")

model = load_yolo_model(YOLO_MODEL_FILE)

print("Starting camera...")

picam2 = setup_camera()

try:

if MODE == "single":

scan_once(picam2, model, watchlist, conn)

else:

print("Starting continuous scan. Press CTRL+C to stop.")

while True:

scan_once(picam2, model, watchlist, conn)

time.sleep(SCAN_INTERVAL_SECONDS)

except KeyboardInterrupt:

print("Stopping scanner...")

finally:

try:

picam2.stop()

except Exception:

pass

conn.close()

print("Shutdown complete.")

if __name__ == "__main__":

main()