Setting up the kafka listener#

Attention: this notebook must be run locally, or on colab, because Binder does not allow for secure connection

To run locally, inside this repository do:

python -m venv alerts

source alerts/bin/activate

pip install gcn-kafka ligo.skymap xmltodict

To run on colab, download the notebook and upload it on colab, then add an initial line with

! pip install gcn-kafka ligo.skymap xmltodict

Swift#

Instruments

Energy Range

Field of View

Localization

Burst Alert Telescope (BAT)

15-350 keV

2 ster

\(\leq 1-3^{\prime}\) radius (statistical, 90%)

X-ray Telescope (XRT)

0.3-10 keV

\(23.6^{\prime} \times 23.6^{\prime}\)

\(1-3^{\prime \prime}\)

Ultraviolet/Optical Telescope UVOT

170-650 nm

\(17^{\prime} \times 17^{\prime}\)

\(< 1^{\prime \prime}\)

Type

Typical Latency Since Burst

Location Precision

Description

SWIFT_BAT_QL_POS

13-30 seconds

1-3’

QuickLook Position Notice (subset of BAT_POS info)

SWIFT_BAT_GRB_POS_ACK

13-30 seconds

1-3’

First Position Notice, the BAT Position

SWIFT_XRT_POSITION

\(30-80\)
seconds

\(<7^{\prime \prime}\)

XRT afterglow
location

SWIFT_UVOT_POS

\(1-3\)
hours

\(<2^{\prime \prime}\)

UVOT afterglow
location (Gnd-
generated only)

Check https://gcn.gsfc.nasa.gov/swift.html for further details for the content of each notice

Fermi#

Instruments

Energy Range

Field of View

Localization

Large Area Telescope (LAT)

\(20 \mathrm{MeV}- >300 \mathrm{GeV}\)

2.5 ster

\(\leq 1^{\circ}\) radius (statistical, 90%)

Gamma-ray Burst Monitor (GBM)

\(8 \mathrm{keV}-30\) MeV

8.8 ster

\(\gtrsim 1-10^{\circ}\) radius (statistical + systematic)

Type

Contents

Latency

FERMI_GBM_ALERT

Trigger info

~5 seconds

FERMI_GBM_FLT_POS

Flight localization, classification

~10 seconds

FERMI_GBM_GND_POS

Updated ground localization using finer lookup tables

20-300 seconds

FERMI_GBM_FIN_POS

Final trigger localization

15 minutes

FERMI_LAT_POS_INI

Onboard LAT detection, initial position

2-4 seconds

FERMI_LAT_POS_UPD

Updated onboard localization with more data

2-32 seconds

FERMI_LAT_GND

Ground localization of onboard trigger

8-12 hours

Check https://gcn.gsfc.nasa.gov/fermi.html for further details for the content of each notice

Get familiar with notice content#

https://gcn.gsfc.nasa.gov/fermi_grbs.html

https://gcn.gsfc.nasa.gov/swift_grbs.html

Let’s try our first kafka connection#

set

"auto.offset.reset": "earliest"

to receive all the messages you missed since the last time you had the listener active

set

"auto.offset.reset": "latest" 

to receive ONLY NEW messages you missed since the last time you had the listener active

from credentials import load_credentials, get_credential, list_credentials

# Load credentials from credentials.txt
load_credentials()

# Check what's configured
print("Configured credentials:")
for key, status in list_credentials().items():
    print(f"  {key}: {status}")

# Access specific credentials when needed
try:
    gcn_client_id = get_credential('GCN_CLIENT_ID')
    gcn_client_secret = get_credential('GCN_CLIENT_SECRET')
    print(f"\n✅ GCN Client ID: {gcn_client_id[:10]}..." )
except ValueError as e:
    print(f"\n⚠️ {e}")
📖 Loading credentials from: /Users/samueleronchini/Desktop/acme_tutorials/alerts/credentials.txt
✅ Loaded 17 credentials
Configured credentials:
  GCN_CLIENT_ID: ✅ Configured
  GCN_CLIENT_SECRET: ✅ Configured
  FINK_USERNAME: ⚠️ Empty
  FINK_GROUP_ID: ⚠️ Empty
  FINK_SERVERS: ✅ Configured
  SLACK_WEBHOOK: ✅ Configured
  SLACK_CHANNEL_ID: ✅ Configured
  EMAIL_SENDER: ✅ Configured
  EMAIL_PASSWORD: ✅ Configured
  EMAIL_SMTP_SERVER: ✅ Configured
  EMAIL_SMTP_PORT: ✅ Configured
  EMAIL_RECIPIENT: ✅ Configured
  TELEGRAM_BOT_TOKEN: ✅ Configured
  GRACEDB_USERNAME: ⚠️ Empty
  GRACEDB_PASSWORD: ⚠️ Empty
  GCN_CLIENT_ID_PROD: ✅ Configured
  GCN_CLIENT_SECRET_PROD: ✅ Configured

