šŸ“Clojure: Manifold

šŸ“Clojure: Manifold

November 28, 2022

šŸ“ManifoldćØćÆ #

éžåŒęœŸćƒ—ćƒ­ć‚°ćƒ©ćƒŸćƒ³ć‚°ć®ćŸć‚ć®éƒØå“ć‚’ęä¾›.

2ć¤ć®ęŠ½č±”ćŒćƒć‚¤ćƒ³ćƒˆ.

  • deferreds
  • streams

deferredsćØćÆčžćę…£ć‚ŒćŖ恄恌, å»¶ęœŸ, å…ˆå»¶ć°ć—ć®ę„å‘³. é…å»¶č©•ä¾”ćØ恄恆恓ćØ恋ćŖ? streamćÆäø¦č”Œćƒ—ćƒ­ć‚°ćƒ©ćƒŸćƒ³ć‚°ć«ćŠć‘ć‚‹Streams恮ꦂåæµ.

Manifold恮Streamć‚’åˆ©ē”Øć—ć¦ä½œć‚‰ć‚ŒćŸé€šäæ”ćƒ©ć‚¤ćƒ–ćƒ©ćƒŖ恌aleph. alephć®ę–¹ć«ć‚‚Manifoldć«é–¢ć™ć‚‹ćƒ‰ć‚­ćƒ„ćƒ”ćƒ³ćƒˆćÆ恂悋.

Base Concepts #

deferred, ć¾ćŸćÆdeferred valuesćÆé…å»¶ć•ć‚ŒćŸå€¤ćØć„ć†ę„å‘³. č©•ä¾”ć•ć‚Œć¦ć„ćŖć„å€¤. «… »ćØ恄恆悈恆ćŖäŗŒé‡ć‚«ćƒƒć‚³ć§č”Øē¾ć•ć‚Œć‚‹. asynchronous valuećØ悂恄悏悌悋. Clojure恮promise恫callback悒čØ­å®šć§ćć‚‹ć‚ˆć†ćŖę‹”å¼µę©Ÿčƒ½.

deferred value恫ćŖć«ć‹ćŒbind恕悌悋ćØ realized ćØ恄恆č”Øē¾ć‚’ć¤ć‹ć£ć¦ć„ć‚‹. deferred恮åÆ¾ę¦‚åæµćŒrealized.

stream ćÆ, deferred values恮ćƒŖć‚¹ćƒˆ. sink ćÆ ęƒ…å ±ć‚’ę¶ˆč²»(consume)恙悋stream, source ćÆęƒ…å ±ć‚’ē”Ÿćæå‡ŗ恙(produce)stream.

ć“ć®ć‚¹ćƒˆćƒŖćƒ¼ćƒ ć®äø€čˆ¬ēš„ćŖ惑ć‚æćƒ¼ćƒ³ćÆāš™Pipeline惑ć‚æćƒ¼ćƒ³ć«ć¾ćØ悁恟.

deferred: ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å¤‰ę•° #

https://github.com/clj-commons/manifold/blob/master/doc/deferred.md

deferred valuećÆ@ć ć£ćŸć‚Šderef恧čŖ­ć‚€.

deferred valuesćØćÆć„ć‚ć°ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å¤‰ę•°ć§ć‚ć‚‹(šŸ¤”Clojure恮promise/deliverćÆćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å¤‰ę•°ć®ć“ćØ).

d/success! ć§ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å¤‰ę•°ć«å€¤ć‚’binding恙悋. 恓悌ćÆpromise恫恊恑悋deliverćØåŒć˜ćØē†č§£ć—恦恄恄. 悂恗恏ćÆ, d/error! ć§ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å¤‰ę•°ć«ä¾‹å¤–ć‚’binding恙悋恓ćØ悂恧恍悋.

success!/error!恌ē™ŗå‹•ć•ć‚Œć¦ćÆć˜ć‚ć¦realizedćØ恄恆ēŠ¶ę…‹ć«ćŖ悊, on-realized恫ē™»éŒ²ć•ć‚ŒćŸcallbacké–¢ę•°ćŒćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å¤‰ę•°ć«åÆ¾ć—ć¦ē™ŗå‹•ć™ć‚‹. 恗恋恗, README恧 “no one should ever need to use on-realized"ćØčؘčæ°ć•ć‚Œć¦ć„ć‚‹ć‚ˆć†ć«, å®Ÿéš›ć«callback悒ē™»éŒ²ć™ć‚‹é–¢ę•°ćÆd/chain恌ä½æ悏悌悋.

chain/let-flow: ę±ŗå®šę€§ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼ć®ę§‹ēƉ #

chain 悒恤恋恆ćØ, é…å»¶å®Ÿč”Œé–¢ę•°(deferred value恌bind恕悌恟ćØćć®callbacké–¢ę•°)ć‚’é–¢ę•°åˆęˆ(composing)恧恍悋. ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å¤‰ę•°ć‚’å¼•ę•°ć«ćØ悋Threading Macros恌꧋ēÆ‰ć§ćć‚‹.

