updated IS codes
This commit is contained in:
		
							parent
							
								
									a5c9c120ff
								
							
						
					
					
						commit
						13bcfbbafd
					
				
					 7 changed files with 356 additions and 1 deletions
				
			
		|  | @ -1,10 +1,55 @@ | |||
| import hashlib | ||||
| import random | ||||
| import string | ||||
| import time | ||||
| 
 | ||||
| def ds_gen(dsize): | ||||
|     """Generate random strings dataset""" | ||||
|     dataset = [] | ||||
|     for _ in range(dsize): | ||||
|         length = random.randint(10, 50) | ||||
|         random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=length)) | ||||
|         dataset.append(random_string) | ||||
|     return dataset | ||||
| 
 | ||||
| def hash_benchmark(dataset, hash_func, hash_name): | ||||
|     """Benchmark hashing function and detect collisions""" | ||||
|     start_time = time.time() | ||||
|     hashes = {} | ||||
|     collisions = [] | ||||
|      | ||||
|     for data in dataset: | ||||
|         hash_value = hash_func(data.encode()).hexdigest() | ||||
|         if hash_value in hashes: | ||||
|             collisions.append((data, hashes[hash_value])) | ||||
|         else: | ||||
|             hashes[hash_value] = data | ||||
|      | ||||
|     end_time = time.time() | ||||
|     return end_time - start_time, len(collisions), collisions | ||||
| 
 | ||||
| def main(): | ||||
|     dsize = int(input("Enter data size: ")) | ||||
|     dsize = int(input("Enter data size (50-100): ")) | ||||
|     dsize = max(50, min(100, dsize))  # Ensure range 50-100 | ||||
|      | ||||
|     dataset = ds_gen(dsize) | ||||
|      | ||||
|     hash_functions = [ | ||||
|         (hashlib.md5, "MD5"), | ||||
|         (hashlib.sha1, "SHA-1"),  | ||||
|         (hashlib.sha256, "SHA-256") | ||||
|     ] | ||||
|      | ||||
|     print(f"Testing with {len(dataset)} strings\n") | ||||
|      | ||||
|     for hash_func, name in hash_functions: | ||||
|         time_taken, collision_count, collisions = hash_benchmark(dataset, hash_func, name) | ||||
|         print(f"{name}:") | ||||
|         print(f"  Time: {time_taken:.6f} seconds") | ||||
|         print(f"  Collisions: {collision_count}") | ||||
|         if collisions: | ||||
|             print(f"  Collision pairs: {collisions[:3]}")  # Show first 3 | ||||
|         print() | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     main() | ||||
|  |  | |||
|  | @ -0,0 +1,29 @@ | |||
| import socket | ||||
| import hashlib | ||||
| 
 | ||||
| 
 | ||||
| def main() -> None: | ||||
|     host = "127.0.0.1" | ||||
|     port = 5001 | ||||
| 
 | ||||
|     message = b"hello integrity" | ||||
|     tamper = False  # change to True to simulate corruption | ||||
| 
 | ||||
|     with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | ||||
|         s.connect((host, port)) | ||||
|         send_data = message if not tamper else message[:-1] + b"X" | ||||
|         s.sendall(send_data) | ||||
| 
 | ||||
|         server_digest = s.recv(128).decode() | ||||
|         local_digest = hashlib.sha256(message).hexdigest() | ||||
| 
 | ||||
|         ok = (server_digest == local_digest) | ||||
|         print("server:", server_digest) | ||||
|         print("local :", local_digest) | ||||
|         print("match :", ok) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
| 
 | ||||
| 
 | ||||
|  | @ -0,0 +1,22 @@ | |||
| import socket | ||||
| import hashlib | ||||
| 
 | ||||
| 
 | ||||
| def main() -> None: | ||||
|     host = "127.0.0.1" | ||||
|     port = 5001 | ||||
| 
 | ||||
|     with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server: | ||||
|         server.bind((host, port)) | ||||
|         server.listen(1) | ||||
|         conn, addr = server.accept() | ||||
|         with conn: | ||||
|             data = conn.recv(4096) | ||||
|             digest = hashlib.sha256(data).hexdigest() | ||||
|             conn.sendall(digest.encode()) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
| 
 | ||||
| 
 | ||||
							
								
								
									
										63
									
								
								IS/Lab/Lab6/dh_basic.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								IS/Lab/Lab6/dh_basic.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,63 @@ | |||