✅ GCN Client ID: 5h42prl0v1...
from gcn_kafka import Consumer
from confluent_kafka import TopicPartition
import os
import json
import datetime 
import email
import xmltodict

CONFIG = {"group.id": "", "auto.offset.reset": "earliest"}
consumer = Consumer(config=CONFIG,
                    client_id=gcn_client_id,
                    client_secret=gcn_client_secret,
                    domain='gcn.nasa.gov')
def write_json(text_data, output_dir="./gcn_alerts"):
    """Parse GCN text format and extract relevant information, then save to JSON."""
    
    # Convert bytes to string if necessary
    if isinstance(text_data, bytes):
        text_data = text_data.decode('utf-8')
    
    # Parse using email module
    msg = email.message_from_string(text_data)
    result = dict(msg)
    
    try:
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        
        # Create filename using trigger number or timestamp
        if 'TRIGGER_NUM' in result:
            filename = f"GRB_{result['TRIGGER_NUM']}.json"
        else:
            timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"GRB_{timestamp}.json"
        
        filepath = os.path.join(output_dir, filename)
        
        with open(filepath, 'w') as f:
            json.dump(result, f, indent=2)
        
        print(f"Saved to: {filepath}")
    
    except Exception as e:
        print("Error processing GCN text notice:", e)
    
    return result
def define_time_range(t1, t2):
    """Define start and end timestamps in milliseconds since the unix epoch."""
    timestamp1 = int((datetime.datetime.now() - datetime.timedelta(days=t1)).timestamp() * 1000)
    timestamp2 = timestamp1 + t2*86400000 
    return timestamp1, timestamp2
timestamp1, timestamp2 = define_time_range(1, 1)

print(f"Fetching messages between {timestamp1} and {timestamp2}")

# Subscribe to topics and receive alerts
topics = 'gcn.classic.text.FERMI_GBM_ALERT'
# topics = 'gcn.classic.text.SWIFT_BAT_GRB_POS_ACK'
# topics = 'gcn.classic.text.SWIFT_XRT_POSITION'
# topics = 'gcn.classic.text.SWIFT_UVOT_POS'

start = consumer.offsets_for_times(
    [TopicPartition(topics, 0, timestamp1)])
end = consumer.offsets_for_times(
    [TopicPartition(topics, 0, timestamp2)])


consumer.assign(start)

# Calculate the number of messages to consume, ensuring it's within valid range
num_messages = end[0].offset - start[0].offset


for message in consumer.consume(abs(num_messages), timeout=1):
    
# while True:
#     for message in consumer.consume(timeout=1):
        if message.error():
            print(message.error())
            continue
        # Print the topic and message ID
        print(f'topic={message.topic()}, offset={message.offset()}')
        value = message.value()
        # xml_dict = xmltodict.parse(value)
        write_json(value)
Fetching messages between 1763462992372 and 1763549392372
topic=gcn.classic.text.FERMI_GBM_ALERT, offset=3537
Saved to: ./gcn_alerts/GRB_785185339.json
topic=gcn.classic.text.FERMI_GBM_ALERT, offset=3538
Saved to: ./gcn_alerts/GRB_785190695.json
topic=gcn.classic.text.FERMI_GBM_ALERT, offset=3539
Saved to: ./gcn_alerts/GRB_785198410.json

SVOM#

Instruments

Energy Range

Field of View

Localization

ECLAIRs

\(4-250 \mathrm{ keV}\)

2 ster

\(\sim 10^{\prime}\) radius (statistical + systematic)

GRM

\(15-5000 \mathrm{ keV}\)

2.6 ster

\(\sim 10^{\circ}\)

MXT

0.2-10 keV

\(1.1^{\circ} \times 1.1^{\circ}\)

~10-100” radius (statistical + systematic)

VT

450-650 nm (Blue band)
\(650-1000 \mathrm{~nm}\) (Red band)

\(26^{\prime} \times 26^{\prime}\)

\(\sim 1^{\prime \prime}\) radius

Kafka topic

Notice types

gcn.notices.svom.voevent.grm

grm-trigger

gcn.notices.svom.voevent.eclairs

eclairs-wakeup eclairs-catalog slewing / not-slewing

gcn.notices.svom.voevent.mxt

mxt-initial mxt-update

gcn.notices.svom.voevent.vt

Available soon

Notice Type

Content

Latency

grm-trigger

GRM trigger notice with possible rough localization

\(\sim 15 \mathrm{sec}\)

eclairs-wakeup

ECLAIRs localization of a GRB candidate

\(\sim 15 \mathrm{sec}\)

eclairs-catalog

ECLAIRs localization of a flaring source from the on-board catalogue

\(\sim 15 \mathrm{sec}\)

slewing/ not-slewing

Platform slew status and target coordinates

~18 min

mxt-initial_qf#

MXT localization of a found counterpart (only if platform slews)

\(\sim 3 \mathrm{~min}\)

mxt-update_qf#

MXT updated localization

\(\sim 3 \mathrm{~min}\)

def voevent_to_json(xml_file):

    # Convert to normal dict and JSON
    json_data = json.loads(json.dumps(xml_file))

    # Navigate through the XML structure safely
    voevent = json_data.get("voe:VOEvent", {})
    wherewhen = voevent.get("WhereWhen", {})

    # Initialize result dictionary
    result = {}

    try:
        obs_location = wherewhen.get("ObsDataLocation", {}).get("ObservationLocation", {})
        astro_coords = obs_location.get("AstroCoords", {})
        pos = astro_coords.get("Position2D", {}).get("Value2", {})
        c1 = pos.get("C1")
        c2 = pos.get("C2")
        
        # The coordinates are already in RA/Dec format (not galactic)
        ra = float(c1)
        dec = float(c2)
        
        result['ra'] = ra
        result['dec'] = dec
        
        print("RA:", ra)
        print("Dec:", dec)

    except Exception as e:
        print("No coordinates found")

    trigger_time = astro_coords.get("Time", {}).get("TimeInstant", {}).get("ISOTime")
    result['trigger_time'] = trigger_time

    print("Trigger time:", trigger_time)

    # Save to JSON file
    output_dir = "./gcn_alerts"
    os.makedirs(output_dir, exist_ok=True)

    # Extract burst ID for filename
    burst_id = voevent.get("What", {}).get("Group", [{}])[0].get("Param", [{}])[1].get("@value", "unknown")
    filename = f"SVOM_{burst_id}.json"
    filepath = os.path.join(output_dir, filename)

    with open(filepath, 'w') as f:
        json.dump(result, f, indent=2)

    print(f"Saved to: {filepath}")

    return json_data
# Subscribe to topics and receive alerts
topics = 'gcn.notices.svom.voevent.eclairs'
# topics = 'gcn.notices.svom.voevent.grm'

timestamp1, timestamp2 = define_time_range(5, 5)

start = consumer.offsets_for_times(
    [TopicPartition(topics, 0, timestamp1)])
end = consumer.offsets_for_times(
    [TopicPartition(topics, 0, timestamp2)])

num_messages = end[0].offset - start[0].offset

consumer.assign(start)
for message in consumer.consume(abs(num_messages), timeout=1):
    
# while True:
#     for message in consumer.consume(timeout=1):
        if message.error():
            print(message.error())
            continue
        # Print the topic and message ID
        print(f'topic={message.topic()}, offset={message.offset()}')
        value = message.value()
        xml_dict = xmltodict.parse(value)
        voevent_to_json(xml_dict)
topic=gcn.notices.svom.voevent.eclairs, offset=277
RA: 18.832
Dec: -41.0854
Trigger time: 2025-11-16T15:32:26.019000
Saved to: ./gcn_alerts/SVOM_sb25111604.json
topic=gcn.notices.svom.voevent.eclairs, offset=278
RA: 335.2652
Dec: -10.4105
Trigger time: 2025-11-16T15:57:15.919000
Saved to: ./gcn_alerts/SVOM_sb25111605.json
topic=gcn.notices.svom.voevent.eclairs, offset=279
RA: 335.3182
Dec: -10.4239
Trigger time: 2025-11-16T15:57:36.399000
Saved to: ./gcn_alerts/SVOM_sb25111605.json
topic=gcn.notices.svom.voevent.eclairs, offset=280
RA: 84.4289
Dec: 12.2431
Trigger time: 2025-11-19T08:47:23.892000
Saved to: ./gcn_alerts/SVOM_sb25111901.json

check https://fsc.svom.org/readthedocs/svom/notices_and_circulars/index.html for more details about circulars and notices

Einstein Probe#

Instruments

Energy Range

Field of View

Localization

Wide Field X-ray Telescope

0.5-4 keV

\(3600 \mathrm{deg}^2\)

2 arcmin

Follow-up X-ray Telescope

\(0.5-10 \mathrm{keV}\)

60 arcmin diameter

5-15 arcsec (90% c.l.)

Type

Contents

Latency

gcn.notices.einstein_probe.wxt.alert

Localization, Count Rate, Significance

\(\sim 1 \mathrm{~min}\)

def to_json(text_data, output_dir="./gcn_alerts"):
    """Parse GCN JSON format and extract position and trigger time, then save to JSON."""
    
    try:
        # Parse the JSON data
        data = json.loads(text_data)
        
        # Extract only position and trigger time
        alert_info = {
            "trigger_time": data.get("trigger_time"),
            "ra": data.get("ra"),
            "dec": data.get("dec")
        }
        
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        
        # Generate filename based on trigger time and ID
        trigger_id = data.get("id", ["unknown"])[0]
        filename = f"einstein_probe_{trigger_id}.json"
        filepath = os.path.join(output_dir, filename)
        
        # Save to JSON file
        with open(filepath, 'w') as f:
            json.dump(alert_info, f, indent=2)
        
        print(f"Saved alert to {filepath}")
        return alert_info
        
    except json.JSONDecodeError as e:
        print(f"Error parsing JSON: {e}")
        return None
    except Exception as e:
        print(f"Error processing alert: {e}")
        return None
timestamp1, timestamp2 = define_time_range(10, 10)
# Subscribe to topics and receive alerts
topics = 'gcn.notices.einstein_probe.wxt.alert'


start = consumer.offsets_for_times(
    [TopicPartition(topics, 0, timestamp1)])
end = consumer.offsets_for_times(
    [TopicPartition(topics, 0, timestamp2)])

consumer.assign(start)

# Calculate the number of messages to consume, ensuring it's within valid range
num_messages = end[0].offset - start[0].offset

print(end[0].offset, start[0].offset)

for message in consumer.consume(abs(num_messages), timeout=5):
    
# while True:
#     for message in consumer.consume(timeout=1):
        if message.error():
            print(message.error())
            continue
        # Print the topic and message ID
        print(f'topic={message.topic()}, offset={message.offset()}')
        value = message.value()
        # xml_dict = xmltodict.parse(value)
        to_json(value)
-1 182
topic=gcn.notices.einstein_probe.wxt.alert, offset=182
Saved alert to ./gcn_alerts/einstein_probe_01709248221.json
topic=gcn.notices.einstein_probe.wxt.alert, offset=183
Saved alert to ./gcn_alerts/einstein_probe_11900480769.json

Let’s switch on our kafka listener#

import threading
import time

consumer.subscribe(['gcn.notices.einstein_probe.wxt.alert',
                    'gcn.notices.svom.voevent.grm',
                    'gcn.notices.svom.voevent.eclairs',
                    'gcn.classic.text.SWIFT_BAT_GRB_POS_ACK',
                    'gcn.classic.text.FERMI_GBM_ALERT'])

def kafka_listener():

    try:
        while not stop_event.is_set():
            msg = consumer.poll(timeout=1.0)  # poll frequently
            if msg is None:
                continue
            if msg.error():
                print(f"Consumer error: {msg.error()}")
                continue

            for message in consumer.consume(timeout=1):
                if message.error():
                    print(message.error())
                    continue
                print(f'topic={message.topic()}, offset={message.offset()}')
                if message.topic() == 'gcn.notices.einstein_probe.wxt.alert':
                    value = message.value()
                    to_json(value)
                elif 'svom' in message.topic():
                    value = message.value()
                    xml_dict = xmltodict.parse(value)
                    voevent_to_json(xml_dict)
                elif 'SWIFT' in message.topic() or 'FERMI' in message.topic():
                    value = message.value()
                    write_json(value)

    except KafkaException as e:
        print(f"Kafka exception: {e}")
    finally:
        consumer.close()
        print("Consumer closed.")

Start the listener as a background process#

CONFIG = {"group.id": "", "auto.offset.reset": "latest"}
consumer = Consumer(config=CONFIG,
                    client_id=gcn_client_id,
                    client_secret=gcn_client_secret,
                    domain='gcn.nasa.gov')
import threading

stop_event = threading.Event()

# --- Start listener in a daemon thread ---
listener_thread = threading.Thread(target=kafka_listener, daemon=True)
listener_thread.start()

Check if the listener is alive#

listener_thread.is_alive()
True

Stop the listener#

stop_event.set()  # This will stop keep_polling safely
listener_thread.join()    # wait for it to exit