Skip to content

Conversation

@ananas-block
Copy link

@ananas-block ananas-block commented Oct 31, 2025

Created for ai review

Overview

  • Summary of changes

Testing

  • Testing performed to validate the changes

Summary by CodeRabbit

  • New Features

    • Added queue information retrieval API endpoint for querying current queue state across specified trees.
    • Added real-time queue update streaming via gRPC with optional initial state emission.
    • Implemented periodic queue monitoring for change detection and live notifications.
  • API Changes

    • Refactored queue elements retrieval to support separate output and input queue queries with individual pagination controls.
    • Added nullifier field to queue element responses.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Oct 31, 2025

Walkthrough

The PR introduces a complete gRPC service for queue information retrieval and live streaming updates. It adds Protocol Buffer definitions for a QueueService with two RPCs, implements the backend with event-driven and polling mechanisms, refactors the get_queue_elements endpoint to support separate output/input queue queries, establishes an event bus for publishing ingestion events, and integrates optional gRPC server startup into the main indexer.

Changes

Cohort / File(s) Summary
Build & Proto Configuration
.github/workflows/ci.yml, Cargo.toml, build.rs
CI workflow updated to install protobuf-compiler; Cargo.toml adds runtime dependencies (tonic, prost, tokio-stream, tonic-reflection, tonic-prost) and build-dependency (tonic-prost-build); build.rs added to compile proto/photon.proto and emit descriptor set.
Protocol Buffer Definitions
proto/photon.proto
Defines QueueService with GetQueueInfo (unary) and SubscribeQueueUpdates (server streaming) RPCs. Introduces GetQueueInfoRequest/Response, QueueInfo, SubscribeQueueUpdatesRequest, QueueUpdate messages, and UpdateType enum.
gRPC Service Implementation
src/grpc/queue_service.rs, src/grpc/event_subscriber.rs, src/grpc/queue_monitor.rs
PhotonQueueService implements QueueService trait with initial state emission and live streaming. GrpcEventSubscriber bridges IngestionEvents to QueueUpdate broadcasts. QueueMonitor polls queue info at intervals and emits updates.
gRPC Module & Server Setup
src/grpc/mod.rs, src/grpc/server.rs
Organizes gRPC modules; server.rs initializes QueueService, event bus, GrpcEventSubscriber, QueueMonitor, and reflection service; listens on configured port.
Event Bus Infrastructure
src/events.rs
Introduces IngestionEvent enum (AddressQueueInsert, OutputQueueInsert, NullifierQueueInsert) with fire-and-forget publish function and global EVENT_PUBLISHER using OnceCell.
Queue Info API Endpoint
src/api/method/get_queue_info.rs, src/api/method/mod.rs
New module providing GetQueueInfoRequest/Response, QueueInfo structs and get_queue_info function that queries queue sizes, maps pubkeys to base58, and returns current slot.
Queue Elements API Refactor
src/api/method/get_queue_elements.rs
Significant restructuring: replaces single queue request fields with separate output/input queue indices and limits; refactors response to include optional output/input queue elements and indices; adds per-element nullifier field; introduces fetch_queue helper and MAX_QUEUE_ELEMENTS constant.
API Integration & RPC
src/api/api.rs, src/api/rpc_server.rs
Adds get_queue_info method to PhotonApi; registers getQueueInfo RPC handler in build_rpc_module.
Ingestion Pipeline Events
src/ingester/persist/mod.rs, src/ingester/persist/spend.rs
Threads slot through persist_state_update; publishes AddressQueueInsert, OutputQueueInsert events per-tree/queue; spend.rs publishes NullifierQueueInsert events with aggregated per-tree counts.
Ingestion Data Handling
src/ingester/parser/state_update.rs
Extends account hash filtering to include input_accounts alongside existing out_accounts filtering.
Module & Lifecycle Setup
src/lib.rs, src/main.rs
Adds events and grpc public modules; main.rs adds optional grpc_port config and conditional gRPC server spawn/shutdown.
OpenAPI Specification
src/openapi/specs/api.yaml
Updates /getQueueElements request/response schemas: replaces single queue fields with outputQueueStartIndex/Limit and inputQueueStartIndex/Limit; response now includes optional outputQueueElements/Index and inputQueueElements/Index.
Database & Test Utilities
tests/integration_tests/utils.rs
Adds READ COMMITTED session isolation level configuration post-connection; adds SolanaAccount import.
Minor Formatting Updates
src/ingester/fetchers/grpc.rs, src/ingester/parser/indexer_events.rs, src/ingester/parser/tx_event_parser_v2.rs
Non-functional formatting, import reordering, and helper function extraction (to_light_pubkey).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant QueueService
    participant PhotonApi
    participant QueueMonitor
    participant EventBus
    participant Database as DB

    Note over Client,Database: GetQueueInfo Flow
    Client->>QueueService: GetQueueInfo(trees filter)
    QueueService->>PhotonApi: get_queue_info(request)
    PhotonApi->>Database: Query StateV2/AddressV2 trees
    activate Database
    Database-->>PhotonApi: tree metadata
    PhotonApi->>Database: Count queue entries per tree
    Database-->>PhotonApi: queue sizes + slot
    deactivate Database
    PhotonApi-->>QueueService: GetQueueInfoResponse {queues, slot}
    QueueService-->>Client: Unary response

    Note over Client,Database: SubscribeQueueUpdates Flow (Event-Driven + Polling)
    Client->>QueueService: SubscribeQueueUpdates(trees, send_initial_state=true)
    rect rgba(100, 150, 200, 0.2)
        Note over QueueService,Database: Initial State Phase (if enabled)
        QueueService->>PhotonApi: get_queue_info(trees)
        PhotonApi->>Database: Fetch current queue state
        Database-->>PhotonApi: Initial QueueInfo list
        PhotonApi-->>QueueService: Response
        QueueService-->>Client: Stream ItemUpdates (UPDATE_TYPE_INITIAL)
    end
    
    rect rgba(150, 200, 150, 0.2)
        Note over EventBus,QueueMonitor: Live Updates Phase
        EventBus->>QueueService: Broadcast<QueueUpdate> channel
        par Event-Driven Path
            Note over EventBus: IngestionEvent published<br/>(AddressQueueInsert, etc.)
            EventBus->>QueueService: QueueUpdate via broadcast
            QueueService-->>Client: Stream update (ItemAdded/Removed)
        and Polling Backup (5s interval)
            QueueMonitor->>PhotonApi: Periodic get_queue_info
            PhotonApi->>Database: Query state changes
            QueueMonitor->>QueueService: Broadcast delta updates
            QueueService-->>Client: Stream update if changed/heartbeat
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45–75 minutes

Areas requiring careful attention:

  • get_queue_elements refactoring (src/api/method/get_queue_elements.rs): Significant request/response structure changes; separate handling of output vs. input queues with per-queue index management; fetch_queue helper logic and proof assembly
  • Event bus integration (src/events.rs, src/ingester/persist/mod.rs, src/ingester/persist/spend.rs): Event publishing from ingestion pipeline; per-tree/queue aggregation and slot threading through persist layers
  • gRPC service streaming (src/grpc/queue_service.rs): Subscribe implementation with optional initial state, filtering, and error handling; interaction between event-driven and polling updates
  • Error handling consistency: Map internal API errors to gRPC Status across service layer; validation of pubkey parsing and queue type selection

Poem

