Skip to content

feat: Typescript implementation#6

Open
bjing94 wants to merge 29 commits intomainfrom
without-child-jobs
Open

feat: Typescript implementation#6
bjing94 wants to merge 29 commits intomainfrom
without-child-jobs

Conversation

@bjing94
Copy link
Copy Markdown
Collaborator

@bjing94 bjing94 commented Jul 15, 2025

No description provided.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jul 15, 2025

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

Attention: Patch coverage is 65.66265% with 228 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/worker.ts 60.57% 192 Missing ⚠️
src/queue.ts 75.67% 35 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

Comment thread src/job.ts
this.data = data.data
this.meta = {
retryCount: 0,
startTime: Date.now() + (data.delay ?? 0),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Лучше вообще не писать свойство, если это можно сделать.
И в целом нейминг свойст можно более ужатый

Comment thread src/job.ts
startTime: Date.now() + (data.delay ?? 0),
failed: false,
// TODO: Is this correct?
timeout: data.timeout ?? 0,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это нафиг

Comment thread src/job.ts
this.meta = {
retryCount: 0,
startTime: Date.now() + (data.delay ?? 0),
failed: false,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это по принципу отсутствие failed === false

Comment thread src/queue.ts
subjects: subjects,
duplicate_window: nanos(this.duplicateWindow),
})
console.log(`Stream '${this.name}' created successfully.`)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Давай логгирование уберем и перейдем на евен емиттинг.
Возможно самое простое - это отнаследоваться от EventEmitter. Реализацию опять таки можно подсмотреть в BullMQ, вроде удобно.

Comment thread package.json
"@nats-io/kv": "3.0.2",
"@nats-io/jetstream": "3.0.2",
"@nats-io/nats-core": "3.0.2",
"@nats-io/transport-node": "3.0.2"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вот эти зависимости надо указывать в peerDependecies.
И в дев вернуть.

Comment thread src/queue.ts
}

// TODO: I think this is not needed
public async close(): Promise<void> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если коннекшн внешний, то можно close убирать.

Comment thread src/queue.ts

public async addJob(job: Job, priority: number = 1): Promise<void> {
if (this.connection.isClosed()) {
throw new Error('Cannot add job when NATS connection is closed.')
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Наверное, надо хотя начать с того, чтобы все ошибки либы были от кастомного класса
NatsQueueError extends Error типа

Comment thread src/queue.ts
}

public async addJob(job: Job, priority: number = 1): Promise<void> {
if (this.connection.isClosed()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Нахер

Comment thread src/queue.ts
const DEFAULT_DEDUPLICATE_WINDOW = 2000
const MIN_DUPLICATE_WINDOW = 100

export type QueueOpts = {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Подумай насчет префикса

Comment thread src/job.ts
@@ -0,0 +1,29 @@
import { JobCreateData } from './types'

export class Job {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Надо джобу иметь для консьюмера.
И это дает возможность что-то попытаться сделать lazy.

Comment thread src/queue.ts
const msgHeaders = headers()
msgHeaders.set('Nats-Msg-Id', job.id)

await this.client.publish(`${job.queueName}.${priority}`, jobData, {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Давайте попробуем
queueName.PRIORITY.jobName

Comment thread src/worker.ts
export type WorkerOpts = {
client: JetStreamClient
name: string
processor: (job: JsMsg, timeout: number) => Promise<void>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout минус
Здесь job - то уже наш класс, а не JsMsg

Comment thread src/worker.ts
for (let i = 1; i <= this.priorities; i++) {
// TODO: Naming might be wrong, independent of the queue name
const consumerName = `worker_group_${i}`
const subject = `${this.name}.${i}`
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это name.${i}.*

Comment thread src/worker.ts
`Job: name=${data.name} id=${data.id} is started with data=${data.data} in queue=${data.queueName}`,
)

const timeout = data.meta.timeout
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это минус

Comment thread src/worker.ts
)
}

const newId = `${crypto.randomUUID()}_${Date.now()}`
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Надо явно это вынести либо в отдельный метод, либо в метод класса джобы.

Comment thread src/worker.ts

protected async fetch(consumer: Consumer, count: number): Promise<JsMsg[]> {
// TODO: Maybe fail to fetch consumer info
const consumerInfo = await consumer.info()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне кажется не нужно

Comment thread src/worker.ts
filter_subject: subject,
name: consumerName,
durable_name: consumerName,
ack_policy: AckPolicy.All,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Глобальный конкаренси НЕ ДЕЛАЕМ!!

Comment thread src/worker.ts
const awaitedMessages: JsMsg[] = []

for await (const msg of msgs) {
awaitedMessages.push(msg)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне кажется, нечего тут массив собирать, давай процессить сразу.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants