487 lines
		
	
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			487 lines
		
	
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
 | |
| import sys
 | |
| from datetime import datetime
 | |
| from Crypto.Util import number
 | |
| from Crypto.Hash import SHA512, SHA256
 | |
| 
 | |
| # --------------- Simple Rabin Cryptosystem (Blum integers) ---------------
 | |
| 
 | |
| def generate_rabin_keys(bits=2048):
 | |
|     # p, q must be primes where p % 4 == q % 4 == 3
 | |
|     while True:
 | |
|         p = number.getPrime(bits // 2)
 | |
|         if p % 4 == 3:
 | |
|             break
 | |
|     while True:
 | |
|         q = number.getPrime(bits // 2)
 | |
|         if q % 4 == 3 and q != p:
 | |
|             break
 | |
|     n = p * q
 | |
|     return {"n": n, "p": p, "q": q}
 | |
| 
 | |
| def _rabin_encode(msg_bytes):
 | |
|     # Add minimal redundancy to identify the correct root after decryption
 | |
|     # Format: b"RB" + 2-byte length + msg + SHA256(msg)[:16]
 | |
|     if not isinstance(msg_bytes, (bytes, bytearray)):
 | |
|         raise ValueError("msg_bytes must be bytes")
 | |
|     L = len(msg_bytes)
 | |
|     if L > 65535:
 | |
|         raise ValueError("Message too long for simple encoding")
 | |
|     length_bytes = L.to_bytes(2, byteorder="big")
 | |
|     tag = SHA256.new(msg_bytes).digest()[:16]
 | |
|     return b"RB" + length_bytes + msg_bytes + tag
 | |
| 
 | |
| def rabin_encrypt(msg_bytes, n):
 | |
|     enc = _rabin_encode(msg_bytes)
 | |
|     m = number.bytes_to_long(enc)
 | |
|     if m >= n:
 | |
|         raise ValueError("Message too long for Rabin modulus. Use shorter message or larger key.")
 | |
|     c = pow(m, 2, n)
 | |
|     return c
 | |
| 
 | |
| def rabin_decrypt(cipher_int, p, q, n):
 | |
|     # Compute square roots modulo p and q (p,q mod 4 == 3)
 | |
|     mp = pow(cipher_int, (p + 1) // 4, p)
 | |
|     mq = pow(cipher_int, (q + 1) // 4, q)
 | |
| 
 | |
|     # CRT Combine to get four roots
 | |
|     inv_p_mod_q = number.inverse(p, q)
 | |
|     inv_q_mod_p = number.inverse(q, p)
 | |
|     a = (q * inv_q_mod_p) % n
 | |
|     b = (p * inv_p_mod_q) % n
 | |
| 
 | |
|     roots = []
 | |
|     roots.append((a * mp + b * mq) % n)
 | |
|     roots.append((a * mp - b * mq) % n)
 | |
|     roots.append((-a * mp + b * mq) % n)
 | |
|     roots.append((-a * mp - b * mq) % n)
 | |
| 
 | |
|     # Try to decode redundancy
 | |
|     for r in roots:
 | |
|         rb = number.long_to_bytes(r)
 | |
|         if len(rb) < 20:
 | |
|             continue
 | |
|         if not (rb[0:2] == b"RB"):
 | |
|             continue
 | |
|         if len(rb) < 2 + 2 + 16:
 | |
|             continue
 | |
|         L = int.from_bytes(rb[2:4], byteorder="big")
 | |
|         if L < 0 or L > 65535:
 | |
|             continue
 | |
|         if len(rb) != 2 + 2 + L + 16:
 | |
|             continue
 | |
|         msg = rb[4:4 + L]
 | |
|         tag = rb[4 + L:]
 | |
|         if SHA256.new(msg).digest()[:16] == tag:
 | |
|             return msg
 | |
|     return None
 | |
| 
 | |
| # --------------- ElGamal Signatures ---------------
 | |
| 
 | |
| def generate_safe_prime(bits):
 | |
|     # Try to generate a safe prime p=2q+1
 | |
|     # This can take a few attempts for larger bit sizes
 | |
|     while True:
 | |
|         q = number.getPrime(bits - 1)
 | |
|         p = 2 * q + 1
 | |
|         if number.isPrime(p):
 | |
|             return p, q
 | |
| 
 | |
| def find_generator_for_safe_prime(p, q):
 | |
|     # For safe prime p = 2q + 1, a generator g must satisfy:
 | |
|     # g^2 mod p != 1 and g^q mod p != 1
 | |
|     while True:
 | |
|         g = number.getRandomRange(2, p - 2)
 | |
|         if pow(g, 2, p) != 1 and pow(g, q, p) != 1:
 | |
|             return g
 | |
| 
 | |
| def generate_elgamal_keys(bits=1024):
 | |
|     p, q = generate_safe_prime(bits)
 | |
|     g = find_generator_for_safe_prime(p, q)
 | |
|     x = number.getRandomRange(2, p - 2)
 | |
|     y = pow(g, x, p)
 | |
|     return {"p": p, "g": g, "x": x, "y": y}
 | |
| 
 | |
| def _elg_hash_to_int(msg_bytes, p_minus_1):
 | |
|     h = SHA512.new(msg_bytes).digest()
 | |
|     h_int = number.bytes_to_long(h) % p_minus_1
 | |
|     if h_int == 0:
 | |
|         h_int = 1
 | |
|     return h_int
 | |
| 
 | |
| def elgamal_sign(msg_bytes, priv):
 | |
|     p = priv["p"]
 | |
|     g = priv["g"]
 | |
|     x = priv["x"]
 | |
|     k = None
 | |
|     while True:
 | |
|         k = number.getRandomRange(2, p - 2)
 | |
|         if number.GCD(k, p - 1) == 1:
 | |
|             break
 | |
|     r = pow(g, k, p)
 | |
|     kinv = number.inverse(k, p - 1)
 | |
|     h_int = _elg_hash_to_int(msg_bytes, p - 1)
 | |
|     s = (kinv * (h_int - x * r)) % (p - 1)
 | |
|     return (int(r), int(s))
 | |
| 
 | |
| def elgamal_verify(msg_bytes, sig, pub):
 | |
|     p = pub["p"]
 | |
|     g = pub["g"]
 | |
|     y = pub["y"]
 | |
|     r, s = sig
 | |
|     if not (1 <= r <= p - 1):
 | |
|         return False
 | |
|     if not (0 <= s <= p - 2):
 | |
|         return False
 | |
|     h_int = _elg_hash_to_int(msg_bytes, p - 1)
 | |
|     v1 = (pow(y, r, p) * pow(r, s, p)) % p
 | |
|     v2 = pow(g, h_int, p)
 | |
|     return v1 == v2
 | |
| 
 | |
| # --------------- In-Memory Store and Utilities ---------------
 | |
| 
 | |
| class Store:
 | |
|     def __init__(self):
 | |
|         self.keys = {
 | |
|             "rabin": None,     # {"n":..., "p":..., "q":...}
 | |
|             "elgamal": None,   # {"p":..., "g":..., "x":..., "y":...}
 | |
|         }
 | |
|         self.customer_history = []      # list of transactions sent by customer
 | |
|         self.merchant_inbox = []        # list of pending transactions for merchant
 | |
|         self.merchant_processed = []    # list of processed transactions
 | |
|         self.next_tx_id = 1
 | |
| 
 | |
| store = Store()
 | |
| 
 | |
| def now_ts():
 | |
|     return datetime.now().isoformat(timespec="seconds")
 | |
| 
 | |
| def short_hex(x, length=32):
 | |
|     hx = hex(x)[2:]
 | |
|     if len(hx) <= length:
 | |
|         return hx
 | |
|     return hx[:length] + "...(" + str(len(hx)) + " hex chars)"
 | |
| 
 | |
| # --------------- Role Actions ---------------
 | |
| 
 | |
| def customer_create_and_send():
 | |
|     if store.keys["rabin"] is None or store.keys["elgamal"] is None:
 | |
|         print("Keys not ready.")
 | |
|         return
 | |
| 
 | |
|     try:
 | |
|         details = input("Enter payment details (example: Send 55000 to Bob using Mastercard 3048330330393783): ").strip()
 | |
|     except (EOFError, KeyboardInterrupt):
 | |
|         print()
 | |
|         return
 | |
| 
 | |
|     if not details:
 | |
|         print("Empty details. Aborting.")
 | |
|         return
 | |
| 
 | |
|     msg_bytes = details.encode("utf-8")
 | |
|     ts = now_ts()
 | |
|     # Hash (SHA-512) of plaintext; this is what we sign and send as integrity record
 | |
|     digest = SHA512.new(msg_bytes).digest()
 | |
|     digest_hex = digest.hex()
 | |
| 
 | |
|     # Sign the digest using ElGamal (signing input is digest bytes)
 | |
|     sig_r, sig_s = elgamal_sign(digest, store.keys["elgamal"])
 | |
| 
 | |
|     # Encrypt payment details using Rabin (merchant's public n)
 | |
|     n = store.keys["rabin"]["n"]
 | |
|     try:
 | |
|         c = rabin_encrypt(msg_bytes, n)
 | |
|     except Exception as e:
 | |
|         print("Encryption error:", str(e))
 | |
|         return
 | |
| 
 | |
|     tx = {
 | |
|         "id": store.next_tx_id,
 | |
|         "timestamp": ts,
 | |
|         "plaintext": details,
 | |
|         "cipher_hex": hex(c)[2:],   # store as hex text for display
 | |
|         "hash_hex": digest_hex,
 | |
|         "sig_r": sig_r,
 | |
|         "sig_s": sig_s,
 | |
|         "processed": False,
 | |
|     }
 | |
|     store.next_tx_id += 1
 | |
|     store.customer_history.append(tx)
 | |
|     store.merchant_inbox.append(dict(tx))  # copy to merchant inbox
 | |
| 
 | |
|     print("Transaction created and sent to merchant.")
 | |
|     print("ID:", tx["id"])
 | |
|     print("Timestamp:", tx["timestamp"])
 | |
|     print("Before encryption (plaintext):", tx["plaintext"])
 | |
|     print("After encryption (cipher hex):", short_hex(c))
 | |
|     print("SHA-512 digest (hex):", tx["hash_hex"][:64] + "..." if len(tx["hash_hex"]) > 64 else tx["hash_hex"])
 | |
|     print("ElGamal signature r:", tx["sig_r"])
 | |
|     print("ElGamal signature s:", tx["sig_s"])
 | |
| 
 | |
| def customer_view_history():
 | |
|     if not store.customer_history:
 | |
|         print("No past transactions.")
 | |
|         return
 | |
|     for tx in store.customer_history:
 | |
|         print("ID:", tx["id"])
 | |
|         print("Timestamp:", tx["timestamp"])
 | |
|         print("Plaintext:", tx["plaintext"])
 | |
|         print("Cipher (hex):", tx["cipher_hex"])
 | |
|         print("SHA-512 digest (hex):", tx["hash_hex"])
 | |
|         print("Signature r:", tx["sig_r"])
 | |
|         print("Signature s:", tx["sig_s"])
 | |
|         print("Processed by merchant:", tx["processed"])
 | |
|         print("-" * 40)
 | |
| 
 | |
| def merchant_process_all():
 | |
|     if store.keys["rabin"] is None or store.keys["elgamal"] is None:
 | |
|         print("Keys not ready.")
 | |
|         return
 | |
|     if not store.merchant_inbox:
 | |
|         print("No pending transactions.")
 | |
|         return
 | |
| 
 | |
|     p = store.keys["rabin"]["p"]
 | |
|     q = store.keys["rabin"]["q"]
 | |
|     n = store.keys["rabin"]["n"]
 | |
| 
 | |
|     pub_elg = {
 | |
|         "p": store.keys["elgamal"]["p"],
 | |
|         "g": store.keys["elgamal"]["g"],
 | |
|         "y": store.keys["elgamal"]["y"],
 | |
|     }
 | |
| 
 | |
|     processed_any = False
 | |
|     new_inbox = []
 | |
|     for tx in store.merchant_inbox:
 | |
|         processed_any = True
 | |
|         ts = now_ts()
 | |
|         c_hex = tx["cipher_hex"]
 | |
|         try:
 | |
|             c = int(c_hex, 16)
 | |
|         except Exception:
 | |
|             c = None
 | |
| 
 | |
|         decrypted_msg = None
 | |
|         decrypt_ok = False
 | |
|         if c is not None:
 | |
|             try:
 | |
|                 m = rabin_decrypt(c, p, q, n)
 | |
|                 if m is not None:
 | |
|                     decrypted_msg = m.decode("utf-8", errors="replace")
 | |
|                     decrypt_ok = True
 | |
|                 else:
 | |
|                     decrypt_ok = False
 | |
|             except Exception:
 | |
|                 decrypt_ok = False
 | |
| 
 | |
|         # Compute hash of decrypted plaintext
 | |
|         computed_hash_hex = "DECRYPT_FAIL"
 | |
|         if decrypt_ok:
 | |
|             computed_hash_hex = SHA512.new(m).hexdigest()
 | |
| 
 | |
|         # Verify signature using received hash bytes (auditable)
 | |
|         received_hash_hex = tx["hash_hex"]
 | |
|         try:
 | |
|             received_hash_bytes = bytes.fromhex(received_hash_hex)
 | |
|         except Exception:
 | |
|             received_hash_bytes = b""
 | |
|         sig = (tx["sig_r"], tx["sig_s"])
 | |
|         sig_valid = elgamal_verify(received_hash_bytes, sig, pub_elg)
 | |
| 
 | |
|         # Optionally also verify that signature corresponds to computed hash if decryption ok
 | |
|         hash_match = False
 | |
|         if decrypt_ok:
 | |
|             hash_match = (computed_hash_hex == received_hash_hex)
 | |
| 
 | |
|         rec = {
 | |
|             "id": tx["id"],
 | |
|             "timestamp": ts,
 | |
|             "received_hash_hex": received_hash_hex,
 | |
|             "computed_hash_hex": computed_hash_hex,
 | |
|             "signature_valid": bool(sig_valid),
 | |
|             "hash_match": bool(hash_match),
 | |
|             "decryption_ok": bool(decrypt_ok),
 | |
|             "decrypted_plaintext": decrypted_msg if decrypt_ok else "",
 | |
|         }
 | |
|         store.merchant_processed.append(rec)
 | |
|         tx["processed"] = True
 | |
| 
 | |
|         print("Processed transaction ID:", tx["id"])
 | |
|         print("Signature valid:", rec["signature_valid"])
 | |
|         print("Decryption ok:", rec["decryption_ok"])
 | |
|         print("Received hash (hex):", rec["received_hash_hex"])
 | |
|         print("Computed hash (hex):", rec["computed_hash_hex"])
 | |
|         print("Hashes match:", rec["hash_match"])
 | |
|         print("Timestamp:", rec["timestamp"])
 | |
|         print("-" * 40)
 | |
| 
 | |
|     # Clear inbox after processing all
 | |
|     store.merchant_inbox = []
 | |
|     if not processed_any:
 | |
|         print("No transactions processed.")
 | |
| 
 | |
| def merchant_show_processed():
 | |
|     if not store.merchant_processed:
 | |
|         print("No processed records.")
 | |
|         return
 | |
|     for rec in store.merchant_processed:
 | |
|         print("ID:", rec["id"])
 | |
|         print("Timestamp:", rec["timestamp"])
 | |
|         print("Signature valid:", rec["signature_valid"])
 | |
|         print("Decryption ok:", rec["decryption_ok"])
 | |
|         print("Received hash (hex):", rec["received_hash_hex"])
 | |
|         print("Computed hash (hex):", rec["computed_hash_hex"])
 | |
|         print("Hashes match:", rec["hash_match"])
 | |
|         # Merchant can see plaintext if needed:
 | |
|         if rec["decryption_ok"]:
 | |
|             print("Decrypted plaintext:", rec["decrypted_plaintext"])
 | |
|         print("-" * 40)
 | |
| 
 | |
| def auditor_view_hashed_records():
 | |
|     if not store.merchant_processed:
 | |
|         print("No records to audit.")
 | |
|         return
 | |
|     for rec in store.merchant_processed:
 | |
|         print("ID:", rec["id"])
 | |
|         print("Timestamp:", rec["timestamp"])
 | |
|         print("Received hash (hex):", rec["received_hash_hex"])
 | |
|         print("Computed hash (hex):", rec["computed_hash_hex"])
 | |
|         print("Hashes match:", rec["hash_match"])
 | |
|         print("-" * 40)
 | |
| 
 | |
| def auditor_verify_signatures():
 | |
|     if not store.merchant_processed:
 | |
|         print("No records to verify.")
 | |
|         return
 | |
|     pub_elg = {
 | |
|         "p": store.keys["elgamal"]["p"],
 | |
|         "g": store.keys["elgamal"]["g"],
 | |
|         "y": store.keys["elgamal"]["y"],
 | |
|     }
 | |
|     for rec in store.merchant_processed:
 | |
|         tx_id = rec["id"]
 | |
|         sig_r = None
 | |
|         sig_s = None
 | |
|         hash_hex = rec["received_hash_hex"]
 | |
|         for tx in store.customer_history:
 | |
|             if tx["id"] == tx_id:
 | |
|                 sig_r = tx["sig_r"]
 | |
|                 sig_s = tx["sig_s"]
 | |
|                 break
 | |
|         if sig_r is None:
 | |
|             print("ID:", tx_id, "Signature not found.")
 | |
|             continue
 | |
|         try:
 | |
|             msg_bytes = bytes.fromhex(hash_hex)
 | |
|         except Exception:
 | |
|             msg_bytes = b""
 | |
|         ok = elgamal_verify(msg_bytes, (sig_r, sig_s), pub_elg)
 | |
|         print("ID:", tx_id, "Signature valid:", bool(ok))
 | |
| 
 | |
| def show_public_keys():
 | |
|     if store.keys["rabin"] is None or store.keys["elgamal"] is None:
 | |
|         print("Keys not ready.")
 | |
|         return
 | |
|     rabin_pub_n = store.keys["rabin"]["n"]
 | |
|     elg_pub = {
 | |
|         "p": store.keys["elgamal"]["p"],
 | |
|         "g": store.keys["elgamal"]["g"],
 | |
|         "y": store.keys["elgamal"]["y"],
 | |
|     }
 | |
|     print("Customer ElGamal public key parameters:")
 | |
|     print("p (bits):", store.keys["elgamal"]["p"].bit_length())
 | |
|     print("g:", short_hex(elg_pub["g"]))
 | |
|     print("y:", short_hex(elg_pub["y"]))
 | |
|     print("Merchant Rabin public modulus n (bits):", rabin_pub_n.bit_length())
 | |
|     print("n:", short_hex(rabin_pub_n))
 | |
| 
 | |
| def main_menu():
 | |
|     while True:
 | |
|         print("Select role:")
 | |
|         print("1. Customer")
 | |
|         print("2. Merchant")
 | |
|         print("3. Auditor")
 | |
|         print("4. Show public keys")
 | |
|         print("5. Exit")
 | |
|         choice = input("Enter choice: ").strip()
 | |
|         if choice == "1":
 | |
|             customer_menu()
 | |
|         elif choice == "2":
 | |
|             merchant_menu()
 | |
|         elif choice == "3":
 | |
|             auditor_menu()
 | |
|         elif choice == "4":
 | |
|             show_public_keys()
 | |
|         elif choice == "5":
 | |
|             print("Goodbye.")
 | |
|             break
 | |
|         else:
 | |
|             print("Invalid choice.")
 | |
| 
 | |
| def customer_menu():
 | |
|     while True:
 | |
|         print("Customer menu:")
 | |
|         print("1. Encrypt, sign, and send payment")
 | |
|         print("2. View past transactions")
 | |
|         print("3. Back")
 | |
|         choice = input("Enter choice: ").strip()
 | |
|         if choice == "1":
 | |
|             customer_create_and_send()
 | |
|         elif choice == "2":
 | |
|             customer_view_history()
 | |
|         elif choice == "3":
 | |
|             return
 | |
|         else:
 | |
|             print("Invalid choice.")
 | |
| 
 | |
| def merchant_menu():
 | |
|     while True:
 | |
|         print("Merchant menu:")
 | |
|         print("1. Process all pending transactions")
 | |
|         print("2. Show processed records")
 | |
|         print("3. Back")
 | |
|         choice = input("Enter choice: ").strip()
 | |
|         if choice == "1":
 | |
|             merchant_process_all()
 | |
|         elif choice == "2":
 | |
|             merchant_show_processed()
 | |
|         elif choice == "3":
 | |
|             return
 | |
|         else:
 | |
|             print("Invalid choice.")
 | |
| 
 | |
| def auditor_menu():
 | |
|     while True:
 | |
|         print("Auditor menu:")
 | |
|         print("1. View hashed payment records")
 | |
|         print("2. Verify ElGamal signatures on records")
 | |
|         print("3. Back")
 | |
|         choice = input("Enter choice: ").strip()
 | |
|         if choice == "1":
 | |
|             auditor_view_hashed_records()
 | |
|         elif choice == "2":
 | |
|             auditor_verify_signatures()
 | |
|         elif choice == "3":
 | |
|             return
 | |
|         else:
 | |
|             print("Invalid choice.")
 | |
| 
 | |
| def init_keys():
 | |
|     print("Generating keys. This may take a moment...")
 | |
|     # Rabin: 2048-bit modulus to fit typical payment strings
 | |
|     store.keys["rabin"] = generate_rabin_keys(bits=2048)
 | |
|     # ElGamal: 1024-bit safe prime for demo speed
 | |
|     store.keys["elgamal"] = generate_elgamal_keys(bits=1024)
 | |
|     print("Keys ready.")
 | |
|     print("Customer has ElGamal signing key (keeps x private, shares p,g,y).")
 | |
|     print("Merchant has Rabin decryption key (keeps p,q private, shares n).")
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     try:
 | |
|         init_keys()
 | |
|         main_menu()
 | |
|     except KeyboardInterrupt:
 | |
|         print("\nInterrupted.")
 | |
|         sys.exit(0)
 | 
