#!/usr/bin/env python3
"""
Raspberry Pi ANPR Logger
Pipeline:
Picamera2 -> YOLO license plate detector -> crop plate -> Tesseract OCR -> fuzzy XML match -> SQLite log -> Telegram alert
Hardware target:
- Raspberry Pi 4 B 8GB
- Raspberry Pi NoIR Camera V2
- Optional Adafruit Mini PiTFT display handled later
Install notes:
cd ~/plate_logger
source venv/bin/activate
pip install ultralytics rapidfuzz requests pillow pytesseract
sudo apt install -y tesseract-ocr python3-opencv python3-picamera2
You also need a YOLO license plate detection model, for example:
models/license_plate_detector.pt
This file assumes:
data/watchlist.xml
models/license_plate_detector.pt
captures/
crops/
logs/
"""
import os
import re
import cv2
import time
import sqlite3
import requests
import xml.etree.ElementTree as ET
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Tuple, List
import pytesseract
from rapidfuzz import fuzz, process
from picamera2 import Picamera2
from ultralytics import YOLO
# ============================================================
# CONFIGURATION
# ============================================================
BASE_DIR = Path.home() / "plate_logger"
DATA_DIR = BASE_DIR / "data"
MODEL_DIR = BASE_DIR / "models"
CAPTURE_DIR = BASE_DIR / "captures"
CROP_DIR = BASE_DIR / "crops"
LOG_DIR = BASE_DIR / "logs"
XML_WATCHLIST_FILE = DATA_DIR / "watchlist.xml"
YOLO_MODEL_FILE = MODEL_DIR / "license_plate_detector.pt"
DB_FILE = LOG_DIR / "plate_logs.db"
# Camera settings
CAMERA_WIDTH = 1280
CAMERA_HEIGHT = 720
CAMERA_WARMUP_SECONDS = 1.5
SCAN_INTERVAL_SECONDS = 2.0
# YOLO settings
YOLO_CONFIDENCE_THRESHOLD = 0.35
MAX_DETECTIONS_TO_OCR = 5
# OCR settings
MIN_PLATE_LENGTH = 4
MAX_PLATE_LENGTH = 9
# Fuzzy matching settings
FUZZY_MATCH_THRESHOLD = 85
# Telegram settings
ENABLE_TELEGRAM = False
TELEGRAM_BOT_TOKEN = "PASTE_YOUR_BOT_TOKEN_HERE"
TELEGRAM_CHAT_ID = "PASTE_YOUR_CHAT_ID_HERE"
# Runtime mode
# "continuous" = repeatedly scan
# "single" = scan once and exit
MODE = "continuous"
# ============================================================
# DIRECTORY AND DATABASE SETUP
# ============================================================
def ensure_directories() -> None:
for directory in [DATA_DIR, MODEL_DIR, CAPTURE_DIR, CROP_DIR, LOG_DIR]:
directory.mkdir(parents=True, exist_ok=True)
def init_database() -> sqlite3.Connection:
conn = sqlite3.connect(DB_FILE)
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS plate_detections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
detected_plate TEXT,
ocr_confidence REAL,
yolo_confidence REAL,
matched_plate TEXT,
match_score REAL,
is_match INTEGER,
owner TEXT,
vehicle_make TEXT,
vehicle_model TEXT,
status TEXT,
notes TEXT,
image_path TEXT,
crop_path TEXT
)
"""
)
conn.commit()
return conn
# ============================================================
# XML WATCHLIST LOADING
# ============================================================
def safe_text(element: Optional[ET.Element], default: str = "") -> str:
if element is None or element.text is None:
return default
return element.text.strip()
def normalize_plate(text: str) -> str:
return re.sub(r"[^A-Z0-9]", "", text.upper())
def load_watchlist(xml_file: Path) -> Dict[str, Dict[str, str]]:
if not xml_file.exists():
create_example_watchlist(xml_file)
print(f"Created example watchlist at: {xml_file}")
tree = ET.parse(xml_file)
root = tree.getroot()
vehicles: Dict[str, Dict[str, str]] = {}
for vehicle in root.findall("vehicle"):
raw_plate = safe_text(vehicle.find("plate"))
plate = normalize_plate(raw_plate)
if not plate:
continue
vehicles[plate] = {
"plate": plate,
"owner": safe_text(vehicle.find("owner"), "Unknown"),
"vehicle_make": safe_text(vehicle.find("vehicle_make"), "Unknown"),
"vehicle_model": safe_text(vehicle.find("vehicle_model"), "Unknown"),
"color": safe_text(vehicle.find("color"), "Unknown"),
"status": safe_text(vehicle.find("status"), "Unknown"),
"notes": safe_text(vehicle.find("notes"), ""),
}
return vehicles
def create_example_watchlist(xml_file: Path) -> None:
xml_file.parent.mkdir(parents=True, exist_ok=True)
example = """<vehicles>
<vehicle>
<plate>ABC123</plate>
<owner>John Smith</owner>
<vehicle_make>Toyota</vehicle_make>
<vehicle_model>Camry</vehicle_model>
<color>Blue</color>
<status>WATCHLIST</status>
<notes>Example vehicle entry</notes>
</vehicle>
<vehicle>
<plate>XYZ789</plate>
<owner>Jane Doe</owner>
<vehicle_make>Ford</vehicle_make>
<vehicle_model>F150</vehicle_model>
<color>White</color>
<status>AUTHORIZED</status>
<notes>Example authorized vehicle</notes>
</vehicle>
</vehicles>
"""
xml_file.write_text(example)
# ============================================================
# CAMERA
# ============================================================
def setup_camera() -> Picamera2:
picam2 = Picamera2()
config = picam2.create_still_configuration(
main={"size": (CAMERA_WIDTH, CAMERA_HEIGHT)}
)
picam2.configure(config)
picam2.start()
time.sleep(CAMERA_WARMUP_SECONDS)
return picam2
def capture_frame(picam2: Picamera2) -> Tuple[Any, Path]:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
image_path = CAPTURE_DIR / f"capture_{timestamp}.jpg"
# capture_array returns RGB by default in many Picamera2 configurations.
frame_rgb = picam2.capture_array()
frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
cv2.imwrite(str(image_path), frame_bgr)
return frame_bgr, image_path
# ============================================================
# YOLO PLATE DETECTION
# ============================================================
def load_yolo_model(model_file: Path) -> YOLO:
if not model_file.exists():
raise FileNotFoundError(
f"YOLO model not found: {model_file}\n"
"Put your plate detector model at models/license_plate_detector.pt"
)
return YOLO(str(model_file))
def detect_plate_boxes(model: YOLO, image_bgr: Any) -> List[Dict[str, Any]]:
results = model.predict(
source=image_bgr,
conf=YOLO_CONFIDENCE_THRESHOLD,
verbose=False
)
detections: List[Dict[str, Any]] = []
if not results:
return detections
result = results[0]
if result.boxes is None:
return detections
for box in result.boxes:
xyxy = box.xyxy[0].cpu().numpy().astype(int)
conf = float(box.conf[0].cpu().numpy())
x1, y1, x2, y2 = xyxy.tolist()
# Basic sanity check
if x2 <= x1 or y2 <= y1:
continue
detections.append({
"box": (x1, y1, x2, y2),
"confidence": conf,
"area": (x2 - x1) * (y2 - y1)
})
detections.sort(key=lambda d: d["confidence"], reverse=True)
return detections
# ============================================================
# CROP + OCR
# ============================================================
def crop_with_padding(image_bgr: Any, box: Tuple[int, int, int, int], pad_ratio: float = 0.12) -> Any:
h, w = image_bgr.shape[:2]
x1, y1, x2, y2 = box
box_w = x2 - x1
box_h = y2 - y1
pad_x = int(box_w * pad_ratio)
pad_y = int(box_h * pad_ratio)
x1 = max(0, x1 - pad_x)
y1 = max(0, y1 - pad_y)
x2 = min(w, x2 + pad_x)
y2 = min(h, y2 + pad_y)
return image_bgr[y1:y2, x1:x2]
def preprocess_plate_crop(crop_bgr: Any) -> Any:
gray = cv2.cvtColor(crop_bgr, cv2.COLOR_BGR2GRAY)
# Enlarge crop to help OCR
gray = cv2.resize(gray, None, fx=3, fy=3, interpolation=cv2.INTER_CUBIC)
# Denoise while preserving edges
gray = cv2.bilateralFilter(gray, 9, 75, 75)
# Contrast normalization
gray = cv2.equalizeHist(gray)
# Threshold to black/white
thresh = cv2.threshold(
gray,
0,
255,
cv2.THRESH_BINARY + cv2.THRESH_OTSU
)[1]
return thresh
def tesseract_plate_ocr(processed_image: Any) -> Tuple[str, float, str]:
config = (
"--oem 3 --psm 7 "
"-c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
)
raw_text = pytesseract.image_to_string(processed_image, config=config)
cleaned = normalize_plate(raw_text)
# Tesseract image_to_string doesn't give a clean single confidence.
# Use image_to_data for approximate confidence.
data = pytesseract.image_to_data(
processed_image,
config=config,
output_type=pytesseract.Output.DICT
)
confidences = []
for conf in data.get("conf", []):
try:
conf_float = float(conf)
if conf_float >= 0:
confidences.append(conf_float)
except ValueError:
pass
avg_conf = sum(confidences) / len(confidences) if confidences else 0.0
return cleaned, avg_conf, raw_text
def read_best_plate_from_detections(
image_bgr: Any,
detections: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
best: Optional[Dict[str, Any]] = None
for idx, detection in enumerate(detections[:MAX_DETECTIONS_TO_OCR]):
box = detection["box"]
yolo_conf = detection["confidence"]
crop = crop_with_padding(image_bgr, box)
processed = preprocess_plate_crop(crop)
plate_text, ocr_conf, raw_text = tesseract_plate_ocr(processed)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
crop_path = CROP_DIR / f"plate_candidate_{timestamp}_{idx}.jpg"
processed_path = CROP_DIR / f"plate_candidate_{timestamp}_{idx}_processed.jpg"
cv2.imwrite(str(crop_path), crop)
cv2.imwrite(str(processed_path), processed)
# Score candidate
score = 0.0
if MIN_PLATE_LENGTH <= len(plate_text) <= MAX_PLATE_LENGTH:
score += 50
score += len(plate_text) * 5
score += ocr_conf * 0.5
score += yolo_conf * 30
candidate = {
"plate": plate_text,
"ocr_confidence": ocr_conf,
"raw_text": raw_text,
"yolo_confidence": yolo_conf,
"box": box,
"score": score,
"crop_path": crop_path,
"processed_path": processed_path,
}
print(
f"Candidate {idx}: plate={plate_text or '[blank]'} "
f"ocr_conf={ocr_conf:.1f} yolo_conf={yolo_conf:.2f} score={score:.1f}"
)
if best is None or candidate["score"] > best["score"]:
best = candidate
if best and best["plate"]:
return best
return None
# ============================================================
# FUZZY XML MATCHING
# ============================================================
def fuzzy_match_plate(
detected_plate: str,
watchlist: Dict[str, Dict[str, str]]
) -> Optional[Dict[str, Any]]:
if not detected_plate or not watchlist:
return None
plate_keys = list(watchlist.keys())
result = process.extractOne(
detected_plate,
plate_keys,
scorer=fuzz.ratio
)
if result is None:
return None
matched_plate, score, _ = result
if score < FUZZY_MATCH_THRESHOLD:
return {
"is_match": False,
"detected_plate": detected_plate,
"matched_plate": matched_plate,
"match_score": score,
"vehicle": watchlist[matched_plate]
}
return {
"is_match": True,
"detected_plate": detected_plate,
"matched_plate": matched_plate,
"match_score": score,
"vehicle": watchlist[matched_plate]
}
# ============================================================
# LOGGING
# ============================================================
def log_detection(
conn: sqlite3.Connection,
detected: Dict[str, Any],
match: Optional[Dict[str, Any]],
image_path: Path
) -> None:
vehicle = match["vehicle"] if match else {}
cur = conn.cursor()
cur.execute(
"""
INSERT INTO plate_detections (
timestamp,
detected_plate,
ocr_confidence,
yolo_confidence,
matched_plate,
match_score,
is_match,
owner,
vehicle_make,
vehicle_model,
status,
notes,
image_path,
crop_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
datetime.now().isoformat(),
detected.get("plate", ""),
detected.get("ocr_confidence", 0.0),
detected.get("yolo_confidence", 0.0),
match.get("matched_plate", "") if match else "",
match.get("match_score", 0.0) if match else 0.0,
1 if match and match.get("is_match") else 0,
vehicle.get("owner", ""),
vehicle.get("vehicle_make", ""),
vehicle.get("vehicle_model", ""),
vehicle.get("status", ""),
vehicle.get("notes", ""),
str(image_path),
str(detected.get("crop_path", "")),
)
)
conn.commit()
# ============================================================
# TELEGRAM ALERTS
# ============================================================
def telegram_enabled() -> bool:
return (
ENABLE_TELEGRAM
and TELEGRAM_BOT_TOKEN
and TELEGRAM_CHAT_ID
and "PASTE_" not in TELEGRAM_BOT_TOKEN
and "PASTE_" not in TELEGRAM_CHAT_ID
)
def send_telegram_photo(image_path: Path, caption: str) -> None:
if not telegram_enabled():
return
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
try:
with open(image_path, "rb") as photo:
response = requests.post(
url,
data={
"chat_id": TELEGRAM_CHAT_ID,
"caption": caption,
},
files={"photo": photo},
timeout=15,
)
response.raise_for_status()
except Exception as exc:
print(f"Telegram send failed: {exc}")
def build_alert_caption(detected: Dict[str, Any], match: Dict[str, Any]) -> str:
vehicle = match["vehicle"]
return (
"LICENSE PLATE MATCH\n\n"
f"Detected: {detected['plate']}\n"
f"Matched: {match['matched_plate']}\n"
f"Match Score: {match['match_score']:.1f}%\n"
f"OCR Confidence: {detected['ocr_confidence']:.1f}%\n"
f"YOLO Confidence: {detected['yolo_confidence']:.2f}\n\n"
f"Owner: {vehicle.get('owner', 'Unknown')}\n"
f"Vehicle: {vehicle.get('vehicle_make', 'Unknown')} {vehicle.get('vehicle_model', '')}\n"
f"Color: {vehicle.get('color', 'Unknown')}\n"
f"Status: {vehicle.get('status', 'Unknown')}\n"
f"Notes: {vehicle.get('notes', '')}\n\n"
f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
# ============================================================
# MAIN SCAN LOOP
# ============================================================
def scan_once(
picam2: Picamera2,
model: YOLO,
watchlist: Dict[str, Dict[str, str]],
conn: sqlite3.Connection
) -> None:
print("\nCapturing frame...")
image_bgr, image_path = capture_frame(picam2)
print("Running YOLO plate detection...")
detections = detect_plate_boxes(model, image_bgr)
if not detections:
print("No license plate detected.")
return
print(f"YOLO found {len(detections)} plate candidate(s).")
detected = read_best_plate_from_detections(image_bgr, detections)
if detected is None:
print("No usable OCR result from detected plate crop.")
return
print(f"Best OCR plate: {detected['plate']}")
match = fuzzy_match_plate(detected["plate"], watchlist)
if match is None:
print("No watchlist loaded or no fuzzy match candidate.")
log_detection(conn, detected, None, image_path)
return
print(
f"Closest XML plate: {match['matched_plate']} "
f"score={match['match_score']:.1f}%"
)
if match["is_match"]:
print("MATCH FOUND")
vehicle = match["vehicle"]
print(f"Owner: {vehicle.get('owner', 'Unknown')}")
print(f"Vehicle: {vehicle.get('vehicle_make', '')} {vehicle.get('vehicle_model', '')}")
print(f"Status: {vehicle.get('status', '')}")
caption = build_alert_caption(detected, match)
send_telegram_photo(Path(detected["crop_path"]), caption)
else:
print("No match above threshold.")
log_detection(conn, detected, match, image_path)
def main() -> None:
ensure_directories()
print("Loading XML watchlist...")
watchlist = load_watchlist(XML_WATCHLIST_FILE)
print(f"Loaded {len(watchlist)} plate(s) from XML.")
print("Initializing database...")
conn = init_database()
print("Loading YOLO model...")
model = load_yolo_model(YOLO_MODEL_FILE)
print("Starting camera...")
picam2 = setup_camera()
try:
if MODE == "single":
scan_once(picam2, model, watchlist, conn)
else:
print("Starting continuous scan. Press CTRL+C to stop.")
while True:
scan_once(picam2, model, watchlist, conn)
time.sleep(SCAN_INTERVAL_SECONDS)
except KeyboardInterrupt:
print("Stopping scanner...")
finally:
try:
picam2.stop()
except Exception:
pass
conn.close()
print("Shutdown complete.")
if __name__ == "__main__":
main()