d/chainćÆäø€ć¤ć®ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å¤‰ę•°ć«åÆ¾ć—ć¦callbacké–¢ę•°ć‚’é€£ć­ć¦ć„ćć“ćØ恌恧恍悋. čˆˆå‘³ę·±ć„ę€§č³ŖćÆ, 途äø­ć§deferred valuećŒé–¢ę•°ć®äø­ć§ęˆ»ć‚‰ć•ć‚Œć‚‹å “合ćÆ途äø­ć§å‡¦ē†ćŒę­¢ć¾ć‚Šćć®ę™‚ē‚¹ć®deferred valuećŒęˆ»ć‚Šå€¤ć§ęˆ»ć‚‹.

恓悌ćÆchain恮途äø­ć§å®Ÿč”Œć«ę™‚é–“ćŒć‹ć‹ć‚‹å‡¦ē†ćŒęŒŸć¾ć£ć¦ć„ćŸć‚Š, ćŖ恫恋恮č؈ē®—ēµęžœć‚’å¾…ć”åˆć‚ć›ć¦ć„ć‚‹ćØćć«, ćć®å…ˆć®ćƒ•ćƒ­ćƒ¼å‡¦ē†ćŒé€²ć¾ćšć«é€”äø­ēµęžœćŒęˆ»ć£ć¦ćć‚‹ć‚ˆć†ćŖ悂恮.

chain returns a deferred representing the return value of the right-most callback. If any of the functions returns a deferred or a value that can be coerced into a deferred, the chain will be paused until the deferred yields a value.

ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å®Ÿč”Œć®åˆęˆćŒchain, ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å¤‰ę•°ć®åˆęˆćŒlet-flowćØč§£é‡ˆć™ć‚‹ć“ćØ恌恧恍悋.

streams: é…å»¶ć‚¹ćƒˆćƒŖćƒ¼ćƒ  #

ref. https://github.com/clj-commons/manifold/blob/master/doc/stream.md

streamć®åž‹ć‚’type恧čŖæć¹ć‚‹ćØ, SplicedStreamćØćŖ悋. splicećÆä½™ć‚Šćć‹ćŖć„č‹±å˜čŖžć ćŒ, 連ēµć•ć‚ŒćŸć®ę„.


ćŖć«ć‚‚å–ć‚Šå‡ŗ恛ćŖ恄ēŠ¶ę…‹, ē©ŗ恮sourcećÆ drained (ęžÆęø‡)ćØ恄恆ēŠ¶ę…‹ć«ćŖ悋.

恓恮ćØ恍, stream恋悉take!恙悋ćØdeferred 恕悌恟nil恌åø°ć£ć¦ćć‚‹(« nil » ćæ恟恄ćŖꄟ恘). nilćŒę„å›³ć—ć¦å…„åŠ›ć•ć‚ŒćŸå€¤(valid message)ćŖ恮恋, ęžÆęø‡ć‚’ē¤ŗ恙悂恮ćŖć®ć‹ć‚’ę˜Žē¢ŗ恫åŒŗåˆ„ć—ćŸć„ćØ恍ćÆ, take!恮optionå¼•ę•°ć§::drained悒ęø”恙.

> @(s/take! s ::drained)
::drained

;; ć‚µćƒ³ćƒ—ćƒ«ć§ćÆidentical?恧ęÆ”č¼ƒć—ć¦ć„ćŸ.
(when (identical? ::drained msg)
  :a)

connect/connect-via/map/filter: å¤‰ę›å™ØćØćƒ‘ć‚¤ćƒ—ćƒ©ć‚¤ćƒ³ę§‹ēƉ #

connectć‚’ć¤ć‹ć£ć¦streamé€šć—ć‚’é€£ēµć™ć‚‹ćØäø€é€£ć®ē¹‹ćŒć‚ŠćŒćƒ‘ć‚¤ćƒ—ćƒ©ć‚¤ćƒ³ć«ćŖ悋.

é–“ć«å¤‰ę›å‡¦ē†ć‚’ęŒŸć‚€ćØ恝悌ćÆtransducerćØå‘¼ć°ć‚Œć‚‹å¤‰ę›å™Ø恫ćŖ悋. connect-via恫ęø”恙ē„”åé–¢ę•°ć ć£ćŸć‚Š, map/filterć®é–¢ę•°ćŒē”Øę„ć•ć‚Œć¦ć‚‹(恄悍恄悍恂悋).


connect-via恫ęø”ć™é–¢ę•°ćÆå¼•ę•°ćØ恗恦source->x->sink間恮äø€ć¤ć®å€¤ć‚’悂悉恆. é–¢ę•°ć®ćŖ恋恧sink恫put恙悋åæ…č¦ćŒć‚ć‚‹. #(s/put! b (inc %)).

connect-via恫ćÆ恗恰恗ćÆ, backpressure ćØ恄恆čØ€č‘‰ćŒä½æ悏悌悋. 恓悌ćÆčƒŒåœ§ćØčØ³ć•ć‚Œć¦streamć®ę–‡č„ˆć§ćÆ, streamć«ćƒ‡ćƒ¼ć‚æ悒putå‡ŗę„ćŖćć™ć‚‹ę©Ÿčƒ½. deferred valuećŒęˆ»ć‚‹ćØćć®å€¤ćŒrealizedć•ć‚Œć‚‹ć¾ć§ä»–ć®å€¤ćŒęø”恕悌ćŖ恄.

go-off/loop/recur: Agent恮ē”Ÿęˆ #

