chore: refactor concurrency model in the bgitable session protocol #13485
chore: refactor concurrency model in the bgitable session protocol #13485igorbernstein2 wants to merge 25 commits into
Conversation
Document lock ownership before later steps remove the lock. Add @GuardedBy to fields under the lock, move heartbeatInterval read into the synchronized block in startRpc(), and comment fields intentionally read outside the lock. No runtime change.
Replace ScheduledExecutorService with a BigtableTimer (Netty hashed wheel, will move in-tree later) for heartbeat, deadline, watchdog, AFE-prune, retry-create-session, and retry-delay. Owned by Client and shared across pools.
Replaces CancellableVRpc with a VOperation layer that sits above the VRpc chain rather than inside it. VOperationImpl owns the gRPC Context cancellation listener and constructs the per-op VRpcCallContext; downstream middleware just sees the chain.
VRpcCallContext.getExecutor() returns OpExecutor (thin wrapper with runningThread affinity tracking). VOperationImpl constructs the per-call SynchronizationContext + OpExecutor; RetryingVRpc drops its own SyncContext and dispatches via ctx.getExecutor(). The uncaught-handler safety net moves from RetryingVRpc up to VOperationImpl.
VRpcImpl.handle*() methods now dispatch listener callbacks via ctx.getExecutor(), with CAS STARTED->CLOSED in all three (handleError no longer proceeds from NEW) and decode moved into the executor task. RetryingVRpc.Active drops its own wrap since callbacks already arrive on the op executor. start() publishes ctx/listener only after winning the CAS so a racing duplicate can't corrupt the winner's fields. SessionPoolImpl's three direct listener.onClose paths also dispatch via ctx.getExecutor().
There was a problem hiding this comment.
Code Review
This pull request refactors the threading and execution model of the Bigtable client by introducing a per-operation serializing executor (OpExecutor), a unified operation wrapper (VOperationImpl), and a hashed-wheel timer (BigtableTimer/NettyWheelTimer) to handle scheduling tasks like heartbeats and deadlines without stalling Netty I/O threads. Additionally, SessionImpl and SessionPoolImpl are refactored to use SynchronizationContext instead of synchronized blocks. The review feedback highlights critical thread-safety and robustness issues, including potential visibility bugs from removing volatile on exposed session parameters, a potential NullPointerException in PendingVRpc.cancel when ctx is null, and the need to guard PendingVRpc.start against already-cancelled operations.
| private OpenParams openParams; | ||
|
|
||
| private volatile boolean openParamsUpdated; | ||
| private boolean openParamsUpdated; |
There was a problem hiding this comment.
Removing the volatile keyword from openParams and openParamsUpdated introduces a Java Memory Model (JMM) visibility and data-race issue. Even if there are no off-context production readers today, these fields are exposed via public getters (such as getOpenParams() and isOpenParamsUpdated()). Any external thread (like SessionPoolImpl or background threads) calling these getters directly without executing on the sessionSyncContext could read stale values or fail to see updates entirely. Keeping them volatile is crucial for thread safety and defensive programming.
| private OpenParams openParams; | |
| private volatile boolean openParamsUpdated; | |
| private boolean openParamsUpdated; | |
| private volatile OpenParams openParams; | |
| private volatile boolean openParamsUpdated; |
| private void cancel(Status status, boolean onlyCancelPendingCall) { | ||
| boolean delegateToRealCall = true; | ||
| synchronized (SessionPoolImpl.this) { | ||
| if (isCancelled) { | ||
| return; | ||
| } | ||
| pendingRpcs.remove(this); // eager removal; no-op if already drained | ||
| } | ||
| ctx.getExecutor().execute(() -> { |
There was a problem hiding this comment.
If cancel is called on a PendingVRpc before start() has been invoked, ctx will be null, leading to a NullPointerException when calling ctx.getExecutor(). To prevent this, we should check if ctx is null under the SessionPoolImpl.this monitor lock, set isCancelled = true, and return early. Note: While we use synchronized here to match the existing locking strategy of SessionPoolImpl, in performance-sensitive code, explicit locks are preferred over the synchronized keyword to protect shared state.
private void cancel(Status status, boolean onlyCancelPendingCall) {
synchronized (SessionPoolImpl.this) {
pendingRpcs.remove(this); // eager removal; no-op if already drained
if (ctx == null) {
isCancelled = true;
return;
}
}
ctx.getExecutor().execute(() -> {References
- In performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.
| synchronized (SessionPoolImpl.this) { | ||
| if (SessionPoolImpl.this.poolState != PoolState.STARTED) { | ||
| listener.onClose( | ||
| VRpcResult result = | ||
| VRpcResult.createUncommitedError( | ||
| Status.UNAVAILABLE.withCause( | ||
| new IllegalStateException("SessionPool is closed")))); | ||
| new IllegalStateException("SessionPool is closed"))); | ||
| ctx.getExecutor().execute(() -> listener.onClose(result)); | ||
| return; | ||
| } |
There was a problem hiding this comment.
To complement the pre-start cancellation guard, PendingVRpc.start should check if isCancelled is already true when called, and if so, fast-fail immediately instead of queueing the RPC in pendingRpcs. Note: While we use synchronized here to match the existing locking strategy of SessionPoolImpl, in performance-sensitive code, explicit locks are preferred over the synchronized keyword to protect shared state.
| synchronized (SessionPoolImpl.this) { | |
| if (SessionPoolImpl.this.poolState != PoolState.STARTED) { | |
| listener.onClose( | |
| VRpcResult result = | |
| VRpcResult.createUncommitedError( | |
| Status.UNAVAILABLE.withCause( | |
| new IllegalStateException("SessionPool is closed")))); | |
| new IllegalStateException("SessionPool is closed"))); | |
| ctx.getExecutor().execute(() -> listener.onClose(result)); | |
| return; | |
| } | |
| synchronized (SessionPoolImpl.this) { | |
| if (SessionPoolImpl.this.poolState != PoolState.STARTED) { | |
| VRpcResult result = | |
| VRpcResult.createUncommitedError( | |
| Status.UNAVAILABLE.withCause( | |
| new IllegalStateException("SessionPool is closed"))); | |
| ctx.getExecutor().execute(() -> listener.onClose(result)); | |
| return; | |
| } | |
| if (isCancelled) { | |
| VRpcResult result = VRpcResult.createRejectedError(Status.CANCELLED); | |
| ctx.getExecutor().execute(() -> listener.onClose(result)); | |
| return; | |
| } |
References
- In performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.
The pool-wide heartbeat monitor on main only inspected sessions in inUseSessions (i.e. with an active vRPC). When heartbeat scheduling moved into SessionImpl, the per-session timer was instead armed once on ready and self-rescheduled indefinitely. The "no active vRPC" case was reduced to a sentinel (nextHeartbeat = now + FUTURE_TIME), which made checkHeartbeat no-op but kept the wheel ticking. It also left a latent force-close on idle sessions: handleHeartBeatResponse unconditionally resets nextHeartbeat to now + heartbeatInterval, so an idle session that received a server heartbeat became eligible for force-close on the next missed beat. Tie the timer to the vRPC lifecycle to match main's semantic: - handleOpenSessionResponse no longer schedules the tick. - startRpc arms the tick after setting nextHeartbeat. - handleVRpcResponse / handleVRpcErrorResponse cancel the tick. - updateState (already cancels on transition past READY). Add SessionImplTest coverage with a counting BigtableTimer wrapper that asserts (a) no schedule before any vRPC and (b) exactly one schedule/cancel per vRPC lifecycle.
SessionImpl gains sessionSyncContext that serializes stream callbacks. onMessage/onClose dispatch onto it, and the per-session heartbeat tick trampolines through it. synchronized(lock) blocks remain inside the handlers — the two coexist for now. Affinity asserts added at boundary methods and every handle*.
SessionImpl.startRpc and cancelRpc now submit to sessionSyncContext rather than running synchronously on the caller. VRpcSessionApi.startRpc is void — errors flow through rpc.handleError() onto ctx.getExecutor(). VRpcImpl drops its synchronous post-startRpc error branch.
All session state mutations now run on sessionSyncContext, so the per-session Object lock is no longer needed. Public methods (start, close, forceClose, startRpc, cancelRpc) submit onto sessionSyncContext; nextRpcId becomes AtomicLong for the cross-thread newCall() caller. handleVRpcResponse and handleVRpcErrorResponse drop the localCancel/localRpc capture-and-recheck dance — sessionSyncContext serializes them now. Stale lock-era comments on the fields are replaced with a sessionSyncContext-ownership note. SessionImplTest.testHeartbeat polls for the now-async nextHeartbeat update.
Split terminal close into notifyTerminalClose (per-target try/catch fan-out) and abortFromUncaughtException (global handler). Uncaught syncContext exceptions always set closeReason to ERROR — the prior reason is folded into the description so tracer/metrics correctly attribute aborts. notifyTerminalClose synthesizes a fallback closeReason if missing: every caller sets it today (forceClose, startGracefulClose, dispatchStreamClosed, abortFromUncaughtException), but a future writer who forgets would NPE inside the fan-out — and the throw escapes to the syncContext uncaught handler, which early-returns on the already-CLOSED state and silently skips the remaining cleanup. The synthesizer mirrors startGracefulClose: log a warning with an IllegalStateException for stack-trace observability, then build a fallback CloseSessionRequest so the rest of the fan-out runs. Adds three regression tests (listener.onReady throws, onClose throws, both throw).
Client (and ShimImpl) own a dedicated bigtable-callback-%d cached pool, plumbed through *Async / TableBase. A blocked user callback can no longer starve heartbeats, retry delays, or pool bookkeeping (all of which run on backgroundExecutor). The op-level SerializingExecutor in a later commit will dispatch onto this pool.
VOperationImpl now constructs OpExecutor over a per-call SequentialExecutor on the shared userCallbackExecutor, replacing the per-call SynchronizationContext. OpExecutor gains an UncaughtExceptionHandler ctor arg — the safety net that the removed RetryingVRpc-owned SyncContext provided. The 3-arg VRpcCallContext.create defaults to a no-op handler for tests; production callers go through VOperationImpl.
CallOptions.withExecutor(MoreExecutors.directExecutor()) on the session stream so Netty I/O threads deliver SessionStream.Listener callbacks directly; sessionSyncContext immediately trampolines off them.
Tests that call Future.get() on vRpc chains can hang indefinitely if an exception breaks the callback dispatch chain and orphans the future (see ISSUE-006). A class-level JUnit 5 @timeout(30) converts a silent hang into a clear test failure within a bounded time. Applied to: RetryingVRpcTest, VRpcTracerTest, ClientTest, TableBaseTest, SessionImplTest, SessionPoolImplTest.
PendingVRpc.cancel and drainTo move isCancelled/realCall onto ctx.getExecutor(); the pool lock now covers only queue / poolState / session list. close() switches to cancelWithResult to honor the new op-executor contract. NOOP_CALL sentinel removed. PendingVRpc.start arms the deadline monitor only AFTER committing to the queue (inside the synchronized block). Previously the timer was scheduled before the pool-state check, so the closed-pool fast-fail path returned without cancelling it — the timer then fired later and called listener.onClose a second time with DEADLINE_EXCEEDED. RetryingVRpc.Active suppressed the duplicate at the user-facing layer, but tracer.onAttemptFinish still ran twice and corrupted per-attempt metrics. Adds SessionPoolImplTest#pendingVRpcOnClosedPoolDoesNotLeakDeadlineMonitor.
VOperationImpl captures opExecutor in start() and trampolines start/cancel via it. RetryingVRpc.start runs synchronously on the op-executor task RetryingVRpc.cancel no longer wraps in execute. Tracer.onOperationStart reordered before started=true (a throwing tracer short-circuits to direct listener.onClose). listener.onMessage failures classify as USER_FAILURE. CleanupListener tracks a closed flag to prevent gRPC-context listener leaks on synchronous chain close.
…en Client.close OpExecutor switches from a SequentialExecutor backing to an internal ArrayDeque + drain loop, and gains runInline() — runs the task synchronously on the caller thread when the executor is idle, otherwise queues it. VOperationImpl uses runInline for chain.start so the start dispatch skips the queue+drain round-trip. Drain rejections (e.g. during shutdown) reset drainScheduled so subsequent submissions can retry. Client.close drains userCallbackExecutor first via a 5s-bounded shutdownAndAwait so pool.close's cancelWithResult onClose notifications complete before the executor is torn down. Resource.close becomes idempotent. Drive-by cleanups: replace fully-qualified type names with imports across touched files, and swap Guava Objects.hashCode(id) for Long.hashCode(id) in ChannelPoolDpImpl.AfeId to avoid per-call boxing and array allocation.
…lient.close
Client.close shut down userCallbackExecutor before draining the SessionPools
that depend on it, so late listener.onClose tasks from in-flight RPCs
arrived after backing was dead and got RejectedExecutionException — silently
stranding the user's terminal callbacks. The earlier fix sprinkled
inline-drain fallbacks inside OpExecutor; restructure shutdown instead so
the race can't happen.
SessionPool gains awaitTerminated(Duration), backed by a CompletableFuture
SessionPoolImpl completes from onSessionClose once the pool is CLOSED and
the last session has drained. close() no longer kills the watchdog —
awaitTerminated takes ownership of that, so the watchdog stays alive during
shutdown and can escalate any session stuck in WAIT_SERVER_CLOSE longer
than its tick interval (5 min) via forceClose.
Client.close becomes three explicit phases: (1) initiate graceful close on
each pool, (2) awaitTerminated on each with a 6-minute per-pool budget
(one full watchdog tick plus buffer), (3) tear down userCallbackExecutor /
channelPool / timers in the existing order, now safely because all
listener.onClose tasks are queued or drained before backing dies.
VOperationImpl.start queues grpcContext.addListener through the op executor
via runInline. Without this, an async-queued onClose from chain.start
(PendingVRpc pool-closed fast-fail, VRpcImpl deadline-exceeded short-circuit)
could drain between the CleanupListener.closed read and addListener:
CleanupListener.onClose would call removeListener as a no-op pre-registration,
and the caller would then register a listener with nothing to remove it.
The leak is per-RPC and permanent until grpcContext cancels — for long-lived
application contexts it accumulates indefinitely. FIFO ordering through the
op executor makes the closed-check sound: any onClose chain.start enqueued
drains first, so the check is accurate by the time we evaluate it.
Add a `closed` AtomicBoolean + checkNotClosed() guard on the three openers
so concurrent opens during shutdown can't create pools the close path
won't see. close() is now idempotent via CAS on that flag.
Tests:
- ClientTest#openAfterCloseThrows / closeIsIdempotent
- SessionPoolImplTest awaitTerminated* coverage
- VOperationImplTest covering async onClose / context cancel ordering
- SessionPoolImplTest tearDown now calls awaitTerminated so the watchdog
is closed before testTimer.stop races its self-reschedule
- FakeSessionPool in TableBaseTest gains a no-op awaitTerminated stub
Drive-by: remove the spurious @nested annotation from SessionPoolImplTest's
top-level class. @nested is meaningful only on non-static inner classes;
on the outer class it caused Surefire to mis-attribute test counts.
…close Scheduled RetryingVRpcs hold no session reference, so a long-delay retry (server-driven RetryInfo.retryDelay) outlives Phase 2 drain and is silently discarded when sessionTimer.stop() runs in Phase 3. The user's listener never fires. Add an onStop hook primitive to BigtableTimer. Scheduled.onStart registers a hook on entry and unregisters on every exit path (normal fire, cancel, hook fire). NettyWheelTimer.stop() runs every hook synchronously before discarding pending wheel timeouts; hooks trampoline back through the op executor to drive Scheduled to a CANCELLED Done. Reorder Client.close Phase 3 so sessionTimer.stop() runs before userCallbackExecutor.close(), giving the hook-fired onClose tasks a live op-executor backing to land on. Also replace Scheduled.onStart's dead RejectedExecutionException catch with IllegalStateException, matching BigtableTimer.stop()'s documented post-condition.
openTableAsync / openAuthorizedViewAsync / openMaterializedViewAsync read 'closed' lock-free, then constructed the pool, then inserted it into sessionPools — close() could CAS closed=true and snapshot sessionPools in between, leaving the new pool orphaned: never closed, its callbacks landing on shut-down executors. Hold sessionPools' monitor across the closed check, the construction, and the insert. Opens are infrequent (typically once per table at app startup) so the monitor cost is negligible. Move close()'s closed flip inside the same monitor too. With every access now under the lock, downgrade 'closed' from AtomicBoolean to a plain boolean — the CAS provided no value over a plain read+write under the lock.
ShimImpl registered its userCallbackExecutor with a bare shutdown() closer, while Client.create's path uses shutdownAndAwait (which gives in-flight listener.onClose tasks a 5-second drain window before shutdownNow). On ShimImpl close, queued callbacks were abandoned mid-flight — fine for quiescent shutdowns but a regression for fast-close patterns (test boundaries, dynamic config reloads) where in-flight callbacks have not yet drained. Promote Client.shutdownAndAwait from private-static to public-static so ShimImpl (different package) can reuse the same shutdown semantics, and update ShimImpl to call it.
ReadRowShim and MutateRowShim cache per-target handles in a Guava LoadingCache whose loader calls Client.openTableAsync (and friends). After the Client-close hardening, that loader throws IllegalStateException post-close — and getUnchecked wraps it in UncheckedExecutionException, so callers see an unchecked exception thrown out of readRow / mutateRow instead of a failed CompletableFuture as the surface contract promises. Introduce SessionPoolMap, a small wrapper over the existing Util.createSessionMap cache that owns the conversion: get() unwraps UncheckedExecutionException to surface the original cause, and apply() converts a loader throw into a failed CompletableFuture for the async call paths. Replace the inline LoadingCache fields in both shim ops files. Covered by a new focused unit test.
… Active State machine now has an onExit hook called from onStateChange before the swap. States that hold cleanup-worthy resources override it. Active owns its own tracer-pairing flag + finishAttempt helper (no longer a RetryingVRpc-level bool). Listener paths and onCancel call finishAttempt with the right result; onExit is a safety net that guarantees the pairing is balanced even if a future exit path forgets. Fixes the attempt.start synchronous-throw leak. Also resolves two latent tracer hazards: - tracer.onAttemptFinish now fires when an in-flight attempt is cancelled (previously the late server onClose was dropped by the stale-state guard, so the cancelled attempt's tracer span leaked). - The listener-path tracer.onAttemptFinish is gated on the stale-state check first, matching onMessage above — a discarded onClose can no longer double-fire the tracer. Scheduled cleanup (timer + stop-hook unregister) consolidates from three parallel sites (onCancel, timer-fire body, stop-hook body) into one onExit. onCancel drops to the default no-op. Done.onStart no longer balances per-attempt tracer state — that lives on Active now.
VOperationImpl.start needs to detect whether chain.start synchronously delivered a terminal onClose, so it can skip registering the gRPC cancellation listener (which would otherwise leak onto grpcContext). Previously this was tracked via a 'closed' flag on CleanupListener — a piggyback bookkeeping field on the listener wrapper that exists only for VOperationImpl's coordination. Add isDone() to the VRpc interface and ask the chain directly. The chain is the natural source of truth for its own terminal state. CleanupListener shrinks back to its single concern: relay events and unhook the gRPC cancellation listener on close. Implementations: RetryingVRpc delegates to currentState.isDone(); VRpcImpl reports state==CLOSED; ForwardingVRpc forwards; PendingVRpc defers to realCall once handed off, otherwise reports isCancelled. Test fakes (DelayedVRpc, FakeVRpc, anonymous VOperationImplTest chains) implement the new method. Drive-by: drop the defensive handling of tracer.onOperationStart throws from RetryingVRpc.start, and the symmetric `!started` early-return in Done.onStart that paired with it. CompositeVRpcTracer catches throws from every child tracer, so the only way tracer.onOperationStart reaches RetryingVRpc with a real throw is a test that bypasses Composite. Dead code in production; relying on the existing chain.cancel cascade is simpler than maintaining a separate short-circuit path. Also: the already-started error in RetryingVRpc.start now dispatches listener.onClose through ctx.getExecutor() rather than invoking it synchronously on the caller, matching the dispatch convention used everywhere else in the chain.
Inline doc: clarify why OpExecutor is not SynchronizationContext. They look superficially similar (FIFO + drain + affinity assertion + uncaught handler) but have opposite drain-thread policies. SynchronizationContext drains on whichever thread first calls execute() while idle — appropriate for state serialization on threads that should do that work anyway. OpExecutor always hands off to the backing user-callback pool so chain callbacks never run on transport / session-sync / timer-dispatch threads. TODO entries for deferred review findings: - closeReason synthesizer triplication in SessionImpl - drainedFuture completed from two unrelated sites - shutdownAndAwait as public-static helper in the wrong place - RetryingVRpc relies on unenforced op-executor affinity (more important now that chain.isDone is externally observable) - per-op tracking for graceful shutdown of pending Scheduled retries (explicitly chose cancel-on-close; this is the path if requirements change) - SessionImpl.checkHeartbeat re-arms a 100ms wheel tick unconditionally; for idle sessions this is 10 wakeups/sec/session of noise Each entry has file pointer, symptom, fix sketch, and risk.
5e619a4 to
fe812c6
Compare
No description provided.