

the infrastructure shared by mastermind and figurehead



the common message bus

(ns core.bus
  (:require (core [state :as state]))
  (:require [clojure.core.async
             :as async
             :refer [chan
                     buffer sliding-buffer
                     pub sub unsub
                     mult tap untap
                     >!! <!!]]))
(declare sub-topic unsub-topic get-subscribers
         register-listener unregister-listener get-listeners
         get-message-topic remove-message-topic build-message
         say say!! well-saying?
         what-is-said what-is-said!!)

the defaults

(def defaults
    :bus-main-buffer-size 10000
    :bus-pub-buffer-size 10000
    :pub-buf-fn (fn [topic]
                  (buffer (:bus-pub-buffer-size
    :get-message-topic (fn [message]
                          (map? message) (:topic message)
                          (sequential? message) (first message)
                          :else message))
    :remove-message-topic (fn [message]
                             (map? message) (:content message)
                             (sequential? message) (rest message)
                             :else nil))
    :build-message (fn [topic content]
                     {:topic topic
                      :content content})
    :well-saying? (fn [said]
                    (and said
                         (map? said)
                         (:topic said)
                         (:content said)))}))

subscribe the ch(an) to the topic on bus-pub, with close? as in clojure.core.async/sub

unsubscribe the ch(an) from the topic on bus-pub

get subscribers

register as a listener of all messages over bus-chan

undo register-listener

get listeners

get all said topics

get the topic of the message

remove the topic from the message

build message from topic and content

say content with the topic

say content with the topic (>!! as chan-op)

Is said a well saying?

get what is from the sub-ch

