487 lines
15 KiB
Python
487 lines
15 KiB
Python
|
|
import sys
|
|
from datetime import datetime
|
|
from Crypto.Util import number
|
|
from Crypto.Hash import SHA512, SHA256
|
|
|
|
# --------------- Simple Rabin Cryptosystem (Blum integers) ---------------
|
|
|
|
def generate_rabin_keys(bits=2048):
|
|
# p, q must be primes where p % 4 == q % 4 == 3
|
|
while True:
|
|
p = number.getPrime(bits // 2)
|
|
if p % 4 == 3:
|
|
break
|
|
while True:
|
|
q = number.getPrime(bits // 2)
|
|
if q % 4 == 3 and q != p:
|
|
break
|
|
n = p * q
|
|
return {"n": n, "p": p, "q": q}
|
|
|
|
def _rabin_encode(msg_bytes):
|
|
# Add minimal redundancy to identify the correct root after decryption
|
|
# Format: b"RB" + 2-byte length + msg + SHA256(msg)[:16]
|
|
if not isinstance(msg_bytes, (bytes, bytearray)):
|
|
raise ValueError("msg_bytes must be bytes")
|
|
L = len(msg_bytes)
|
|
if L > 65535:
|
|
raise ValueError("Message too long for simple encoding")
|
|
length_bytes = L.to_bytes(2, byteorder="big")
|
|
tag = SHA256.new(msg_bytes).digest()[:16]
|
|
return b"RB" + length_bytes + msg_bytes + tag
|
|
|
|
def rabin_encrypt(msg_bytes, n):
|
|
enc = _rabin_encode(msg_bytes)
|
|
m = number.bytes_to_long(enc)
|
|
if m >= n:
|
|
raise ValueError("Message too long for Rabin modulus. Use shorter message or larger key.")
|
|
c = pow(m, 2, n)
|
|
return c
|
|
|
|
def rabin_decrypt(cipher_int, p, q, n):
|
|
# Compute square roots modulo p and q (p,q mod 4 == 3)
|
|
mp = pow(cipher_int, (p + 1) // 4, p)
|
|
mq = pow(cipher_int, (q + 1) // 4, q)
|
|
|
|
# CRT Combine to get four roots
|
|
inv_p_mod_q = number.inverse(p, q)
|
|
inv_q_mod_p = number.inverse(q, p)
|
|
a = (q * inv_q_mod_p) % n
|
|
b = (p * inv_p_mod_q) % n
|
|
|
|
roots = []
|
|
roots.append((a * mp + b * mq) % n)
|
|
roots.append((a * mp - b * mq) % n)
|
|
roots.append((-a * mp + b * mq) % n)
|
|
roots.append((-a * mp - b * mq) % n)
|
|
|
|
# Try to decode redundancy
|
|
for r in roots:
|
|
rb = number.long_to_bytes(r)
|
|
if len(rb) < 20:
|
|
continue
|
|
if not (rb[0:2] == b"RB"):
|
|
continue
|
|
if len(rb) < 2 + 2 + 16:
|
|
continue
|
|
L = int.from_bytes(rb[2:4], byteorder="big")
|
|
if L < 0 or L > 65535:
|
|
continue
|
|
if len(rb) != 2 + 2 + L + 16:
|
|
continue
|
|
msg = rb[4:4 + L]
|
|
tag = rb[4 + L:]
|
|
if SHA256.new(msg).digest()[:16] == tag:
|
|
return msg
|
|
return None
|
|
|
|
# --------------- ElGamal Signatures ---------------
|
|
|
|
def generate_safe_prime(bits):
|
|
# Try to generate a safe prime p=2q+1
|
|
# This can take a few attempts for larger bit sizes
|
|
while True:
|
|
q = number.getPrime(bits - 1)
|
|
p = 2 * q + 1
|
|
if number.isPrime(p):
|
|
return p, q
|
|
|
|
def find_generator_for_safe_prime(p, q):
|
|
# For safe prime p = 2q + 1, a generator g must satisfy:
|
|
# g^2 mod p != 1 and g^q mod p != 1
|
|
while True:
|
|
g = number.getRandomRange(2, p - 2)
|
|
if pow(g, 2, p) != 1 and pow(g, q, p) != 1:
|
|
return g
|
|
|
|
def generate_elgamal_keys(bits=1024):
|
|
p, q = generate_safe_prime(bits)
|
|
g = find_generator_for_safe_prime(p, q)
|
|
x = number.getRandomRange(2, p - 2)
|
|
y = pow(g, x, p)
|
|
return {"p": p, "g": g, "x": x, "y": y}
|
|
|
|
def _elg_hash_to_int(msg_bytes, p_minus_1):
|
|
h = SHA512.new(msg_bytes).digest()
|
|
h_int = number.bytes_to_long(h) % p_minus_1
|
|
if h_int == 0:
|
|
h_int = 1
|
|
return h_int
|
|
|
|
def elgamal_sign(msg_bytes, priv):
|
|
p = priv["p"]
|
|
g = priv["g"]
|
|
x = priv["x"]
|
|
k = None
|
|
while True:
|
|
k = number.getRandomRange(2, p - 2)
|
|
if number.GCD(k, p - 1) == 1:
|
|
break
|
|
r = pow(g, k, p)
|
|
kinv = number.inverse(k, p - 1)
|
|
h_int = _elg_hash_to_int(msg_bytes, p - 1)
|
|
s = (kinv * (h_int - x * r)) % (p - 1)
|
|
return (int(r), int(s))
|
|
|
|
def elgamal_verify(msg_bytes, sig, pub):
|
|
p = pub["p"]
|
|
g = pub["g"]
|
|
y = pub["y"]
|
|
r, s = sig
|
|
if not (1 <= r <= p - 1):
|
|
return False
|
|
if not (0 <= s <= p - 2):
|
|
return False
|
|
h_int = _elg_hash_to_int(msg_bytes, p - 1)
|
|
v1 = (pow(y, r, p) * pow(r, s, p)) % p
|
|
v2 = pow(g, h_int, p)
|
|
return v1 == v2
|
|
|
|
# --------------- In-Memory Store and Utilities ---------------
|
|
|
|
class Store:
|
|
def __init__(self):
|
|
self.keys = {
|
|
"rabin": None, # {"n":..., "p":..., "q":...}
|
|
"elgamal": None, # {"p":..., "g":..., "x":..., "y":...}
|
|
}
|
|
self.customer_history = [] # list of transactions sent by customer
|
|
self.merchant_inbox = [] # list of pending transactions for merchant
|
|
self.merchant_processed = [] # list of processed transactions
|
|
self.next_tx_id = 1
|
|
|
|
store = Store()
|
|
|
|
def now_ts():
|
|
return datetime.now().isoformat(timespec="seconds")
|
|
|
|
def short_hex(x, length=32):
|
|
hx = hex(x)[2:]
|
|
if len(hx) <= length:
|
|
return hx
|
|
return hx[:length] + "...(" + str(len(hx)) + " hex chars)"
|
|
|
|
# --------------- Role Actions ---------------
|
|
|
|
def customer_create_and_send():
|
|
if store.keys["rabin"] is None or store.keys["elgamal"] is None:
|
|
print("Keys not ready.")
|
|
return
|
|
|
|
try:
|
|
details = input("Enter payment details (example: Send 55000 to Bob using Mastercard 3048330330393783): ").strip()
|
|
except (EOFError, KeyboardInterrupt):
|
|
print()
|
|
return
|
|
|
|
if not details:
|
|
print("Empty details. Aborting.")
|
|
return
|
|
|
|
msg_bytes = details.encode("utf-8")
|
|
ts = now_ts()
|
|
# Hash (SHA-512) of plaintext; this is what we sign and send as integrity record
|
|
digest = SHA512.new(msg_bytes).digest()
|
|
digest_hex = digest.hex()
|
|
|
|
# Sign the digest using ElGamal (signing input is digest bytes)
|
|
sig_r, sig_s = elgamal_sign(digest, store.keys["elgamal"])
|
|
|
|
# Encrypt payment details using Rabin (merchant's public n)
|
|
n = store.keys["rabin"]["n"]
|
|
try:
|
|
c = rabin_encrypt(msg_bytes, n)
|
|
except Exception as e:
|
|
print("Encryption error:", str(e))
|
|
return
|
|
|
|
tx = {
|
|
"id": store.next_tx_id,
|
|
"timestamp": ts,
|
|
"plaintext": details,
|
|
"cipher_hex": hex(c)[2:], # store as hex text for display
|
|
"hash_hex": digest_hex,
|
|
"sig_r": sig_r,
|
|
"sig_s": sig_s,
|
|
"processed": False,
|
|
}
|
|
store.next_tx_id += 1
|
|
store.customer_history.append(tx)
|
|
store.merchant_inbox.append(dict(tx)) # copy to merchant inbox
|
|
|
|
print("Transaction created and sent to merchant.")
|
|
print("ID:", tx["id"])
|
|
print("Timestamp:", tx["timestamp"])
|
|
print("Before encryption (plaintext):", tx["plaintext"])
|
|
print("After encryption (cipher hex):", short_hex(c))
|
|
print("SHA-512 digest (hex):", tx["hash_hex"][:64] + "..." if len(tx["hash_hex"]) > 64 else tx["hash_hex"])
|
|
print("ElGamal signature r:", tx["sig_r"])
|
|
print("ElGamal signature s:", tx["sig_s"])
|
|
|
|
def customer_view_history():
|
|
if not store.customer_history:
|
|
print("No past transactions.")
|
|
return
|
|
for tx in store.customer_history:
|
|
print("ID:", tx["id"])
|
|
print("Timestamp:", tx["timestamp"])
|
|
print("Plaintext:", tx["plaintext"])
|
|
print("Cipher (hex):", tx["cipher_hex"])
|
|
print("SHA-512 digest (hex):", tx["hash_hex"])
|
|
print("Signature r:", tx["sig_r"])
|
|
print("Signature s:", tx["sig_s"])
|
|
print("Processed by merchant:", tx["processed"])
|
|
print("-" * 40)
|
|
|
|
def merchant_process_all():
|
|
if store.keys["rabin"] is None or store.keys["elgamal"] is None:
|
|
print("Keys not ready.")
|
|
return
|
|
if not store.merchant_inbox:
|
|
print("No pending transactions.")
|
|
return
|
|
|
|
p = store.keys["rabin"]["p"]
|
|
q = store.keys["rabin"]["q"]
|
|
n = store.keys["rabin"]["n"]
|
|
|
|
pub_elg = {
|
|
"p": store.keys["elgamal"]["p"],
|
|
"g": store.keys["elgamal"]["g"],
|
|
"y": store.keys["elgamal"]["y"],
|
|
}
|
|
|
|
processed_any = False
|
|
new_inbox = []
|
|
for tx in store.merchant_inbox:
|
|
processed_any = True
|
|
ts = now_ts()
|
|
c_hex = tx["cipher_hex"]
|
|
try:
|
|
c = int(c_hex, 16)
|
|
except Exception:
|
|
c = None
|
|
|
|
decrypted_msg = None
|
|
decrypt_ok = False
|
|
if c is not None:
|
|
try:
|
|
m = rabin_decrypt(c, p, q, n)
|
|
if m is not None:
|
|
decrypted_msg = m.decode("utf-8", errors="replace")
|
|
decrypt_ok = True
|
|
else:
|
|
decrypt_ok = False
|
|
except Exception:
|
|
decrypt_ok = False
|
|
|
|
# Compute hash of decrypted plaintext
|
|
computed_hash_hex = "DECRYPT_FAIL"
|
|
if decrypt_ok:
|
|
computed_hash_hex = SHA512.new(m).hexdigest()
|
|
|
|
# Verify signature using received hash bytes (auditable)
|
|
received_hash_hex = tx["hash_hex"]
|
|
try:
|
|
received_hash_bytes = bytes.fromhex(received_hash_hex)
|
|
except Exception:
|
|
received_hash_bytes = b""
|
|
sig = (tx["sig_r"], tx["sig_s"])
|
|
sig_valid = elgamal_verify(received_hash_bytes, sig, pub_elg)
|
|
|
|
# Optionally also verify that signature corresponds to computed hash if decryption ok
|
|
hash_match = False
|
|
if decrypt_ok:
|
|
hash_match = (computed_hash_hex == received_hash_hex)
|
|
|
|
rec = {
|
|
"id": tx["id"],
|
|
"timestamp": ts,
|
|
"received_hash_hex": received_hash_hex,
|
|
"computed_hash_hex": computed_hash_hex,
|
|
"signature_valid": bool(sig_valid),
|
|
"hash_match": bool(hash_match),
|
|
"decryption_ok": bool(decrypt_ok),
|
|
"decrypted_plaintext": decrypted_msg if decrypt_ok else "",
|
|
}
|
|
store.merchant_processed.append(rec)
|
|
tx["processed"] = True
|
|
|
|
print("Processed transaction ID:", tx["id"])
|
|
print("Signature valid:", rec["signature_valid"])
|
|
print("Decryption ok:", rec["decryption_ok"])
|
|
print("Received hash (hex):", rec["received_hash_hex"])
|
|
print("Computed hash (hex):", rec["computed_hash_hex"])
|
|
print("Hashes match:", rec["hash_match"])
|
|
print("Timestamp:", rec["timestamp"])
|
|
print("-" * 40)
|
|
|
|
# Clear inbox after processing all
|
|
store.merchant_inbox = []
|
|
if not processed_any:
|
|
print("No transactions processed.")
|
|
|
|
def merchant_show_processed():
|
|
if not store.merchant_processed:
|
|
print("No processed records.")
|
|
return
|
|
for rec in store.merchant_processed:
|
|
print("ID:", rec["id"])
|
|
print("Timestamp:", rec["timestamp"])
|
|
print("Signature valid:", rec["signature_valid"])
|
|
print("Decryption ok:", rec["decryption_ok"])
|
|
print("Received hash (hex):", rec["received_hash_hex"])
|
|
print("Computed hash (hex):", rec["computed_hash_hex"])
|
|
print("Hashes match:", rec["hash_match"])
|
|
# Merchant can see plaintext if needed:
|
|
if rec["decryption_ok"]:
|
|
print("Decrypted plaintext:", rec["decrypted_plaintext"])
|
|
print("-" * 40)
|
|
|
|
def auditor_view_hashed_records():
|
|
if not store.merchant_processed:
|
|
print("No records to audit.")
|
|
return
|
|
for rec in store.merchant_processed:
|
|
print("ID:", rec["id"])
|
|
print("Timestamp:", rec["timestamp"])
|
|
print("Received hash (hex):", rec["received_hash_hex"])
|
|
print("Computed hash (hex):", rec["computed_hash_hex"])
|
|
print("Hashes match:", rec["hash_match"])
|
|
print("-" * 40)
|
|
|
|
def auditor_verify_signatures():
|
|
if not store.merchant_processed:
|
|
print("No records to verify.")
|
|
return
|
|
pub_elg = {
|
|
"p": store.keys["elgamal"]["p"],
|
|
"g": store.keys["elgamal"]["g"],
|
|
"y": store.keys["elgamal"]["y"],
|
|
}
|
|
for rec in store.merchant_processed:
|
|
tx_id = rec["id"]
|
|
sig_r = None
|
|
sig_s = None
|
|
hash_hex = rec["received_hash_hex"]
|
|
for tx in store.customer_history:
|
|
if tx["id"] == tx_id:
|
|
sig_r = tx["sig_r"]
|
|
sig_s = tx["sig_s"]
|
|
break
|
|
if sig_r is None:
|
|
print("ID:", tx_id, "Signature not found.")
|
|
continue
|
|
try:
|
|
msg_bytes = bytes.fromhex(hash_hex)
|
|
except Exception:
|
|
msg_bytes = b""
|
|
ok = elgamal_verify(msg_bytes, (sig_r, sig_s), pub_elg)
|
|
print("ID:", tx_id, "Signature valid:", bool(ok))
|
|
|
|
def show_public_keys():
|
|
if store.keys["rabin"] is None or store.keys["elgamal"] is None:
|
|
print("Keys not ready.")
|
|
return
|
|
rabin_pub_n = store.keys["rabin"]["n"]
|
|
elg_pub = {
|
|
"p": store.keys["elgamal"]["p"],
|
|
"g": store.keys["elgamal"]["g"],
|
|
"y": store.keys["elgamal"]["y"],
|
|
}
|
|
print("Customer ElGamal public key parameters:")
|
|
print("p (bits):", store.keys["elgamal"]["p"].bit_length())
|
|
print("g:", short_hex(elg_pub["g"]))
|
|
print("y:", short_hex(elg_pub["y"]))
|
|
print("Merchant Rabin public modulus n (bits):", rabin_pub_n.bit_length())
|
|
print("n:", short_hex(rabin_pub_n))
|
|
|
|
def main_menu():
|
|
while True:
|
|
print("Select role:")
|
|
print("1. Customer")
|
|
print("2. Merchant")
|
|
print("3. Auditor")
|
|
print("4. Show public keys")
|
|
print("5. Exit")
|
|
choice = input("Enter choice: ").strip()
|
|
if choice == "1":
|
|
customer_menu()
|
|
elif choice == "2":
|
|
merchant_menu()
|
|
elif choice == "3":
|
|
auditor_menu()
|
|
elif choice == "4":
|
|
show_public_keys()
|
|
elif choice == "5":
|
|
print("Goodbye.")
|
|
break
|
|
else:
|
|
print("Invalid choice.")
|
|
|
|
def customer_menu():
|
|
while True:
|
|
print("Customer menu:")
|
|
print("1. Encrypt, sign, and send payment")
|
|
print("2. View past transactions")
|
|
print("3. Back")
|
|
choice = input("Enter choice: ").strip()
|
|
if choice == "1":
|
|
customer_create_and_send()
|
|
elif choice == "2":
|
|
customer_view_history()
|
|
elif choice == "3":
|
|
return
|
|
else:
|
|
print("Invalid choice.")
|
|
|
|
def merchant_menu():
|
|
while True:
|
|
print("Merchant menu:")
|
|
print("1. Process all pending transactions")
|
|
print("2. Show processed records")
|
|
print("3. Back")
|
|
choice = input("Enter choice: ").strip()
|
|
if choice == "1":
|
|
merchant_process_all()
|
|
elif choice == "2":
|
|
merchant_show_processed()
|
|
elif choice == "3":
|
|
return
|
|
else:
|
|
print("Invalid choice.")
|
|
|
|
def auditor_menu():
|
|
while True:
|
|
print("Auditor menu:")
|
|
print("1. View hashed payment records")
|
|
print("2. Verify ElGamal signatures on records")
|
|
print("3. Back")
|
|
choice = input("Enter choice: ").strip()
|
|
if choice == "1":
|
|
auditor_view_hashed_records()
|
|
elif choice == "2":
|
|
auditor_verify_signatures()
|
|
elif choice == "3":
|
|
return
|
|
else:
|
|
print("Invalid choice.")
|
|
|
|
def init_keys():
|
|
print("Generating keys. This may take a moment...")
|
|
# Rabin: 2048-bit modulus to fit typical payment strings
|
|
store.keys["rabin"] = generate_rabin_keys(bits=2048)
|
|
# ElGamal: 1024-bit safe prime for demo speed
|
|
store.keys["elgamal"] = generate_elgamal_keys(bits=1024)
|
|
print("Keys ready.")
|
|
print("Customer has ElGamal signing key (keeps x private, shares p,g,y).")
|
|
print("Merchant has Rabin decryption key (keeps p,q private, shares n).")
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
init_keys()
|
|
main_menu()
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted.")
|
|
sys.exit(0)
|