Stream恫čŖ­ćæę›øćć™ć‚‹ć‚¹ćƒ¬ćƒƒćƒ‰ć‚’AgentćØ定ē¾©ć™ć‚‹ćŖ悉恰, 再åø°ć«ć‚ˆć‚‹ćƒ«ćƒ¼ćƒ—悒꧋ēÆ‰ć™ć‚‹ć“ćØćÆć‚Øćƒ¼ć‚øć‚§ćƒ³ćƒˆć‚’ę§‹ēÆ‰ć™ć‚‹ćØč§£é‡ˆć™ć‚‹ć“ćØ恌恧恍悋.


go-offćÆcore.async恮goć‚’ć‚³ćƒ”ćƒ¼ć—ć¦ć„ć‚‹. go blockå†…ć§ćÆćƒ«ćƒ¼ćƒ—ć™ć‚‹ę§‹é€ ćÆć‚¹ćƒ¬ćƒƒćƒ‰ć‚ˆć‚Šć‚‚åŠ¹ēŽ‡ēš„ćŖč»½é‡ć‚¹ćƒ¬ćƒƒćƒ‰ćØć„ć†å®Ÿč”Œå˜ä½ć«ćŖ悋.

goćØgo-off恮違恄ćÆ, <!? ćØć„ć†ć‚·ćƒ³ć‚æ惃ć‚Æć‚¹. 通åøøgoć®å†…éƒØć§ä¾‹å¤–ć‚’catch恙悋čؘčæ°ć‚’ć—ć‚ć™ć‚Œć‚‹ćØnil恌čæ”ć£ć¦ćŖ悓恮ć‚Øćƒ©ćƒ¼ćŒē™ŗē”Ÿć—ćŸć®ć‹ć‚ć‹ć‚‰ćŖ恄. <!?ćÆ例外(Throwable)恌čæ”ć‚‹ćØ恝悌悒ꊕ恒ē›“恙(throw恙悋. ćć®ēµęžœć‚’try-catchć§ćƒćƒ³ćƒ‰ćƒŖćƒ³ć‚°).

namespace恌ē‹¬ē«‹ć—恦恄悋. ćć—ć¦ä½æ恆恫ćÆcore.async恮install悂åæ…要.