get what is said on sub-ch (<!!

(let [bus-chan (chan (:bus-main-buffer-size @defaults))
      bus-chan-mult (mult bus-chan)
      bus-chan-1 (chan)
      _1 (tap bus-chan-mult bus-chan-1)
      bus-pub (pub bus-chan-1
                   (:get-message-topic @defaults)
                   (:pub-buf-fn @defaults))
      bus-chan-2 (chan)
      _2 (tap bus-chan-mult bus-chan-2)
      listener-pub (pub bus-chan-2
                        (constantly true)
                        (:pub-buf-fn @defaults))
      topics (atom #{})
      subscribers (atom {})
      listeners (atom #{})]
  (defn sub-topic
    ([ch topic] (sub-topic ch topic true))
    ([ch topic close?]
       (swap! subscribers update-in [topic]
              (comp set conj) ch)
       (sub bus-pub topic ch close?)))
  (defn unsub-topic
    [ch topic]
    (swap! subscribers update-in [topic]
           disj ch)
    (unsub bus-pub topic ch))
  (defn get-subscribers
  (defn register-listener
    ([ch] (register-listener ch true))
    ([ch close?]
       (swap! listeners
              (comp set conj) ch)
       (sub listener-pub true ch close?)))
  (defn unregister-listener
    (swap! listeners
           disj ch)
    (unsub listener-pub true ch))
  (defn get-listeners
  (defn get-topics
  (defn get-message-topic
    ((:get-message-topic @defaults) message))
  (defn remove-message-topic
    ((:remove-message-topic @defaults) message))
  (defn build-message
    [topic content]
    ((:build-message @defaults) topic content))
  (defn say
    ([topic content chan-op] (say topic content false chan-op))
    ([topic content verbose? chan-op]
       (swap! topics conj topic)
       (let [message (build-message topic content)]
         (chan-op bus-chan message)
         (when verbose?
           (prn {:message message
                 :topics (get-topics)
                 :subscribers (get-subscribers)
                 :listeners (get-listeners)})))))
  ;; say! is omitted because >! may not work with Clojure on Android for SDK 18
  ;; !!! use say! within go block may deadlock
  ;; (defn say!
  ;;   "say content with the topic (>! as chan-op)"
  ;;   ([topic content] (say! topic content false))    
  ;;   ([topic content verbose?]
  ;;      (let [chan-op (fn [ch val]
  ;;                      (go >! ch val))]
  ;;        (say topic content verbose? chan-op))))
  (defn say!!
    ([topic content] (say!! topic content false))
    ([topic content verbose?]
       (let [chan-op (fn [ch val]
                       (>!! ch val))]
         (say topic content verbose? chan-op))))
  (defn well-saying?
    ((:well-saying? defaults) said))
  (defn what-is-said
    ([sub-ch chan-op] (what-is-said sub-ch false chan-op))
    ([sub-ch verbose? chan-op]
       (let [said (chan-op sub-ch)]
         (when verbose?
           (prn [:said said]))
         (remove-message-topic said))))
  (defn what-is-said!!
    ([sub-ch] (what-is-said!! sub-ch false))
    ([sub-ch verbose?]
       (let [chan-op (fn [ch]
                       (<!! ch))]
         (what-is-said sub-ch verbose? chan-op)))))

initialization procedures

(ns core.init
   (core [state :as state]
         [plugin :as plugin])))
(declare add-to-parse-opts-vector get-parse-opts-vector

the vector to feed into

(def ^:private parse-opts-vector
  (atom [
         ["-h" "--help" "show help"]         
         ["-v" "--verbose" "be verbose and show debug info"]
         ["-B" "--batch" "batch mode: no block after loading plugins"]

lines are added into the parse-opts vector

(defn add-to-parse-opts-vector
  (swap! parse-opts-vector into lines))

return the parse-opts vector

(defn get-parse-opts-vector

help parse the parse-opts vector

(def parse-opts-vector-helper
  (atom {
          :inet-port (comp #(when (and (> % 0)
                                       (< % 65536))
                           #(Integer/parseInt %))
          :file #( %)
(defn set-default-plugins
  [& plugins]
  (swap! parse-opts-vector
         (let [option :plugin]
            (str "--"
                 (name option)
                 " [PLUGIN]")
            "[m] plugin to load"
            :default plugins
            :parse-fn symbol
            :assoc-fn (fn [m k v]
                        (update-in m [k]
                                   (comp vec conj) v))])))

set default plugin namespaces to be required/loaded

(defmacro require-and-set-default-plugins
  [& plugins]
  ;; compile-time require will put the plugin on classpath
  (doseq [plugin plugins]
    (require (plugin/get-plugin-main-entry plugin)))
  `(set-default-plugins ~@(map (fn [plugin] `'~plugin) plugins)))

main procedures

(ns core.main
  (:require (core [init :as init]
                  [plugin :as plugin]
                  [state :as state]))
  (:require [ :refer [parse-opts]]
            [clojure.stacktrace :refer [print-stack-trace]]
            [clojure.core.async :as async :refer [chan <!!]])
  (import (java.util UUID)))

the main entry

(defn main
  [& args]
    (state/add-state :instance-id
                     (-> (UUID/randomUUID) str keyword))
    ;; load the plugins and populate the parse-opts vector
    (let [{:keys [options arguments errors summary]}
          (parse-opts args (init/get-parse-opts-vector))
          verbose (:verbose options)]
      (let [plugins (:plugin options)]
        (when verbose
          (prn (list :load-plugins
        (doseq [plugin plugins]
          (plugin/load-plugin plugin))
        (doseq [plugin plugins]
          (when verbose
            (prn (list :populate-parse-opts-vector
          (plugin/populate-parse-opts-vector plugin
    ;; parse-opts again; initilize and run the plugins
    (let [{:keys [options arguments errors summary]}
          (parse-opts args (init/get-parse-opts-vector))
          verbose (:verbose options)]
      (reset! plugin/current-options options)
       ;; ask for help
       (:help options)
         (println summary))
       (let [plugins (plugin/list-all-plugins-by-priority)]
         (doseq [plugin plugins]
           (plugin/init-and-run-plugin plugin options))
         (when-not (:batch options)
           ;; block the main Thread 
           (when verbose
             (prn {:batch (:batch options)}))
           (<!! (chan))))))
    (catch Throwable e
      (print-stack-trace e))

the plugin infrastructure

(ns core.plugin
  (:require (core [bus :as bus]))
  (:require [clojure.stacktrace :refer [print-stack-trace]]
             :as async
             :refer [thread <!! chan timeout]]))
(declare list-all-plugins list-all-plugins-by-priority
         get-plugin get-plugin-main-entry
         get-config-map get-config-map-entry update-config-map-entry set-config-map-entry
         get-param get-param-entry update-param-entry set-param-entry
         get-state get-state-entry update-state-entry set-state-entry
         load-plugin unload-plugin
         init-plugin run-plugin stop-plugin init-and-run-plugin load-init-and-run-plugin restart-plugin
         block-thread unblock-thread
         register-exit-hook unregister-exit-hook execute-all-exit-hooks)
(def ^:dynamic *current-plugin*
  "bound to current plugin by the context")

the defaults

(def defaults
  (atom {
         :auto-restart-retry-interval 1000

all loaded plugins and their config map

(def plugins
  (atom {}))

current options

(def current-options
  (atom nil))

exit hooks that are executed on main-thread exit

(def exit-hooks
  (atom {}))

list all plugins

(defn list-all-plugins
  (keys @plugins))

list all plugins from highest (larger number) to lowest priority

the convention is that 'I do not care'-priority is 1, and 'absolute first'-priority is 99; 0 and 100 are reserved for core

(defn list-all-plugins-by-priority
  (->> @plugins
       (sort-by #(or (get-in (second %)
                             [:param :priority])
                     ;; assign low priority if missing :priority spec
       (map first)

list all non-stop plugins from highest to lowest priority

(defn list-all-nonstop-plugins-by-priority
  (filter #(not (get-state-entry % :stop))

get the named plugin

(defn get-plugin
  ([] (get-plugin *current-plugin*))
     (keyword plugin)))

get the main entry to the plugin

(defn get-plugin-main-entry
  ([] (get-plugin-main-entry *current-plugin*))
     (symbol (str plugin ".main"))))


get the config map

(defn get-config-map 
  ([] (get-config-map *current-plugin*))
     (get @plugins (get-plugin plugin))))

get a config entry

(defn get-config-map-entry 
  ([key] (get-config-map-entry *current-plugin* key))
  ([plugin key]
     (get (get-config-map plugin) key)))

update a config entry to (f & args)

(defn update-config-map-entry
  ([plugin key f & args]
     (apply swap! plugins update-in 
            [(get-plugin plugin) key]
            f args)))

set a config entry to val

(defn set-config-map-entry
  ([key val] (set-config-map-entry *current-plugin* key val))
  ([plugin key val]
     (update-config-map-entry plugin key (constantly val))))

get the params


(defn get-param
  ([] (get-param *current-plugin*))
     (get-config-map-entry plugin :param)))

get a param entry

(defn get-param-entry
  ([] (get-param-entry *current-plugin*))
  ([plugin key]
     (get (get-param plugin) key)))

update a param entry to (f & args)

(defn update-param-entry
  ([plugin key f & args]
     (apply swap! plugins update-in
            [(get-plugin plugin) :param key]
            f args)))

set a parameter entry to val

(defn set-param-entry
  ([key val] (set-param-entry *current-plugin* key val))
  ([plugin key val]
     (update-param-entry plugin key (constantly val))))

get the states


(defn get-state
  ([] (get-state *current-plugin*))
     (get-config-map-entry plugin :state)))

get a state entry

(defn get-state-entry
  ([key] (get-state-entry *current-plugin* key))
  ([plugin key]
     (get (get-state plugin) key)))

update a state entry to (f & args)

(defn update-state-entry
  ([plugin key f & args]
     (apply swap! plugins update-in
            [(get-plugin plugin) :state key]
            f args)))

set a state entry to val

(defn set-state-entry
  ([key val] (set-state-entry *current-plugin* key val))
  ([plugin key val]
     (update-state-entry plugin key (constantly val))))

load/unload plugin

load a plugin

(defn load-plugin
  (binding [*current-plugin* plugin]
    (let [plugin-main-entry (get-plugin-main-entry plugin)]
      (require plugin-main-entry)
      (swap! plugins
             (get-plugin plugin)
             @(ns-resolve plugin-main-entry

unload the plugins

(defn unload-plugin
  (binding [*current-plugin* plugin]
    (when-let [unload (get-config-map-entry plugin :unload)]
    (swap! plugins
           (get-plugin plugin))))

populate parse-opts vector for the plugin

populate parse-opts vector

(defn populate-parse-opts-vector 
  [plugin current-parse-opts-vector]
  (binding [*current-plugin* plugin]
    (when-let [populate-parse-opts-vector (get-config-map-entry plugin :populate-parse-opts-vector)]
      (populate-parse-opts-vector current-parse-opts-vector))))

initialize the plugin with the options; return false to abort running the plugin

init/run plugin

(defn init-plugin 
  ([plugin] (init-plugin plugin @current-options))
  ([plugin options]
     (binding [*current-plugin* plugin]
       (let [verbose (:verbose options)]
         (when verbose
           (prn [:init-plugin plugin options]))
         (let [result (if-let [init (get-config-map-entry plugin :init)]
                        (init options)
           (when verbose
             (prn [:init-plugin plugin :result result]))

run the plugin with the options in a separte thread

(defn run-plugin 
  ([plugin] (run-plugin plugin @current-options))
  ([plugin options]
     (binding [*current-plugin* plugin]
       (let [verbose (:verbose options)]
         (when-let [run (get-config-map-entry plugin :run)]
           (when verbose
             (prn [:run-plugin plugin options]))
           (set-state-entry plugin :stop false)
           (if (get-param-entry plugin :sync)
             (when-not (get-state-entry plugin :stop)
                 (run options)
                 (catch Exception e
                   (when verbose
                     (print-stack-trace e)))))
               ;; only :async plugin can auto-restart
               (loop []
                   (run options)
                   (catch Exception e
                     (when verbose
                       (print-stack-trace e))))
                 (when (and
                        ;; auto-restart has been requested and...
                        (get-param-entry plugin :auto-restart)
                        ;; plugin has NOT been explicitly stopped
                        (not (get-state-entry plugin :stop)))
                   (let [auto-restart-retry-interval (:auto-restart-retry-interval @defaults)]
                     (when verbose
                       (prn [:plugin plugin
                             :retry-interval auto-restart-retry-interval]))
                     (Thread/sleep auto-restart-retry-interval)

stop the plugin

(defn stop-plugin
  ([plugin] (stop-plugin plugin @current-options))
  ([plugin options]
     (let [verbose (:verbose options)
           stop (get-config-map-entry plugin :stop)]
       (when stop
         (when verbose
           (prn [:stop-plugin plugin options]))
         (stop options)))))

init the plugin and, if successful, run it

(defn init-and-run-plugin
  ([plugin] (init-and-run-plugin plugin @current-options))
  ([plugin options]
     (let [verbose (:verbose options)]
       (when (init-plugin plugin options)
         (run-plugin plugin options)
         (when-let [wait (get-param-entry plugin :wait)]
           (when verbose
             (prn [:wait wait]))
           (<!! (timeout wait)))))))

load-plugin + init-and-run-plugin; mainly for dynamic loading

(defn load-init-and-run-plugin
  [plugin options]
  (let [verbose (:verbose options)]
    (load-plugin plugin)
    (init-and-run-plugin plugin options)))

stop-plugin + init-and-run-plugin

(defn restart-plugin
  ([plugin] (restart-plugin plugin @current-options))
  ([plugin options]
     (stop-plugin plugin options)
     (init-and-run-plugin plugin options)))

block the plugin thread so will not keep restarting the plugin; optional with timeout and topic (subscribed to :unblock-thread

(defn block-thread
  ([] (block-thread nil nil))
      (number? timeout-or-tag)
      (block-thread timeout-or-tag nil)
      (block-thread nil timeout-or-tag)))
  ([timeout tag]
     (let [ch (if (number? timeout)
                (async/timeout timeout)
         (when tag
           (bus/sub-topic ch :unblock-thread))
         (loop [said (bus/what-is-said!! ch)]
           (when (and said tag (not= tag said))
             ;; only recur when <!! return from :unblock-thread (and val tag) and not having a matching tag
             (recur (bus/what-is-said!! ch))))
           (when tag
             (bus/unsub-topic ch :nnblock-thread)))))))

unblock thread with the given unblock-tag

(defn unblock-thread
  (bus/say!! :unblock-thread unblock-tag))

jail body by blocking

(defmacro blocking-jail
  [[timeout unblock-tag finalization verbose] & body]
  `(let [timeout# ~timeout
         unblock-tag# ~unblock-tag
         verbose# ~verbose]
       (block-thread timeout# unblock-tag#)
       (catch Exception e#
         (when verbose#
           (clojure.stacktrace/print-stack-trace e#))
         (throw e#))
         (when verbose#
           (prn [:final :blocking-jail
                 :timeout timeout#
                 :unblock-tag unblock-tag#
                 :finalization '~finalization]))

jail body by looping

(defmacro looping-jail
  [[stop-condition finalization verbose] & body]
  `(let [verbose# ~verbose]
       (loop []
         (when-not ~stop-condition
       (catch Exception e#
         (when verbose#
           (clojure.stacktrace/print-stack-trace e#))
         (throw e#))
         (when verbose#
           (prn [:final *current-plugin* :looping-jail
                 :stop-condition '~stop-condition
                 :finalization '~finalization]))

register exit hook on the main thread

(defn register-exit-hook
  [key hook]
  (swap! exit-hooks assoc key hook))

undo register-exit-hook

(defn unregister-exit-hook
  (swap! exit-hooks dissoc key))

execute all registered exit hooks

(defn execute-all-exit-hooks
  (doseq [[_ hook] @exit-hooks]

listen for commands on bus and execute them

(ns core.plugin.command-executor.main
  (:require (core [init :as init]
                  [state :as state]
                  [bus :as bus]
                  [plugin :as plugin]))
  (:require [clojure.core.async :as async
             :refer [chan <!!]]))
(def defaults
    :stop-unblock-tag :stop-core.plugin.command-executor
(defn populate-parse-opts-vector
  (init/add-to-parse-opts-vector [
                                   "disable command executor"]
(defn init
  (let [no-command-executor (:no-command-executor options)]
    (when-not no-command-executor
(defn run
  (let [verbose (:verbose options)
        ch (chan)]
    (plugin/blocking-jail [
                           ;; timeout
                           ;; unblock-tag
                           (:stop-unblock-tag @defaults)
                           ;; finalization
                             (bus/unsub-topic ch :command))
                           ;; verbose
                          (bus/sub-topic ch :command)
                          ;; listen for model update
                          (loop [said (<!! ch)]
                            (let [topic (bus/get-message-topic said)
                                  content (bus/remove-message-topic said)]
                              (case topic
                                  (let [command (:command content)
                                        param (:param content)
                                        command-impl (state/get-command command)]
                                    (when command-impl
                                      (bus/say!! :response
                                                 {:command command
                                                  :result (command-impl param)}
                            (recur (<!! ch))))))
(defn stop
  (plugin/set-state-entry :core.plugin.command-executor
                          :stop true)
  (plugin/unblock-thread (:stop-unblock-tag @defaults)))

the config map

(def config-map
   :populate-parse-opts-vector populate-parse-opts-vector
   :init init
   :run run
   :stop stop
   :param {
           :priority 1
           :auto-restart true

echo bus messages

(ns core.plugin.echo.main
  (:require (core [init :as init]
                  [state :as state]
                  [bus :as bus]
                  [plugin :as plugin]))
  (:require [clojure.stacktrace :refer [print-stack-trace]]
             :as async
             :refer [chan
(def defaults
    :echo-buffer 100
(defn populate-parse-opts-vector
  (init/add-to-parse-opts-vector [
                                   "enable echo"]
(defn init
  (when (:echo options)

archetype of looping-jail

(defn run
  (let [verbose (:verbose options)
        ch (chan (:echo-buffer @defaults))]
    (bus/register-listener ch)
    (plugin/looping-jail [
                          ;; stop condition
                          (plugin/get-state-entry :stop)
                          ;; finalization
                            (bus/unregister-listener ch))
                          ;; verbose
                         (prn (<!! ch)))))

archetype of stopping looping-jail

(defn stop
  (plugin/set-state-entry :core.plugin.echo
                          :stop true))

the config map

(def config-map
  {:populate-parse-opts-vector populate-parse-opts-vector
   :init init
   :run run
   :stop stop
   :param {:priority 100                ; echo needs to be run first in order to capture all transcripts
           :auto-restart false

start nREPL server

(ns core.plugin.nrepl.main
  (:require (core [init :as init]
                  [state :as state]
                  [bus :as bus]
                  [plugin :as plugin]))
  (:require [ :as nrepl-server]
            [clojure.stacktrace :refer [print-stack-trace]]
            [cider.nrepl :refer [cider-nrepl-handler]]
             :as async
             :refer [<!! chan]]))
(def defaults
    :stop-unblock-tag :stop-core.plugin.nrepl
(defn populate-parse-opts-vector
  (init/add-to-parse-opts-vector [
                                  (let [option :nrepl-port]
                                     (str "--"
                                          (name option)
                                          " [PORT]")
                                     (str "nREPL port")
                                     :parse-fn (get-in @init/parse-opts-vector-helper
                                                       [:parse-fn :inet-port])])
(defn init
  (when (:nrepl-port options)

archetype of blocking-jail

(defn run
  (let [verbose (:verbose options)
        nrepl-port (:nrepl-port options)]
    (plugin/blocking-jail [
                           ;; timeout
                           ;; unblock-tag
                           (:stop-unblock-tag @defaults)
                           ;; finalization
                           (nrepl-server/stop-server (plugin/get-state-entry :nrepl-server))
                           ;; verbose
                          (binding [*ns* (create-ns 'user)]
                            (use 'clojure.repl)
                            (use 'clojure.pprint)
                            (use '
                            (require '(core [bus :as bus]
                                            [plugin :as plugin]
                                            [state :as state]))
                            (require '[ :as nrepl-server])
                            (plugin/set-state-entry :nrepl-server
                                                    ((resolve 'nrepl-server/start-server)
                                                     :port nrepl-port
                                                     :handler cider-nrepl-handler))))))

archetype of stopping blocking-jail

(defn stop
  (plugin/set-state-entry :core.plugin.nrepl
                          :stop true)
  (plugin/unblock-thread (:stop-unblock-tag @defaults)))

the config map

(def config-map
   :populate-parse-opts-vector populate-parse-opts-vector
   :init init
   :run run
   :stop stop
   :param {:priority 0
           :auto-restart true}})

global (rather than plugin) state

(ns core.state)
 ;; state management
 add-state get-state reset-state update-state remove-state list-states
 ;; command management
 register-command unregister-command
 get-command list-commands

global state

(def ^:dynamic *state*
  (atom {}))

add the state with the init

(defn add-state 
  [state init]
  (swap! *state* assoc state init))

get the state

(defn get-state 
  (get @*state* state))

reset the state to the value

(defn reset-state 
  [state value]
  (swap! *state* assoc state value))

update state to (f state & args)

(defn update-state 
  [state f & args]
  (apply swap! *state* update-in [state] f args))

remote the state

(defn remove-state 
  (swap! *state* dissoc state))

list all states

(defn list-states
  (keys @*state*))

special states

register command to command dispatcherundo register-commandget commandlist all commandsdefine and register the command

command dispatcher

(let [state-name :command-dispatch]
  (defn register-command
    [command command-impl]
    (update-state state-name
                  assoc command command-impl))
  (defn unregister-command
    (update-state state-name
                  dissoc command))
  (defn get-command
    (get (get-state state-name) command))
  (defn list-commands
    (keys (get-state state-name)))
  (defmacro defcommand
    [command & body]
       (defn ~command
       (register-command ~(keyword command) ~command))))