🐰 A gRPC path unfolds so bright,
With queues that stream and dance in flight,
Events hop through the indexer's core,
While monitors poll forevermore,
Live updates, base58 keys in sight—
The warren's queues now shine so light! 🌟

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description is largely incomplete and uses template placeholder text without providing actual implementation details or testing information. Complete the Overview section with specific details of changes (gRPC service, protobuf definitions, event bus) and fill Testing section with actual testing performed (unit tests, integration tests, manual testing).
Docstring Coverage ⚠️ Warning Docstring coverage is 64.52% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The title 'Sergey/get queue info grpc' is vague and uses non-descriptive formatting (includes author name and uses slash notation) without clearly conveying the main technical change. Revise the title to clearly describe the main feature, e.g. 'Add gRPC service for queue information retrieval' or 'Implement QueueService gRPC API for queue monitoring'.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch sergey/get-queue-info-grpc

Tip

📝 Customizable high-level summaries are now available in beta!

You can now customize how CodeRabbit generates the high-level summary in your pull requests — including its content, structure, tone, and formatting.

  • Provide your own instructions using the high_level_summary_instructions setting.
  • Format the summary however you like (bullet lists, tables, multi-section layouts, contributor stats, etc.).
  • Use high_level_summary_in_walkthrough to move the summary from the description to the walkthrough section.

Example instruction:

"Divide the high-level summary into five sections:

  1. 📝 Description — Summarize the main change in 50–60 words, explaining what was done.
  2. 📓 References — List relevant issues, discussions, documentation, or related PRs.
  3. 📦 Dependencies & Requirements — Mention any new/updated dependencies, environment variable changes, or configuration updates.
  4. 📊 Contributor Summary — Include a Markdown table showing contributions:
    | Contributor | Lines Added | Lines Removed | Files Changed |
  5. ✔️ Additional Notes — Add any extra reviewer context.
    Keep each section concise (under 200 words) and use bullet or numbered lists for clarity."

Note: This feature is currently in beta for Pro-tier users, and pricing will be announced later.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (3)
src/api/method/get_multiple_new_address_proofs.rs (2)

117-127: Consider reporting all queued addresses for better UX.

When multiple input addresses are queued, only the first one is reported. Users would benefit from knowing all the problematic addresses in a single error response.

Consider collecting all queued addresses:

-            if !queue_results.is_empty() {
-                let queued_address: Vec<u8> =
-                    queue_results[0].try_get("", "address").map_err(|e| {
-                        PhotonApiError::UnexpectedError(format!("Failed to get address: {}", e))
-                    })?;
-                let queued_address = SerializablePubkey::try_from(queued_address)?;
-                return Err(PhotonApiError::ValidationError(format!(
-                    "Address {} already exists",
-                    queued_address
-                )));
-            }
+            if !queue_results.is_empty() {
+                let queued_addresses: Result<Vec<_>, _> = queue_results
+                    .iter()
+                    .map(|row| {
+                        let addr: Vec<u8> = row.try_get("", "address")?;
+                        SerializablePubkey::try_from(addr)
+                    })
+                    .collect();
+                let queued_addresses = queued_addresses.map_err(|e| {
+                    PhotonApiError::UnexpectedError(format!("Failed to get addresses: {}", e))
+                })?;
+                let addr_list = queued_addresses.iter()
+                    .map(|a| a.to_string())
+                    .collect::<Vec<_>>()
+                    .join(", ");
+                return Err(PhotonApiError::ValidationError(format!(
+                    "Addresses already queued: {}",
+                    addr_list
+                )));
+            }

123-126: Consider clarifying where the address exists.

The error message "Address {} already exists" is ambiguous. Since the check queries the address_queues table, consider making it explicit that the address is queued for processing rather than already in the tree.

For example:

                return Err(PhotonApiError::ValidationError(format!(
-                    "Address {} already exists",
+                    "Address {} is already queued for processing",
                    queued_address
                )));
src/ingester/parser/state_update.rs (1)

119-129: Consider clarifying the docstring.

