Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Pedestal 0.7 #152

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
com.pitch/uix.dom {:mvn/version "1.1.0"}
datascript-transit/datascript-transit {:mvn/version "0.3.0" :exclusions [com.cognitect/transit-clj com.cognitect/transit-cljs]}
datascript/datascript {:mvn/version "1.5.3"}
io.pedestal/pedestal.jetty {:mvn/version "0.5.10"}
io.pedestal/pedestal.service {:mvn/version "0.5.10"}
io.pedestal/pedestal.jetty {:mvn/version "0.7.0"}
io.pedestal/pedestal.service {:mvn/version "0.7.0"}
jstrutz/hashids {:mvn/version "1.0.1"}
org.clojure/clojure {:mvn/version "1.11.1"}
org.clojure/clojurescript {:mvn/version "1.11.60"}
org.clojure/core.async {:mvn/version "1.5.648"}
org.slf4j/slf4j-simple {:mvn/version "1.7.28"}
org.slf4j/slf4j-simple {:mvn/version "2.0.13"}
thheller/shadow-cljs {:mvn/version "2.23.3"}}
:aliases {:server/dev {:exec-fn ogres.server.core/run-development}
:uberjar {:extra-deps {com.github.seancorfield/depstar {:mvn/version "2.1.303"}}}}}
241 changes: 108 additions & 133 deletions src/main/ogres/server/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
(:import [java.io ByteArrayOutputStream ByteArrayInputStream]
[java.time Instant])
(:require [clojure.string :refer [upper-case]]
[clojure.core.async :as async :refer [go <! >! >!! close! timeout]]
[clojure.core.async :as async :refer [>!! close!]]
[cognitect.transit :as transit]
[datascript.core :as ds]
[datascript.transit :refer [read-handlers write-handlers]]
[io.pedestal.http :as server]
[io.pedestal.http.jetty.websockets :as ws]))
[io.pedestal.http :as http]
[io.pedestal.websocket :as ws]))

(def conns-chans-xf
(comp (map val) (map :chan) (filter identity)))
Expand All @@ -18,7 +18,7 @@
(defn room-create-key []
(let [keys (:rooms @sessions)]
(loop []
(let [code (->> (datascript.core/squuid) (str) (take-last 4) (apply str) (upper-case))]
(let [code (->> (ds/squuid) (str) (take-last 4) (apply str) (upper-case))]
(if (contains? keys code) (recur) code)))))

(defn room-create
Expand All @@ -43,6 +43,11 @@
(not= conn host) (update-in [:rooms room :conns] disj conn))
(update sessions :conns dissoc conn))))

(defn uuid->room
[sessions uuid]
(let [room (get-in sessions [:conns uuid :room])]
(get-in sessions [:rooms room])))

(defn marshall
"Serializes the given value as JSON compressed EDN."
[value]
Expand All @@ -51,158 +56,128 @@
(transit/write writer value)
(.toString stream)))

(defn on-connect
[uuid]
(fn [session chan]
(defn on-open
"Handles newly created WebSocket connections with the server."
[session _]
(let [para (.getRequestParameterMap session)
host (some-> para (.get "host") (.get 0))
join (some-> para (.get "join") (.get 0) (upper-case))
time (.getEpochSecond (Instant/now))
uuid (.getId session)
meta {:type :event :time time :src uuid}
chan (ws/start-ws-connection session {})]

;; Increase the max text message size to 10MB to allow for large files to
;; be exchanged over the socket, such as image data URLs.
(.setMaxTextMessageSize (.getPolicy session) 1e7)

(let [params (.. session (getUpgradeRequest) (getParameterMap))
host (some-> params (.get "host") (.get 0))
join (some-> params (.get "join") (.get 0) (upper-case))
time (.getEpochSecond (Instant/now))
meta {:type :event :time time :src uuid}]
(cond (string? host)
(let [_ (swap! sessions room-create host uuid chan)]
(go (<! (timeout 32))
(->> (assoc meta :data {:name :session/created :room host :uuid uuid} :dst uuid)
(marshall)
(>! chan))))

(string? join)
(let [sessions (swap! sessions room-join join uuid chan)]
(go (<! (timeout 32))
(->> (assoc meta :data {:name :session/joined :room join :uuid uuid} :dst uuid)
(marshall)
(>! chan))
(let [conns (disj (get-in sessions [:rooms join :conns]) uuid)
chans (into [] conns-chans-xf (select-keys (:conns sessions) conns))]
(doseq [chan chans]
(->> (assoc meta :data {:name :session/join :room join :uuid uuid})
(marshall)
(>! chan))))))

:else
(let [room (room-create-key)
_ (swap! sessions room-create room uuid chan)]
(go (<! (timeout 32))
(->> (assoc meta :data {:name :session/created :room room :uuid uuid} :dst uuid)
(marshall)
(>! chan))))))))
(.setMaxTextMessageBufferSize session 1e7)

(cond host
(do (swap! sessions room-create host uuid chan)
(->> (assoc meta :data {:name :session/created :room host :uuid uuid} :dst uuid)
(marshall)
(>!! chan)))

join
(let [state (swap! sessions room-join join uuid chan)
data (assoc meta :data {:name :session/joined :room join :uuid uuid} :dst uuid)]
(>!! chan (marshall data))
(let [conns (disj (get-in state [:rooms join :conns]) uuid)
chans (into [] conns-chans-xf (select-keys (:conns state) conns))]
(doseq [chan chans]
(->> (assoc meta :data {:name :session/join :room join :uuid uuid})
(marshall)
(>!! chan)))))

(defn uuid->room
[sessions uuid]
(let [room (get-in sessions [:conns uuid :room])]
(get-in sessions [:rooms room])))
:else
(let [room (room-create-key)]
(swap! sessions room-create room uuid chan)
(->> (assoc meta :data {:name :session/created :room room :uuid uuid} :dst uuid)
(marshall)
(>!! chan))))
{:uuid uuid}))

