"""Main pipeline orchestrator for Share of Search analysis."""
import os
import time
from pathlib import Path
from typing import Dict, Any
import pandas as pd
from ..config.loader import load_config
from ..config.schema import Config
from ..data.providers.serpapi import create_provider
from ..processing.transformer import DataTransformer
from ..processing.calculator import MetricsCalculator
from ..analysis.ai_insights import AIInsightEngine
from ..visualization.charts import ChartGenerator
from ..reporting.powerpoint import PowerPointGenerator
from ..utils.logging import setup_logging, get_logger
from ..utils.errors import ShareOfSearchError
from ..utils.progress import print_header, print_summary, create_progress
logger = get_logger(__name__)
[docs]
class ShareOfSearchPipeline:
"""Orchestrate the complete Share of Search analysis pipeline."""
def __init__(self, config_path: Path = None):
"""
Initialize pipeline.
Args:
config_path: Path to config.yaml
"""
self.config_path = config_path
self.config = None
self.results = {}
self.start_time = None
[docs]
def run(self) -> Dict[str, Any]:
"""
Execute the complete pipeline.
Returns:
Dictionary containing all analysis results
Raises:
ShareOfSearchError: If pipeline fails
"""
self.start_time = time.time()
try:
# Stage 1: Load configuration
self.config = self._load_configuration()
# Print header
print_header(
self.config.project.name,
[q.label for q in self.config.queries],
self.config.parameters.geo,
self.config.parameters.date_range
)
# Create progress bar
with create_progress() as progress:
# Stage 1: Configuration (already done)
task1 = progress.add_task("[cyan]Configuration", total=100)
progress.update(task1, completed=100, description="[green]✓[/green] Configuration")
# Stage 2: Fetch data
task2 = progress.add_task("[cyan]Data Collection", total=100)
raw_data = self._fetch_data()
progress.update(task2, completed=100, description="[green]✓[/green] Data Collection")
# Stage 3: Transform data
task3 = progress.add_task("[cyan]Data Processing", total=100)
df = self._transform_data(raw_data)
progress.update(task3, completed=100, description="[green]✓[/green] Data Processing")
# Stage 4: Calculate metrics
task4 = progress.add_task("[cyan]Metrics Calculation", total=100)
df = self._calculate_metrics(df)
progress.update(task4, completed=100, description="[green]✓[/green] Metrics Calculation")
# Stage 5: Generate visualizations
task5 = progress.add_task("[cyan]Visualization", total=100)
charts = self._generate_visualizations(df)
progress.update(task5, completed=100, description="[green]✓[/green] Visualization")
# Stage 6: Generate insights
task6 = progress.add_task("[cyan]AI Insights", total=100)
insights = self._generate_insights(df)
progress.update(task6, completed=100, description="[green]✓[/green] AI Insights")
# Stage 7: Save results
task7 = progress.add_task("[cyan]Saving Results", total=100)
self._save_results(df, insights)
progress.update(task7, completed=100, description="[green]✓[/green] Saving Results")
# Calculate elapsed time
elapsed = time.time() - self.start_time
# Extract key findings
metrics_summary = self._extract_key_metrics(df)
# Print summary
print_summary(
self.config.project.name,
str(self.config.project.output_dir),
metrics_summary,
len(charts),
elapsed
)
return self.results
except Exception as e:
logger.error(f"Pipeline failed: {e}")
raise
def _load_configuration(self) -> Config:
"""Load and validate configuration."""
# Setup logging first
log_file = Path("outputs") / "pipeline.log"
setup_logging(log_level="INFO", log_file=log_file)
# Load config
config = load_config(self.config_path)
# Setup project-specific logging
project_log = config.project.output_dir / "analysis.log"
setup_logging(
log_level=config.options.log_level,
log_file=project_log
)
logger.info(f"Configuration loaded: {config.project.name}")
logger.info(f"Analyzing {len(config.queries)} queries")
return config
def _fetch_data(self) -> Dict[str, Any]:
"""Fetch data from API."""
# Get API key
api_key = os.getenv(self.config.data_source.api_key_env)
# Create provider
provider = create_provider(
api_key=api_key,
max_retries=self.config.options.retry_attempts,
retry_delay=self.config.options.retry_delay
)
# Extract queries
queries = [q.query for q in self.config.queries]
# Fetch trends
data = provider.fetch_trends(
queries=queries,
geo=self.config.parameters.geo,
date_range=self.config.parameters.date_range
)
logger.info(f"Retrieved data for {len(queries)} queries")
return data
def _transform_data(self, raw_data: Dict[str, Any]) -> pd.DataFrame:
"""Transform raw API data."""
transformer = DataTransformer()
# Build query label mapping
query_labels = {q.query: q.label for q in self.config.queries}
# Transform
df = transformer.transform_interest_data(raw_data, query_labels)
# Add smoothing if configured
if self.config.parameters.smoothing_period > 1:
df = transformer.add_moving_average(
df,
window=self.config.parameters.smoothing_period
)
logger.info(f"Transformed {len(df)} data points")
return df
def _calculate_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calculate all metrics."""
calculator = MetricsCalculator()
# Calculate all metrics
df = calculator.calculate_all_metrics(
df,
smoothing_window=self.config.parameters.smoothing_period
)
# Calculate period metrics
period_metrics = calculator.calculate_period_metrics(df)
# Calculate market concentration
market_concentration = calculator.calculate_market_concentration(df)
# Store results
self.results['dataframe'] = df
self.results['period_metrics'] = period_metrics
self.results['market_concentration'] = market_concentration
logger.info("All metrics calculated")
return df
def _generate_visualizations(self, df: pd.DataFrame) -> list:
"""Generate visualization charts."""
charts = []
try:
# Create chart generator
chart_gen = ChartGenerator(
theme=self.config.visualization.theme,
dpi=self.config.visualization.dpi
)
# Create charts directory
charts_dir = self.config.project.output_dir / "charts"
charts_dir.mkdir(parents=True, exist_ok=True)
# Generate all charts
charts = chart_gen.generate_all_charts(
df,
self.results['period_metrics'],
charts_dir
)
self.results['charts'] = charts
logger.info(f"Generated {len(charts)} charts")
return charts
except Exception as e:
logger.warning(f"Failed to generate charts: {e}")
return []
def _generate_insights(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Generate AI-powered insights."""
insights = {}
if not self.config.analysis.insights.enabled:
logger.info("AI insights disabled in configuration")
return insights
ai_engine = AIInsightEngine()
# Executive summary
insights['executive_summary'] = ai_engine.generate_executive_summary(
df,
self.results['period_metrics'],
self.results['market_concentration']
)
# Anomaly explanations
anomalies = df[df['is_anomaly'] == True]
if not anomalies.empty:
insights['anomaly_explanations'] = ai_engine.explain_anomalies(df, anomalies)
# Competitive insights
insights['competitive_insights'] = ai_engine.generate_competitive_insights(
self.results['period_metrics']
)
# Recommendations
insights['recommendations'] = ai_engine.generate_recommendations(
df,
self.results['period_metrics']
)
self.results['insights'] = insights
logger.info("AI insights generated")
return insights
def _save_results(self, df: pd.DataFrame, insights: Dict[str, Any]) -> None:
"""Save all results to files."""
output_dir = self.config.project.output_dir
output_dir.mkdir(parents=True, exist_ok=True)
# Create subdirectories
data_dir = output_dir / "data"
data_dir.mkdir(exist_ok=True)
# Save DataFrame
df.to_csv(data_dir / "full_data.csv", index=False)
logger.info(f"Saved full data to {data_dir / 'full_data.csv'}")
# Save period metrics
self.results['period_metrics'].to_csv(data_dir / "metrics.csv", index=False)
logger.info(f"Saved metrics to {data_dir / 'metrics.csv'}")
# Save insights as text file
if insights:
insights_file = output_dir / "insights.txt"
with open(insights_file, 'w', encoding='utf-8') as f:
f.write(f"Share of Search Analysis: {self.config.project.name}\n")
f.write("=" * 80 + "\n\n")
if 'executive_summary' in insights:
f.write("EXECUTIVE SUMMARY\n")
f.write("-" * 80 + "\n")
f.write(insights['executive_summary'])
f.write("\n\n")
if 'competitive_insights' in insights:
f.write("COMPETITIVE INSIGHTS\n")
f.write("-" * 80 + "\n")
f.write(insights['competitive_insights'])
f.write("\n\n")
if 'anomaly_explanations' in insights and insights['anomaly_explanations']:
f.write("ANOMALY EXPLANATIONS\n")
f.write("-" * 80 + "\n")
for anomaly in insights['anomaly_explanations']:
f.write(f"\n{anomaly['query']} on {anomaly['date']}: {anomaly['value']}\n")
f.write(f"{anomaly['explanation']}\n")
f.write("\n")
if 'recommendations' in insights and insights['recommendations']:
f.write("STRATEGIC RECOMMENDATIONS\n")
f.write("-" * 80 + "\n")
for brand, recs in insights['recommendations'].items():
f.write(f"\n{brand}:\n{recs}\n")
logger.info(f"Saved insights to {insights_file}")
# Generate PowerPoint presentation
if insights and self.results.get('charts'):
try:
ppt_gen = PowerPointGenerator()
# Prepare chart paths dictionary
chart_paths = {}
for chart_path in self.results['charts']:
chart_name = chart_path.stem.split('_')[0] # Extract 'line', 'area', 'bar', 'pie'
chart_paths[chart_name] = chart_path
# Generate presentation
ppt_path = output_dir / f"{self.config.project.name.replace(' ', '_')}_presentation.pptx"
ppt_gen.generate_presentation(
project_name=self.config.project.name,
executive_summary=insights.get('executive_summary', ''),
competitive_insights=insights.get('competitive_insights', ''),
metrics_df=self.results['period_metrics'],
chart_paths=chart_paths,
market_concentration=self.results['market_concentration'],
output_path=ppt_path
)
logger.info(f"PowerPoint presentation saved: {ppt_path}")
except Exception as e:
logger.warning(f"Failed to generate PowerPoint: {e}")
logger.info(f"All results saved to {output_dir}")
def _extract_key_metrics(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Extract key metrics for summary."""
metrics = {
'num_brands': len(df['query'].unique()),
'data_points': len(df)
}
# Find market leader
period_metrics = self.results['period_metrics']
leader_row = period_metrics.nlargest(1, 'avg_share').iloc[0]
metrics['leader'] = leader_row['query']
metrics['leader_share'] = leader_row['avg_share']
# Find most stable
stable_row = period_metrics.nsmallest(1, 'volatility').iloc[0]
metrics['most_stable'] = stable_row['query']
return metrics
[docs]
def run_pipeline(config_path: Path = None) -> Dict[str, Any]:
"""
Convenience function to run the pipeline.
Args:
config_path: Path to config.yaml
Returns:
Analysis results
"""
pipeline = ShareOfSearchPipeline(config_path)
return pipeline.run()