| from typing import Tuple | ||||
| 
 | ||||
| from Crypto.Hash import SHA256 | ||||
| from Crypto.Protocol.KDF import HKDF | ||||
| 
 | ||||
| 
 | ||||
| def rfc3526_group14() -> Tuple[int, int]: | ||||
|     p_hex = ( | ||||
|         "FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1" | ||||
|         "29024E088A67CC74020BBEA63B139B22514A08798E3404DD" | ||||
|         "EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245" | ||||
|         "E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED" | ||||
|         "EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D" | ||||
|         "C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F" | ||||
|         "83655D23DCA3AD961C62F356208552BB9ED529077096966D" | ||||
|         "670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B" | ||||
|         "E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9" | ||||
|         "DE2BCBF6955817183995497CEA956AE515D2261898FA0510" | ||||
|         "15728E5A8AACAA68FFFFFFFFFFFFFFFF" | ||||
|     ) | ||||
|     return int(p_hex, 16), 2 | ||||
| 
 | ||||
| 
 | ||||
| def int_to_fixed_length_bytes(value: int, length_bits: int) -> bytes: | ||||
|     length_bytes = (length_bits + 7) // 8 | ||||
|     return value.to_bytes(length_bytes, byteorder="big") | ||||
| 
 | ||||
| 
 | ||||
| def dh_keypair(p: int, g: int, private_bits: int = 256) -> Tuple[int, int]: | ||||
|     from Crypto.Util import number | ||||
| 
 | ||||
|     a = 0 | ||||
|     while a < 2: | ||||
|         a = number.getRandomNBitInteger(private_bits) | ||||
|     A = pow(g, a, p) | ||||
|     return a, A | ||||
| 
 | ||||
| 
 | ||||
| def main() -> None: | ||||
|     p, g = rfc3526_group14() | ||||
|     a_priv, a_pub = dh_keypair(p, g) | ||||
|     b_priv, b_pub = dh_keypair(p, g) | ||||
| 
 | ||||
|     a_shared = pow(b_pub, a_priv, p) | ||||
|     b_shared = pow(a_pub, b_priv, p) | ||||
|     same = (a_shared == b_shared) | ||||
| 
 | ||||
|     print("Lab6: Basic Diffie-Hellman (no symmetric cipher)") | ||||
|     print(f"- Group: p({p.bit_length()} bits), g={g}") | ||||
|     print(f"- A public: {hex(a_pub)[:18]}...") | ||||
|     print(f"- B public: {hex(b_pub)[:18]}...") | ||||
|     print(f"- Shared equal: {same}") | ||||
| 
 | ||||
|     # Optionally derive a fixed-length key material to show post-processing (not used to encrypt) | ||||
|     shared_bytes = int_to_fixed_length_bytes(a_shared, p.bit_length()) | ||||
|     key_material = HKDF(shared_bytes, 32, salt=None, hashmod=SHA256, context=b"demo") | ||||
|     print(f"- Derived key material (SHA256/HKDF) prefix: {key_material.hex()[:16]}...") | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
| 
 | ||||
| 
 | ||||
							
								
								
									
										104
									
								
								IS/Lab/Lab6/elgamal_schnorr.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										104
									
								
								IS/Lab/Lab6/elgamal_schnorr.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,104 @@ | |||
| import os | ||||
| import random | ||||
| from typing import Tuple | ||||
| 
 | ||||
| from Crypto.Hash import SHA256 | ||||
| from Crypto.Util.number import getPrime | ||||
| 
 | ||||
| 
 | ||||
| def elgamal_keygen(bits: int = 512) -> Tuple[Tuple[int, int, int], int]: | ||||
|     p = getPrime(bits) | ||||
|     g = random.randrange(2, p - 1) | ||||
|     x = random.randrange(1, p - 1) | ||||
|     h = pow(g, x, p) | ||||
|     return (p, g, h), x | ||||
| 
 | ||||
| 
 | ||||
| def elgamal_encrypt(message: bytes, pub_key: Tuple[int, int, int]) -> Tuple[int, int]: | ||||
|     p, g, h = pub_key | ||||
|     m = int.from_bytes(message, "big") | ||||
|     if m >= p: | ||||
|         raise ValueError("Message too large for key size") | ||||
|     k = random.randrange(1, p - 1) | ||||
|     c1 = pow(g, k, p) | ||||
|     s = pow(h, k, p) | ||||
|     c2 = (m * s) % p | ||||
|     return c1, c2 | ||||
| 
 | ||||
