Source code for src.pipeline.orchestrator

"""Main pipeline orchestrator for Share of Search analysis."""

import os
import time
from pathlib import Path
from typing import Dict, Any
import pandas as pd

from ..config.loader import load_config
from ..config.schema import Config
from ..data.providers.serpapi import create_provider
from ..processing.transformer import DataTransformer
from ..processing.calculator import MetricsCalculator
from ..analysis.ai_insights import AIInsightEngine
from ..visualization.charts import ChartGenerator
from ..reporting.powerpoint import PowerPointGenerator
from ..utils.logging import setup_logging, get_logger
from ..utils.errors import ShareOfSearchError
from ..utils.progress import print_header, print_summary, create_progress

logger = get_logger(__name__)


[docs] class ShareOfSearchPipeline: """Orchestrate the complete Share of Search analysis pipeline.""" def __init__(self, config_path: Path = None): """ Initialize pipeline. Args: config_path: Path to config.yaml """ self.config_path = config_path self.config = None self.results = {} self.start_time = None
[docs] def run(self) -> Dict[str, Any]: """ Execute the complete pipeline. Returns: Dictionary containing all analysis results Raises: ShareOfSearchError: If pipeline fails """ self.start_time = time.time() try: # Stage 1: Load configuration self.config = self._load_configuration() # Print header print_header( self.config.project.name, [q.label for q in self.config.queries], self.config.parameters.geo, self.config.parameters.date_range ) # Create progress bar with create_progress() as progress: # Stage 1: Configuration (already done) task1 = progress.add_task("[cyan]Configuration", total=100) progress.update(task1, completed=100, description="[green]✓[/green] Configuration") # Stage 2: Fetch data task2 = progress.add_task("[cyan]Data Collection", total=100) raw_data = self._fetch_data() progress.update(task2, completed=100, description="[green]✓[/green] Data Collection") # Stage 3: Transform data task3 = progress.add_task("[cyan]Data Processing", total=100) df = self._transform_data(raw_data) progress.update(task3, completed=100, description="[green]✓[/green] Data Processing") # Stage 4: Calculate metrics task4 = progress.add_task("[cyan]Metrics Calculation", total=100) df = self._calculate_metrics(df) progress.update(task4, completed=100, description="[green]✓[/green] Metrics Calculation") # Stage 5: Generate visualizations task5 = progress.add_task("[cyan]Visualization", total=100) charts = self._generate_visualizations(df) progress.update(task5, completed=100, description="[green]✓[/green] Visualization") # Stage 6: Generate insights task6 = progress.add_task("[cyan]AI Insights", total=100) insights = self._generate_insights(df) progress.update(task6, completed=100, description="[green]✓[/green] AI Insights") # Stage 7: Save results task7 = progress.add_task("[cyan]Saving Results", total=100) self._save_results(df, insights) progress.update(task7, completed=100, description="[green]✓[/green] Saving Results") # Calculate elapsed time elapsed = time.time() - self.start_time # Extract key findings metrics_summary = self._extract_key_metrics(df) # Print summary print_summary( self.config.project.name, str(self.config.project.output_dir), metrics_summary, len(charts), elapsed ) return self.results except Exception as e: logger.error(f"Pipeline failed: {e}") raise
def _load_configuration(self) -> Config: """Load and validate configuration.""" # Setup logging first log_file = Path("outputs") / "pipeline.log" setup_logging(log_level="INFO", log_file=log_file) # Load config config = load_config(self.config_path) # Setup project-specific logging project_log = config.project.output_dir / "analysis.log" setup_logging( log_level=config.options.log_level, log_file=project_log ) logger.info(f"Configuration loaded: {config.project.name}") logger.info(f"Analyzing {len(config.queries)} queries") return config def _fetch_data(self) -> Dict[str, Any]: """Fetch data from API.""" # Get API key api_key = os.getenv(self.config.data_source.api_key_env) # Create provider provider = create_provider( api_key=api_key, max_retries=self.config.options.retry_attempts, retry_delay=self.config.options.retry_delay ) # Extract queries queries = [q.query for q in self.config.queries] # Fetch trends data = provider.fetch_trends( queries=queries, geo=self.config.parameters.geo, date_range=self.config.parameters.date_range ) logger.info(f"Retrieved data for {len(queries)} queries") return data def _transform_data(self, raw_data: Dict[str, Any]) -> pd.DataFrame: """Transform raw API data.""" transformer = DataTransformer() # Build query label mapping query_labels = {q.query: q.label for q in self.config.queries} # Transform df = transformer.transform_interest_data(raw_data, query_labels) # Add smoothing if configured if self.config.parameters.smoothing_period > 1: df = transformer.add_moving_average( df, window=self.config.parameters.smoothing_period ) logger.info(f"Transformed {len(df)} data points") return df def _calculate_metrics(self, df: pd.DataFrame) -> pd.DataFrame: """Calculate all metrics.""" calculator = MetricsCalculator() # Calculate all metrics df = calculator.calculate_all_metrics( df, smoothing_window=self.config.parameters.smoothing_period ) # Calculate period metrics period_metrics = calculator.calculate_period_metrics(df) # Calculate market concentration market_concentration = calculator.calculate_market_concentration(df) # Store results self.results['dataframe'] = df self.results['period_metrics'] = period_metrics self.results['market_concentration'] = market_concentration logger.info("All metrics calculated") return df def _generate_visualizations(self, df: pd.DataFrame) -> list: """Generate visualization charts.""" charts = [] try: # Create chart generator chart_gen = ChartGenerator( theme=self.config.visualization.theme, dpi=self.config.visualization.dpi ) # Create charts directory charts_dir = self.config.project.output_dir / "charts" charts_dir.mkdir(parents=True, exist_ok=True) # Generate all charts charts = chart_gen.generate_all_charts( df, self.results['period_metrics'], charts_dir ) self.results['charts'] = charts logger.info(f"Generated {len(charts)} charts") return charts except Exception as e: logger.warning(f"Failed to generate charts: {e}") return [] def _generate_insights(self, df: pd.DataFrame) -> Dict[str, Any]: """Generate AI-powered insights.""" insights = {} if not self.config.analysis.insights.enabled: logger.info("AI insights disabled in configuration") return insights ai_engine = AIInsightEngine() # Executive summary insights['executive_summary'] = ai_engine.generate_executive_summary( df, self.results['period_metrics'], self.results['market_concentration'] ) # Anomaly explanations anomalies = df[df['is_anomaly'] == True] if not anomalies.empty: insights['anomaly_explanations'] = ai_engine.explain_anomalies(df, anomalies) # Competitive insights insights['competitive_insights'] = ai_engine.generate_competitive_insights( self.results['period_metrics'] ) # Recommendations insights['recommendations'] = ai_engine.generate_recommendations( df, self.results['period_metrics'] ) self.results['insights'] = insights logger.info("AI insights generated") return insights def _save_results(self, df: pd.DataFrame, insights: Dict[str, Any]) -> None: """Save all results to files.""" output_dir = self.config.project.output_dir output_dir.mkdir(parents=True, exist_ok=True) # Create subdirectories data_dir = output_dir / "data" data_dir.mkdir(exist_ok=True) # Save DataFrame df.to_csv(data_dir / "full_data.csv", index=False) logger.info(f"Saved full data to {data_dir / 'full_data.csv'}") # Save period metrics self.results['period_metrics'].to_csv(data_dir / "metrics.csv", index=False) logger.info(f"Saved metrics to {data_dir / 'metrics.csv'}") # Save insights as text file if insights: insights_file = output_dir / "insights.txt" with open(insights_file, 'w', encoding='utf-8') as f: f.write(f"Share of Search Analysis: {self.config.project.name}\n") f.write("=" * 80 + "\n\n") if 'executive_summary' in insights: f.write("EXECUTIVE SUMMARY\n") f.write("-" * 80 + "\n") f.write(insights['executive_summary']) f.write("\n\n") if 'competitive_insights' in insights: f.write("COMPETITIVE INSIGHTS\n") f.write("-" * 80 + "\n") f.write(insights['competitive_insights']) f.write("\n\n") if 'anomaly_explanations' in insights and insights['anomaly_explanations']: f.write("ANOMALY EXPLANATIONS\n") f.write("-" * 80 + "\n") for anomaly in insights['anomaly_explanations']: f.write(f"\n{anomaly['query']} on {anomaly['date']}: {anomaly['value']}\n") f.write(f"{anomaly['explanation']}\n") f.write("\n") if 'recommendations' in insights and insights['recommendations']: f.write("STRATEGIC RECOMMENDATIONS\n") f.write("-" * 80 + "\n") for brand, recs in insights['recommendations'].items(): f.write(f"\n{brand}:\n{recs}\n") logger.info(f"Saved insights to {insights_file}") # Generate PowerPoint presentation if insights and self.results.get('charts'): try: ppt_gen = PowerPointGenerator() # Prepare chart paths dictionary chart_paths = {} for chart_path in self.results['charts']: chart_name = chart_path.stem.split('_')[0] # Extract 'line', 'area', 'bar', 'pie' chart_paths[chart_name] = chart_path # Generate presentation ppt_path = output_dir / f"{self.config.project.name.replace(' ', '_')}_presentation.pptx" ppt_gen.generate_presentation( project_name=self.config.project.name, executive_summary=insights.get('executive_summary', ''), competitive_insights=insights.get('competitive_insights', ''), metrics_df=self.results['period_metrics'], chart_paths=chart_paths, market_concentration=self.results['market_concentration'], output_path=ppt_path ) logger.info(f"PowerPoint presentation saved: {ppt_path}") except Exception as e: logger.warning(f"Failed to generate PowerPoint: {e}") logger.info(f"All results saved to {output_dir}") def _extract_key_metrics(self, df: pd.DataFrame) -> Dict[str, Any]: """Extract key metrics for summary.""" metrics = { 'num_brands': len(df['query'].unique()), 'data_points': len(df) } # Find market leader period_metrics = self.results['period_metrics'] leader_row = period_metrics.nlargest(1, 'avg_share').iloc[0] metrics['leader'] = leader_row['query'] metrics['leader_share'] = leader_row['avg_share'] # Find most stable stable_row = period_metrics.nsmallest(1, 'volatility').iloc[0] metrics['most_stable'] = stable_row['query'] return metrics
[docs] def run_pipeline(config_path: Path = None) -> Dict[str, Any]: """ Convenience function to run the pipeline. Args: config_path: Path to config.yaml Returns: Analysis results """ pipeline = ShareOfSearchPipeline(config_path) return pipeline.run()