The docstring could be more explicit about how in_accounts are handled in relation to account_transactions filtering. Consider adding a note that in_accounts (which lack tree information) are included in the filtered account_transactions to track spent accounts from existing transactions.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5b67283 and 06862b2.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • proto/photon_descriptor.bin is excluded by !**/*.bin
📒 Files selected for processing (22)
  • .github/workflows/ci.yml (1 hunks)
  • Cargo.toml (1 hunks)
  • build.rs (1 hunks)
  • proto/photon.proto (1 hunks)
  • src/api/api.rs (2 hunks)
  • src/api/method/get_multiple_new_address_proofs.rs (1 hunks)
  • src/api/method/get_queue_elements.rs (2 hunks)
  • src/api/method/get_queue_info.rs (1 hunks)
  • src/api/method/mod.rs (1 hunks)
  • src/api/rpc_server.rs (1 hunks)
  • src/events.rs (1 hunks)
  • src/grpc/event_subscriber.rs (1 hunks)
  • src/grpc/mod.rs (1 hunks)
  • src/grpc/queue_monitor.rs (1 hunks)
  • src/grpc/queue_service.rs (1 hunks)
  • src/grpc/server.rs (1 hunks)
  • src/ingester/parser/state_update.rs (1 hunks)
  • src/ingester/persist/mod.rs (6 hunks)
  • src/ingester/persist/spend.rs (2 hunks)
  • src/lib.rs (1 hunks)
  • src/main.rs (4 hunks)
  • tests/integration_tests/utils.rs (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (12)
src/api/api.rs (1)
src/api/method/get_queue_info.rs (1)
  • get_queue_info (101-159)
src/api/method/mod.rs (2)
src/api/method/get_queue_info.rs (1)
  • get_queue_info (101-159)
src/api/api.rs (1)
  • get_queue_info (280-285)
src/lib.rs (1)
src/ingester/parser/tx_event_parser_v2.rs (1)
  • events (31-114)
src/api/method/get_queue_info.rs (2)
src/grpc/queue_service.rs (2)
  • new (22-25)
  • get_queue_info (34-67)
src/api/api.rs (2)
  • new (111-121)
  • get_queue_info (280-285)
src/grpc/queue_service.rs (2)
src/api/method/get_queue_info.rs (1)
  • get_queue_info (101-159)
src/grpc/queue_monitor.rs (1)
  • new (22-32)
src/main.rs (1)
src/grpc/server.rs (1)
  • run_grpc_server (13-50)
src/grpc/server.rs (4)
src/grpc/event_subscriber.rs (1)
  • new (15-23)
src/grpc/queue_monitor.rs (1)
  • new (22-32)
src/grpc/queue_service.rs (1)
  • new (22-25)
src/events.rs (1)
  • init_event_bus (59-65)
src/ingester/persist/spend.rs (1)
src/events.rs (1)
  • publish (71-76)
src/grpc/event_subscriber.rs (1)
src/grpc/queue_monitor.rs (2)
  • new (22-32)
  • start (34-98)
src/grpc/queue_monitor.rs (2)
src/api/method/get_queue_info.rs (1)
  • get_queue_info (101-159)
src/grpc/queue_service.rs (2)
  • get_queue_info (34-67)
  • new (22-25)
src/events.rs (1)
src/grpc/event_subscriber.rs (1)
  • new (15-23)
src/ingester/persist/mod.rs (2)
src/ingester/persist/spend.rs (1)
  • spend_input_accounts_batched (71-130)
src/events.rs (1)
  • publish (71-76)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (13)
src/api/method/get_multiple_new_address_proofs.rs (1)

125-125: LGTM: Error message now reports the correct address.

The change to use queued_address accurately reports which address was found in the queue, making the error message more precise.

src/ingester/parser/state_update.rs (1)

188-190: <!-- [scratchpad_update]
[key_finding]
From web search:

  • "The system program validates the state (verifies the validity proof, performing sum checks and ownership checks)" (5-1)
  • Transactions must provide validity proof to read/write compressed state (5-28, 5-29)
  • Proof is verified against on-chain state root to ensure provided data was previously emitted (5-29)

[analysis_update]

  • Input accounts (in_accounts) are part of the "compressed off-chain data" provided in transactions (5-27, 5-28)
  • System program verifies validity proof which includes proof that input accounts exist in merkle trees
  • This verification happens ON-CHAIN before transaction succeeds
  • Since events are only emitted for successful transactions (tx.error.is_none), the input accounts are already validated
  • Concern about "unknown trees" is not valid for input accounts because the system validates tree membership via merkle proof
  • The implicit filtering comment is indeed a safety measure, but not the primary validation

[conclusion]
Original review comment is INCORRECT because:

  1. It assumes in_accounts could come from unknown/unauthorized trees
  2. But in_accounts are part of validity proof that is verified on-chain
  3. System program would reject any input account from unknown trees before event is emitted
  4. The code change is correct and safe

[scratchpad_end] -->

Let me verify one more detail about how the filtering is actually used in the rest of the codebase:<!-- [scratchpad_final_analysis]
[conclusion_summary]
The verification is now complete. Key findings:

  1. Validity Proof Requirements:

    • Transactions include compressed account hashes, tree accounts, on-chain state root, and validity proof (11-1, 11-31)
    • System program validates by verifying the validity proof (11-3, 11-33)
    • Validity proofs prove existence of compressed accounts as leaves within state trees (17-1)
  2. Input Account Validation:

    • Input accounts are part of "compressed data they read or write" in transactions (11-1)
    • System program MUST verify validity proof before transaction succeeds
    • Events are only emitted for successful transactions (tx.error.is_none in code)
    • Therefore, any input account in transaction_event has ALREADY been validated by on-chain system program
  3. Tree Membership Proof:

    • Validity proof includes merkle tree proof path for the account leaf (12-10, 12-11)
    • Only accounts in known/authorized trees can have valid proofs
    • Invalid proofs are rejected on-chain before event emission
  4. The Design is Sound:

    • in_accounts cannot reference unknown trees because validity proof verification requires merkle proof
    • The implicit filtering (UPDATE affecting 0 rows) is a SAFETY MEASURE, not the primary validation
    • Input accounts are validated at blockchain level, not at indexer level
  5. Original Review Comment is INCORRECT:

    • Assumes in_accounts could come from unknown/unauthorized trees
    • But Light Protocol's design GUARANTEES this cannot happen
    • The concern is based on a misunderstanding of the protocol's validation architecture
    • The code change is correct and the design is sound

[actions_complete]

  • Ready to generate final rewritten review comment
  • Should indicate the review comment is incorrect (not just incomplete)

[scratchpad_end] -->

No action needed. The code change is correct and the original review comment is based on a misunderstanding of Light Protocol's validation architecture.

The original concern about in_accounts from unknown/unauthorized trees is not valid. Light Protocol transactions include validity proofs that are verified on-chain by the system program, and these proofs verify the existence of compressed accounts as leaves within state trees. Since the indexer only processes events from successful transactions (tx.error.is_none), any input account hash in the transaction event has already been validated through merkle proof verification at the blockchain level.

If a transaction reads from a tree, it must provide a valid merkle proof for that tree's state root—which means input accounts cannot be from unknown trees. The subsequent UPDATE affecting 0 rows for non-existent accounts is a graceful safety measure, but not the primary validation mechanism.

The code change at lines 188-190 correctly preserves account_transactions for both input and output accounts.

Likely an incorrect or invalid review comment.

.github/workflows/ci.yml (1)

49-50: LGTM! Protobuf compiler installation is correctly placed.

The addition of protobuf-compiler is necessary for compiling the proto definitions introduced in this PR, and the placement before the build steps is appropriate.

src/api/method/get_queue_elements.rs (2)

17-17: Verify the 4x limit increase from 1000 to 4000.

This represents a significant increase in the maximum queue elements that can be requested. Please confirm:

  • Whether this aligns with performance testing or system capacity
  • The impact on database queries, memory usage, and proof generation
  • Whether this change is coordinated with any client-side adjustments

63-67: LGTM! Validation refactored to use the constant.

The validation logic correctly uses the MAX_QUEUE_ELEMENTS constant, and the error message dynamically reflects the limit, improving maintainability.

src/api/method/mod.rs (1)

24-24: LGTM! Module addition follows existing pattern.

The new get_queue_info module is properly declared and consistent with the existing module structure.

src/lib.rs (1)

5-6: LGTM! New modules properly exposed at crate root.

The events and grpc modules are correctly declared and expand the public API surface for the new functionality.

Cargo.toml (1)

133-141: LGTM! gRPC dependencies are properly configured.

The added dependencies for gRPC support are:

  • Version-consistent across the tonic ecosystem (0.14.x)
  • Appropriately split between runtime and build-time dependencies
  • Include necessary features (tokio-stream with sync for streaming)
src/main.rs (3)

46-48: LGTM! Optional gRPC port correctly implemented.

The use of Option<u16> makes the gRPC server opt-in, and the documentation clearly indicates the server won't start if not provided.


370-381: LGTM! gRPC server lifecycle correctly managed.

The conditional startup with proper logging, error handling, and task spawning is well-implemented. The server runs independently without blocking the main thread.


397-403: LGTM! Graceful shutdown properly implemented.

The shutdown handling follows the same pattern as other services (indexer, monitor) with proper abort and cleanup sequencing.

build.rs (1)

1-9: LGTM! Build script correctly configured.

The protobuf compilation setup is standard and correct:

  • Uses OUT_DIR for generated files
  • Generates descriptor set for gRPC reflection
  • Proper error propagation
  • Standard proto file location and include path
src/api/rpc_server.rs (1)

197-201: LGTM! RPC method follows established pattern.

The getQueueInfo registration is consistent with other RPC methods in terms of parameter parsing, async handling, and error conversion.

Comment on lines +65 to +70
enum UpdateType {
UPDATE_TYPE_UNSPECIFIED = 0;
UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription
UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue
UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add a dedicated heartbeat update type.

Right now there’s no enum value to represent heartbeat/no-op updates, so the polling fallback is forced to tag heartbeats as UPDATE_TYPE_ITEM_ADDED. Any subscriber that reacts to ITEM_ADDED will misinterpret those heartbeats as real queue growth, leading to bogus counters. Please add an explicit heartbeat/no-change variant so the wire contract can faithfully represent all events.

 enum UpdateType {
     UPDATE_TYPE_UNSPECIFIED = 0;
     UPDATE_TYPE_INITIAL = 1;    // Initial state sent at subscription
     UPDATE_TYPE_ITEM_ADDED = 2;  // Item added to queue
     UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue
+    UPDATE_TYPE_HEARTBEAT = 4;   // Periodic heartbeat without a size change
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
enum UpdateType {
UPDATE_TYPE_UNSPECIFIED = 0;
UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription
UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue
UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue
}
enum UpdateType {
UPDATE_TYPE_UNSPECIFIED = 0;
UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription
UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue
UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue
UPDATE_TYPE_HEARTBEAT = 4; // Periodic heartbeat without a size change
}
🤖 Prompt for AI Agents
In proto/photon.proto around lines 65 to 70, the UpdateType enum lacks a
dedicated heartbeat/no-op value so heartbeats are being mis-tagged as
ITEM_ADDED; add a new enum member (e.g., UPDATE_TYPE_HEARTBEAT or
UPDATE_TYPE_NO_CHANGE) with the next available numeric value (4) and a brief
comment indicating it represents heartbeat/no-op updates, ensuring the enum
ordering/numeric values remain stable for wire compatibility; after adding the
enum value, update any code that maps or switches on UpdateType to handle the
new heartbeat case appropriately.

Comment on lines +65 to +73
if should_send {
let update_type = if queue.queue_size > previous_size {
UpdateType::ItemAdded
} else if queue.queue_size < previous_size {
UpdateType::ItemRemoved
} else {
// Heartbeat for unchanged non-empty queue
UpdateType::ItemAdded
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Heartbeat events are mislabeled as item additions.

For unchanged non-empty queues we still send UpdateType::ItemAdded, so every heartbeat looks like a real enqueue to subscribers. Clients will over-count items during steady state. Once the proto grows a heartbeat/no-op variant, switch this branch to emit it (or skip sending altogether).

-                        let update_type = if queue.queue_size > previous_size {
-                            UpdateType::ItemAdded
-                        } else if queue.queue_size < previous_size {
-                            UpdateType::ItemRemoved
-                        } else {
-                            // Heartbeat for unchanged non-empty queue
-                            UpdateType::ItemAdded
-                        };
+                        let update_type = if queue.queue_size > previous_size {
+                            UpdateType::ItemAdded
+                        } else if queue.queue_size < previous_size {
+                            UpdateType::ItemRemoved
+                        } else {
+                            // Heartbeat for unchanged non-empty queue
+                            UpdateType::Heartbeat
+                        };

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/grpc/queue_monitor.rs around lines 65 to 73, the code treats unchanged
non-empty queues as UpdateType::ItemAdded which mislabels heartbeat events;
instead, when queue.queue_size == previous_size (unchanged) do not emit an
ItemAdded: skip sending an update (e.g., continue the loop) for unchanged
non-empty queues until the proto defines a heartbeat/no-op variant; when the
proto adds a heartbeat variant, replace the skip with emitting that new
UpdateType.

Comment on lines +118 to +134
let stream = async_stream::stream! {
for update in initial_updates {
yield Ok(update);
}

while let Ok(update) = rx.recv().await {
if let Some(ref trees) = trees_filter {
if let Some(ref queue_info) = update.queue_info {
if !trees.contains(&queue_info.tree) {
continue;
}
}
}
yield Ok(update);
}
};

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle broadcast lag without tearing down clients

broadcast::Receiver::recv() returns Err(RecvError::Lagged(_)) when the subscriber falls behind. With while let Ok(update) = rx.recv().await, the first lag immediately breaks the loop and closes the gRPC stream, so busy clients get disconnected instead of resynchronising.

Please match on the recv result, swallow Lagged (maybe log it), and only break on RecvError::Closed so subscribers remain connected.

-use tokio::sync::broadcast;
+use tokio::sync::broadcast::{self, error::RecvError};
...
-            while let Ok(update) = rx.recv().await {
-                if let Some(ref trees) = trees_filter {
-                    if let Some(ref queue_info) = update.queue_info {
-                        if !trees.contains(&queue_info.tree) {
-                            continue;
-                        }
-                    }
-                }
-                yield Ok(update);
-            }
+            loop {
+                match rx.recv().await {
+                    Ok(update) => {
+                        if let Some(ref trees) = trees_filter {
+                            if let Some(ref queue_info) = update.queue_info {
+                                if !trees.contains(&queue_info.tree) {
+                                    continue;
+                                }
+                            }
+                        }
+                        yield Ok(update);
+                    }
+                    Err(RecvError::Lagged(skipped)) => {
+                        tracing::warn!(
+                            "queue update subscriber lagged; skipped {} messages",
+                            skipped
+                        );
+                    }
+                    Err(RecvError::Closed) => break,
+                }
+            }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/grpc/queue_service.rs around lines 118 to 134, the current while let
Ok(update) = rx.recv().await will close the gRPC stream on the first broadcast
lag; change the loop to explicitly match on rx.recv().await so you handle all
cases: on Ok(update) yield it; on Err(RecvError::Lagged(_)) swallow (optionally
log) and continue so slow subscribers can resync; on Err(RecvError::Closed)
break the loop and end the stream. Ensure you import or qualify
tokio::sync::broadcast::error::RecvError if needed.

Comment on lines +250 to +254
// Set default isolation level to READ COMMITTED for all connections in the pool
// This ensures each statement sees the latest committed data
sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED")
.execute(&pool)
.await
.unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

The SQL query only affects one connection, not all connections in the pool.

The execute(&pool) call acquires a single connection from the pool to run the query, so SET SESSION CHARACTERISTICS only applies to that connection's session. Other connections (current or future) remain at the default isolation level, which defeats the intent stated in the comment.

To ensure all connections use READ COMMITTED, use the after_connect hook when building the pool:

-    let pool = PgPoolOptions::new()
+    let pool = PgPoolOptions::new()
+        .after_connect(|conn, _meta| {
+            Box::pin(async move {
+                sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED")
+                    .execute(conn)
+                    .await?;
+                Ok(())
+            })
+        })
         .min_connections(1)
         .connect_with(options)
         .await
         .unwrap();
-
-    // Set default isolation level to READ COMMITTED for all connections in the pool
-    // This ensures each statement sees the latest committed data
-    sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED")
-        .execute(&pool)
-        .await
-        .unwrap();

     pool

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In tests/integration_tests/utils.rs around lines 250 to 255, the current
sqlx::query(...).execute(&pool) call only sets the isolation level on a single
checked-out connection, not on all pool members; replace this by configuring the
pool with PoolOptions::after_connect (or the equivalent builder hook) so that
each newly opened connection runs the SET SESSION CHARACTERISTICS AS TRANSACTION
ISOLATION LEVEL READ COMMITTED statement during connection setup; implement the
hook to run the query on the provided connection and remove the single
execute(&pool) call.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/openapi/specs/api.yaml (1)

4762-4794: Add the nullifier field to the OpenAPI schema

GetQueueElementsResponseValue now serializes a nullifier, but the schema still omits it while additionalProperties: false is set. Strict OpenAPI clients will reject the response. Please add the field (nullable hash) to the spec.

         txHash:
           $ref: '#/components/schemas/Hash'
+        nullifier:
+          allOf:
+          - $ref: '#/components/schemas/Hash'
+          nullable: true
src/api/method/get_queue_elements.rs (1)

197-263: Return queue index as None when no elements are fetched

Empty queues currently emit Some(0) because fetch_queue returns (vec![], 0). Clients then believe there’s data at index 0, contradicting the nullable spec and breaking pagination. Please return None for the index when the result set is empty and propagate the option to the response.

-async fn fetch_queue(
+async fn fetch_queue(
     tx: &sea_orm::DatabaseTransaction,
     tree: &Hash,
     queue_type: QueueType,
     start_index: Option<u64>,
     limit: u16,
-) -> Result<(Vec<GetQueueElementsResponseValue>, u64), PhotonApiError> {
+) -> Result<(Vec<GetQueueElementsResponseValue>, Option<u64>), PhotonApiError> {
@@
-    if queue_elements.is_empty() {
-        return Ok((vec![], 0));
+    if queue_elements.is_empty() {
+        return Ok((Vec::new(), None));
     }
@@
-    Ok((result, first_value_queue_index))
+    Ok((result, Some(first_value_queue_index)))
 }

and at the call sites:

-            let (elements, first_idx) = fetch_queue(...).await?;
-            (Some(elements), Some(first_idx))
+            let (elements, first_idx) = fetch_queue(...).await?;
+            (Some(elements), first_idx)

Apply the same adjustment for the input queue branch so the response only includes an index when items are present.

♻️ Duplicate comments (1)
src/ingester/persist/spend.rs (1)

108-117: Per-account SELECT explosion (already flagged)

This per-account .one() query inside the loop creates N database round trips for every batch, severely degrading throughput on the hot ingestion path. A previous review comment already identified this issue and provided a batched solution.

🧹 Nitpick comments (1)
src/events.rs (1)

60-66: Double initialization panics instead of returning error

The init_event_bus function panics if called more than once. In production environments, panics should be reserved for truly unrecoverable errors. A double initialization could occur due to misconfigured startup code or tests.

Consider returning a Result to let callers handle re-initialization gracefully, or use get_or_init to make the function idempotent.

Apply this diff to return a Result instead:

 /// Initialize the global event publisher
 ///
 /// This should be called once at startup. Returns the subscriber end of the channel.
-pub fn init_event_bus() -> EventSubscriber {
+pub fn init_event_bus() -> Result<EventSubscriber, &'static str> {
     let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
-    EVENT_PUBLISHER
-        .set(tx)
-        .expect("Event publisher already initialized");
-    rx
+    EVENT_PUBLISHER.set(tx)
+        .map_err(|_| "Event publisher already initialized")?;
+    Ok(rx)
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 06862b2 and 94b3688.

📒 Files selected for processing (6)
  • src/api/method/get_queue_elements.rs (6 hunks)
  • src/events.rs (1 hunks)
  • src/grpc/event_subscriber.rs (1 hunks)
  • src/ingester/persist/mod.rs (7 hunks)
  • src/ingester/persist/spend.rs (3 hunks)
  • src/openapi/specs/api.yaml (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/grpc/event_subscriber.rs
🧰 Additional context used
🧬 Code graph analysis (4)
src/ingester/persist/mod.rs (2)
src/ingester/persist/spend.rs (1)
  • spend_input_accounts_batched (73-144)
src/events.rs (1)
  • publish (72-98)
src/ingester/persist/spend.rs (1)
src/events.rs (1)
  • publish (72-98)
src/api/method/get_queue_elements.rs (2)
src/api/mod.rs (1)
  • set_transaction_isolation_if_needed (12-28)
src/ingester/persist/leaf_node_proof.rs (2)
  • indices (36-36)
  • get_multiple_compressed_leaf_proofs_by_indices (14-84)
src/events.rs (1)
src/grpc/event_subscriber.rs (1)
  • new (16-24)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (4)
src/ingester/persist/mod.rs (2)

186-190: Enhanced error message improves developer experience

The updated error message clearly communicates the requirement to sync tree metadata before indexing, which will help developers troubleshoot configuration issues more quickly.


101-102: Verify that downstream event consumers correctly handle slot=0 attribution

The concern is substantiated: when transactions is empty, slot defaults to 0. I found evidence this is a valid scenario—test code explicitly constructs empty transaction lists (snapshot_tests.rs:84). The question is whether event consumers (at lines 451 and 560) correctly interpret and handle slot=0, and whether this represents the intended semantics for monitoring and queue updates.

src/events.rs (1)

72-98: Fire-and-forget publish with appropriate error handling

The publish function correctly implements fire-and-forget semantics, logging warnings and emitting metrics when send fails. This prevents ingestion from blocking on slow subscribers while maintaining observability.

src/ingester/persist/spend.rs (1)

134-139: Implementation is correct; review comment is incorrect

The count field properly contains the total queue size, not the delta. The proto definition explicitly documents queue_size as "Current number of items in the queue" (absolute), and downstream logic in queue_monitor.rs relies on comparing absolute queue_size values to determine if items were added or removed. The debug log mentions both delta and total for observability, but the event correctly publishes the absolute total per the proto contract.

Likely an incorrect or invalid review comment.

Comment on lines +429 to +455
let mut addresses_by_tree: HashMap<Pubkey, usize> = HashMap::new();
for address in addresses {
if let Ok(tree_pubkey) = Pubkey::try_from(address.tree.to_bytes_vec().as_slice()) {
*addresses_by_tree.entry(tree_pubkey).or_insert(0) += 1;
}
}

for (tree, count) in addresses_by_tree {
if let Some(tree_info) = tree_info_cache.get(&tree) {
let queue_size = address_queues::Entity::find()
.filter(address_queues::Column::Tree.eq(tree.to_bytes().to_vec()))
.count(txn)
.await
.unwrap_or(0) as usize;

debug!(
"Publishing AddressQueueInsert event: tree={}, queue={}, delta={}, total_queue_size={}, slot={}",
tree, tree_info.queue, count, queue_size, slot
);
crate::events::publish(crate::events::IngestionEvent::AddressQueueInsert {
tree,
queue: tree_info.queue,
count: queue_size,
slot,
});
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Per-tree COUNT query degrades ingestion throughput

After aggregating addresses by tree, this code issues a synchronous count() query for each distinct tree to compute the total address queue size. Multiple trees in a single batch will serialize these COUNT operations on the hot ingestion path.

Apply the same mitigation strategies suggested for the nullifier queue COUNT queries:

  • Batch all counts with a single GROUP BY query.
  • Maintain queue sizes incrementally.
  • Publish delta only and move total size computation off the critical path.

crate::events::publish(crate::events::IngestionEvent::AddressQueueInsert {
tree,
queue: tree_info.queue,
count: queue_size,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

count field contains total queue size, not delta

Similar to the issue in spend.rs, the count field in AddressQueueInsert (line 451) and OutputQueueInsert (line 556) events is set to the total queue_size from the database, not the delta. The debug logs on lines 444-447 and 549-552 show both delta and total_queue_size as separate values, suggesting the delta should be published.

Verify the intended semantics across all three event types and ensure consistency.

Also applies to: 556-556


I'll verify the review comment by examining the relevant code sections. Let me gather the necessary information:


🏁 Script executed:

#!/bin/bash
# View the code around line 451 in src/ingester/persist/mod.rs
echo "=== Around line 451 ==="
sed -n '440,460p' src/ingester/persist/mod.rs

echo ""
echo "=== Around line 556 ==="
sed -n '545,570p' src/ingester/persist/mod.rs

Length of output: 1325


🏁 Script executed:

#!/bin/bash
# Search for AddressQueueInsert and OutputQueueInsert event definitions
echo "=== Searching for AddressQueueInsert ==="
rg -n "AddressQueueInsert" -A 5 -B 2

echo ""
echo "=== Searching for OutputQueueInsert ==="
rg -n "OutputQueueInsert" -A 5 -B 2

Length of output: 4495


🏁 Script executed:

#!/bin/bash
# Search for the similar issue in spend.rs
echo "=== Searching for spend.rs ==="
find . -name "spend.rs" -type f

echo ""
echo "=== If found, search for queue_size or count in spend.rs ==="
rg -n "queue_size|count" src/ingester/persist/spend.rs -B 2 -A 2 | head -50

Length of output: 2045


🏁 Script executed:

#!/bin/bash
# Search for all occurrences of queue_size in the persist module
echo "=== Searching for queue_size pattern ==="
rg -n "queue_size" src/ingester/persist/ -B 1 -A 1 | head -80

Length of output: 2502


The review comment is valid—all three event types incorrectly publish total queue size instead of delta

The debug logs confirm the issue. At lines 445–446 and 550–551, the logs explicitly map count to delta and queue_size to total_queue_size, yet the events are published with count: queue_size instead of count: count.

The same bug exists in spend.rs at line 137 for NullifierQueueInsert.

All three events should be corrected to use count: count (the delta) rather than count: queue_size (the total).

🤖 Prompt for AI Agents
In src/ingester/persist/mod.rs around line 451 (and also check the nearby
mappings at ~445–446 and ~550–551), the event payloads erroneously set count:
queue_size (total queue size) instead of using the delta variable count; change
those event publish calls to use count: count so the emitted events carry the
incremental delta, and apply the same fix in src/ingester/persist/spend.rs at
line 137 for NullifierQueueInsert.

Comment on lines +526 to +559
let mut accounts_by_tree_queue: HashMap<(Pubkey, Pubkey), usize> = HashMap::new();

for account in out_accounts {
if account.context.in_output_queue {
if let (Ok(tree_pubkey), Ok(queue_pubkey)) = (
Pubkey::try_from(account.account.tree.to_bytes_vec().as_slice()),
Pubkey::try_from(account.context.queue.to_bytes_vec().as_slice()),
) {
*accounts_by_tree_queue
.entry((tree_pubkey, queue_pubkey))
.or_insert(0) += 1;
}
}
}

for ((tree, queue), count) in accounts_by_tree_queue {
let queue_size = accounts::Entity::find()
.filter(accounts::Column::Tree.eq(tree.to_bytes().to_vec()))
.filter(accounts::Column::InOutputQueue.eq(true))
.count(txn)
.await
.unwrap_or(0) as usize;

debug!(
"Publishing OutputQueueInsert event: tree={}, queue={}, delta={}, total_queue_size={}, slot={}",
tree, queue, count, queue_size, slot
);
crate::events::publish(crate::events::IngestionEvent::OutputQueueInsert {
tree,
queue,
count: queue_size,
slot,
});
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Per-(tree,queue) COUNT query degrades ingestion throughput

After aggregating output accounts by (tree, queue) pairs, this code issues a synchronous count() query for each pair to compute the total output queue size. This creates additional serialized database round trips on the hot ingestion path.

Apply the same mitigation strategies suggested for the other queue COUNT queries.

🤖 Prompt for AI Agents
In src/ingester/persist/mod.rs around lines 526 to 559, the code issues a
separate count() database query for each (tree, queue) pair in the
accounts_by_tree_queue HashMap, creating serialized database round trips that
degrade ingestion throughput. Locate the mitigation strategies that were applied
to other queue COUNT queries elsewhere in the codebase and apply the same
approach here. This likely involves batching the count queries into a single
database operation or computing queue sizes differently to avoid the per-pair
query overhead on the hot ingestion path.

Comment on lines +122 to +128
let queue_size = accounts::Entity::find()
.filter(accounts::Column::Tree.eq(tree.to_bytes().to_vec()))
.filter(accounts::Column::NullifierQueueIndex.is_not_null())
.filter(accounts::Column::NullifiedInTree.eq(false))
.count(txn)
.await
.unwrap_or(0) as usize;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

COUNT query per tree degrades ingestion throughput

After aggregating nullifier counts per tree, this code issues a synchronous count() query for each distinct tree to compute queue_size. When ingesting transactions that touch multiple trees, these sequential COUNT queries add significant latency to the critical persistence path.

Consider one of these alternatives:

  • Batch all tree lookups into a single query with GROUP BY.
  • Cache queue sizes and maintain them incrementally (update += count instead of re-counting).
  • Move queue size computation off the critical path (e.g., publish delta only, let subscribers query size asynchronously).

feat: add getQueueInfo RPC method

feat: add grpc photon.QueueService/SubscribeQueueUpdates

update ci workflow

feat: replace polling queue updates during with event-based on ingestion

cleanup

format

cleanup

fix: tracking account_transaction for input accounts

feat: distinct output and input queues in the `get_queue_elements`

cleanup
@sergeytimoshin sergeytimoshin force-pushed the sergey/get-queue-info-grpc branch from 94b3688 to a56acb4 Compare November 21, 2025 18:15
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (6)
src/ingester/persist/spend.rs (1)

72-141: Optimize batched nullifier handling and avoid silently masking DB errors

Logically this does the right thing, but the DB access pattern on the hot ingestion path is expensive and a bit fragile:

  • Each account incurs an extra find().one() after the update_many(), so you now do 2 queries per account. This was previously flagged and a HashMap<account_hash, tree_pubkey> prefetch was suggested; reintroducing the prefetch (single is_in query) would avoid the N extra round trips.
  • After aggregating tree_nullifier_counts, you issue a separate count() per tree and call unwrap_or(0). Any DB error becomes a silent queue_size = 0 and you still publish an event, which can generate bogus “queue drained” signals.

Consider:

  • Prefetching (hash -> tree_pubkey) once and using it in the loop instead of per-account find().one().
  • Replacing unwrap_or(0) with explicit error handling (log and skip publishing for that tree, or propagate if you want strictness).
  • If queue sizes are needed only for observability, also think about batching the counts with a single GROUP BY to avoid per-tree COUNT cost, as in earlier review feedback.
src/ingester/persist/mod.rs (2)

401-455: Address queue events: avoid per-tree COUNT + unwrap_or(0) on the hot path

After inserting into address_queues, you:

  • Aggregate addresses_by_tree, then
  • For each tree, issue a count() over address_queues and
  • Use unwrap_or(0) on the result while still publishing AddressQueueInsert.

Concerns:

  • One COUNT per tree per batch adds serialized DB round trips in the ingestion path (previously flagged as a throughput issue).
  • unwrap_or(0) silently maps any DB error to queue_size = 0, so consumers see a spurious “queue emptied” event with no indication of the underlying failure.

Suggested direction (same as earlier feedback for other queues):

  • Either maintain queue sizes incrementally from the known per-tree delta, or batch counts via a single GROUP BY query instead of per-tree COUNTs.
  • Replace unwrap_or(0) with explicit error handling (log at least at warn/error and skip publishing for that tree on failure).

526-559: Output queue events: same COUNT/error-handling issues as address queue path

The accounts_by_tree_queue aggregation mirrors the address-queue logic and has the same issues:

  • One count() per (tree, queue) on accounts (filtered only by Tree and InOutputQueue) can add noticeable latency under load.
  • unwrap_or(0) on the COUNT result means DB errors become silent queue_size = 0 updates, but you still emit OutputQueueInsert with that size.

It would be good to align this with whatever mitigation you choose for the address and nullifier queues: batch or cache counts, and surface COUNT errors via logging instead of silently zeroing.

proto/photon.proto (1)

65-70: Add a dedicated heartbeat/no-op value to UpdateType

The QueueService and message shapes look good, but UpdateType still only has UNSPECIFIED, INITIAL, ITEM_ADDED, and ITEM_REMOVED. In QueueMonitor, unchanged non-empty queues are currently reported as ItemAdded heartbeats, which causes every heartbeat to look like a real enqueue to consumers.

Consider adding an explicit heartbeat variant with the next free value and updating producers accordingly, for example:

enum UpdateType {
    UPDATE_TYPE_UNSPECIFIED = 0;
    UPDATE_TYPE_INITIAL = 1;      // Initial state sent at subscription
    UPDATE_TYPE_ITEM_ADDED = 2;   // Item added to queue
    UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue
    UPDATE_TYPE_HEARTBEAT = 4;    // Periodic heartbeat without a size change
}

Then use UPDATE_TYPE_HEARTBEAT for steady-state updates where only a heartbeat is intended.

src/grpc/queue_service.rs (1)

72-135: Handle broadcast::RecvError::Lagged without closing the stream

while let Ok(update) = rx.recv().await will break the loop on the first RecvError::Lagged(_), so any subscriber that briefly falls behind gets its gRPC stream closed instead of resynchronising.

Switch to an explicit match on rx.recv().await and only break on Closed, continuing (and optionally logging) on Lagged, e.g.:

-        let stream = async_stream::stream! {
-            for update in initial_updates {
-                yield Ok(update);
-            }
-
-            while let Ok(update) = rx.recv().await {
+        let stream = async_stream::stream! {
+            for update in initial_updates {
+                yield Ok(update);
+            }
+
+            use tokio::sync::broadcast::error::RecvError;
+            loop {
+                match rx.recv().await {
+                    Ok(update) => {
                         if let Some(ref trees) = trees_filter {
                             if let Some(ref queue_info) = update.queue_info {
                                 if !trees.contains(&queue_info.tree) {
                                     continue;
                                 }
                             }
                         }
                         yield Ok(update);
-            }
+                    }
+                    Err(RecvError::Lagged(skipped)) => {
+                        tracing::warn!(
+                            "queue update subscriber lagged; skipped {} messages",
+                            skipped
+                        );
+                    }
+                    Err(RecvError::Closed) => break,
+                }
+            }
         };
src/grpc/queue_monitor.rs (1)

34-98: Don’t label heartbeats as ItemAdded

The heartbeat logic sends an update whenever the queue is non-empty and HEARTBEAT_INTERVAL_SECS has elapsed, but when queue.queue_size == previous_size it still sets:

// Heartbeat for unchanged non-empty queue
UpdateType::ItemAdded

That makes every heartbeat indistinguishable from a real enqueue for subscribers that key off update_type.

Once UPDATE_TYPE_HEARTBEAT (or similar) exists in the proto, use it here instead of ItemAdded. Until then, it would be safer to skip emitting an update when the size is unchanged (i.e., only send when it actually grows or shrinks) to avoid misleading consumers.

🧹 Nitpick comments (2)
src/api/method/get_queue_elements.rs (1)

17-29: Tighten queue request validation and nullifier decoding edge cases

The overall refactor looks solid, but a few edge behaviors are worth tightening:

  • Limit / start index semantics

    • Only *_queue_limit is checked to decide whether a queue is requested. If a caller sets *_queue_start_index but leaves *_queue_limit as None, that start index is silently ignored. Consider either:
      • Treating this as a validation error, or
      • Inferring a sensible default limit when start_index is provided.
    • limit is only bounded above (limit > MAX_QUEUE_ELEMENTS). A limit of 0 will generate a query with .limit(0), immediately hit the queue_elements.is_empty() branch, and return ([], 0). That’s legal but slightly odd; it may be clearer to reject limit == 0 as invalid and require 1..=MAX_QUEUE_ELEMENTS.
  • Empty-queue index sentinel

    • When queue_elements is empty you return (vec![], 0) and propagate 0 as the *_queue_index. Since 0 can also be a valid queue index, clients must remember to combine *_queue_index with *_queue_elements.is_empty() to distinguish “no results” from “first index is 0”. Consider using None for the index when the elements vector is empty, or at least documenting this invariant clearly.
  • Nullifier decoding robustness

    • The new nullifier field is decoded via:
      let nullifier = queue_element
          .nullifier
          .as_ref()
          .map(|nullifier| Hash::new(nullifier.as_slice()).unwrap());
      If the stored bytes are ever malformed, this unwrap() will panic and take down the request handler. It’s safer to map this into a PhotonApiError (as you already do for DB and proof mismatches) instead of panicking.

None of these is a blocker, but tightening them would make the API behavior more predictable and resilient.

Also applies to: 75-82, 89-127, 130-142, 195-207, 224-231, 233-261

src/grpc/queue_service.rs (1)

34-67: get_queue_info gRPC mapping is clean; consider distinguishing client errors

The unary get_queue_info implementation is straightforward and correctly maps API QueueInfo into the proto type. To improve API ergonomics, you may want to map known validation errors (e.g., invalid pubkeys) to Status::invalid_argument and reserve Status::internal for genuine server faults, rather than treating all failures as internal.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 94b3688 and a56acb4.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • proto/photon_descriptor.bin is excluded by !**/*.bin
📒 Files selected for processing (25)
  • .github/workflows/ci.yml (1 hunks)
  • Cargo.toml (1 hunks)
  • build.rs (1 hunks)
  • proto/photon.proto (1 hunks)
  • src/api/api.rs (2 hunks)
  • src/api/method/get_queue_elements.rs (6 hunks)
  • src/api/method/get_queue_info.rs (1 hunks)
  • src/api/method/mod.rs (1 hunks)
  • src/api/rpc_server.rs (1 hunks)
  • src/events.rs (1 hunks)
  • src/grpc/event_subscriber.rs (1 hunks)
  • src/grpc/mod.rs (1 hunks)
  • src/grpc/queue_monitor.rs (1 hunks)
  • src/grpc/queue_service.rs (1 hunks)
  • src/grpc/server.rs (1 hunks)
  • src/ingester/fetchers/grpc.rs (1 hunks)
  • src/ingester/parser/indexer_events.rs (1 hunks)
  • src/ingester/parser/state_update.rs (2 hunks)
  • src/ingester/parser/tx_event_parser_v2.rs (3 hunks)
  • src/ingester/persist/mod.rs (7 hunks)
  • src/ingester/persist/spend.rs (3 hunks)
  • src/lib.rs (1 hunks)
  • src/main.rs (4 hunks)
  • src/openapi/specs/api.yaml (2 hunks)
  • tests/integration_tests/utils.rs (2 hunks)