| 
 | ||||
| def elgamal_decrypt(ciphertext: Tuple[int, int], priv_key: int, pub_key: Tuple[int, int, int]) -> bytes: | ||||
|     c1, c2 = ciphertext | ||||
|     p, _, _ = pub_key | ||||
|     s = pow(c1, priv_key, p) | ||||
|     s_inv = pow(s, p - 2, p) | ||||
|     m = (c2 * s_inv) % p | ||||
|     if m == 0: | ||||
|         return b"" | ||||
|     m_len = (m.bit_length() + 7) // 8 | ||||
|     return m.to_bytes(m_len, "big") | ||||
| 
 | ||||
| 
 | ||||
| def schnorr_keygen(bits: int = 512) -> Tuple[Tuple[int, int, int], int]: | ||||
|     # Use same style group as ElGamal demo for simplicity | ||||
|     p = getPrime(bits) | ||||
|     g = random.randrange(2, p - 1) | ||||
|     x = random.randrange(1, p - 1) | ||||
|     y = pow(g, x, p) | ||||
|     return (p, g, y), x | ||||
| 
 | ||||
| 
 | ||||
| def schnorr_sign(message: bytes, priv_key: int, params: Tuple[int, int, int]) -> Tuple[int, int]: | ||||
|     p, g, _ = params | ||||
|     k = random.randrange(1, p - 1) | ||||
|     r = pow(g, k, p) | ||||
|     h = SHA256.new() | ||||
|     h.update(r.to_bytes((r.bit_length() + 7) // 8 or 1, "big") + message) | ||||
|     e = int.from_bytes(h.digest(), "big") % (p - 1) | ||||
|     s = (k + e * priv_key) % (p - 1) | ||||
|     return e, s | ||||
| 
 | ||||
| 
 | ||||
| def schnorr_verify(message: bytes, signature: Tuple[int, int], params: Tuple[int, int, int]) -> bool: | ||||
|     p, g, y = params | ||||
|     e, s = signature | ||||
|     # Compute r' = g^s * y^{-e} mod p | ||||
|     y_inv_e = pow(y, (p - 1 - e) % (p - 1), p) | ||||
|     r_prime = (pow(g, s, p) * y_inv_e) % p | ||||
|     h = SHA256.new() | ||||
|     h.update(r_prime.to_bytes((r_prime.bit_length() + 7) // 8 or 1, "big") + message) | ||||
|     e_prime = int.from_bytes(h.digest(), "big") % (p - 1) | ||||
|     return e_prime == e | ||||
| 
 | ||||
| 
 | ||||
| def demo() -> None: | ||||
|     print("Lab6: ElGamal (encrypt/decrypt) and Schnorr (sign/verify) demo") | ||||
|     default_msg = os.environ.get("LAB6_MESSAGE", "Test message for Lab6") | ||||
|     try: | ||||
|         user_msg = input(f"Enter plaintext [{default_msg}]: ").strip() | ||||
|     except EOFError: | ||||
|         user_msg = "" | ||||
|     message = (user_msg or default_msg).encode() | ||||
| 
 | ||||
|     # ElGamal | ||||
|     pub_e, priv_e = elgamal_keygen(512) | ||||
|     c1, c2 = elgamal_encrypt(message, pub_e) | ||||
|     recovered = elgamal_decrypt((c1, c2), priv_e, pub_e) | ||||
|     print("\nElGamal:") | ||||
|     print(f"- Public: p({pub_e[0].bit_length()} bits), g, h") | ||||
|     print(f"- Ciphertext c1={hex(c1)[:18]}..., c2={hex(c2)[:18]}...") | ||||
|     print(f"- Decrypt OK: {recovered == message}") | ||||
| 
 | ||||
|     # Schnorr | ||||
|     pub_s, priv_s = schnorr_keygen(512) | ||||
|     sig = schnorr_sign(message, priv_s, pub_s) | ||||
|     ok = schnorr_verify(message, sig, pub_s) | ||||
|     print("\nSchnorr:") | ||||
|     print(f"- Public: p({pub_s[0].bit_length()} bits), g, y") | ||||
|     print(f"- Signature e={sig[0]}, s={sig[1]}") | ||||
|     print(f"- Verify OK: {ok}") | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     demo() | ||||
| 
 | ||||
| 
 | ||||
							
								
								
									
										49
									
								
								IS/Lab/Lab6/socket_rsa/client.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								IS/Lab/Lab6/socket_rsa/client.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,49 @@ | |||
| import os | ||||
| import socket | ||||
| 
 | ||||
| from Crypto.PublicKey import RSA | ||||
| from Crypto.Cipher import PKCS1_OAEP | ||||
| 
 | ||||
| 
 | ||||
| def recv_exact(conn: socket.socket, n: int) -> bytes: | ||||
|     buf = b"" | ||||
|     while len(buf) < n: | ||||
|         chunk = conn.recv(n - len(buf)) | ||||
|         if not chunk: | ||||
|             raise ConnectionError("connection closed") | ||||
|         buf += chunk | ||||
|     return buf | ||||
| 
 | ||||
| 
 | ||||
| def main() -> None: | ||||
|     host = "127.0.0.1" | ||||
|     port = 5003 | ||||
|     default_msg = os.environ.get("LAB6_MESSAGE", "rsa hello") | ||||
|     try: | ||||
|         user_msg = input(f"Enter plaintext [{default_msg}]: ").strip() | ||||
|     except EOFError: | ||||
|         user_msg = "" | ||||
|     message = (user_msg or default_msg).encode() | ||||
| 
 | ||||
|     with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as c: | ||||
|         c.connect((host, port)) | ||||
|         # Receive server public key | ||||
|         klen = int.from_bytes(recv_exact(c, 4), "big") | ||||
|         kbytes = recv_exact(c, klen) | ||||
|         server_pub = RSA.import_key(kbytes) | ||||
| 
 | ||||
|         # Encrypt message to server | ||||
|         cipher = PKCS1_OAEP.new(server_pub) | ||||
|         ctext = cipher.encrypt(message) | ||||
|         c.sendall(len(ctext).to_bytes(4, "big") + ctext) | ||||
| 
 | ||||
|         # Receive acknowledgement | ||||
|         rlen = int.from_bytes(recv_exact(c, 4), "big") | ||||
|         reply = recv_exact(c, rlen) | ||||
|         print("server reply:", reply.decode(errors="ignore")) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
| 
 | ||||
| 
 | ||||
							
								
								
									
										43
									
								
								IS/Lab/Lab6/socket_rsa/server.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										43
									
								
								IS/Lab/Lab6/socket_rsa/server.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,43 @@ | |||
| import socket | ||||
| from typing import Tuple | ||||
| 
 | ||||
| from Crypto.PublicKey import RSA | ||||
| from Crypto.Cipher import PKCS1_OAEP | ||||
| 
 | ||||
| 
 | ||||
| def generate_rsa(bits: int = 2048) -> Tuple[RSA.RsaKey, RSA.RsaKey]: | ||||
|     key = RSA.generate(bits) | ||||
|     return key.publickey(), key | ||||
| 
 | ||||
| 
 | ||||
| def main() -> None: | ||||
|     host = "127.0.0.1" | ||||
|     port = 5003 | ||||
| 
 | ||||
|     pub, priv = generate_rsa(2048) | ||||
|     pub_pem = pub.export_key() | ||||
| 
 | ||||
|     with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: | ||||
|         srv.bind((host, port)) | ||||
|         srv.listen(1) | ||||
|         conn, _ = srv.accept() | ||||
|         with conn: | ||||
|             # Send server public key | ||||
|             conn.sendall(len(pub_pem).to_bytes(4, "big") + pub_pem) | ||||
| 
 | ||||
|             # Receive client's encrypted message | ||||
|             clen = int.from_bytes(conn.recv(4), "big") | ||||
|             ctext = conn.recv(clen) | ||||
| 
 | ||||
|             cipher = PKCS1_OAEP.new(priv) | ||||
|             msg = cipher.decrypt(ctext) | ||||
|             # Respond: encrypt reply with same public (client will not be able to decrypt without its private), | ||||
|             # so just echo plaintext length as acknowledgement for demo. | ||||
|             reply = f"received:{len(msg)}".encode() | ||||
|             conn.sendall(len(reply).to_bytes(4, "big") + reply) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
| 
 | ||||
| 
 | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue