IceCube Neutrino Observatory#

This notebook demonstrates how to receive and interpret high-energy neutrino alerts from the IceCube Neutrino Observatory via Kafka, with special emphasis on understanding p-values and the trials factor.

About IceCube#

The IceCube Neutrino Observatory is a cubic-kilometer Cherenkov particle detector deployed in the Antarctic ice.

Key Features:#

  • Location: 86 strings of photo-detectors extending to ~2,500 meters below the ice surface

  • Detection: Cherenkov light from relativistic charged particles produced by neutrino interactions

  • Energy Range: Sensitive to neutrinos with energies > ~10 GeV, with astrophysical neutrino alerts for energies > ~50 TeV

  • Direction Reconstruction:

    • Track events: < 1° uncertainty (~km length signatures from muons)

    • Cascade events: Higher signal purity but ~2-20° uncertainty (~m extent from electrons/hadrons)

Alert Types:#

  • GOLD: Highest confidence astrophysical neutrino candidates (~12/year)

  • BRONZE: High confidence candidates (~16/year)

  • CASCADE: High-energy cascade events (~8/year)

  • LVK Nu Track Search: Neutrino searches coincident with gravitational wave events

Prerequisites#

Important: This notebook requires GCN Kafka credentials and must be run locally or on Colab (not on Binder).

Installation#

To run locally:

pip install gcn-kafka

To run on Colab, uncomment the following cell:

# ! pip install gcn-kafka

Import Required Libraries#

from gcn_kafka import Consumer
from confluent_kafka import TopicPartition
import os
import json
import datetime
import numpy as np
from astropy.coordinates import SkyCoord
import astropy.units as u

Understanding P-values in IceCube Alerts#

What is a P-value?#

A p-value (probability value) represents the probability of observing a signal as extreme as (or more extreme than) what was detected, assuming the null hypothesis is true (i.e., the signal is just background noise).

Interpretation:#

  • Small p-value (e.g., p < 0.01): Strong evidence against the null hypothesis → signal is likely real

  • Large p-value (e.g., p > 0.1): Weak evidence → could be background fluctuation

P-value to Significance Conversion#

P-values are often converted to sigmas (standard deviations) in astronomy:

P-value

Significance

Interpretation

0.32

Common fluctuation

0.046

Suggestive

0.0027

Evidence

6.3 × 10⁻⁵

Strong evidence

2.9 × 10⁻⁷

Discovery (physics standard)

The relationship is: \(p = \mathrm{erfc}(\sigma/\sqrt{2})\), where erfc is the complementary error function.

The Trials Factor: A Critical Concept#

What is the Trials Factor?#

The trials factor (or look-elsewhere effect) accounts for the fact that when you perform many statistical tests, you increase the probability of finding at least one significant result by chance.

Why It Matters:#

If you perform \(N\) independent tests, each with a false alarm probability \(p\), the probability of getting at least one false alarm is:

\[P_{\mathrm{false~alarm}} = 1 - (1-p)^N \approx N \times p \quad \text{(for small } p\text{)}\]

IceCube Context:#

IceCube continuously monitors the sky, effectively performing many “trials” by:

  1. Analyzing thousands of neutrino events per year

  2. Searching across the entire sky (multiple directions)

  3. Looking over extended time periods

  4. For LVK coincidences: testing against every gravitational wave alert

Correcting for Trials:#

Pre-trial p-value: The significance of a single observation
Post-trial p-value: Corrected for the number of trials

\[p_{\mathrm{post\text{-}trial}} = p_{\mathrm{pre\text{-}trial}} \times N_{\mathrm{trials}}\]

For IceCube LVK searches, the GCN documentation states:

“The reported p-values here do not account for any trials correction. The false alarm rate of these coincidences can be obtained by multiplying the p-values with their corresponding GW trigger rates.”

Example:#

Suppose:

  • Pre-trial p-value = 0.001 (3σ)

  • LVK trigger rate = ~100 alerts/year

  • Post-trial p-value ≈ 0.001 × 100 = 0.10 (1.6σ)

Result: What appeared to be a 3σ detection becomes a 1.6σ hint when accounting for trials!

Configure Kafka Consumer#

from credentials import load_credentials, get_credential, list_credentials


CONFIG = {
    "group.id": "",  # Your group ID
    "auto.offset.reset": "earliest"  # 'earliest' for all messages, 'latest' for new only
}

# Load credentials from credentials.txt
load_credentials()

# Check what's configured
print("Configured credentials:")
for key, status in list_credentials().items():
    print(f"  {key}: {status}")

# Access specific credentials when needed
try:
    gcn_client_id = get_credential('GCN_CLIENT_ID')
    gcn_client_secret = get_credential('GCN_CLIENT_SECRET')
    print(f"\n✅ GCN Client ID: {gcn_client_id[:10]}..." )
except ValueError as e:
    print(f"\n⚠️ {e}")

consumer = Consumer(config=CONFIG,
                    client_id = gcn_client_id,
                    client_secret = gcn_client_secret,
                    domain='gcn.nasa.gov')
def interpret_significance(p_value, n_trials):
    """Interpret p-value in terms of sigma and post-trial correction."""
    from scipy.stats import norm

    sigma_pre_trial = norm.isf(p_value)
    p_value_post_trial = 1 - (1 - p_value) ** n_trials
    sigma_post_trial = norm.isf(p_value_post_trial)

    if p_value_post_trial < 2.87e-7:
        interpretation = "Discovery"
    elif p_value_post_trial < 0.00135:
        interpretation = "Strong Evidence"
    elif p_value_post_trial < 0.0228:
        interpretation = "Moderate Evidence"
    else:
        interpretation = "Weak or No Evidence"

    return {
        'sigma_pre_trial': sigma_pre_trial,
        'pvalue_post_trial': p_value_post_trial,
        'sigma_post_trial': sigma_post_trial,
        'interpretation': interpretation
    }

Function to Process IceCube Alerts#

GW–targeted searches in IceCube–LVK follow‑ups#

For GW‑targeted searches, two complementary statistical tests are performed, following the strategy described in the IceCube–LVK analyses (see, e.g., IceCube Collaboration, Search for high‑energy neutrinos from binary neutron star merger GW170817, arXiv:1810.11467):

  1. Generic transient search
    This search tests for an excess of neutrino events that are spatially and temporally consistent with a given GW event, without assuming a detailed emission model:

    • Uses minimal astrophysical assumptions (model‑agnostic)

    • Builds a likelihood ratio comparing:

      • a background‑only hypothesis \(H_0\) (all events are atmospheric / diffuse background), and

      • a signal+background hypothesis \(H_1\) where some events are associated with the GW

    • Incorporates:

      • the reconstructed directions and angular uncertainties of neutrino candidates,

      • the GW localization map (sky probability),

      • the time delay between the GW trigger and the neutrino candidates

    • Produces a test statistic (TS) and a pre‑trial p‑value by comparing the observed TS to the background‑only distribution from scrambled data.

    In practice, the generic search asks: “given this GW event, do we see any cluster of neutrinos near it in time and direction that is unlikely to be background?”

  2. Bayesian merger‑scenario search
    This search explicitly tests the hypothesis that a compact binary merger (the GW source) produced the observed neutrino candidates. It combines GW and neutrino information within a Bayesian framework:

    • Uses astrophysical priors motivated by merger scenarios, including:

      • the GW‑inferred distance,

      • the GW sky localization,

      • assumptions on the neutrino energy spectrum and total emitted energy,

    • Constructs the likelihood \(p(\text{data}\,|\,H_1)\) for a joint GW+neutrino origin and \(p(\text{data}\,|\,H_0)\) for background‑only,

    • Computes a Bayes factor \(\mathcal{B}_{10} = p(\text{data}\,|\,H_1)/p(\text{data}\,|\,H_0)\) and, when desired, an associated p‑value / significance.

    Conceptually, the Bayesian analysis asks: “are the observed neutrino candidates and the GW signal jointly consistent with a single astrophysical merger, once we fold in realistic priors on the source?”

Because many GW alerts are analyzed and IceCube continuously monitors the full sky, both methods must account for trials factors (look‑elsewhere effect). The pre‑trial p‑values from a single GW event are therefore converted to post‑trial significances by combining them with the relevant GW trigger rate or the effective number of searches.
Using both the generic and the Bayesian searches provides a robust way to identify potential neutrino counterparts while controlling false alarms and making optimal use of the GW information.

def process_icecube_alert(alert_data, output_dir="./icecube_alerts", n_trials=100):
    """
    Process IceCube alert with detailed p-value analysis.
    
    Parameters
    ----------
    alert_data : dict or bytes
        Alert data from IceCube stream
    output_dir : str
        Directory to save JSON files
    n_trials : int
        Number of trials for post-trial correction (default: 100 for LVK rate)
    
    Returns
    -------
    dict
        Processed alert information
    """
    try:
        # Parse JSON if needed
        if isinstance(alert_data, bytes):
            alert_data = json.loads(alert_data.decode('utf-8'))
        elif isinstance(alert_data, str):
            alert_data = json.loads(alert_data)
        
        # Extract basic information
        alert_info = {
            'trigger_time': alert_data.get('trigger_time'),
            'event_id': alert_data.get('id', [None])[0] if isinstance(alert_data.get('id'), list) else alert_data.get('id'),
        }
        
        # Extract position information
        alert_info['ra'] = alert_data.get('ra')
        alert_info['dec'] = alert_data.get('dec')
        alert_info['ra_uncertainty'] = alert_data.get('ra_uncertainty')
        
        # Extract p-values
        pval_generic = alert_data.get('pval_generic')
        pval_bayesian = alert_data.get('pval_bayesian')
        
        # Analyze p-values
        if pval_generic is not None:
            generic_analysis = interpret_significance(pval_generic, n_trials)
            alert_info['pval_generic'] = pval_generic
            alert_info['pval_generic_post_trial'] = generic_analysis['pvalue_post_trial']
            alert_info['sigma_generic_pre_trial'] = generic_analysis['sigma_pre_trial']
            alert_info['sigma_generic_post_trial'] = generic_analysis['sigma_post_trial']
            alert_info['generic_interpretation'] = generic_analysis['interpretation']
        
        if pval_bayesian is not None:
            bayesian_analysis = interpret_significance(pval_bayesian, n_trials)
            alert_info['pval_bayesian'] = pval_bayesian
            alert_info['pval_bayesian_post_trial'] = bayesian_analysis['pvalue_post_trial']
            alert_info['sigma_bayesian_pre_trial'] = bayesian_analysis['sigma_pre_trial']
            alert_info['sigma_bayesian_post_trial'] = bayesian_analysis['sigma_post_trial']
            alert_info['bayesian_interpretation'] = bayesian_analysis['interpretation']
        
        # Extract coincident events information
        alert_info['n_events_coincident'] = alert_data.get('n_events_coincident', 0)
        
        # Most probable direction for generic search
        if 'most_probable_direction' in alert_data:
            alert_info['most_probable_direction'] = alert_data['most_probable_direction']
        
        # Neutrino flux sensitivity
        if 'neutrino_flux_sensitivity_range' in alert_data:
            alert_info['flux_sensitivity'] = alert_data['neutrino_flux_sensitivity_range']
        
        # Related GW event
        if 'superevent_id' in alert_data:
            alert_info['gw_superevent_id'] = alert_data['superevent_id']
        
        # Create output directory
        os.makedirs(output_dir, exist_ok=True)
        
        # Generate filename
        event_id = alert_info.get('event_id', 'unknown')
        timestamp = alert_info.get('trigger_time', datetime.datetime.now().isoformat())
        safe_timestamp = timestamp.replace(':', '-').replace('.', '-')[:19]
        filename = f"icecube_{event_id}_{safe_timestamp}.json"
        filepath = os.path.join(output_dir, filename)
        
        # Save to JSON
        with open(filepath, 'w') as f:
            json.dump(alert_info, f, indent=2)
        
        # Print summary
        print(f"\n{'='*70}")
        print(f"IceCube Alert: {event_id}")
        print(f"{'='*70}")
        print(f"Trigger Time: {alert_info['trigger_time']}")
        if alert_info.get('ra') is not None:
            print(f"Position: RA={alert_info['ra']:.4f}°, Dec={alert_info['dec']:.4f}°")
            print(f"Uncertainty: {alert_info.get('ra_uncertainty', 'N/A')}°")
        
        if pval_generic is not None:
            print(f"\nGeneric Search:")
            print(f"  Pre-trial:  p={pval_generic:.2e} ({alert_info['sigma_generic_pre_trial']:.2f}σ)")
            print(f"  Post-trial: p={alert_info['pval_generic_post_trial']:.2e} ({alert_info['sigma_generic_post_trial']:.2f}σ)")
            print(f"  → {alert_info['generic_interpretation']}")
        
        if pval_bayesian is not None:
            print(f"\nBayesian Search:")
            print(f"  Pre-trial:  p={pval_bayesian:.2e} ({alert_info['sigma_bayesian_pre_trial']:.2f}σ)")
            print(f"  Post-trial: p={alert_info['pval_bayesian_post_trial']:.2e} ({alert_info['sigma_bayesian_post_trial']:.2f}σ)")
            print(f"  → {alert_info['bayesian_interpretation']}")
        
        print(f"\nCoincident Events: {alert_info['n_events_coincident']}")
        print(f"Trials Factor Applied: {n_trials}")
        print(f"\nSaved to: {filepath}")
        print(f"{'='*70}\n")
        
        return alert_info
        
    except Exception as e:
        print(f"Error processing IceCube alert: {e}")
        import traceback
        traceback.print_exc()
        return None

Create Output Directory#

output_dir = "./icecube_alerts"
os.makedirs(output_dir, exist_ok=True)
print(f"IceCube alerts will be saved to: {output_dir}")

Set Time Range for Alert Retrieval#

# Time ranges are specified as Kafka timestamps (milliseconds since Unix epoch)
# Set the number of days to look back
days_back = 30

def define_time_range(t1, t2):
    """Define start and end timestamps in milliseconds since the unix epoch."""
    timestamp1 = int((datetime.datetime.now() - datetime.timedelta(days=t1)).timestamp() * 1000)
    timestamp2 = timestamp1 + t2*86400000 
    return timestamp1, timestamp2

timestamp1, timestamp2 = define_time_range(30, 30)

Poll IceCube LVK Nu Track Search Alerts#

This retrieves neutrino alerts coincident with LIGO/Virgo/KAGRA gravitational wave events.

# IceCube LVK Nu Track Search topic
topic = 'gcn.notices.icecube.lvk_nu_track_search'

# Typical LVK alert rate for trials factor
# O3: ~56 alerts, O4: ~80-100 alerts per year
lvk_trials_factor = 100

print(f"Subscribing to: {topic}")
print(f"Using trials factor: {lvk_trials_factor} (typical LVK yearly rate)\n")

try:
    start = consumer.offsets_for_times([TopicPartition(topic, 0, timestamp1)])
    end = consumer.offsets_for_times([TopicPartition(topic, 0, timestamp2)])
    
    consumer.assign(start)
    
    num_messages = end[0].offset - start[0].offset
    print(f"Found {num_messages} message(s) in time range\n")
    

    for message in consumer.consume(abs(num_messages), timeout=5):
        if message.error():
            print(f"Error: {message.error()}")
            continue
        
        print(f"Received message: topic={message.topic()}, offset={message.offset()}")
        value = message.value()
        
        # Process the alert
        process_icecube_alert(value, output_dir=output_dir, n_trials=lvk_trials_factor)

        
except Exception as e:
    print(f"Error polling alerts: {e}")
    import traceback
    traceback.print_exc()

Poll IceCube ASTROTRACK Alerts (Classic Format)#

These are high-energy single neutrino track events distributed via GCN Classic.

# IceCube ASTROTRACK alerts come through AMON
# Note: These use a different format (text-based)

topics_classic = [
    'gcn.classic.text.ICECUBE_ASTROTRACK_GOLD',
    'gcn.classic.text.ICECUBE_ASTROTRACK_BRONZE'
]

print("Searching for IceCube ASTROTRACK alerts (text format)...\n")

for topic in topics_classic:
    print(f"\nChecking topic: {topic}")
    try:
        start = consumer.offsets_for_times([TopicPartition(topic, 0, timestamp1)])
        end = consumer.offsets_for_times([TopicPartition(topic, 0, timestamp2)])
        
        consumer.assign(start)
        num_messages = end[0].offset - start[0].offset
        
        print(f"  Found {num_messages} message(s)")
        

        for message in consumer.consume(abs(num_messages), timeout=5):
            if message.error():
                continue
            
            value = message.value()
            print(f"\n  Received: offset={message.offset()}")
            
            # For text format, just save and display key info
            if isinstance(value, bytes):
                value = value.decode('utf-8')
            print(value)  # Print first 500 characters
                
    except Exception as e:
        print(f"  Error: {e}")

Real-time Monitoring (Background Listener)#

Set up a background listener for continuous monitoring of IceCube alerts.

import threading
import time

def icecube_listener(consumer, topics, output_dir="./icecube_alerts", n_trials=100, stop_event=None):
    """
    Background listener for IceCube alerts.
    
    Parameters
    ----------
    consumer : Consumer
        GCN Kafka consumer
    topics : list
        List of topics to subscribe to
    output_dir : str
        Directory to save alerts
    n_trials : int
        Trials factor for p-value correction
    stop_event : threading.Event
        Event to signal when to stop
    """
    consumer.subscribe(topics)
    
    try:
        print(f"Listener started for topics: {topics}")
        while not stop_event.is_set():
            for message in consumer.consume(timeout=1):
                if message.error():
                    print(f"Error: {message.error()}")
                    continue
                
                print(f"\nNew alert received: topic={message.topic()}, offset={message.offset()}")
                value = message.value()
                
                # Process based on topic type
                if 'lvk_nu_track_search' in message.topic():
                    process_icecube_alert(value, output_dir=output_dir, n_trials=n_trials)
                else:
                    # Text format - just display
                    if isinstance(value, bytes):
                        value = value.decode('utf-8')
                    print(value[:500])
            
            time.sleep(0.1)
            
    except Exception as e:
        print(f"Listener error: {e}")
    finally:
        consumer.close()
        print("Listener stopped")
#Uncomment to start background listener:
stop_event = threading.Event()
listener_topics = ['gcn.notices.icecube.lvk_nu_track_search']
listener_thread = threading.Thread(
    target=icecube_listener,
    args=(consumer, listener_topics, output_dir, 100, stop_event),
    daemon=True
)
listener_thread.start()
print("Background listener started")

Check Listener Status#

# Uncomment to check:
listener_thread.is_alive()

Stop the Listener#

# Uncomment to stop:
stop_event.set()
listener_thread.join()
# print("Listener stopped successfully")

Additional Resources#

IceCube:#

Statistical Methods:#