Advanced Python Programming for AI: High-Performance, Clean Code, and Concurrency
1. Introduction
- Why Advanced Python for AI? (With a Mini-Challenge)
- Briefly cover Python’s role.
- Mini-Challenge: Provide a simple, inefficient Python function (e.g., loading a large file line by line with string concatenation in a loop) and ask the reader to predict bottlenecks and think about improvements. This sets the stage for performance sections.
- Explain how the book will provide the tools to solve such challenges.
- Who is this Book For?
- Reiterate target audience.
- How to Use This Book: Learn by Doing!
- Emphasize that the book is full of code, labs, and exercises. Encourage active participation.
- Suggest setting up a dedicated environment for labs.
2. Core Python Refresh: Building Blocks for AI (Hands-On)
This section won’t just explain data structures; it will show why they matter for AI with concrete scenarios and code.
- Efficient Data Structures for AI Tasks
- Lists vs. Tuples vs. Sets vs. Dictionaries: Practical Choices
- Scenario: Storing tokens, configuration, or unique identifiers.
- Code Example 1.1: Tokenizing and Counting
import collections import time text_corpus = ["apple banana", "orange apple", "banana grape apple", "kiwi orange"] * 10000 # List for basic sequence token_list = [] for doc in text_corpus: token_list.extend(doc.split()) print(f"Total tokens (list): {len(token_list)}") # Set for unique tokens unique_tokens = set(token_list) print(f"Unique tokens (set): {len(unique_tokens)}") # Dictionary for word frequencies (naive) word_counts_naive = {} start = time.time() for token in token_list: word_counts_naive[token] = word_counts_naive.get(token, 0) + 1 end = time.time() print(f"Naive word count time: {end - start:.4f}s") # Dictionary for word frequencies (using collections.Counter) start = time.time() word_counts_counter = collections.Counter(token_list) end = time.time() print(f"Counter word count time: {end - start:.4f}s") print(f"Most common words: {word_counts_counter.most_common(3)}") - Discuss performance differences and when
collections.Counterorsetis superior.
- Beyond Built-ins:
collectionsModule in Action- Code Example 1.2: Grouping Data with
defaultdictExplain clarity and conciseness.from collections import defaultdict data_points = [ {'category': 'fruit', 'item': 'apple', 'value': 10}, {'category': 'vegetable', 'item': 'carrot', 'value': 5}, {'category': 'fruit', 'item': 'banana', 'value': 7}, {'category': 'vegetable', 'item': 'broccoli', 'value': 3}, ] # Grouping by category (naive) grouped_naive = {} for dp in data_points: category = dp['category'] if category not in grouped_naive: grouped_naive[category] = [] grouped_naive[category].append(dp['item']) print(f"Grouped Naive: {grouped_naive}") # Grouping by category (using defaultdict) grouped_defaultdict = defaultdict(list) for dp in data_points: grouped_defaultdict[dp['category']].append(dp['item']) print(f"Grouped Defaultdict: {dict(grouped_defaultdict)}")
- Code Example 1.2: Grouping Data with
- NumPy Arrays: The Foundation of Tensor Operations (Coding Lab 1.1)
- Concept: Introduce
ndarrayas contiguous memory blocks, crucial for speed. - Coding Lab 1.1: Basic Tensor Operations
- Create 1D, 2D, 3D arrays.
- Element-wise operations (addition, multiplication).
- Matrix multiplication (
@operator). - Slicing and indexing for features/batches.
- Example: Calculating Euclidean distance between feature vectors.
import numpy as np # Create two 1D feature vectors vec1 = np.array([1.0, 2.0, 3.0]) vec2 = np.array([4.0, 5.0, 6.0]) # Element-wise addition print(f"Element-wise sum: {vec1 + vec2}") # Dot product (fundamental for neural networks) print(f"Dot product: {vec1 @ vec2}") # Example: A batch of feature vectors (4 samples, 3 features each) feature_batch = np.array([ [1.1, 2.2, 3.3], [4.4, 5.5, 6.6], [7.7, 8.8, 9.9], [0.1, 0.2, 0.3] ]) print(f"Shape of feature_batch: {feature_batch.shape}") # Select the first two samples print(f"First two samples:\n{feature_batch[:2, :]}") # Calculate the mean of each feature print(f"Mean of each feature: {np.mean(feature_batch, axis=0)}") # Exercise: Calculate Euclidean distance between vec1 and vec2 distance = np.sqrt(np.sum((vec1 - vec2)**2)) print(f"Euclidean distance: {distance:.2f}") # Challenge: Implement a simple ReLU activation function for an array def relu(x): return np.maximum(0, x) test_array = np.array([-1, 0, 1, -5, 10]) print(f"ReLU applied: {relu(test_array)}")
- Concept: Introduce
- Pandas DataFrames: Cleaning and Preprocessing Real-World Data (Coding Lab 1.2)
- Concept: Tabular data manipulation.
- Coding Lab 1.2: Data Cleaning and Feature Engineering
- Load a (small, synthetic) CSV dataset.
- Handle missing values (fill, drop).
- Filter data based on conditions.
- Create new features (e.g., polynomial features, interaction terms).
- Group by and aggregate.
import pandas as pd import numpy as np # Create a synthetic dataset data = { 'age': [25, 30, np.nan, 40, 28, 35], 'salary': [50000, 60000, 75000, np.nan, 52000, 65000], 'experience': [2, 5, 8, 10, 3, 7], 'city': ['NY', 'SF', 'NY', 'LA', 'SF', 'NY'], 'gender': ['M', 'F', 'F', 'M', 'F', 'M'] } df = pd.DataFrame(data) print("Original DataFrame:") print(df) # --- Data Cleaning --- # 1. Check for missing values print("\nMissing values:\n", df.isnull().sum()) # 2. Fill missing 'age' with mean df['age'].fillna(df['age'].mean(), inplace=True) # 3. Drop rows with missing 'salary' df.dropna(subset=['salary'], inplace=True) print("\nDataFrame after cleaning missing values:") print(df) # --- Feature Engineering --- # 1. Create a new feature: 'experience_squared' df['experience_squared'] = df['experience'] ** 2 # 2. One-hot encode 'city' and 'gender' df = pd.get_dummies(df, columns=['city', 'gender'], drop_first=True) print("\nDataFrame after feature engineering:") print(df) # --- Aggregation --- # Calculate average salary by gender (before one-hot encoding for clarity) original_df = pd.DataFrame(data) # Reload for this specific aggregation print("\nAverage salary by gender (original data):") print(original_df.groupby('gender')['salary'].mean())
- Lists vs. Tuples vs. Sets vs. Dictionaries: Practical Choices
- Functions, Decorators, and Generators: AI Logic Patterns
- Mastering Functions: First-Class Citizens and Lambdas
- Scenario: Passing different activation functions to a model.
- Code Example 1.3: Higher-Order Functions for Activations
def relu(x): return max(0, x) def sigmoid(x): return 1 / (1 + (2.71828 ** -x)) # Approx e def apply_activation(data, activation_func): return [activation_func(x) for x in data] inputs = [-3, -1, 0, 1, 3] print(f"ReLU applied: {apply_activation(inputs, relu)}") print(f"Sigmoid applied: {apply_activation(inputs, sigmoid)}") print(f"Lambda (double) applied: {apply_activation(inputs, lambda x: x * 2)}")
- Decorators for AI: Timing, Caching, and Pre/Post Processing (Coding Lab 1.3)
- Concept: How decorators wrap functions to add functionality.
- Coding Lab 1.3: Building Practical AI Decorators
- A
@timerdecorator for ML function execution. - A
@cachedecorator for expensive computations (e.g., feature extraction).
import time from functools import wraps # Decorator 1: @timer for performance measurement def timer(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() print(f"Function '{func.__name__}' took {end_time - start_time:.4f} seconds.") return result return wrapper # Decorator 2: @cache for memoization (simple version) def cache(func): _cache = {} @wraps(func) def wrapper(*args, **kwargs): key = str((args, sorted(kwargs.items()))) # Simple key for hashable args/kwargs if key not in _cache: _cache[key] = func(*args, **kwargs) return _cache[key] return wrapper @timer def train_model_epoch(data_batch, epoch_num): # Simulate a complex training step time.sleep(0.1) return f"Model trained for epoch {epoch_num} with {len(data_batch)} samples." @cache @timer # Decorators stack from bottom up def compute_expensive_feature(text_input: str) -> list: # Simulate feature extraction that takes time time.sleep(0.5) return [len(text_input), text_input.count('e'), text_input.upper()] print(train_model_epoch([1,2,3], 1)) print(train_model_epoch([1,2,3], 2)) # Will run again print("\n--- Testing Caching ---") print(compute_expensive_feature("hello world")) print(compute_expensive_feature("hello world")) # Should be faster due to cache print(compute_expensive_feature("another phrase")) print(compute_expensive_feature("another phrase")) # Should be faster due to cache - A
- Generators: Processing Large Datasets Memory-Efficiently (Coding Lab 1.4)
- Concept:
yieldfor lazy evaluation, crucial for out-of-core data processing. - Coding Lab 1.4: Streaming Large Files for LLM Preprocessing
- Create a dummy large text file.
- Generator to read chunks or lines without loading the whole file.
- Scenario: Tokenizing a massive corpus.
import os import random import sys # Create a dummy large text file file_path = "large_text_data.txt" num_lines = 100000 # 100k lines with open(file_path, "w") as f: for _ in range(num_lines): f.write("This is a line of sample text for AI processing. " * random.randint(5, 10) + "\n") print(f"Created dummy file: {file_path} (size: {os.path.getsize(file_path) / (1024*1024):.2f} MB)") # Generator function to read file line by line def read_large_file_generator(path): print(f"Memory before generator: {sys.getsizeof([])} bytes (just an empty list)") with open(path, 'r') as f: for line in f: yield line.strip() # Yield one line at a time # Function to read all lines into a list (memory-intensive) def read_large_file_list(path): print("Reading entire file into memory...") with open(path, 'r') as f: return [line.strip() for line in f] print("\n--- Using Generator (Memory Efficient) ---") line_count_gen = 0 for line in read_large_file_generator(file_path): line_count_gen += 1 if line_count_gen % 20000 == 0: print(f"Processed {line_count_gen} lines (generator)") print(f"Finished processing {line_count_gen} lines with generator.") # Observe memory usage here, it should remain low print("\n--- Using List (Memory Inefficient for very large files) ---") # This part might cause MemoryError for genuinely massive files if not careful # For demonstration, use a smaller file or accept potential slowdown try: # To simulate large memory usage, we'd need a much larger file or a system with less RAM # For this example, let's just see it load, conceptually it's less efficient lines_list = read_large_file_list(file_path) print(f"Finished loading {len(lines_list)} lines into list.") # print(f"Memory used by list: {sys.getsizeof(lines_list) / (1024*1024):.2f} MB") except MemoryError: print("Caught MemoryError when trying to load the whole file into a list. Generator approach is superior!") except Exception as e: print(f"An error occurred: {e}") # Clean up dummy file os.remove(file_path)
- Concept:
- Mastering Functions: First-Class Citizens and Lambdas
- Object-Oriented Python for AI: Structuring Complex Systems
- Classes and Objects: Building Reusable AI Components
- Scenario: Creating a basic
DatasetandModelclass. - Code Example 1.5: Simple Dataset Loader and Model Stub
class AIDataset: def __init__(self, data_path): self.data_path = data_path self.data = self._load_data() def _load_data(self): # Simulate loading data from a file print(f"Loading data from {self.data_path}") return [i * 10 for i in range(5)] # Dummy data def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx] class AIModel: def __init__(self, num_features): self.num_features = num_features self.weights = [random.random() for _ in range(num_features)] # Dummy weights print(f"Initialized model with {num_features} features.") def predict(self, input_data): # Simulate a simple linear prediction return sum(x * w for x, w in zip(input_data, self.weights)) # Usage my_dataset = AIDataset("path/to/my_data.csv") print(f"Dataset size: {len(my_dataset)}") print(f"First item: {my_dataset[0]}") my_model = AIModel(num_features=5) sample_input = [1, 2, 3, 4, 5] prediction = my_model.predict(sample_input) print(f"Prediction for {sample_input}: {prediction}")
- Scenario: Creating a basic
- Inheritance and Polymorphism: Designing Flexible Models (Coding Lab 1.5)
- Concept: Base classes for different model types.
- Coding Lab 1.5: Polymorphic AI Models
- Base
Modelclass withtrainandpredictmethods. - Subclasses
LinearModel,NeuralNetworkModeloverriding these methods. - Demonstrate calling
trainon different model instances.
import random class BaseModel: def __init__(self, name="GenericModel"): self.name = name self.is_trained = False def train(self, data, labels): raise NotImplementedError("Subclasses must implement 'train' method.") def predict(self, data): raise NotImplementedError("Subclasses must implement 'predict' method.") def __str__(self): return f"{self.name} (Trained: {self.is_trained})" class LinearRegressionModel(BaseModel): def __init__(self): super().__init__("LinearRegression") self.weights = [] self.bias = 0 def train(self, data, labels): # Simulate training a linear model print(f"Training {self.name} with {len(data)} samples...") self.weights = [random.uniform(-1, 1) for _ in range(len(data[0]))] self.bias = random.uniform(-0.5, 0.5) self.is_trained = True print(f"{self.name} training complete.") def predict(self, data_point): if not self.is_trained: raise ValueError("Model not trained.") return sum(x * w for x, w in zip(data_point, self.weights)) + self.bias class NeuralNetworkModel(BaseModel): def __init__(self, num_layers=2): super().__init__("SimpleNeuralNetwork") self.num_layers = num_layers self.layers = [] # Simulate layers for _ in range(num_layers): self.layers.append({"weights": [random.uniform(-1, 1), random.uniform(-1, 1)]}) def train(self, data, labels): # Simulate training a neural network print(f"Training {self.name} with {len(data)} samples across {self.num_layers} layers...") time.sleep(0.2) # Simulate more complex training # In a real scenario, this would involve backpropagation, etc. self.is_trained = True print(f"{self.name} training complete.") def predict(self, data_point): if not self.is_trained: raise ValueError("Model not trained.") # Simulate forward pass activation = sum(x * w for x, w in zip(data_point, self.layers[0]["weights"])) return activation # Simplified for example # Using polymorphism models = [LinearRegressionModel(), NeuralNetworkModel(num_layers=3)] sample_data = [[1.0, 2.0], [3.0, 4.0]] sample_labels = [5.0, 7.0] for model in models: print(f"\n--- {model.name} ---") model.train(sample_data, sample_labels) try: prediction = model.predict([0.5, 1.5]) print(f"Prediction for [0.5, 1.5]: {prediction:.2f}") except ValueError as e: print(e) - Base
- Practical Design Patterns: Strategy and Factory in AI
- Scenario: Switching between optimizers or model architectures.
- Code Example 1.6: Optimizer Strategy Pattern
# Strategy Pattern for Optimizers class OptimizerStrategy: def optimize(self, model_params, gradients): raise NotImplementedError class SGD(OptimizerStrategy): def __init__(self, learning_rate=0.01): self.lr = learning_rate def optimize(self, model_params, gradients): # Simulate SGD update updated_params = [p - self.lr * g for p, g in zip(model_params, gradients)] print(f"Applying SGD with LR={self.lr}") return updated_params class Adam(OptimizerStrategy): def __init__(self, learning_rate=0.001): self.lr = learning_rate # Adam specific state (e.g., moments) would be here def optimize(self, model_params, gradients): # Simulate Adam update updated_params = [p - self.lr * g * 0.99 for p, g in zip(model_params, gradients)] # Simplified print(f"Applying Adam with LR={self.lr}") return updated_params # Context class that uses the strategy class Trainer: def __init__(self, optimizer: OptimizerStrategy): self.optimizer = optimizer self.model_params = [0.1, 0.2, 0.3] # Initial params def run_training_step(self, gradients): print(f"Current params: {self.model_params}") self.model_params = self.optimizer.optimize(self.model_params, gradients) print(f"Updated params: {self.model_params}") # Usage sgd_optimizer = SGD(learning_rate=0.02) adam_optimizer = Adam(learning_rate=0.005) sgd_trainer = Trainer(sgd_optimizer) adam_trainer = Trainer(adam_optimizer) print("\n--- SGD Training ---") sgd_trainer.run_training_step([0.1, 0.2, 0.3]) sgd_trainer.run_training_step([0.05, 0.1, 0.15]) print("\n--- Adam Training ---") adam_trainer.run_training_step([0.1, 0.2, 0.3]) adam_trainer.run_training_step([0.05, 0.1, 0.15])
- Classes and Objects: Building Reusable AI Components
3. Crafting Clean & Maintainable AI Code (Best Practices Lab)
This section focuses heavily on code quality with immediate application.
- Coding Style and Readability: PEP 8 in Practice
- Explanation: Introduce PEP 8, benefits of consistency.
- Exercise 2.1: Linting and Formatting Your Code
- Provide a deliberately messy code snippet (e.g., inconsistent indentation, long lines, bad naming).
- Instructions on installing and running
flake8andblack. - Challenge: Fix the snippet to be PEP 8 compliant.
# MESSY CODE SNIPPET (to be corrected by reader) import pandas as pd def process_Data ( input_file , out_file ): data=pd.read_csv( input_file ) filtered_data=data[ data["value"] > 100 ] filtered_data.to_csv(out_file,index=False) process_Data('input.csv', 'output.csv')
- Documentation and Type Hinting: Clarity for Collaboration
- Exercise 2.2: Writing Effective Docstrings for AI Functions/Classes
- Provide a function or a simple class (e.g., a custom
DataLoader). - Guide the reader to write a comprehensive docstring using NumPy or Google style.
def calculate_cosine_similarity(vec1, vec2): # Write a comprehensive docstring for this function # including its purpose, parameters, return value, and any exceptions. # Use NumPy style or Google style. dot_product = sum(v1 * v2 for v1, v2 in zip(vec1, vec2)) magnitude_v1 = sum(v**2 for v in vec1)**0.5 magnitude_v2 = sum(v**2 for v in vec2)**0.5 if magnitude_v1 == 0 or magnitude_v2 == 0: return 0.0 # Handle zero vectors return dot_product / (magnitude_v1 * magnitude_v2) class CustomImageTransformer: def __init__(self, resize_dim, normalize_mean, normalize_std): # Write a docstring for this class and its __init__ method. self.resize_dim = resize_dim self.normalize_mean = normalize_mean self.normalize_std = normalize_std def transform(self, image): # Write a docstring for the transform method. # Simulate image transformation return image # Placeholder - Provide a function or a simple class (e.g., a custom
- Type Hinting: Catching Bugs Early in AI Development (Coding Lab 2.1)
- Concept: Explain how type hints improve readability and enable static analysis.
- Coding Lab 2.1: Adding Type Hints to an AI Utility
- Take the
calculate_cosine_similarityfunction and add type hints. - Run
mypyto demonstrate error detection.
from typing import List, Union, Tuple, Dict # Function to calculate cosine similarity with type hints def calculate_cosine_similarity_typed(vec1: List[float], vec2: List[float]) -> float: """ Calculates the cosine similarity between two float vectors. Args: vec1: The first vector of floats. vec2: The second vector of floats. Returns: The cosine similarity as a float, or 0.0 if either vector is a zero vector. """ dot_product = sum(v1 * v2 for v1, v2 in zip(vec1, vec2)) magnitude_v1 = sum(v**2 for v in vec1)**0.5 magnitude_v2 = sum(v**2 for v in vec2)**0.5 if magnitude_v1 == 0 or magnitude_v2 == 0: return 0.0 return dot_product / (magnitude_v1 * magnitude_v2) # Class for a data preprocessor with type hints class TextPreprocessor: def __init__(self, lower_case: bool = True, remove_stopwords: bool = False, vocab_size: int = 10000) -> None: self.lower_case = lower_case self.remove_stopwords = remove_stopwords self.vocab_size = vocab_size self.stopwords: List[str] = [] if remove_stopwords: self.stopwords = ["the", "is", "a", "an", "and"] # Simplified for example def preprocess(self, text: str) -> List[str]: processed_text = text if self.lower_case: processed_text = processed_text.lower() tokens: List[str] = processed_text.split() if self.remove_stopwords: tokens = [token for token in tokens if token not in self.stopwords] return tokens # Usage: vec_a = [1.0, 1.0, 0.0] vec_b = [0.0, 1.0, 1.0] similarity = calculate_cosine_similarity_typed(vec_a, vec_b) print(f"Cosine Similarity: {similarity:.2f}") preprocessor = TextPreprocessor(remove_stopwords=True) text_input = "The quick brown fox jumps over the lazy dog" processed_tokens = preprocessor.preprocess(text_input) print(f"Processed tokens: {processed_tokens}") # Example of potential type error (run mypy to catch this) # calculate_cosine_similarity_typed([1, 2], [3, "4"]) - Take the
- Exercise 2.2: Writing Effective Docstrings for AI Functions/Classes
- Robust Error Handling and Logging for AI Systems
- Exercise 2.3:
try-exceptfor Resilient AI Pipelines- Provide a function that might fail (e.g., trying to open a non-existent file, division by zero during normalization).
- Guide the reader to add robust
try-exceptblocks. - Introduce custom exceptions for specific AI errors.
# Function to process an image file def process_image(file_path): try: with open(file_path, 'rb') as f: image_data = f.read() if not image_data: raise ValueError("Image file is empty.") # Simulate image processing print(f"Successfully processed {len(image_data)} bytes from {file_path}") return True except FileNotFoundError: print(f"Error: File not found at {file_path}") return False except ValueError as e: print(f"Error processing image {file_path}: {e}") return False except Exception as e: print(f"An unexpected error occurred: {e}") return False # Custom Exception for AI Model issues class ModelLoadingError(Exception): """Custom exception for errors during model loading.""" pass def load_ai_model(model_path): if not model_path.endswith(".pt") and not model_path.endswith(".h5"): raise ModelLoadingError(f"Unsupported model format for {model_path}. Expected .pt or .h5") # Simulate actual model loading if not os.path.exists(model_path): raise ModelLoadingError(f"Model file not found at {model_path}") print(f"Loading model from {model_path}...") time.sleep(0.1) print("Model loaded successfully!") return {"model_name": "MyCoolModel", "version": "1.0"} # Testing error handling process_image("non_existent_image.jpg") # Create a dummy empty file with open("empty_image.jpg", "w") as f: pass process_image("empty_image.jpg") os.remove("empty_image.jpg") process_image("valid_image.png") # Assume this file exists or create a dummy one try: load_ai_model("invalid_model.txt") except ModelLoadingError as e: print(f"Caught model loading error: {e}") # Create a dummy model file with open("model.pt", "w") as f: f.write("dummy model content") try: model = load_ai_model("model.pt") print(f"Loaded model: {model}") except ModelLoadingError as e: print(f"Caught model loading error: {e}") os.remove("model.pt") - Strategic Logging: Debugging Model Training and Inference (Coding Lab 2.2)
- Concept: Using
loggingmodule effectively. - Coding Lab 2.2: Implementing Detailed Logging in a Training Loop
- Simulate a training loop and add log messages for: epoch start/end, loss, metric updates, warnings for data anomalies, errors for critical failures.
- Configure logging to file and console.
import logging import random import sys # Configure logger logging.basicConfig( level=logging.INFO, # Default level format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("ai_training.log"), # Log to file logging.StreamHandler(sys.stdout) # Log to console ] ) # Get a logger for our training module logger = logging.getLogger(__name__) def train_model_with_logging(epochs: int, batch_size: int): logger.info("Starting model training...") total_samples = 1000 num_batches = total_samples // batch_size for epoch in range(1, epochs + 1): logger.info(f"--- Epoch {epoch}/{epochs} ---") epoch_loss = 0.0 for batch_idx in range(num_batches): # Simulate processing a batch current_loss = random.uniform(0.1, 0.5) epoch_loss += current_loss if batch_idx % (num_batches // 5) == 0: # Log progress every 20% of batches logger.debug(f" Batch {batch_idx}/{num_batches}: Current Loss = {current_loss:.4f}") # Simulate potential data anomaly if random.random() < 0.01: logger.warning(f" Epoch {epoch}, Batch {batch_idx}: Detected potential data anomaly.") avg_epoch_loss = epoch_loss / num_batches logger.info(f"Epoch {epoch} finished. Average Loss: {avg_epoch_loss:.4f}") # Simulate a critical error if avg_epoch_loss > 0.45: logger.error(f"Epoch {epoch}: High loss detected. Model divergence likely!") # In a real scenario, you might stop training here break logger.info("Model training finished.") # Run the training with logging train_model_with_logging(epochs=5, batch_size=32)
- Concept: Using
- Exercise 2.3:
- Testing AI Components: Ensuring Reliability
- Unit Testing Data Preprocessing and Custom Layers (
pytestLab 2.3)- Concept: Writing unit tests for deterministic AI components.
- Lab 2.3: Testing a Preprocessing Function and a Simple Layer
- Write tests for a
tokenizefunction, asserting output, edge cases. - Write tests for a custom
reluactivation (e.g.,test_relu_positive,test_relu_negative,test_relu_zero).
Instructions:# test_ai_utils.py import pytest import numpy as np # Function to be tested def simple_tokenizer(text: str) -> List[str]: """Splits text by space and converts to lowercase.""" return text.lower().split() # A simple custom activation function (NumPy-based) def custom_relu(x: np.ndarray) -> np.ndarray: return np.maximum(0, x) # --- Tests for simple_tokenizer --- def test_simple_tokenizer_basic(): assert simple_tokenizer("Hello World") == ["hello", "world"] def test_simple_tokenizer_empty_string(): assert simple_tokenizer("") == [""] def test_simple_tokenizer_with_punctuation(): assert simple_tokenizer("Hello, World!") == ["hello,", "world!"] def test_simple_tokenizer_multiple_spaces(): assert simple_tokenizer(" Hello World ") == ["", "", "hello", "", "", "world", ""] # --- Tests for custom_relu --- def test_custom_relu_positive_values(): input_array = np.array([1.0, 5.0, 10.0]) expected_output = np.array([1.0, 5.0, 10.0]) np.testing.assert_array_equal(custom_relu(input_array), expected_output) def test_custom_relu_negative_values(): input_array = np.array([-1.0, -5.0, -10.0]) expected_output = np.array([0.0, 0.0, 0.0]) np.testing.assert_array_equal(custom_relu(input_array), expected_output) def test_custom_relu_mixed_values(): input_array = np.array([-2.0, 0.0, 3.0]) expected_output = np.array([0.0, 0.0, 3.0]) np.testing.assert_array_equal(custom_relu(input_array), expected_output) def test_custom_relu_zero_value(): input_array = np.array([0.0]) expected_output = np.array([0.0]) np.testing.assert_array_equal(custom_relu(input_array), expected_output)pip install pytest numpy. Save the above astest_ai_utils.pyand runpytest. - Write tests for a
- Mocking External Dependencies: Simulating API Calls
- Scenario: Testing a function that makes an external API call (e.g., to an LLM provider) without actually hitting the network.
- Code Example 2.4: Mocking an LLM API Call
from unittest.mock import patch, MagicMock import requests # Function that uses an external API def get_llm_response(prompt: str) -> str: """Makes a call to a hypothetical LLM API and returns the response.""" api_endpoint = "https://api.hypothetical-llm.com/generate" payload = {"text": prompt, "max_tokens": 50} try: response = requests.post(api_endpoint, json=payload, timeout=5) response.raise_for_status() # Raise an exception for HTTP errors return response.json().get("generated_text", "No text generated.") except requests.exceptions.RequestException as e: print(f"API call failed: {e}") return "Error: Could not get response from LLM." # Test using unittest.mock.patch @patch('requests.post') # Patch the requests.post function def test_get_llm_response_success(mock_post): # Configure the mock object's return value mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = {"generated_text": "Mocked LLM response."} mock_response.raise_for_status.return_value = None # No HTTP errors mock_post.return_value = mock_response prompt = "What is the capital of France?" result = get_llm_response(prompt) # Assert that requests.post was called correctly mock_post.assert_called_once_with( "https://api.hypothetical-llm.com/generate", json={"text": prompt, "max_tokens": 50}, timeout=5 ) assert result == "Mocked LLM response." @patch('requests.post') def test_get_llm_response_api_error(mock_post): mock_post.side_effect = requests.exceptions.RequestException("Simulated network error") prompt = "Tell me a joke." result = get_llm_response(prompt) assert "Error: Could not get response from LLM." in result mock_post.assert_called_once() # Run the tests print("Running mocking tests:") test_get_llm_response_success() test_get_llm_response_api_error() print("Mocking tests completed.")
- Unit Testing Data Preprocessing and Custom Layers (
4. Performance Optimization: Supercharging Your AI (Speed Hack Lab)
This section is all about making code faster with concrete examples and measurement.
- Profiling AI Code: Finding the Bottlenecks
- Concept: Introduce
cProfileandline_profiler. - Coding Lab 3.1: Profiling a Mini-ML Pipeline
- Create a simple pipeline: data loading (list comprehension), feature engineering (loops), simple model calculation (more loops).
- Run
cProfileandline_profilerto identify which parts are slowest. - Challenge: Based on profiles, identify optimization targets.
import cProfile import pstats import io import time import random # To run line_profiler: # pip install line_profiler # @profile decorator and then python -m kernprof -l your_script.py # python -m line_profiler your_script.py.lprof # --- A simple, inefficient ML-like pipeline for profiling --- # Make sure to install line_profiler: pip install line_profiler # To run this with line_profiler, you would: # 1. Uncomment the '@profile' line # 2. Run: kernprof -l your_script_name.py # 3. Then: python -m line_profiler your_script_name.py.lprof # @profile def load_data(num_samples): # Simulate loading and basic string processing data = [] for i in range(num_samples): # Simulate a complex string operation long_string = " ".join([random.choice("abcdefg") for _ in range(50)]) data.append(f"sample_{i}_{long_string}") return data # @profile def featurize_data(raw_data): features = [] for item in raw_data: # Simulate a simple feature extraction: string length, count of 'a' feature_vector = [len(item), item.count('a'), item.count('e')] features.append(feature_vector) # Simulate a small, time-consuming intermediate step # time.sleep(0.00001) return features # @profile def train_simple_model(features): # Simulate a very basic "training" - just summing features total_sum = 0 for feature_vec in features: for val in feature_vec: total_sum += val # Simulate a small computation for model update # time.sleep(0.000005) return total_sum def run_pipeline(num_samples=10000): print(f"Running pipeline with {num_samples} samples...") start_time = time.time() raw_data = load_data(num_samples) features = featurize_data(raw_data) model_output = train_simple_model(features) end_time = time.time() print(f"Pipeline finished in {end_time - start_time:.4f}s. Output: {model_output}") print("--- Running with cProfile ---") pr = cProfile.Profile() pr.enable() run_pipeline(num_samples=5000) # Use fewer samples for cProfile due to verbose output pr.disable() s = io.StringIO() sortby = 'cumulative' ps = pstats.Stats(pr, stream=s).sort_stats(sortby) ps.print_stats(10) # Print top 10 functions print(s.getvalue()) print("\n--- To run with line_profiler: ---") print("1. Uncomment '@profile' decorator above each function you want to profile.") print("2. Save this script as, e.g., `profiling_example.py`") print("3. Run in your terminal: `kernprof -l profiling_example.py`") print("4. Then view results: `python -m line_profiler profiling_example.py.lprof`") - Memory Profiling: Taming RAM Hogs in Deep Learning (Coding Lab 3.2)
- Concept: Introduce
memory_profiler. - Coding Lab 3.2: Identifying Memory-Intensive Operations
- Create a function that builds a large list of strings or a large NumPy array and then performs an operation.
- Use
@profilefrommemory_profilerto see memory usage line-by-line. - Challenge: Refactor the function to use a generator or process in chunks to reduce memory.
# To run memory_profiler: # pip install memory_profiler # @profile decorator and then python -m memory_profiler your_script.py from memory_profiler import profile import numpy as np import random import sys # @profile def create_and_process_large_list(num_elements=10**6): print(f"\n--- Creating large list of {num_elements} strings ---") large_list = [] for i in range(num_elements): large_list.append(f"item_{i}_" + "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=10))) # Simulate some processing that might create intermediate copies processed_list = [s.upper() for s in large_list] print(f"Size of large_list: {sys.getsizeof(large_list) / (1024**2):.2f} MB") print(f"Size of processed_list: {sys.getsizeof(processed_list) / (1024**2):.2f} MB") return len(processed_list) # @profile def create_and_process_large_numpy_array(shape=(5000, 5000)): print(f"\n--- Creating large NumPy array of shape {shape} ---") large_array = np.random.rand(*shape) # Array of floats # Simulate an operation that might consume more memory # e.g., an element-wise operation that creates a new array squared_array = large_array ** 2 # Another operation mean_array = np.mean(squared_array, axis=0) print(f"Size of large_array: {large_array.nbytes / (1024**2):.2f} MB") print(f"Size of squared_array: {squared_array.nbytes / (1024**2):.2f} MB") print(f"Size of mean_array: {mean_array.nbytes / (1024**2):.2f} MB") return mean_array[0] # Return a small part to avoid returning large object if __name__ == '__main__': # To run: `python -m memory_profiler this_script_name.py` # And uncomment '@profile' decorators print("--- Run this script with: python -m memory_profiler your_script.py ---") create_and_process_large_list(num_elements=5 * 10**5) # Adjusted for reasonable demo create_and_process_large_numpy_array(shape=(2000, 2000)) # Adjusted for reasonable demo
- Concept: Introduce
- Concept: Introduce
- Vectorization with NumPy: The Ultimate Speed Boost
- Concept: Replacing Python loops with fast, C-optimized NumPy operations.
- Coding Lab 3.3: NumPy Vectorization Challenge
- Challenge 1: Implement a custom
sigmoidfunction using a Python loop and then using NumPy vectorization. Benchmark both. - Challenge 2: Calculate row-wise means of a 2D array using a loop vs.
np.mean(axis=1). - Challenge 3: Apply a threshold to an array (loop vs. boolean indexing).
import numpy as np import time # --- Challenge 1: Sigmoid function --- def sigmoid_loop(x): return [1 / (1 + np.exp(-val)) for val in x] def sigmoid_numpy(x: np.ndarray) -> np.ndarray: return 1 / (1 + np.exp(-x)) data = np.random.rand(10**6) * 10 - 5 # 1 million numbers between -5 and 5 start = time.time() result_loop = sigmoid_loop(data) end = time.time() print(f"Sigmoid (loop) for {len(data)} elements: {end - start:.4f}s") start = time.time() result_numpy = sigmoid_numpy(data) end = time.time() print(f"Sigmoid (NumPy) for {len(data)} elements: {end - start:.4f}s") np.testing.assert_allclose(result_loop, result_numpy, rtol=1e-5) # Check correctness # --- Challenge 2: Row-wise mean --- matrix = np.random.rand(1000, 500) # 1000 rows, 500 columns def mean_rows_loop(matrix_2d): means = [] for row in matrix_2d: means.append(sum(row) / len(row)) return means start = time.time() means_loop = mean_rows_loop(matrix) end = time.time() print(f"\nMean rows (loop) for {matrix.shape}: {end - start:.4f}s") start = time.time() means_numpy = np.mean(matrix, axis=1) end = time.time() print(f"Mean rows (NumPy) for {matrix.shape}: {end - start:.4f}s") np.testing.assert_allclose(means_loop, means_numpy, rtol=1e-5) # --- Challenge 3: Thresholding --- large_array = np.random.rand(10**7) * 10 # 10 million numbers def threshold_loop(arr, threshold_val): output = [] for x in arr: output.append(1 if x > threshold_val else 0) return output def threshold_numpy(arr: np.ndarray, threshold_val: float) -> np.ndarray: return (arr > threshold_val).astype(int) threshold = 5.0 start = time.time() thresh_loop_res = threshold_loop(large_array, threshold) end = time.time() print(f"\nThresholding (loop) for {len(large_array)} elements: {end - start:.4f}s") start = time.time() thresh_numpy_res = threshold_numpy(large_array, threshold) end = time.time() print(f"Thresholding (NumPy) for {len(large_array)} elements: {end - start:.4f}s") np.testing.assert_array_equal(thresh_loop_res, thresh_numpy_res) - Challenge 1: Implement a custom
- Broadcasting Magic: Efficient Tensor Math
- Concept: NumPy’s ability to perform operations on arrays of different shapes.
- Code Example 3.4: Applying Bias to a Batch of Activations
import numpy as np # Simulate a batch of 4 activation vectors, each with 3 features activations = np.array([ [0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9], [1.0, 1.1, 1.2] ]) print(f"Activations shape: {activations.shape}") # A bias vector for 3 features bias = np.array([0.01, 0.02, 0.03]) print(f"Bias shape: {bias.shape}") # Apply bias using broadcasting biased_activations = activations + bias print(f"\nBiased Activations (Broadcasting):\n{biased_activations}") # Exercise: Normalize each row by subtracting its mean and dividing by its standard deviation # (conceptually similar to batch normalization) row_means = np.mean(activations, axis=1, keepdims=True) row_stds = np.std(activations, axis=1, keepdims=True) # Adding a small epsilon to avoid division by zero epsilon = 1e-8 normalized_activations = (activations - row_means) / (row_stds + epsilon) print(f"\nNormalized Activations (Broadcasting for normalization):\n{normalized_activations}")
- Accelerating with Numba and Cython (Advanced Lab)
- Concept: Introduce JIT compilation (
Numba) and C extensions (Cython). - Coding Lab 3.4: Numba: JIT Compiling Custom AI Functions
- Take a slow Python function (e.g., a custom loss or a complex data transformation involving loops).
- Decorate with
@numba.jitand benchmark performance improvement. - Experiment with
nogil=True.
# To run Numba: pip install numba import numba import numpy as np import time # A custom, pure Python function that is slow due to loops def custom_loss_pure_python(predictions: np.ndarray, targets: np.ndarray) -> float: loss = 0.0 for i in range(len(predictions)): diff = predictions[i] - targets[i] loss += diff * diff # Squared error return loss / len(predictions) # The same function, Numba-jitted @numba.jit(nopython=True) # nopython=True ensures no Python objects are used inside def custom_loss_numba(predictions: np.ndarray, targets: np.ndarray) -> float: loss = 0.0 for i in range(len(predictions)): diff = predictions[i] - targets[i] loss += diff * diff return loss / len(predictions) # Data for benchmarking preds = np.random.rand(10**6) targs = np.random.rand(10**6) start = time.time() loss_py = custom_loss_pure_python(preds, targs) end = time.time() print(f"Pure Python loss calculation: {end - start:.6f}s, Loss: {loss_py:.4f}") # Numba's first call compiles, subsequent calls are fast start = time.time() loss_nb = custom_loss_numba(preds, targs) # First call is slow due to compilation end = time.time() print(f"Numba (first call) loss calculation: {end - start:.6f}s, Loss: {loss_nb:.4f}") start = time.time() loss_nb = custom_loss_numba(preds, targs) # Second call should be much faster end = time.time() print(f"Numba (second call) loss calculation: {end - start:.6f}s, Loss: {loss_nb:.4f}") np.testing.assert_allclose(loss_py, loss_nb, rtol=1e-5) # Mini-Project Idea: Cythonizing a Simple Neural Network Layer # Guide the reader through creating a .pyx file for a simple # feed-forward layer's dot product and activation. # This will be a more involved mini-project requiring separate files and compilation. # (Detailed steps will be provided in the actual book). - Introduction to Dask: Scaling Beyond Memory (Conceptual + Demo)
- Concept: Parallel computing for larger-than-memory datasets.
- Code Demo (illustrative, not a full lab): Show
dask.arrayfor large array operations.
# To run Dask: pip install dask numpy import dask.array as da import numpy as np import time # Create a large NumPy array (fits in memory for this size) numpy_array = np.random.rand(10000, 10000) print(f"NumPy array size: {numpy_array.nbytes / (1024**2):.2f} MB") start = time.time() result_np = numpy_array @ numpy_array.T # Matrix multiplication end = time.time() print(f"NumPy matrix multiplication: {end - start:.4f}s") # Create an equivalent Dask array (lazy computation) # chunks='auto' lets Dask decide optimal chunk sizes dask_array = da.from_array(numpy_array, chunks='auto') print(f"Dask array: {dask_array}") # Dask operations are lazy - they build a computation graph dask_result = dask_array @ dask_array.T print(f"Dask computation graph (lazy):\n{dask_result}") # To actually compute, call .compute() start = time.time() result_dask_computed = dask_result.compute() end = time.time() print(f"Dask matrix multiplication (computed): {end - start:.4f}s") # Verify results are close np.testing.assert_allclose(result_np, result_dask_computed, rtol=1e-5) print("\n--- Dask is powerful when data doesn't fit in memory or for parallelizing across cores/clusters ---") print("It allows you to specify larger-than-memory arrays and then computes them in chunks.") print("This demo used a small array that fits in memory to show the concept.")
- Concept: Introduce JIT compilation (
5. Concurrency & Parallelism: Scaling AI Workloads (Concurrency Gym)
This section focuses heavily on asyncio for modern LLM-based AI systems, with multiprocessing for CPU-bound tasks.
- Understanding the GIL and its Impact on AI
- Explanation: Reiterate GIL, show a simple CPU-bound multi-threaded example that doesn’t speed up.
- Multithreading for I/O-Bound Tasks: Web Scraping for Data (Coding Lab 4.1)
- Concept: How threads can help for I/O.
- Coding Lab 4.1: Concurrent Web Scraping with Threads
- Scrape text from multiple URLs concurrently using
threadingandrequests. - Compare with sequential scraping.
import requests import threading import time urls = [ "http://quotes.toscrape.com/page/1/", "http://quotes.toscrape.com/page/2/", "http://quotes.toscrape.com/page/3/", "http://quotes.toscrape.com/page/4/", "http://quotes.toscrape.com/page/5/", "http://quotes.toscrape.com/page/6/", "http://quotes.toscrape.com/page/7/", "http://quotes.toscrape.com/page/8/", "http://quotes.toscrape.com/page/9/", "http://quotes.toscrape.com/page/10/" ] * 2 # Duplicate to make it longer def fetch_url(url, results, index): try: response = requests.get(url, timeout=5) results[index] = f"Fetched {len(response.text)} bytes from {url}" except requests.exceptions.RequestException as e: results[index] = f"Error fetching {url}: {e}" def run_sequential(): print("--- Running Sequential Fetch ---") start_time = time.time() results = [None] * len(urls) for i, url in enumerate(urls): fetch_url(url, results, i) end_time = time.time() print(f"Sequential fetch took {end_time - start_time:.2f} seconds.") # for r in results[:3]: print(r) # Print first 3 results return end_time - start_time def run_threaded(): print("--- Running Threaded Fetch ---") start_time = time.time() results = [None] * len(urls) threads = [] for i, url in enumerate(urls): thread = threading.Thread(target=fetch_url, args=(url, results, i)) threads.append(thread) thread.start() for thread in threads: thread.join() # Wait for all threads to complete end_time = time.time() print(f"Threaded fetch took {end_time - start_time:.2f} seconds.") # for r in results[:3]: print(r) # Print first 3 results return end_time - start_time seq_time = run_sequential() thread_time = run_threaded() print(f"\nThreaded was {seq_time / thread_time:.2f}x faster for this I/O-bound task.") - Scrape text from multiple URLs concurrently using
- Multiprocessing: True Parallelism for CPU-Bound AI
- Concept: Bypassing GIL with separate processes.
- Coding Lab 4.2: Parallel Hyperparameter Tuning with
multiprocessing.Pool- Scenario: Training multiple models with different hyperparameters.
- Create a CPU-bound
train_modelfunction (e.g., matrix multiplication). - Use
multiprocessing.Poolto run multiple training jobs in parallel. - Compare with sequential execution.
import multiprocessing import time import random import numpy as np def train_single_model(hyperparams): """Simulates a CPU-bound model training process.""" model_id = hyperparams['model_id'] epochs = hyperparams['epochs'] learning_rate = hyperparams['learning_rate'] # Simulate CPU-intensive work (e.g., matrix multiplication in a simple NN) data_size = 500 input_data = np.random.rand(data_size, data_size) weights = np.random.rand(data_size, data_size) print(f"Model {model_id} (LR: {learning_rate:.4f}) starting training for {epochs} epochs...") for epoch in range(epochs): # Perform a matrix multiplication as a CPU-intensive operation _ = input_data @ weights # No sleep here, we want CPU work # print(f" Model {model_id} Epoch {epoch+1} completed.") # Too verbose final_metric = random.uniform(0.7, 0.95) # Simulate accuracy print(f"Model {model_id} finished. Metric: {final_metric:.4f}") return {"model_id": model_id, "metric": final_metric, "hyperparams": hyperparams} def run_sequential_tuning(hyperparam_configs): print("\n--- Running Sequential Hyperparameter Tuning ---") start_time = time.time() results = [] for config in hyperparam_configs: results.append(train_single_model(config)) end_time = time.time() print(f"Sequential tuning took {end_time - start_time:.2f} seconds.") return results, end_time - start_time def run_parallel_tuning(hyperparam_configs): print("\n--- Running Parallel Hyperparameter Tuning (Multiprocessing) ---") start_time = time.time() # Use a Pool to distribute tasks across available CPU cores # You can specify the number of processes, or let it default to os.cpu_count() with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool: results = pool.map(train_single_model, hyperparam_configs) end_time = time.time() print(f"Parallel tuning took {end_time - start_time:.2f} seconds.") return results, end_time - start_time if __name__ == '__main__': # Required for multiprocessing on some OS num_models = 4 # Number of models to train hyperparameter_configs = [ {"model_id": i, "epochs": 50, "learning_rate": random.uniform(0.001, 0.01)} for i in range(num_models) ] seq_results, seq_time = run_sequential_tuning(hyperparameter_configs) par_results, par_time = run_parallel_tuning(hyperparameter_configs) print("\n--- Comparison ---") print(f"Sequential best metric: {max(r['metric'] for r in seq_results):.4f}") print(f"Parallel best metric: {max(r['metric'] for r in par_results):.4f}") print(f"Parallel execution was {seq_time / par_time:.2f}x faster.")
- Asynchronous Python with
asyncio: Powering LLM Interactions- Concept:
async/await, event loop for non-blocking I/O. - Coding Lab 4.3: Concurrent LLM API Calls with
httpx(Mini-Project)- Scenario: Making multiple simultaneous calls to an LLM API (e.g., for different prompts, or for parallel agent actions).
- Use
asyncioandhttpx(async HTTP client) to demonstrate speedup over sequential API calls.
# To run httpx: pip install httpx import asyncio import httpx import time import json # For parsing mock response # --- Mock LLM API Endpoint --- # In a real scenario, this would be an actual external API call async def mock_llm_api_call(prompt: str, delay: float = 0.5) -> str: """Simulates a call to an LLM API with a given delay.""" await asyncio.sleep(delay) # Simulate network latency and processing time response_text = f"LLM responded to '{prompt[:30]}...' with a creative answer." return json.dumps({"generated_text": response_text}) async def fetch_llm_response(session: httpx.AsyncClient, prompt: str) -> str: # For demonstration, we'll use our mock function. # In a real app, this would be: # response = await session.post( # "https://api.your-llm-provider.com/v1/chat/completions", # json={"messages": [{"role": "user", "content": prompt}]} # ) # response.raise_for_status() # return response.json()["choices"][0]["message"]["content"] # Using our mock function directly for this example return await mock_llm_api_call(prompt) async def main_sequential(prompts: List[str]): print("\n--- Running Sequential LLM Calls ---") start_time = time.time() responses = [] async with httpx.AsyncClient() as client: # httpx client is needed for actual API calls for prompt in prompts: response = await fetch_llm_response(client, prompt) responses.append(response) print(f" Sequential: {prompt[:20]}... -> {json.loads(response)['generated_text'][:30]}...") end_time = time.time() print(f"Sequential calls took {end_time - start_time:.2f} seconds.") return end_time - start_time async def main_concurrent(prompts: List[str]): print("\n--- Running Concurrent LLM Calls (asyncio) ---") start_time = time.time() responses = [] async with httpx.AsyncClient() as client: tasks = [fetch_llm_response(client, prompt) for prompt in prompts] responses = await asyncio.gather(*tasks) # Run all tasks concurrently for i, (prompt, response) in enumerate(zip(prompts, responses)): print(f" Concurrent {i+1}: {prompt[:20]}... -> {json.loads(response)['generated_text'][:30]}...") end_time = time.time() print(f"Concurrent calls took {end_time - start_time:.2f} seconds.") return end_time - start_time if __name__ == "__main__": llm_prompts = [ "Summarize the plot of Inception.", "Write a short poem about a cat.", "Explain quantum entanglement simply.", "Generate a list of 5 healthy snacks.", "Translate 'hello world' to French.", "What is the capital of Japan?", "Provide a recipe for chocolate chip cookies.", "Describe the benefits of meditation.", "Recommend a science fiction book.", "Tell me a fun fact about pandas." ] # Running sequential and concurrent comparisons # asyncio.run() is the entry point for async code seq_duration = asyncio.run(main_sequential(llm_prompts)) conc_duration = asyncio.run(main_concurrent(llm_prompts)) print(f"\n--- Performance Comparison ---") print(f"Sequential Duration: {seq_duration:.2f}s") print(f"Concurrent Duration: {conc_duration:.2f}s") if conc_duration > 0: print(f"Concurrent was {seq_duration / conc_duration:.2f}x faster!") ``` - Building an Async Agent Tool Orchestrator (Coding Lab 4.4)
- Scenario: An AI agent needs to use multiple tools (e.g., search, calculator, database) and some calls can run in parallel.
- Create
asyncfunctions for mock tools. - Use
asyncio.gatherand control flow to build an agent’s “thinking” process.
import asyncio import time import random async def search_web(query: str, delay: float = 1.0) -> str: """Simulates a web search API call.""" print(f"[TOOL] Searching web for: '{query}'...") await asyncio.sleep(delay) return f"Web search results for '{query}': Found 10 results, top result is about {query.split()[0]}." async def use_calculator(expression: str, delay: float = 0.3) -> str: """Simulates a calculator tool.""" print(f"[TOOL] Calculating: '{expression}'...") await asyncio.sleep(delay) try: result = eval(expression) # DANGER: Don't use eval with untrusted input! For demo only. return f"Calculator result for '{expression}': {result}" except Exception as e: return f"Calculator error for '{expression}': {e}" async def query_database(sql_query: str, delay: float = 0.7) -> str: """Simulates a database query tool.""" print(f"[TOOL] Querying DB with: '{sql_query}'...") await asyncio.sleep(delay) return f"DB results for '{sql_query}': Retrieved 5 records (e.g., CustomerID=123, Name='Alice')." async def agent_orchestrator(task: str): print(f"\n[AGENT] Task received: '{task}'") start_time = time.time() if "calculate" in task.lower() and "search" in task.lower(): print("[AGENT] Identified need for both calculation and web search.") # Run web search and calculator concurrently web_task = asyncio.create_task(search_web("stock prices today")) calc_task = asyncio.create_task(use_calculator("15 * 2.5 + 7")) web_result, calc_result = await asyncio.gather(web_task, calc_task) print(f"[AGENT] Received web result: {web_result}") print(f"[AGENT] Received calc result: {calc_result}") final_answer = f"Based on web search for stock prices and calculation, the answer is complex. Web: {web_result}. Calc: {calc_result}" elif "database" in task.lower(): print("[AGENT] Identified need for database query.") db_result = await query_database("SELECT * FROM users WHERE status='active'") print(f"[AGENT] Received DB result: {db_result}") final_answer = f"Database info: {db_result}" else: print("[AGENT] Falling back to general web search.") web_result = await search_web(task) print(f"[AGENT] Received web result: {web_result}") final_answer = f"General info: {web_result}" end_time = time.time() print(f"[AGENT] Task completed in {end_time - start_time:.2f} seconds.") return final_answer if __name__ == "__main__": # Example agent tasks tasks_to_run = [ "I need to calculate 15 * 2.5 + 7 AND find today's stock prices.", "Find me all active users from the database.", "What is the average rainfall in the Amazon during July?" ] for t in tasks_to_run: asyncio.run(agent_orchestrator(t)) print("-" * 50)
- Concept:
6. Architecting & Deploying Scalable AI Systems (Deployment Blueprint)
This section focuses on practical system design and deployment, including a mini-project for building an AI service.
- Modular Project Structure for Production AI
- Scenario: Structuring a real-world AI project, e.g., an LLM inference service.
- Mini-Project: Structuring a FastAPI + LLM Inference Service
- Outline a recommended directory structure:
my_llm_app/,my_llm_app/api/,my_llm_app/models/,my_llm_app/data/,my_llm_app/config/,tests/,scripts/. - Provide skeleton files for
api/main.py,models/llm_loader.py,config/settings.py.
my_llm_app/ ├── api/ │ └── main.py # FastAPI application ├── models/ │ ├── __init__.py │ └── llm_service.py # Handles LLM loading and inference logic ├── config/ │ └── settings.py # Configuration (e.g., API keys, model paths) ├── data/ # Store sample data or embeddings │ └── embeddings.pkl ├── tests/ │ ├── test_api.py │ └── test_llm_service.py ├── Dockerfile # For containerization ├── requirements.txt # Project dependencies └── README.md - Outline a recommended directory structure:
- Dependency Management & Reproducibility
- Exercise 5.1:
pyproject.tomlwith Poetry/Rye: Modern Python Packaging- Guide the reader to create a new project with Poetry (
poetry new my_project,poetry add pandas numpy). - Explain
pyproject.tomland benefits overrequirements.txt. - Show how to manage dependencies, dev dependencies.
- (This will be a step-by-step guide to be done in the terminal).
- Guide the reader to create a new project with Poetry (
- Exercise 5.1:
- Containerization with Docker for AI Deployments
- Concept: Portable and reproducible environments.
- Coding Lab 5.1: Building a Docker Image for an LLM Inference Endpoint
- Scenario: Containerizing the FastAPI LLM service from the mini-project.
- Write a
Dockerfilefor a Python application running FastAPI. - Include steps for installing dependencies, copying code, and running Uvicorn.
- Challenge: Optimize the Dockerfile (multi-stage build for smaller images, specific base image for ML).
Instructions:# Dockerfile (my_llm_app/Dockerfile) # Stage 1: Build stage (install dependencies) FROM python:3.10-slim-bullseye AS builder # Set environment variables ENV PYTHONUNBUFFERED 1 ENV PYTHONDONTWRITEBYTECODE 1 WORKDIR /app # Install poetry RUN pip install poetry # Copy poetry files COPY pyproject.toml poetry.lock ./ # Install dependencies RUN poetry install --no-root --no-dev # Stage 2: Runtime stage FROM python:3.10-slim-bullseye AS runtime WORKDIR /app # Copy installed dependencies from builder stage COPY --from=builder /usr/local/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages COPY --from=builder /usr/local/bin/poetry /usr/local/bin/poetry # Not strictly needed if only running COPY --from=builder /usr/local/bin/uvicorn /usr/local/bin/uvicorn # If uvicorn is a top-level dep # Copy the application code COPY my_llm_app ./my_llm_app/ # Expose the port FastAPI will run on EXPOSE 8000 # Command to run the application using uvicorn # Assuming your FastAPI app is at my_llm_app/api/main.py and the app instance is named 'app' CMD ["uvicorn", "my_llm_app.api.main:app", "--host", "0.0.0.0", "--port", "8000"]docker build -t my-llm-app .thendocker run -p 8000:8000 my-llm-app.
- Serving AI Models with FastAPI
- Concept: Building fast, asynchronous API endpoints for AI.
- Coding Lab 5.2: Building a REST API for Image Classification (or simplified LLM)
- Create a simple FastAPI app.
- Define a
/predictendpoint that takes input data (e.g., image path, text prompt). - Load a dummy/small pre-trained model (e.g., a sklearn model, or a tiny custom model).
- Perform inference and return predictions.
- Add
async deffor potential I/O-bound operations.
Instructions: Save files in the described structure. Install# my_llm_app/models/llm_service.py import asyncio import time class LLMService: _instance = None _lock = asyncio.Lock() def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance async def init_model(self): """Simulate asynchronous model loading.""" async with self._lock: # Ensure only one thread/task loads the model if not hasattr(self, '_model'): print("LLMService: Loading large language model...") await asyncio.sleep(2) # Simulate a long loading time self._model = "Dummy LLM Model v1.0" print("LLMService: Model loaded.") return self._model async def generate_response(self, prompt: str) -> str: """Simulate asynchronous LLM inference.""" if not hasattr(self, '_model'): await self.init_model() # Ensure model is loaded before inference print(f"LLMService: Generating response for '{prompt[:20]}...'") await asyncio.sleep(0.5) # Simulate inference time return f"Response to '{prompt}': This is a generated answer from {self._model}." # my_llm_app/api/main.py from fastapi import FastAPI, HTTPException from pydantic import BaseModel from my_llm_app.models.llm_service import LLMService app = FastAPI(title="LLM Inference API") # Initialize LLMService (singleton pattern managed internally by LLMService) llm_service = LLMService() # Pydantic model for request body class PromptRequest(BaseModel): prompt: str # Pydantic model for response body class LLMResponse(BaseModel): generated_text: str @app.on_event("startup") async def startup_event(): # Load model asynchronously at startup print("FastAPI Startup: Pre-loading LLM...") await llm_service.init_model() print("FastAPI Startup: LLM pre-loading complete.") @app.get("/") async def read_root(): return {"message": "Welcome to the LLM Inference API!"} @app.post("/generate/", response_model=LLMResponse) async def generate_text(request: PromptRequest): """ Generates text using the loaded LLM. """ try: response_text = await llm_service.generate_response(request.prompt) return LLMResponse(generated_text=response_text) except Exception as e: raise HTTPException(status_code=500, detail=f"Internal server error: {e}")fastapi uvicorn pydantic. Runuvicorn my_llm_app.api.main:app --reload. Test withcurlor a tool like Postman/Insomnia. - Adding Asynchronous Endpoints for LLMs: Discuss how
async defin FastAPI pairs withasyncioinllm_servicefor non-blocking I/O, vital when LLM calls are I/O-bound.
- Introduction to Microservices for AI (Conceptual + Example)
- Concept: Breaking down AI into smaller, independent services.
- Code Example (Conceptual/Outline): Discuss how a data preprocessing service, a model training service, and an inference service could communicate (e.g., via a message queue or REST). Provide a high-level diagram.
- CI/CD for AI: Automating Deployment
- Conceptual + Guided Exercise: Outline CI/CD for a deployed model
- Discuss triggers (code commit, data update).
- Stages: Linting, Unit Tests, Integration Tests, Model Training (if applicable), Model Evaluation, Docker Image Build, Push to Registry, Deployment to Staging, A/B Testing, Production Release.
- Exercise: Outline a simple GitHub Actions workflow for the FastAPI LLM service.
- Conceptual + Guided Exercise: Outline CI/CD for a deployed model
7. Advanced Topics & Future-Proofing AI Skills (Deep Dive)
- GPU Memory Management in PyTorch/TensorFlow
- Concept: Explain how GPUs handle tensors and the importance of efficient memory usage.
- Guided Experiment:
- Using PyTorch/TensorFlow (if available) to create large tensors.
- Demonstrate
torch.cuda.empty_cache()(PyTorch). - Show mixed-precision training (conceptual code).
# Illustrative PyTorch GPU Memory Management Snippets import torch if torch.cuda.is_available(): device = torch.device("cuda") print(f"CUDA is available. Device: {torch.cuda.get_device_name(0)}") # Create a very large tensor try: large_tensor = torch.rand(8000, 8000, 8000, device=device) # Might cause OOM on some GPUs print(f"Created a large tensor of shape {large_tensor.shape}. Memory: {large_tensor.element_size() * large_tensor.nelement() / (1024**3):.2f} GB") # Perform an operation that might create a temporary copy temp_tensor = large_tensor * 2 # Free memory del large_tensor del temp_tensor torch.cuda.empty_cache() # Explicitly free unused GPU memory print("GPU memory cleared after operations.") except RuntimeError as e: print(f"Caught RuntimeError: {e}. Likely Out-of-Memory. Consider smaller tensors or mixed precision.") # --- Mixed Precision (Conceptual Snippet) --- print("\n--- Mixed Precision Training Concept ---") # In actual training loops, this is handled by torch.cuda.amp.autocast # Example for a forward pass model = torch.nn.Linear(100, 10).to(device) input_data = torch.randn(64, 100, device=device) with torch.cuda.amp.autocast(): # Operations inside this context will be cast to float16 where possible output = model(input_data) loss = output.mean() print("Performed operations using mixed precision (autocast context).") else: print("CUDA not available. Cannot demonstrate GPU memory management.")
- Introduction to MLOps Tools for LLMs
- Concept: DVC (Data Version Control), MLflow (Experiment Tracking), Model Registries.
- Mini-Demo: Experiment Tracking with MLflow
- Integrate MLflow into a dummy training script to log parameters, metrics, and a simple model.
# To run MLflow: pip install mlflow scikit-learn import mlflow import mlflow.sklearn from sklearn.linear_model import LinearRegression from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error import numpy as np # Ensure MLflow is tracking runs in a local directory mlflow.set_tracking_uri("file:///tmp/mlruns") mlflow.set_experiment("LLM_Experiment_Sim") print("--- Running MLflow Demo ---") with mlflow.start_run(run_name="Simple_Linear_Model_Run"): # Log parameters alpha = 0.5 l1_ratio = 0.5 mlflow.log_param("alpha", alpha) mlflow.log_param("l1_ratio", l1_ratio) # Generate dummy data np.random.seed(42) X = np.random.rand(100, 5) y = X @ np.array([1, 2, 0.5, -1, 3]) + np.random.randn(100) * 0.1 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Train a dummy model model = LinearRegression() model.fit(X_train, y_train) predictions = model.predict(X_test) # Log metrics rmse = np.sqrt(mean_squared_error(y_test, predictions)) mlflow.log_metric("rmse", rmse) print(f"Logged RMSE: {rmse:.4f}") # Log the model mlflow.sklearn.log_model(model, "linear_regression_model") print("Logged LinearRegression model.") # Tag the run mlflow.set_tag("model_type", "LinearRegression") print("\nMLflow run completed. Check runs using `mlflow ui` in your terminal.") print("Then navigate to http://localhost:5000 (or as indicated by mlflow ui).")
- Emerging Distributed Frameworks: Ray and JAX (Conceptual + Code Snippets)
- Concept: Introduce how Ray scales Python code and JAX’s automatic differentiation and
jit. - Code Snippets: Illustrate basic Ray task or JAX
jituse.# --- Ray: Distributed Task (Conceptual Snippet) --- # To run Ray: pip install ray try: import ray ray.init(ignore_reinit_error=True) # Initialize Ray once @ray.remote def process_data_chunk(chunk): # Simulate heavy processing time.sleep(0.1) return [x * 2 for x in chunk] # Create dummy data all_data = list(range(100)) chunk_size = 10 data_chunks = [all_data[i:i + chunk_size] for i in range(0, len(all_data), chunk_size)] # Submit tasks to Ray futures = [process_data_chunk.remote(chunk) for chunk in data_chunks] # Get results processed_results = ray.get(futures) print(f"\nRay: Processed {len(all_data)} items in distributed fashion.") # print(processed_results) ray.shutdown() except ImportError: print("\nRay not installed. Install with `pip install ray` to run this snippet.") except Exception as e: print(f"\nAn error occurred with Ray: {e}") # --- JAX: JIT Compilation (Conceptual Snippet) --- # To run JAX: pip install jax jaxlib try: import jax import jax.numpy as jnp def complex_computation(x, y): return jnp.tanh(jnp.dot(x, x.T) + jnp.dot(y, y.T)) # JIT compile the function for speed jit_computation = jax.jit(complex_computation) key = jax.random.PRNGKey(0) x_data = jax.random.normal(key, (100, 100)) y_data = jax.random.normal(key, (100, 100)) # First run includes compilation time start = time.time() result_jit = jit_computation(x_data, y_data) _ = result_jit.block_until_ready() # Wait for computation to finish end = time.time() print(f"\nJAX JIT (first run with compile): {end - start:.6f}s") # Subsequent runs are much faster start = time.time() result_jit = jit_computation(x_data, y_data) _ = result_jit.block_until_ready() end = time.time() print(f"JAX JIT (subsequent run): {end - start:.6f}s") except ImportError: print("\nJAX not installed. Install with `pip install jax jaxlib` to run this snippet.") except Exception as e: print(f"\nAn error occurred with JAX: {e}")
- Concept: Introduce how Ray scales Python code and JAX’s automatic differentiation and
- Ethical AI: Practical Considerations
- Concept: Discuss bias, fairness, transparency, privacy.
- Practical Example: Show a snippet of how to check for basic data bias (e.g., gender distribution in a demographic dataset).
import pandas as pd # Dummy dataset (simulate demographic data) data = { 'age': [25, 30, 22, 40, 28, 35, 50, 60, 20, 21], 'gender': ['Male', 'Female', 'Male', 'Female', 'Female', 'Male', 'Male', 'Female', 'Male', 'Female'], 'prediction': [0, 1, 0, 1, 1, 0, 0, 1, 0, 1] # Binary prediction } df = pd.DataFrame(data) print("--- Basic Data Bias Check ---") print("Gender distribution in dataset:") print(df['gender'].value_counts(normalize=True)) print("\nPrediction outcome by gender:") # This checks if the prediction is skewed across genders print(df.groupby('gender')['prediction'].value_counts(normalize=True).unstack(fill_value=0)) print("\nConsiderations:") print("- Is the gender distribution in the dataset reflective of the real world?") print("- Is the model's prediction outcome disproportionately affecting one group?") print("- For 'prediction' (e.g., loan approval), is the 'positive' outcome (1) fair across genders?")
8. Conclusion
- Recap of Key Takeaways
- Continuing Your Learning Journey
- Resources for Further Exploration