if (upgrade != nullptr) { const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
if (filter_chain_factory_.createUpgradeFilterChain(upgrade->value().getStringView(), upgrade_map, *this)) { filter_manager_callbacks_.upgradeFilterChainCreated(); returntrue; } else { upgrade_rejected = true; // Fall through to the default filter chain. The function calling this // will send a local reply indicating that the upgrade failed. } } // create here ! filter_chain_factory_.createFilterChain(*this); return !upgrade_rejected; }
boolLocalRateLimiterImpl::requestAllowedHelper(const TokenState& tokens)const{ // Relaxed consistency is used for all operations because we don't care about ordering, just the // final atomic correctness. uint32_t expected_tokens = tokens.tokens_.load(std::memory_order_relaxed); do { // expected_tokens is either initialized above or reloaded during the CAS failure below. if (expected_tokens == 0) { returnfalse; }
// Loop while the weak CAS fails trying to subtract 1 from expected. } while (!tokens.tokens_.compare_exchange_weak(expected_tokens, expected_tokens - 1, std::memory_order_relaxed));
// We successfully decremented the counter by 1. returntrue; }
判断如果 expected_tokens == 0 直接失败,其他通过 CAS 将期望的 Token - 1,返回成功。
classDispatcher : public DispatcherBase, public ScopeTracker { public: /** * Returns the name that identifies this dispatcher, such as "worker_2" or "main_thread". * @return const std::string& the name that identifies this dispatcher. */ virtualconst std::string& name() PURE;
/** * Creates a file event that will signal when a file is readable or writable. On UNIX systems this * can be used for any file like interface (files, sockets, etc.). * @param fd supplies the fd to watch. * @param cb supplies the callback to fire when the file is ready. * @param trigger specifies whether to edge or level trigger. * @param events supplies a logical OR of FileReadyType events that the file event should * initially listen on. */ virtual FileEventPtr createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger, uint32_t events) PURE;
/** * Allocates a timer. @see Timer for docs on how to use the timer. * @param cb supplies the callback to invoke when the timer fires. */ virtual Event::TimerPtr createTimer(TimerCb cb) PURE;
/** * Allocates a scaled timer. @see Timer for docs on how to use the timer. * @param timer_type the type of timer to create. * @param cb supplies the callback to invoke when the timer fires. */ virtual Event::TimerPtr createScaledTimer(Event::ScaledTimerType timer_type, TimerCb cb) PURE;
/** * Allocates a scaled timer. @see Timer for docs on how to use the timer. * @param minimum the rule for computing the minimum value of the timer. * @param cb supplies the callback to invoke when the timer fires. */ virtual Event::TimerPtr createScaledTimer(Event::ScaledTimerMinimum minimum, TimerCb cb) PURE;
/** * Allocates a schedulable callback. @see SchedulableCallback for docs on how to use the wrapped * callback. * @param cb supplies the callback to invoke when the SchedulableCallback is triggered on the * event loop. */ virtual Event::SchedulableCallbackPtr createSchedulableCallback(std::function<void()> cb) PURE;
/** * Register a watchdog for this dispatcher. The dispatcher is responsible for touching the * watchdog at least once per touch interval. Dispatcher implementations may choose to touch more * often to avoid spurious miss events when processing long callback queues. * @param min_touch_interval Touch interval for the watchdog. */ virtualvoidregisterWatchdog(const Server::WatchDogSharedPtr& watchdog, std::chrono::milliseconds min_touch_interval) PURE;
/** * Returns a time-source to use with this dispatcher. */ virtual TimeSource& timeSource() PURE;
/** * Initializes stats for this dispatcher. Note that this can't generally be done at construction * time, since the main and worker thread dispatchers are constructed before * ThreadLocalStoreImpl::initializeThreading. * @param scope the scope to contain the new per-dispatcher stats created here. * @param prefix the stats prefix to identify this dispatcher. If empty, the dispatcher will be * identified by its name. */ virtualvoidinitializeStats(Stats::Scope& scope, const absl::optional<std::string>& prefix = absl::nullopt) PURE;
/** * Clears any items in the deferred deletion queue. */ virtualvoidclearDeferredDeleteList() PURE;
/** * Wraps an already-accepted socket in an instance of Envoy's server Network::Connection. * @param socket supplies an open file descriptor and connection metadata to use for the * connection. Takes ownership of the socket. * @param transport_socket supplies a transport socket to be used by the connection. * @param stream_info info object for the server connection * @return Network::ConnectionPtr a server connection that is owned by the caller. */ virtual Network::ServerConnectionPtr createServerConnection(Network::ConnectionSocketPtr&& socket, Network::TransportSocketPtr&& transport_socket, StreamInfo::StreamInfo& stream_info) PURE;
/** * Creates an instance of Envoy's Network::ClientConnection. Does NOT initiate the connection; * the caller must then call connect() on the returned Network::ClientConnection. * @param address supplies the address to connect to. * @param source_address supplies an address to bind to or nullptr if no bind is necessary. * @param transport_socket supplies a transport socket to be used by the connection. * @param options the socket options to be set on the underlying socket before anything is sent * on the socket. * @return Network::ClientConnectionPtr a client connection that is owned by the caller. */ virtual Network::ClientConnectionPtr createClientConnection(Network::Address::InstanceConstSharedPtr address, Network::Address::InstanceConstSharedPtr source_address, Network::TransportSocketPtr&& transport_socket, const Network::ConnectionSocket::OptionsSharedPtr& options) PURE;
/** * @return Filesystem::WatcherPtr a filesystem watcher owned by the caller. */ virtual Filesystem::WatcherPtr createFilesystemWatcher() PURE;
/** * Creates a listener on a specific port. * @param socket supplies the socket to listen on. * @param cb supplies the callbacks to invoke for listener events. * @param runtime supplies the runtime for this server. * @param bind_to_port controls whether the listener binds to a transport port or not. * @param ignore_global_conn_limit controls whether the listener is limited by the global * connection limit. * @return Network::ListenerPtr a new listener that is owned by the caller. */ virtual Network::ListenerPtr createListener(Network::SocketSharedPtr&& socket, Network::TcpListenerCallbacks& cb, Runtime::Loader& runtime, bool bind_to_port, bool ignore_global_conn_limit) PURE;
/** * Creates a logical udp listener on a specific port. * @param socket supplies the socket to listen on. * @param cb supplies the udp listener callbacks to invoke for listener events. * @param config provides the UDP socket configuration. * @return Network::ListenerPtr a new listener that is owned by the caller. */ virtual Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr socket, Network::UdpListenerCallbacks& cb, const envoy::config::core::v3::UdpSocketConfig& config) PURE; /** * Submits an item for deferred delete. @see DeferredDeletable. */ virtualvoiddeferredDelete(DeferredDeletablePtr&& to_delete) PURE;
/** * Exits the event loop. */ virtualvoidexit() PURE;
/** * Listens for a signal event. Only a single dispatcher in the process can listen for signals. * If more than one dispatcher calls this routine in the process the behavior is undefined. * * @param signal_num supplies the signal to listen on. * @param cb supplies the callback to invoke when the signal fires. * @return SignalEventPtr a signal event that is owned by the caller. */ virtual SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) PURE;
/** * Post the deletable to this dispatcher. The deletable objects are guaranteed to be destroyed on * the dispatcher's thread before dispatcher destroy. This is safe cross thread. */ virtualvoiddeleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) PURE;
/** * Runs the event loop. This will not return until exit() is called either from within a callback * or from a different thread. * @param type specifies whether to run in blocking mode (run() will not return until exit() is * called) or non-blocking mode where only active events will be executed and then * run() will return. */ enum classRunType { Block, // Runs the event-loop until there are no pending events. NonBlock, // Checks for any pending events to activate, executes them, // then exits. Exits immediately if there are no pending or // active events. RunUntilExit // Runs the event-loop until loopExit() is called, blocking // until there are pending or active events. }; virtualvoidrun(RunType type) PURE;
/** * Returns a factory which connections may use for watermark buffer creation. * @return the watermark buffer factory for this dispatcher. */ virtual Buffer::WatermarkFactory& getWatermarkFactory() PURE;
/** * Updates approximate monotonic time to current value. */ virtualvoidupdateApproximateMonotonicTime() PURE;
/** * Shutdown the dispatcher by clear dispatcher thread deletable. */ virtualvoidshutdown() PURE; };
// If there is no more data to send and we saw trailers, we need to continue iteration to release // the trailers to further filters. if (buffer_.length() == 0 && saw_trailers_) { continue_cb_(); } }