我正在使用Swift Combine处理我的API请求。现在我正面临这样一种情况,我想要有4个以上的并行请求,我想要将它们压缩在一起。在此之前,我使用Zip4()运算符将4个请求压缩在一起。我可以想象你在多个步骤中进行压缩,但我不知道如何为它编写receiveValue。
这是我当前代码的一个简化,包含了4个并行请求:
Publishers.Zip4(request1, request2, request3, request4)
.sink(receiveCompletion: { completion in
// completion code if all 4 requests completed
}, receiveValue: { request1Response, request2Response, request3Response, request4Response in
// do something with request1Response
// do something with request2Response
// do something with request3Response
// do something with request4Response
}
)
.store(in: &state.subscriptions)发布于 2020-02-22 09:33:45
阻止您压缩任意数量的出版商的是一个非常不幸的事实,即Apple选择将zip运算符的输出设置为一个元组。元组是非常不灵活的,并且它们的能力有限。您不能拥有一个由十个元素组成的元组;您甚至不能将一个元素附加到一个元组,因为这会导致您获得一个不同的类型。因此,我们需要的是一种新的运算符,它可以完成与zip相同的工作,但会发出一些更强大、更灵活的结果,例如数组。
我们可以做一个!幸运的是,zip操作符本身有一个transform参数,可以让我们指定想要的输出类型。
好的,为了说明,我将把10个出版商压缩在一起。首先,我将创建一个包含10个发布者的数组;它们仅仅是发布者,但这足以说明这一点,并且为了证明我没有作弊,我将对每个发布者附加一个任意的延迟:
let justs = (1...10).map {
Just($0)
.delay(for: .seconds(Int.random(in:1...3)), scheduler: DispatchQueue.main)
.eraseToAnyPublisher() }好了,现在我已经有了一组发布者,我将把它们压缩在一个循环中:
let result = justs.dropFirst().reduce(into: AnyPublisher(justs[0].map{[$0]})) {
res, just in
res = res.zip(just) {
i1, i2 -> [Int] in
return i1 + [i2]
}.eraseToAnyPublisher()
}注意zip操作符后面的闭包!这确保了我的输出将是一个Array<Int>而不是元组。与元组不同,我可以创建任意大小的数组,只需在每次循环中添加元素即可。
好的,result现在是一家Zip出版商,将十家出版商整合在一起。为了证明这一点,我只需给它附加一个订阅者并打印输出:
result.sink {print($0)}.store(in: &self.storage)我们运行代码。有一个令人心跳停止的暂停--这是正确的,因为每个发布者都有不同的随机延迟,压缩的规则是在我们得到任何输出之前,他们都需要发布。它们迟早都会这样做,并且输出会出现在控制台中:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]完全正确的答案!我已经证明,实际上我确实将10个发布者压缩在一起,以产生由每个发布者的单个贡献组成的输出。
将任意数量的数据任务发布者(或您正在使用的任何内容)压缩在一起也没什么不同。
(有关如何序列化任意数量的数据任务发布者的相关问题,请参阅Combine framework serialize async operations。)
发布于 2020-07-07 01:53:24
extension Publishers {
struct ZipMany<Element, F: Error>: Publisher {
typealias Output = [Element]
typealias Failure = F
private let upstreams: [AnyPublisher<Element, F>]
init(_ upstreams: [AnyPublisher<Element, F>]) {
self.upstreams = upstreams
}
func receive<S: Subscriber>(subscriber: S) where Self.Failure == S.Failure, Self.Output == S.Input {
let initial = Just<[Element]>([])
.setFailureType(to: F.self)
.eraseToAnyPublisher()
let zipped = upstreams.reduce(into: initial) { result, upstream in
result = result.zip(upstream) { elements, element in
elements + [element]
}
.eraseToAnyPublisher()
}
zipped.subscribe(subscriber)
}
}
}单元测试可以使用以下内容作为输入:
let upstreams: [AnyPublisher<String, Never>] = [
Just("first")
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher(),
Just("second").eraseToAnyPublisher()
].receive(on:)将该事件的发射放在主队列的末尾,以便它将在"second"之后发出。
发布于 2020-02-22 05:11:09
我认为你可以这样做:
let zipped1 = Publishers.Zip4(request1, request2, request3, request4)
let zipped2 = Publishers.Zip4(request5, request6, request7, request8)
Publishers.Zip(zipped1, zipped2)
.sink(receiveCompletion: { completion in
// completion code if all 8 requests completed
}, receiveValue: { response1, response2 in
// do something with response1.0
// do something with response1.1
// do something with response1.2, response1.3, response2.0, response2.1, response2.2, response2.3
}
)
.store(in: &state.subscriptions)https://stackoverflow.com/questions/60345806
复制相似问题