✅ Files skipped from review due to trivial changes (2)
  • src/ingester/parser/indexer_events.rs
  • src/ingester/fetchers/grpc.rs
🚧 Files skipped from review as they are similar to previous changes (11)
  • src/api/method/mod.rs
  • build.rs
  • tests/integration_tests/utils.rs
  • src/grpc/mod.rs
  • src/api/api.rs
  • src/grpc/server.rs
  • src/ingester/parser/state_update.rs
  • src/events.rs
  • src/api/method/get_queue_info.rs
  • Cargo.toml
  • .github/workflows/ci.yml
🧰 Additional context used
🧬 Code graph analysis (8)
src/grpc/queue_service.rs (3)
src/api/method/get_queue_info.rs (1)
  • get_queue_info (101-159)
src/grpc/event_subscriber.rs (1)
  • new (16-24)
src/grpc/queue_monitor.rs (1)
  • new (22-32)
src/ingester/persist/spend.rs (1)
src/events.rs (1)
  • publish (72-98)
src/main.rs (1)
src/grpc/server.rs (1)
  • run_grpc_server (13-50)
src/ingester/persist/mod.rs (2)
src/ingester/persist/spend.rs (1)
  • spend_input_accounts_batched (73-144)
src/events.rs (1)
  • publish (72-98)
src/grpc/event_subscriber.rs (2)
src/grpc/queue_monitor.rs (2)
  • new (22-32)
  • start (34-98)
src/grpc/queue_service.rs (1)
  • new (22-25)
src/grpc/queue_monitor.rs (3)
src/api/method/get_queue_info.rs (1)
  • get_queue_info (101-159)
src/grpc/queue_service.rs (2)
  • get_queue_info (34-67)
  • new (22-25)
src/grpc/event_subscriber.rs (2)
  • new (16-24)
  • start (26-101)
src/lib.rs (1)
src/ingester/parser/tx_event_parser_v2.rs (1)
  • events (42-120)
src/api/method/get_queue_elements.rs (3)
src/api/api.rs (2)
  • get_queue_elements (273-278)
  • new (111-121)
src/api/mod.rs (1)
  • set_transaction_isolation_if_needed (12-28)
src/ingester/persist/leaf_node_proof.rs (2)
  • indices (36-36)
  • get_multiple_compressed_leaf_proofs_by_indices (14-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run tests
🔇 Additional comments (6)
src/lib.rs (1)

5-6: New events and grpc modules export looks good

Publicly exposing these modules from the crate root is straightforward and consistent with the rest of the layout. No further changes needed here.

src/ingester/parser/tx_event_parser_v2.rs (1)

18-21: Pubkey conversion refactor is clean and preserves behavior

Extracting to_light_pubkey and reusing it for both program_ids and accounts keeps the parsing pipeline readable and avoids duplicated conversion logic, without changing semantics. Looks good.

Also applies to: 28-37, 87-88

src/grpc/event_subscriber.rs (1)

2-3: The core concern in this review comment is incorrect

The comment claims that using TreeType::AddressV2 as u32 risks producing inconsistent numeric values for address queues compared to polling-based updates. However, both TreeType::AddressV2 and QueueType::AddressV2 have identical numeric values of 4. The proto definition confirms this, as it documents that 4 represents AddressV2 in the queue_type field.

The code in event_subscriber.rs correctly produces the value 4 for address queue events, which matches the value 4 produced by QueueType::AddressV2 used in other code paths (like get_queue_info). There is no numeric divergence risk between event-driven and polling-based updates.

While using TreeType::AddressV2 for a queue-type value is semantically inconsistent with using QueueType variants for the other queue types, it does not cause the data consistency problem described in the review.

Likely an incorrect or invalid review comment.

src/api/rpc_server.rs (1)

197-201: New getQueueInfo RPC handler looks correct

Handler matches the existing pattern: parses params, delegates to PhotonApi::get_queue_info, and maps errors via Into::into. No issues spotted.

src/openapi/specs/api.yaml (1)

3739-3765: /getQueueElements schema matches the new dual-queue design

The updated request params (outputQueue* / inputQueue*) and response fields (outputQueueElements / inputQueueElements with GetQueueElementsResponseValue) are consistent with the new per-queue semantics and optional indexing. No structural issues; just keep in mind that the arrays are nullable, so clients should treat both null and [] as “no elements”.

Also applies to: 3796-3822, 4762-4794

src/ingester/persist/mod.rs (1)

101-103: Confirm slot invariants for queue events

You derive slot from transactions.iter().next().map(|tx| tx.slot).unwrap_or(0) and use it for all queue insert events and nullifier events. This is fine if each StateUpdate is guaranteed to contain only a single slot and at least one transaction; otherwise you may emit events with slot = 0 or an ambiguous slot.

Please double-check those invariants and, if they don’t always hold, consider:

  • Skipping queue events when transactions is empty, or
  • Deriving the slot from a more authoritative context (e.g., the Context used elsewhere).

Also applies to: 123-124, 186-190

Comment on lines +46 to +49
/// Port for the gRPC API server (optional, if not provided gRPC server won't start)
#[arg(long)]
grpc_port: Option<u16>,

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Avoid panic on shutdown if the gRPC task has already exited

The gRPC server task is spawned as:

let grpc_handle = if let Some(grpc_port) = args.grpc_port {
    info!("Starting gRPC server with port {}...", grpc_port);
    Some(tokio::spawn(async move {
        if let Err(e) =
            photon_indexer::grpc::server::run_grpc_server(db_conn.clone(), grpc_port).await
        {
            error!("gRPC server error: {}", e);
        }
    }))
} else {
    None
};

If run_grpc_server fails early (e.g., bind error on a busy port), the error is logged and the task returns (). Later on shutdown you do:

if let Some(grpc_handle) = grpc_handle {
    info!("Shutting down gRPC server...");
    grpc_handle.abort();
    grpc_handle
        .await
        .expect_err("gRPC server should have been aborted");
}

If the task already finished, abort() is a no-op and grpc_handle.await returns Ok(()), causing expect_err to panic during a normal Ctrl‑C shutdown.

A simple way to avoid this is to drop the assertion and just ignore the join result:

-            if let Some(grpc_handle) = grpc_handle {
-                info!("Shutting down gRPC server...");
-                grpc_handle.abort();
-                grpc_handle
-                    .await
-                    .expect_err("gRPC server should have been aborted");
-            }
+            if let Some(grpc_handle) = grpc_handle {
+                info!("Shutting down gRPC server...");
+                grpc_handle.abort();
+                let _ = grpc_handle.await;
+            }

This keeps shutdown behavior robust even if the gRPC server task terminates on its own before the signal handler runs.

Also applies to: 370-381, 397-403



---

🏁 Script executed:

```shell
#!/bin/bash
# Inspect the gRPC server setup and shutdown code
cat src/main.rs | head -60 | tail -20
echo "---"
cat src/main.rs | head -410 | tail -50

Length of output: 2403


Panic risk on shutdown when gRPC task exits early is confirmed

The concern is valid. If run_grpc_server fails during initialization (e.g., port binding error), the error is logged and the spawned task completes normally. During shutdown, abort() becomes a no-op on an already-completed task, and await returns Ok(()), causing expect_err to panic during normal Ctrl‑C shutdown.

The suggested fix is appropriate:

-            if let Some(grpc_handle) = grpc_handle {
-                info!("Shutting down gRPC server...");
-                grpc_handle.abort();
-                grpc_handle
-                    .await
-                    .expect_err("gRPC server should have been aborted");
-            }
+            if let Some(grpc_handle) = grpc_handle {
+                info!("Shutting down gRPC server...");
+                grpc_handle.abort();
+                let _ = grpc_handle.await;
+            }

Note: the same pattern affects indexer_handle and monitor_handle with identical expect_err assertions. Consider applying the same fix to those as well.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/main.rs around lines 46-49, the shutdown logic currently calls
handle.abort() and then uses expect_err on handle.await which will panic if the
task already completed (e.g., failed during init); change this to abort the
handle, then await it and match the result: if Ok(()) log/debug that the task
already finished, if Err(join_error) check join_error.is_cancelled() (treat as
expected cancellation), otherwise propagate or panic for unexpected join errors;
apply the same pattern to indexer_handle and monitor_handle to avoid panics on
normal Ctrl-C shutdown.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants