Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions liveobjects/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ tasks.withType<Test>().configureEach {
tasks.register<Test>("runLiveObjectUnitTests") {
filter {
includeTestsMatching("io.ably.lib.objects.unit.*")
// unit tests for the path-based public API implementation (io.ably.lib.object)
includeTestsMatching("io.ably.lib.object.unit.*")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.ably.lib.`object`

import io.ably.lib.`object`.value.LiveCounter

/**
* Implementation of the [LiveCounter] value type: an immutable holder for the
* initial count of a new LiveCounter object to be created by a mutation.
*
* Instantiated reflectively by `io.ably.lib.object.value.LiveCounter#create` —
* the class name and the single `(java.lang.Number)` constructor are a frozen
* contract with the `lib` module and must not change.
*
* Spec: RTLCV1, RTLCV2a, RTLCV3b, RTLCV3d
*/
public class DefaultLiveCounter(count: Number) : LiveCounter() {
/** Internal initial count (RTLCV2a). */
internal val count: Number = count
}
19 changes: 19 additions & 0 deletions liveobjects/src/main/kotlin/io/ably/lib/object/DefaultLiveMap.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.ably.lib.`object`

import io.ably.lib.`object`.value.LiveMap
import io.ably.lib.`object`.value.LiveMapValue

/**
* Implementation of the [LiveMap] value type: an immutable holder for the
* initial entries of a new LiveMap object to be created by a mutation.
*
* Instantiated reflectively by `io.ably.lib.object.value.LiveMap#create` —
* the class name and the single `(java.util.Map)` constructor are a frozen
* contract with the `lib` module and must not change.
*
* Spec: RTLMV1, RTLMV2a, RTLMV3b, RTLMV3d
*/
public class DefaultLiveMap(entries: Map<String, LiveMapValue>) : LiveMap() {
/** Internal initial entries (RTLMV2a); defensively copied for immutability (RTLMV3d). */
internal val entries: Map<String, LiveMapValue> = HashMap(entries)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.ably.lib.`object`

import io.ably.lib.`object`.Subscription

/**
* Implementation of the public [Subscription] handle returned by the
* `subscribe` methods of the path/instance APIs.
*
* Spec: SUB1, SUB2a, SUB2b (idempotent unsubscribe)
*/
internal class DefaultSubscription(private val onUnsubscribe: () -> Unit) : Subscription {

@Volatile
private var unsubscribed = false

override fun unsubscribe() {
if (unsubscribed) return // SUB2b - subsequent calls are no-ops
unsubscribed = true
onUnsubscribe()
}
}
50 changes: 50 additions & 0 deletions liveobjects/src/main/kotlin/io/ably/lib/object/Errors.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.ably.lib.`object`

import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo

/**
* Error codes and helpers for the path-based public API implementation.
* Copied (and extended with the path-API codes) from the legacy package so
* this package has no dependency on `io.ably.lib.objects`.
*/
internal enum class ObjectErrorCode(val code: Int) {
BadRequest(40_000),
InternalError(50_000),
InvalidObject(92_000),
InvalidInputParams(40_003),
MapValueDataTypeUnsupported(40_013),
PathNotResolved(92_005), // RTPO3c2 - write operation on a path that does not resolve
ObjectsTypeMismatch(92_007), // RTTS5d2/RTTS9d2 - operation on a cast wrapper with mismatched resolved type
}

internal enum class ObjectHttpStatusCode(val code: Int) {
BadRequest(400),
InternalServerError(500),
}

internal fun objectsException(
errorMessage: String,
errorCode: ObjectErrorCode,
statusCode: ObjectHttpStatusCode = ObjectHttpStatusCode.BadRequest,
cause: Throwable? = null,
): AblyException {
val errorInfo = ErrorInfo(errorMessage, statusCode.code, errorCode.code)
return cause?.let { AblyException.fromErrorInfo(it, errorInfo) } ?: AblyException.fromErrorInfo(errorInfo)
}

/** ErrorInfo 400 / 40003 - invalid input (RTLMV4a/b, RTLCV4a, key validation). */
internal fun invalidInputError(message: String) =
objectsException(message, ObjectErrorCode.InvalidInputParams)

/** ErrorInfo 400 / 92005 - write operation on an unresolvable path (RTPO3c2). */
internal fun pathNotResolvedError(path: String) =
objectsException("Path could not be resolved: \"$path\"", ObjectErrorCode.PathNotResolved)

/** ErrorInfo 400 / 92007 - resolved/wrapped type does not match the typed wrapper (RTTS5d2/RTTS9d2). */
internal fun typeMismatchError(message: String) =
objectsException(message, ObjectErrorCode.ObjectsTypeMismatch)

/** ErrorInfo 500 / 92000 - invalid internal object state. */
internal fun objectStateError(message: String) =
objectsException(message, ObjectErrorCode.InvalidObject, ObjectHttpStatusCode.InternalServerError)
113 changes: 113 additions & 0 deletions liveobjects/src/main/kotlin/io/ably/lib/object/ObjectsBridge.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.ably.lib.`object`

import io.ably.lib.util.Log
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import io.ably.lib.types.AblyException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import kotlinx.coroutines.launch

/**
* The single abstract seam between this package and the realtime objects
* system. The path/instance implementation classes depend ONLY on this
* contract; a bridge (implemented outside this package, alongside the
* realtime internals) provides the graph views, preconditions, publishing and
* update fan-in. This keeps `io.ably.lib.object` free of any dependency on
* `io.ably.lib.objects`.
*/
internal abstract class ObjectsBridge {

/** The channel this objects instance belongs to (used for PAOM2e/PAOM3b). */
internal abstract val channelName: String

/** The root InternalLiveMap view (objectId `root`), or null if unavailable. Spec: RTO3, RTPO2b */
internal abstract fun getRootNode(): MapNode?

/** Looks up a non-tombstoned object view by id, or null. Spec: RTO3a */
internal abstract fun getNode(objectId: String): ObjectsNode?

/** Access API preconditions; throws ErrorInfo-carrying AblyException on failure. Spec: RTO25 */
internal abstract fun throwIfInvalidAccessApiConfiguration()

/** Write API preconditions; throws ErrorInfo-carrying AblyException on failure. Spec: RTO26 */
internal abstract fun throwIfInvalidWriteApiConfiguration()

/** Publishes the messages and applies them locally on ACK. Spec: RTO15, RTO20 */
internal abstract suspend fun publish(messages: List<WireObjectMessage>)

/** Current server time in epoch milliseconds. Spec: RTO16 */
internal abstract suspend fun getServerTime(): Long

/** Ensures the channel is attached and objects are SYNCED. Spec: RTO23b, RTO23c, RTO23e */
internal abstract suspend fun ensureAttachedAndSynced()

/**
* Registry for path-based subscriptions (RTPO19). Bridge implementations
* feed it via [notifyUpdated].
*/
internal val pathSubscriptionRegister = PathObjectSubscriptionRegister(this)

/** Scope used to expose suspend write operations as CompletableFutures. */
private val asyncScope =
CoroutineScope(Dispatchers.Default + CoroutineName("ObjectsBridge") + SupervisorJob())

/** Per-object message-carrying update listeners (instance subscriptions, RTINS16). */
private val updateListeners =
ConcurrentHashMap<String, CopyOnWriteArrayList<(Set<String>, WireObjectMessage?) -> Unit>>()

/**
* Subscribes to updates applied to the object with [objectId]. The listener
* receives the set of updated map keys (empty for counters) and the source
* message when the update originated from an operation. Returns an
* unsubscribe handle.
*/
internal fun subscribeToUpdates(objectId: String, listener: (Set<String>, WireObjectMessage?) -> Unit): () -> Unit {
val listeners = updateListeners.computeIfAbsent(objectId) { CopyOnWriteArrayList() }
listeners.add(listener)
return { listeners.remove(listener) }
}

/**
* Entry point for bridge implementations: call after an update has been
* applied to an object, with the keys that changed (empty for counters) and
* the source ObjectMessage when the update came from an operation (null for
* sync-induced changes). Fans out to instance subscriptions (RTINS16) and
* path subscriptions (RTPO19).
*/
internal fun notifyUpdated(objectId: String, updatedKeys: Set<String>, message: WireObjectMessage?) {
updateListeners[objectId]?.forEach { listener ->
try {
listener(updatedKeys, message)
} catch (t: Throwable) {
Log.e("ObjectsBridge", "Error in update listener for objectId=$objectId", t)
}
}
pathSubscriptionRegister.notifyObjectUpdated(objectId, updatedKeys, message)
}

/**
* Runs a suspend write and exposes it as a CompletableFuture<Void>;
* failures complete exceptionally with the underlying AblyException.
*/
internal fun launchWithVoidFuture(block: suspend () -> Unit): CompletableFuture<Void> {
val future = CompletableFuture<Void>()
asyncScope.launch {
try {
block()
future.complete(null)
} catch (throwable: Throwable) {
when (throwable) {
is AblyException -> future.completeExceptionally(throwable)
else -> future.completeExceptionally(
objectsException("Error executing operation", ObjectErrorCode.BadRequest, cause = throwable)
)
}
}
}
return future
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.ably.lib.`object`

import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import java.util.Base64

/** Object type discriminator used in objectId generation. Spec: RTO14 */
internal enum class WireObjectType(val value: String) {
Map("map"),
Counter("counter"),
}

/**
* ObjectId generation for client-created objects. Copied from the legacy
* `io.ably.lib.objects.ObjectId` so this package has no dependency on it -
* the format `type:base64url(sha256(initialValue:nonce))@msTimestamp` is a
* wire contract.
*
* Spec: RTO14, RTO6b1
*/
internal object ObjectsIdentifier {
internal fun fromInitialValue(
objectType: WireObjectType,
initialValue: String,
nonce: String,
msTimestamp: Long,
): String {
val valueForHash = "$initialValue:$nonce".toByteArray(StandardCharsets.UTF_8)
// RTO14b - hash the initial value and nonce to create a unique identifier
val hashBytes = MessageDigest.getInstance("SHA-256").digest(valueForHash)
val urlSafeHash = Base64.getUrlEncoder().withoutPadding().encodeToString(hashBytes)
return "${objectType.value}:$urlSafeHash@$msTimestamp"
}
}

/**
* Generates a random nonce string for object creation (16 alphanumeric chars).
* Copied from the legacy `generateNonce`. Spec: RTLMV4g, RTLCV4d
*/
internal fun generateObjectNonce(): String {
val chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
return (1..16).map { chars.random() }.joinToString("")
}
32 changes: 32 additions & 0 deletions liveobjects/src/main/kotlin/io/ably/lib/object/ObjectsNode.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.ably.lib.`object`

/**
* Abstract view over the live objects graph, implemented by the bridge that
* connects this package to the realtime objects system (kept abstract so this
* package has no dependency on `io.ably.lib.objects`).
*
* Contract for implementations:
* - tombstoned objects are never returned by [ObjectsBridge.getNode] /
* [ObjectsBridge.getRootNode];
* - [MapNode.entries] / [MapNode.get] expose only non-tombstoned entries that
* carry data; values referencing other objects carry `objectId` in their
* [WireObjectData].
*/
internal interface ObjectsNode {
val objectId: String
}

/** View over an InternalLiveMap (RTLM1). */
internal interface MapNode : ObjectsNode {
/** Snapshot of the current non-tombstoned entries. */
fun entries(): Map<String, WireObjectData>

/** The current non-tombstoned entry for [key], or null. */
fun get(key: String): WireObjectData?
}

/** View over an InternalLiveCounter (RTLC1). */
internal interface CounterNode : ObjectsNode {
/** The current counter value (RTLC5). */
fun count(): Double
}
49 changes: 49 additions & 0 deletions liveobjects/src/main/kotlin/io/ably/lib/object/PathFinder.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.ably.lib.`object`

/**
* Computes every path from the root map to a target object by walking the
* objects graph on demand (over objectId references), instead of maintaining
* incremental parent references like ably-js does (RTLO3f/RTLO4g/RTLO4h).
* RTLO4f-equivalent observable behavior; cycle-safe.
*
* Spec: RTLO4f (equivalent)
*/
internal object PathFinder {

/**
* Returns all paths (as segment lists) from the root map to the object with
* [targetObjectId]. The root itself yields a single empty path.
*/
internal fun findFullPaths(bridge: ObjectsBridge, targetObjectId: String): List<List<String>> {
val root = bridge.getRootNode() ?: return emptyList()
if (targetObjectId == root.objectId) return listOf(emptyList())
val result = mutableListOf<List<String>>()
walk(bridge, root, targetObjectId, currentPath = mutableListOf(), visited = mutableSetOf(), result)
return result
}

private fun walk(
bridge: ObjectsBridge,
map: MapNode,
targetObjectId: String,
currentPath: MutableList<String>,
visited: MutableSet<String>,
result: MutableList<List<String>>,
) {
if (!visited.add(map.objectId)) return // cycle guard
for ((key, data) in map.entries()) {
val refId = data.objectId ?: continue
if (refId == targetObjectId) {
result.add(currentPath + key)
continue
}
val refNode = bridge.getNode(refId)
if (refNode is MapNode) {
currentPath.add(key)
walk(bridge, refNode, targetObjectId, currentPath, visited, result)
currentPath.removeAt(currentPath.size - 1)
}
}
visited.remove(map.objectId)
}
}
Loading
Loading