(require '[manifold.go-off :refer [go-off]])

go-offćÆdeferred value悒čæ”恙恮恧, @恙悋ćØå®Ÿč”Œć•ć‚Œć‚‹.

Event-Bus: Pub/Subę‹”å¼µ #

āš™Publisher-Subscriber惑ć‚æćƒ¼ćƒ³ć‚’å®Ÿē¾ć‚‹ć™ćŸć‚ć®ę©Ÿčƒ½.

ć‚ć¾ć‚Šęƒ…å ±ćŒćŖ恄恌, 仄äø‹ć®čؘäŗ‹ćÆč©³ć—ć„.

A Tour of Manifoldā€™s Deferred, Stream and Event Bus API | by Functional Human | Medium

IFćÆć‚·ćƒ³ćƒ—ćƒ«ć§ć‚ć‚Šę™®é€šć®pub/subć®ćƒ‘ć‚æćƒ¼ćƒ³ćŖ恮恧core.async恮 pub/sub恋悉悂锞ęŽØ恧恍悋.

Zach Tellman - Everything Will Flow - YouTube

悂恗恏ćÆć‚³ćƒ¼ćƒ‰ć‚’čŖ­ć‚€.

https://github.com/clj-commons/manifold/blob/master/src/manifold/bus.clj

ć‚„ć£ć¦ć„ć‚‹ć“ćØćÆ, subscribers恮streamć‚’å—ć‘å–ć£ć¦ćć‚Œć‚’ē®”ē†ć—恦, äøŠęµć‹ć‚‰ęµć‚Œć¦ććŸć‚‚恮悒ē®”ē†åŒ–恮subscribers恫ćŖ恌恗ē¶šć‘悋恮恧, connectćŒé€²åŒ–ć—ćŸć‚ˆć†ćŖ悂恮. č€ƒćˆę–¹ćŒę­£ć—ć„ć‹ćÆ悏恋悉ćŖ恄恑恩Pub/Subć®ćƒ‘ć‚æćƒ¼ćƒ³ć‹ć‚‰ćÆćæå‡ŗ恟core.async恮mult/tapć®ä»£ę›æćØ恗恦ä½æ恈悋恋悂.

event-busć®å®Ÿä½“ćÆCuncurrentHashMap+custom methodsć®ćƒ‡ćƒ¼ć‚æ꧋造ćŖ恮恧closećØ恋ćÆäøč¦.

unsubscribe #

unsubscribe恙悋恫ćÆsubscribe恧čæ”ć•ć‚ŒćŸstream恫åÆ¾ć—ć¦close!恙悌恰恄恄.

(require '[manifold.bus :as b])
(require '[manifold.stream :as s])

(def sub (b/subscribe bus :test))
(s/close! sub)

closed? #

s/closed?恧ćÆćŖ恏, s/drained?恧ē¢ŗčŖć™ć‚‹.

for debug #

惇惐惃ć‚Æ恮ē†ē”±ć«ć‚ˆć‚Š, websocketēµŒē”±ć§ęƒ…å ±ć‚’å—äæ”恙悋ćØ恍ćÆ恟ćØćˆåˆ©ē”Øć‚·ćƒ¼ćƒ³ćŒāš™Producer-Consumer惑ć‚æćƒ¼ćƒ³ć®å “åˆć§ć‚‚, PubSub惑ć‚æćƒ¼ćƒ³ć®event-busć‚’ć¤ć‹ć£ćŸć»ć†ćŒć„ć„ć‹ć‚‚.

ćØ恄恆恮悂, ćƒ”ć‚¤ćƒ³ć®streamćØćØć‚‚ć«ćƒ‡ćƒćƒƒć‚°ē”Ø恮streamć‚’åˆ„ć§ć¤ćć‚‹ć“ćØ恧websocketć«ęµć‚Œć‚‹ęƒ…å ±ć‚’č¦—ćč¦‹ć™ć‚‹ć“ćØ恌恧恍悋恋悉.

šŸ“Clojure: aleph #

éžåŒęœŸé€šäæ”ć®ćŸć‚ć®ćƒ©ć‚¤ćƒ–ćƒ©ćƒŖ.

šŸ“Manifold恮Stream悒Baseć«ć—ćŸäø¦č”Œåˆ¶å¾”ćØNettyć«ć‚ˆć‚‹éžåŒęœŸé€šäæ”悒Base恫恗恦恄悋(Java恮Nettyćƒ©ć‚¤ćƒ–ćƒ©ćƒŖćƒ©ćƒƒćƒ‘ćƒ¼).

å‚č€ƒčؘäŗ‹.

Aleph HTTP Client #

Clojure: Ringć®ćƒ—ćƒ­ćƒˆć‚³ćƒ«åŠć³, clj-httpć®ę‹”å¼µć‚Ŗćƒ—ć‚·ćƒ§ćƒ³ć«ęŗ–ę‹ . READMEć«ć‚ˆć‚‹ćØ, HTTP通äæ”ćÆclj-http恮mimic悒ē›®ęŒ‡ć™. mimicćØćÆćŖ悓恠?čŖæć¹ćŸęؔ倣ćØć„ć†ę„å‘³ć‚‰ć—ć„.

ćƒ‰ć‚­ćƒ„ćƒ”ćƒ³ćƒˆć«ä¹ć—ć„ć®ć ćŒ, requestć®ćƒ‘ćƒ©ćƒ”ćƒ¼ć‚æ恧ćŖć«ć‚’ęŒ‡å®šć™ć‚‹ć‹čæ·ć£ćŸć‚‰ring 恮docć‚’č¦‹ć‚ćØ恄恆恓ćØ.


http/get恧ćƒŖć‚Æć‚Øć‚¹ćƒˆć‚’å‡ŗ恙ćØ, Java InputStream恌čæ”ć£ć¦ćć‚‹ć®ć§ćć‚Œć‚’stringć«å¤‰ę›ć—ć¦ć¤ć‹ć†.

d/chainćØ連ęŗ恕恛悋ćØ get requestć‹ć‚‰ć®ć‚³ćƒ¼ćƒ«ćƒćƒƒć‚Æé–¢ę•°ć‚’ē™»éŒ²ć—恤恤, å‘¼ć³å‡ŗć—å…ƒć‚¹ćƒ¬ćƒƒćƒ‰ćÆåˆ„å‹•ä½œć§ćć‚‹ć‚ˆć†ćŖéžåŒęœŸćƒŖć‚Æć‚Øć‚¹ćƒˆćŒć§ćć‚‹. d/chainćÆdeferred valueć‚’å¤‰ćˆć™ć®ć§, @恧čŖ­ćæå‡ŗ恕ćŖć„é™ć‚Šå‘¼ć³å‡ŗć—å…ƒć‚’blockingćÆ恗ćŖ恄.

(-> @(http/get "https://google.com/")
    :body
    bs/to-string)

@(d/chain (http/get "https://google.com/")
          :body
          bs/to-string)

Aleph Websocket Clinet #

ć‚Æćƒ©ć‚¤ć‚¢ćƒ³ćƒˆå“ć®Websocketå®Ÿč£…ä¾‹.

(def ws-url "wss://ws.lightstream.bitflyer.com/json-rpc")
(def sock @(websocket-client ws-url))

(let [msg
      {:method "subscribe"
       :params {:channel "lightning_ticker_FX_BTC_JPY"}}]
  (s/put! sock (generate-string msg)))

Manifold Topics #

stream恮ēŠ¶ę…‹ć‚’čŖæć¹ć‚‹é–¢ę•° #

恄悍恄悍恂悋恌, descriptionćØć„ć†é–¢ę•°ć§å…ØéƒØMapć§ęƒ…å ±ćŒå–ć‚Œć‚‹.

  • drained?
  • sink?
  • source?
  • closed?
  • stream?

core.asyncćØć®é€£ęŗ悒恙悋恫ćÆ?(core.async Interop) #

manifold恮streamćÆcore.async恮channelć«å¤‰ę›ć§ćć‚‹(core.async channel-> manifold stream).

(def c (async/chan))
(def s (s/->source c))
(def s (s/->sink c))

connectć‚’ć¤ć‹ć£ć¦é€£ēµć™ć‚‹ć“ćØ悂恧恍悋(manifold stream->core.async channel)

(s/connect s c)

stream恋悉takeć§å–ć‚Šå‡ŗ恗ē¶šć‘悋恫ćÆ? #

loop-recurć®å†åø°ć®ćŖ恋恧takeć‚’å‘¼ć³å‡ŗ恗ē¶šć‘悋恓ćØć§å–ć‚Šå‡ŗ恙.

d/loopé–¢ę•°ćÆclojure.core/loopć‚’ę‹”å¼µć—ćŸé…å»¶ćƒ«ćƒ¼ćƒ—ć®ćƒžć‚Æćƒ­ć§ć“ć®äø­ć§defered stream恋悉take!恙悋恓ćØ恌恧恍悋.

s/comsumećÆstreamć‹ć‚‰å€¤ć‚’å–ć‚Šå‡ŗ恗ē¶šć‘callbacké–¢ę•°ć‚’ć²ćŸć™ć‚‰å½“ć¦ć‚‹loop悒ę›ø恑悋.

ä½æ恄恓ćŖć—ć¦ć„ć‚‹ć‚ć‘ć§ćÆćŖ恄恌, comsume恌ē°”å˜ć«ę›ø恑悋čØ˜ę³•ć§ć‚ˆć‚Šč©³ć—ćå‹•ä½œć‚’ć‚«ć‚¹ć‚æ惞悤ć‚ŗ恗恟恄ćŖ悉恰loop/recurć‚’ć¤ć‹ć†å°č±”.

s/consume #

consume ćÆstreamć«å…„ć£ć¦ć„ć‚‹å€¤ć‚’å…ØéƒØå–ć‚Šå‡ŗ恗恦äø€ć¤ćšć¤callbacké–¢ę•°ć‚’é©ē”Ø恙悋.

仄äø‹ć‚’č©•ä¾”恙悋ćØ, deferred valuećŒå³ę™‚ć«čæ”ć£ć¦ćć‚‹ćŒ, å–ć‚Šå‡ŗ恗恮loop/recur恌äø­ć§ē™ŗē”Ÿć—ē¶šć‘ć‚‹ćŸć‚, sockć‹ć‚‰ęµć‚Œć¦ćć‚‹msgćÆēµ¶ćˆćšprintln恗ē¶šć‘ć‚‹å‹•ä½œćØćŖ悋. ć“ć‚Œć‚’ę­¢ć‚ć‚‹ć«ćÆsock悒閉恘悋.

(s/consume #(println %) sock)

(s/close! sock)

loop/recur #

take! ć‚’ć§åŒć˜ć“ćØ悒恙悋恫ćÆ, loop/recur恧äø€ć¤ćšć¤å–ć‚Šå‡ŗ恗恟äøŠć§é–¢ę•°ć‚’適ē”Ø恙悋åæ…č¦ćŒć‚ć‚‹.

(defn my-consume [sock]
  (d/loop []
    (d/chain (s/take! sock ::drained)
             (fn [msg]
               (if (identical? ::drained msg)
                 ::drained
                 (println msg)))

             (fn [result]
               (when-not (identical? ::drained result)
                 (d/recur))))))

ꎄē¶šćŒć•ć‚Œć¦ć„ć‚‹ćØ恍ćÆdrained?ćÆfalse恫ćŖ悋. 恓悌恌alephć®ę©Ÿčƒ½ćŖ恮恋Manifoldć®ę©Ÿčƒ½ćŖ恮恋ćÆčŖæęŸ»äø­. 少ćŖ恏ćØ悂äøŠć®ä¾‹ć ćØē„”é™ćƒ«ćƒ¼ćƒ—ć«ćŖ悋(i.e. ꬔ恮chain恮callbackå‰ć§ę­¢ć¾ć‚‹). ę°øé ć«å¾…ć”ē¶šć‘ć‚‹ć®ć‚’å›žéæć—ćŸć„å “åˆćÆꎄē¶šć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć‚’timeoutć‚Ŗćƒ—ć‚·ćƒ§ćƒ³ć§ęŒ‡å®šć™ć‚‹.

core.async goćØć®é€£ęŗ #

Manifold streamćÆcore.async恮goroutineć®ć‚ˆć†ćŖč»½é‡ć‚¹ćƒ¬ćƒƒćƒ‰ćŖ悏恑恧ćÆćŖ恄. ManifoldćÆé…å»¶ć‚¹ćƒˆćƒŖćƒ¼ćƒ ćŒćƒ”ć‚¤ćƒ³ę©Ÿčƒ½ć§, CSPäø¦č”Œćƒ¢ćƒ‡ćƒ«ć§ćÆćŖ恄.

ć“ć£ć”ć®ę©Ÿčƒ½ć‚‚åæ…要ćŖ悉恰, defered stream悒core.async恮channelć«å¤‰ę›ć—ć¦go-loop悒恤恋恆恋, go-off(+clojure.core/loop, recur)é–¢ę•°ć‚’ć¤ć‹ć†.

šŸ’”ę€§čƒ½ć®ćƒœćƒˆćƒ«ćƒćƒƒć‚Æ恌CPUćƒć‚¦ćƒ³ćƒ‰ć‹IOćƒć‚¦ćƒ³ćƒ‰ć‹

ꦂ恗恦goć‚’ć¤ć‹ć†č»½é‡ć‚¹ćƒ¬ćƒƒćƒ‰ćÆCPUćƒć‚¦ćƒ³ćƒ‰ć«é©ć—ć¦ć„ć‚‹ćŸć‚IOćƒć‚¦ćƒ³ćƒ‰ćŖalephć®å‡¦ē†ć§ć‚‚ē„”ē†ć‚„ć‚Šä½æć†ć¹ććŖ恮恋ćÆčŖæęŸ»äø­.

ę³Øꄏē‚¹ćÆ, loop恮ćŖ恋恧(s/take! s)悒恤恋恆ćØē„”é™ćƒ«ćƒ¼ćƒ—ć§ćƒćƒ³ć‚°ć™ć‚‹. take!ćÆpark恙悋悏恑恧ćÆćŖ恄. go-loop恮park恌åæ…要ćŖ悉恰 <! 恌åæ…要.

chain + doto #

chainćÆå…„åŠ›ć•ć‚ŒćŸdeferred悒恤ćŖ恄恧恄恏. dotoćÆ兄力xćÆ途äø­ć§ćŖć«ć‚’ć—ć¦ć‚‚ęˆ»ć‚Šå€¤ć§x悒čæ”恙. 仄äø‹ć®ć‚ˆć†ć«ę›ø恑悋ćØ恄恆Tips.

(d/chain d
  #(doto % println)
  #(doto % println))

dirigiste #

Manifoldć®å†…éƒØć§åˆ©ē”Øć•ć‚Œć¦ć„ć‚‹. Java Thread Poolć®ę”¹é€ .

https://github.com/clj-commons/dirigiste

d/chain恮äø­ć§ä¾‹å¤–ćŒē™ŗē”Ÿć™ć‚‹ćØ恩恆ćŖ悋? #

d/chain恮äø­ć§ä¾‹å¤–ćŒē™ŗē”Ÿć™ć‚‹ćØ, ä»„å¾Œć®é–¢ę•°ćÆå…Øć¦ć‚¹ć‚­ćƒƒćƒ—ć—ć¦ęˆ»ć‚Šå€¤ćØ恗恦deferredć•ć‚ŒćŸä¾‹å¤–ć‚Ŗ惖ć‚ø悧ć‚Æ惈恌čæ”ć‚‹.

(def d (d/deferred))
(def z (-> d
           (d/chain
              dec
              #(/ 1 %))))
(d/success! d 1)

z
;; deferred ć•ć‚ŒćŸä¾‹å¤–ć‚Ŗ惖ć‚ø悧ć‚Æ惈.
;;=> << ERROR: #error (.. )

@z
;; 例外ē™ŗ動
;;=> java.lang.ArithmeticException

恕悉恫d/catchć®ćƒžć‚Æ惭悒恤恋恆ćØä¾‹å¤–ć«åÆ¾ć™ć‚‹catchć‚’ć¤ć‹ć£ćŸćƒćƒ³ćƒ‰ćƒŖćƒ³ć‚°å‡¦ē†ć‚’chainć®å‡¦ē†ćØēµ„ćæåˆć‚ć›ć‚‹ć“ćØ恌恧恍悋.

(-> d
   (d/chain dec #(/ 1 %))
   (d/catch Exception #(println "whoops, that didn't work:" %)))

Threading MacroćÆꉋē¶šćć®é€”äø­ć§ä¾‹å¤–ćŒē™ŗē”Ÿć—ćŸćØćć®ćƒćƒ³ćƒ‰ćƒŖćƒ³ć‚°ćŒčŖ²é”Œć ćŒ, deferredćØć„ć†ć‚³ćƒ³ć‚»ćƒ—ćƒˆćÆć“ć‚Œć‚’č§£ę±ŗć™ć‚‹ćŸć‚ć®ć‚ˆć„ę–¹ę³•ć®ć‚ˆć†ć«ę€ćˆć‚‹.

Railway oriented programming(é‰„é“ęŒ‡å‘ćƒ—ćƒ­ć‚°ćƒ©ćƒŸćƒ³ć‚°)

stream恮closećØćć®ćƒćƒ³ćƒ‰ćƒŖćƒ³ć‚° #

streamćÆs/close!恧꘎ē¤ŗēš„恫close恧恍悋. s/closed?恧ēŠ¶ę…‹ć‚’ē¢ŗčŖć§ćć‚‹.

ćć®ä»–, stream処ē†ć®ćŖ恋恮callbacké–¢ę•°ćŖć®ć‹ć§ä¾‹å¤–ćŒē™ŗē”Ÿć—恦Throwable Object悒catch恗ćŖ恄ćØstream悒closeć™ć‚‹ć¤ćć‚Šć®ć‚ˆć†ć (ćƒ‰ć‚­ćƒ„ćƒ”ćƒ³ćƒˆć«ćŖć„ć‘ć©ć‚³ćƒ¼ćƒ‰ć‚’čŖ­ć‚“恠).

ćŖ恮恧streamć‚’é–‰ć˜ć‚‹å‰ć®å¾Œå‡¦ē†(惭悰ćØ恋)恌åæ…要ćŖ堓合ćÆchainćØćØ悂恫catch悒ę›ø恏.

let-flow vs chain #

go-off恮docstringć«ć‹ć‚“ćŸć‚“ćŖč§£čŖ¬ć‚’ē™ŗč¦‹ć—ćŸ.

https://github.com/clj-commons/manifold/blob/master/src/manifold/go_off.clj

deferred/let-flow presumes that every deferrable needs to be resolved. This prevents more complex handling of parallelism or being able to pass deferreds into other functions from within the `let-flow` block.

deferred/chain only works with single deferreds, which means having to write code in unnatural ways to handle multiple deferreds.”

ć¤ć¾ć‚Š, let-flowćÆ複ꕰ恮deferred valuećØćć®åˆęˆć‚’å‰ęć«ć—ć¦ć„ć‚‹ćŒ, chainćÆć²ćØ恤恮deferred value恫åÆ¾ć™ć‚‹č¤‡ę•°ć®ę“ä½œć‚’å‰ęć«ć—ć¦ć„ć‚‹.


let-flowćÆ Barrier pattern.

let-flowćÆzipć®ć‚·ćƒ³ć‚æ惃ć‚Æć‚¹ć‚·ćƒ„ć‚¬ćƒ¼ćØč§£é‡ˆć™ć‚‹ćØ悈恄. ć¤ć¾ć‚Š, å…Ø恦恮deferred恌realizedć•ć‚Œć‚‹ć®ć‚’å¾…ć”åˆć‚ć›ć‚‹.

(defn deferred-sum []
  (let [a (call-service-a)
        b (call-service-b)]
    (chain (zip a b) ;; ā˜… 恓恓
           (fn [[a b]]
             (+ a b)))))

tips: ć‚ć‚‹ę”ä»¶ć«äø€č‡“ć—ćŸć‚‚ć®ć ć‘ć‚’é€šć™ć«ćÆ? #

stream/filter悒恤恋恆.

(->> (b/subscribe bus :executions)
              (s/buffer 1)
              (s/map :price)
              (s/filter #(= 0 (mod (int %) 2))))

å¶ę•°ć®ę•°ć—ć‹ćŠć—ć‚Šć‹ć‚‰å‡ŗ恦恓ćŖć„ć‚¹ćƒˆćƒŖćƒ¼ćƒ ćŒć§ćć‚‹.

tips: ć‚æć‚¤ćƒ ć‚¢ć‚¦ćƒˆć«ć‚ˆć£ć¦ćƒ•ćƒ­ćƒ¼ć®ęŒ™å‹•ć‚’å¤‰ę›“ć™ć‚‹ć«ćÆ? #

d/timeout!ć‚’ć¤ć‹ć£ć¦timeoutć—ćŸć‹ć©ć†ć‹ć®å¤‰ę•°ć‚’å®šē¾©. d/realized?ć«ć‚ˆć£ć¦ifę–‡ć§åˆ¤å®šć™ć‚‹.

(def timeout-d (d/timeout! (d/deferred) 5000 :timeout))

(if (d/realized? timeout-d)
    (do-something)
    (do-else))

ć“ć®ę–¹ę³•ćÆć”ć‚‡ć£ćØä¾æ利. 恓悌恌ćŖ恄ćØäø€ę™‚å¤‰ę•°ć«timestammp悒äæå­˜ć—恦ē¾åœØę™‚åˆ»ćØäæå­˜ę™‚åˆ»ć®å·®åˆ†ć§ę™‚é–“ēµŒéŽć‚’åˆ¤å®šć—ć¦ć„ćŸ.

aleph Topics #

alephꎄē¶šćć‚Œć‚‹å•é”Œ #

恩恆悂aleph恫ćÆ途äø­ć§ęŽ„ē¶šćŒé€”åˆ‡ć‚Œć‚‹ć‚ˆć†ćŖ恓ćØćŒč¦‹å—ć‘ć‚‰ć‚Œć‚‹.

clojure - How to ensure websocket connection is kept alive in Aleph - Stack Overflow Aleph - can it handle client disconnect? : Clojure

恓恮reddit恫ćÆä½œč€…ęœ¬äŗŗē™»å “恗恦workaroundć‚’č§£čŖ¬.

Managing Websocket Stability issues with Aleph? : Clojure

sente悒ä½æ恈ćØć„ć†ę„č¦‹ć‚‚? ęŽØęø¬ć ćŒ, websocketćŒćć†ć„ć†ć‚‚ć®ć§ć‚ć‚Šćć‚Œć‚’ę”¹č‰Æć—ćŸć‚‚ć®ćŒSocket.IOćŖ恮恋悂(socket.io恮clojure libraryć‚’ęŽ¢ć—äø­).

ćƒ‡ćƒ¼ć‚æćŒåˆ°ē€ć™ć‚‹ćØManifold恮d/success!ćŒå‘¼ć°ć‚Œć‚‹? #

ćƒ©ć‚¤ćƒ–ćƒ©ćƒŖć‚’å‘¼ć³å‡ŗ恙ćØć‚¹ćƒˆćƒŖćƒ¼ćƒ ćŒčæ”ć•ć‚Œć‚‹. 内éƒØčØ­čØˆć®ä»•ēµ„ćæćØ恗恦, ćƒ‡ćƒ¼ć‚æćŒåˆ°ē€ć™ć‚‹ćØ, success!悒恙悋恓ćØć«ć‚ˆć£ć¦stream(deferred values恮ćƒŖć‚¹ćƒˆ)ć«å€¤ć‚’ę”¾ć‚Šč¾¼ć‚“ć§ć„ć‚‹ć‚ˆć†ć«č¦‹ćˆćŸ.

tips: socketć‹ć‚‰ć®ćƒ‡ćƒ¼ć‚æ受äæ”Ꙃ恮ē•°åøøć‚’ę•ć¾ćˆć‚‹ #

go-offćØ<!?ć‚’ę“»ē”Ø恙悋.

socketäø­ć§ć®ä¾‹å¤–ćŒē™ŗē”Ÿć™ć‚‹ćØ, <!?ć‚’ę“»ē”Ø恙悋恓ćØ恧deferred ć•ć‚ŒćŸä¾‹å¤–ćÆå–ć‚Šå‡ŗ恙ćØćć«ęŠ•ć’ē›“ć—ć¦ćć‚Œć‚‹ć®ć§, try-catchć§ę•ć¾ćˆć‚‹ć“ćØ恌å‡ŗę„ć‚‹.

(defmacro go-loop* [s & callback]
  `(go-off
    (try
      (loop []
        (if-let [msg# (<!? ~s)]
          (do
            (~@callback msg#)
            (recur))
          (s/close! ~s)))
      (catch Exception e#
        (log/error "Exception occured, " (print-stack-trace e#))
        (s/close! ~s)))))

tips: ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ć‚’å¾…ć”åˆć‚ć›ć¦ę¬”ć«é€²ć‚ć‚‹ #

d/chainć§ć‚·ćƒ¼ć‚±ćƒ³ć‚¹ć‚’å®šē¾©ć—ćŸćØćć«, ēµ‚ć‚ć‚Šć«(fn [_] ws)ćæ恟恄ćŖ処ē†ęŒŸć‚€ćØ, chainć§å®šē¾©ć—ćŸäø€é€£ć®ć‚·ćƒ¼ć‚±ćƒ³ć‚¹å‡¦ē†ć‚’完äŗ†ć•ć›ćŸć‚‰ws悒čæ”恙ćæ恟恄ćŖ処ē†ćŒę›ø恑悋.

(d/chain ws
         (lib/auth! ex ws)
         (fn [ws] (println "authed"))
         (fn [_] ws))

ć“ć‚Œć«ć‚ˆć‚Šchain恮äø€é€£ć®å‡¦ē†ć®ćŠć—ć‚Šć§å…ˆé ­ć«å…„悌恟deferred valuećŒęˆ»ć‚Š, ć“ć®å‡¦ē†ćØåˆ„ć®å‡¦ē†ć‚’chain恙悋恓ćØ恌恧恍悋. ćŸć„ć¦ć„aleph悒ä½æ恆ćØ恍ćÆ外éƒØćØ恮通äæ”, ć¤ć¾ć‚Šå‰Æ作ē”Ø恌ē™ŗē”Ÿć™ć‚‹ć®ć§, ćć®å¾…ć”åˆć‚ć›å‡¦ē†ć‚’ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼ć§äøŠę‰‹ććƒćƒ³ćƒ‰ćƒŖćƒ³ć‚°ć§ćć‚‹.

Manifold Insights #

šŸ¤”é…å»¶å®Ÿč”Œé–¢ę•°åˆęˆćØ恄恆Ꙃē©ŗć‚’č¶…ćˆćŸę±ŗå®šč«–ēš„ćŖę“ä½œęŠ½č±” #

Manifold chainćŒé¢ē™½ć„恮ćÆ, é…å»¶å®Ÿč”Œé–¢ę•°ćŒé€£ē¶šć—恦恄悋ćØ恍.

仄äø‹ć®1č”Œē›®, 2č”Œē›®, 3č”Œē›®ćÆ恩恮ć‚æć‚¤ćƒŸćƒ³ć‚°ć§realized恕悌悋恋ćÆę±ŗć¾ć£ć¦ć„ćŖ恄悂恮恮, Ꙃē©ŗć‚’č¶…ćˆć¦å€¤ć‚’é †ē•Ŗé€šć‚Šć«ę“ä½œć§ćć‚‹. ć“ć‚ŒćŒćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å®Ÿč”Œ, é…å»¶å®Ÿč”Œé–¢ę•°åˆęˆć®é¢ē™½ć•. 1č”Œē›®ć®d恌realizedć•ć‚Œć‚‹ć®ćŒ10å¹“å¾Œć ć‚ć†ćŒ, 2č”Œē›®ć®futureć®å®Ÿč”Œć«100äø‡å¹“恋恋悍恆恌, ć‚·ćƒ³ćƒ—ćƒ«ćŖć‚¹ćƒ¬ćƒƒćƒ‡ć‚£ćƒ³ć‚°ćƒžć‚Æćƒ­ć§å‡¦ē†ć‚’ć¾ćØ悁悉悌悋. ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼å¤‰ę•°ćÆäø¦č”Œå‡¦ē†ć‚’ć‚·ćƒ³ćƒ—ćƒ«ć«ć™ć‚‹.

(d/chain d
    #(future (inc %))
    #(future (inc %))
    #(println "the future returned" %))

恓悌悒恙ćŖć‚ć”, šŸ“ę±ŗå®šę€§ćƒ‡ćƒ¼ć‚æćƒ•ćƒ­ćƒ¼ćƒ—ćƒ­ć‚°ćƒ©ćƒŸćƒ³ć‚°ćØ恄恆.

šŸ”—References #

See also #