(defn on-text
[uuid]
(fn [body]
(let [state @sessions]
(if-let [room (uuid->room state uuid)]
(let [recip (-> (ByteArrayInputStream. (.getBytes body))
(transit/reader :json {:handlers read-handlers})
(transit/read)
(:dst))
uuids (if (uuid? recip) #{recip} (disj (:conns room) uuid))
chans (sequence (comp (map (:conns state)) (map :chan) (filter identity)) uuids)]
(doseq [chan chans]
(>!! chan body)))
(close! (get-in state [:conns uuid :chan]))))))
"Handles incoming messages."
[{uuid :uuid} message]
(let [state @sessions]
(if-let [room (uuid->room state uuid)]
(let [recip (-> (ByteArrayInputStream. (.getBytes message))
(transit/reader :json {:handlers read-handlers})
(transit/read)
(:dst))
uuids (if (some? recip) #{recip} (disj (:conns room) uuid))
chans (sequence (comp (map (:conns state)) (map :chan) (filter identity)) uuids)]
(doseq [chan chans]
(>!! chan message)))
(close! (get-in state [:conns uuid :chan])))))

(defn on-close
[uuid]
(fn [_ _]
(let [sess @sessions
room (get-in sess [:conns uuid :room])
self (get-in sess [:conns uuid :chan])
host (get-in sess [:rooms room :host])
rest (->> (disj (get-in sess [:rooms room :conns]) uuid)
(select-keys (:conns sess))
(into [] conns-chans-xf))]

;; The connection has been closed; close the associated channel.
(close! self)

(if (= uuid host)
;; The host has left, destroying the session entirely. Find and close
;; all remaining connections.
(doseq [chan rest]
(close! chan))

;; Notify all other connections in the same session that a connection
;; has been closed.
(doseq [chan rest]
(->> {:type :event :time "" :data {:name :session/leave :uuid uuid}}
(marshall)
(>!! chan))))

;; Update the sessions to remove the closing connection, potentially
;; also removing the room and closing all related connections within.
(swap! sessions room-leave uuid))))
"Handles closed connections."
[_ session _]
(let [uuid (.getId session)
sess @sessions
room (get-in sess [:conns uuid :room])
self (get-in sess [:conns uuid :chan])
host (get-in sess [:rooms room :host])
rest (->> (disj (get-in sess [:rooms room :conns]) uuid)
(select-keys (:conns sess))
(into [] conns-chans-xf))]

;; The connection has been closed; close the associated channel.
(if (some? self)
(close! self))

(if (= uuid host)
;; The host has left, destroying the session entirely. Find and close
;; all remaining connections.
(doseq [chan rest]
(close! chan))

;; Notify all other connections in the same session that a connection
;; has been closed.
(doseq [chan rest]
(->> {:type :event :time "" :data {:name :session/leave :uuid uuid}}
(marshall)
(>!! chan))))

;; Update the sessions to remove the closing connection, potentially
;; also removing the room and closing all related connections within.
(swap! sessions room-leave uuid)))

(defn on-error
[uuid]
(fn [ex]
(println ex)
((on-close uuid) nil nil)))

(defn actions [uuid]
{:on-connect (ws/start-ws-connection (on-connect uuid))
:on-text (on-text uuid)
:on-error (on-error uuid)
:on-close (on-close uuid)})

(defn create-listener [req res handlers-fn]
(let [params (.. req getParameterMap)
host (some-> params (.get "host") (.get 0))
join (some-> params (.get "join") (.get 0) (upper-case))]
(cond (and host join)
(.sendForbidden res "Query parameters cannot contain host and join parameters.")

(and host (get-in @sessions [:rooms host]))
(.sendForbidden res "Cannot create room with that key; it already exists.")

(and join (nil? (get-in @sessions [:rooms join])))
(.sendForbidden res "Cannot join room with that key; it does not exist.")

:else
(ws/make-ws-listener (handlers-fn (ds/squuid))))))
[_ _ ex]
(prn ex))

(defn root-handler [_]
{:status 200 :body "This server manages multiplayer sessions for https://ogres.app, a free and open-source virtual tabletop application."})

(defn create-server
([] (create-server {}))
([{:keys [port] :or {port 5000}}]
(->> {:env :prod
::server/routes #{["/" :get `root-handler]}
::server/type :jetty
::server/host "0.0.0.0"
::server/port port
::server/container-options
{:context-configurator
(fn [context]
(ws/add-ws-endpoints
context
{"/ws" actions}
{:listener-fn create-listener}))}}
(server/default-interceptors))))
(->> {::http/routes #{["/" :get `root-handler]}
::http/type :jetty
::http/host "0.0.0.0"
::http/port port
::http/websockets
{"/ws"
{:on-open on-open
:on-text on-text
:on-close on-close
:on-error on-error}}}
(http/default-interceptors))))

(defn create-dev-server []
(-> (create-server)
(merge {:env :dev ::server/join? false})
(server/dev-interceptors)
(server/create-server)))
(merge {::http/join? false})
(http/dev-interceptors)
(http/create-server)))

(defn ^:export run-development [& _args]
(let [server (create-dev-server)]
(println "Starting the development server on port" (::server/port server))
(server/start server)))
(println "Starting the development server on port" (::http/port server))
(http/start server)))

(defn -main [port]
(-> (create-server {:port (Integer/parseInt port)})
(server/create-server)
(server/start)))
(http/create-server)
(http/start)))